<a href="https://colab.research.google.com/github/FaisalWant/Apache-kafka/blob/main/kafka.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

setup Kafka:


- Kafka (Brokers: 127.0.0.1:9092)
- Zookeeper (Node: 127.0.0.1:2181)

In [5]:
!curl -sSOL https://dlcdn.apache.org/kafka/3.1.0/kafka_2.13-3.1.0.tgz
!tar -xzf kafka_2.13-3.1.0.tgz

In [3]:
!ls

kafka_2.13-3.1.0  kafka_2.13-3.1.0.tgz	sample_data


Using the default configurations (provided by Apache Kafka) for spinning up the instances.

In [6]:
!./kafka_2.13-3.1.0/bin/zookeeper-server-start.sh -daemon ./kafka_2.13-3.1.0/config/zookeeper.properties
!./kafka_2.13-3.1.0/bin/kafka-server-start.sh -daemon ./kafka_2.13-3.1.0/config/server.properties
!echo "Waiting for 10 secs until kafka and zookeeper services are up and running"
!sleep 10

Waiting for 10 secs until kafka and zookeeper services are up and running


In [7]:
!ps -ef | grep kafka

root        1531       1  0 12:27 ?        00:00:03 java -Xmx512M -Xms512M -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.headless=true -Xlog:gc*:file=/content/kafka_2.13-3.1.0/bin/../logs/zookeeper-gc.log:time,tags:filecount=10,filesize=100M -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dkafka.logs.dir=/content/kafka_2.13-3.1.0/bin/../logs -Dlog4j.configuration=file:./kafka_2.13-3.1.0/bin/../config/log4j.properties -cp /content/kafka_2.13-3.1.0/bin/../libs/activation-1.1.1.jar:/content/kafka_2.13-3.1.0/bin/../libs/aopalliance-repackaged-2.6.1.jar:/content/kafka_2.13-3.1.0/bin/../libs/argparse4j-0.7.0.jar:/content/kafka_2.13-3.1.0/bin/../libs/audience-annotations-0.5.0.jar:/content/kafka_2.13-3.1.0/bin/../libs/commons-cli-1.4.jar:/content/kafka_2.13-3.1.0/bin/../libs/commons-lang3-3.8.1.jar:/content/kafka_2.13

In [8]:
!./kafka_2.13-3.1.0/bin/kafka-topics.sh --create --bootstrap-server 127.0.0.1:9092 --replication-factor 1 --partitions 1 --topic susy-train
!./kafka_2.13-3.1.0/bin/kafka-topics.sh --create --bootstrap-server 127.0.0.1:9092 --replication-factor 1 --partitions 2 --topic susy-test


Created topic susy-train.
Created topic susy-test.


In [9]:
!./kafka_2.13-3.1.0/bin/kafka-topics.sh --describe --bootstrap-server 127.0.0.1:9092 --topic susy-train
!./kafka_2.13-3.1.0/bin/kafka-topics.sh --describe --bootstrap-server 127.0.0.1:9092 --topic susy-test

Topic: susy-train	TopicId: 2RrhvnwcRqmy9SRBmlFB9g	PartitionCount: 1	ReplicationFactor: 1	Configs: segment.bytes=1073741824
	Topic: susy-train	Partition: 0	Leader: 0	Replicas: 0	Isr: 0
Topic: susy-test	TopicId: o2mOnDOpSFWOgSqLH8fyEQ	PartitionCount: 2	ReplicationFactor: 1	Configs: segment.bytes=1073741824
	Topic: susy-test	Partition: 0	Leader: 0	Replicas: 0	Isr: 0
	Topic: susy-test	Partition: 1	Leader: 0	Replicas: 0	Isr: 0


In [10]:
!./kafka_2.13-3.1.0/bin/kafka-topics.sh --list --bootstrap-server 127.0.0.1:9092


susy-test
susy-train


In [11]:
!./kafka_2.13-3.1.0/bin/kafka-console-producer.sh  --broker-list 127.0.0.1:9092  --topic susy-train 

>new
>thread
>images
>soon to follow
>

In [12]:
!./kafka_2.13-3.1.0/bin/kafka-console-consumer.sh --topic susy-train --bootstrap-server localhost:9092 --from-beginning

new
thread
images
soon to follow
Processed a total of 4 messages


Using Python to interact with kafka

In [2]:
!pip install confluent-kafka

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting confluent-kafka
  Downloading confluent_kafka-1.9.0-cp37-cp37m-manylinux2010_x86_64.whl (3.1 MB)
[K     |████████████████████████████████| 3.1 MB 13.8 MB/s 
[?25hInstalling collected packages: confluent-kafka
Successfully installed confluent-kafka-1.9.0


In [None]:
import asyncio 

from confluent_kafka import Consumer, Producer 
from confluent_kafka.admin import AdminClient, NewTopic 

BROKER_URL= "PLAINTEXT://localhost:9092" 
TOPIC_NAME= "susy-train" 

async def produce(topic_name): 
  """ Produce data into kafka topic""" 
  p= Producer({"bootstrap.servers":BROKER_URL})
  curr_iteration=0 
  
  while True: 
    p.produce(TOPIC_NAME, f"Message:{curr_iteration}")
    curr_iteration+=1 


    await asyncio.sleep(1)




async def consume(topic_name): 
  """ consume data from kafka topic""" 
  c= Consumer({"bootstrap.servers":BROKER_URL, "group.id":"First-Python_consumer"})
  c.subscribe([TOPIC_NAME]) 

  while True:
    message= c.poll(1.0)
    if message is None :
      print("No message received") 
    
    elif message.error() is not None: 
      print(f"Message had an error{ message.error()}")
    else: 
      print(f"Key: {message.key()}, Value:{message.value()}")

    await asyncio.sleep(1) 


async def produce_consume():
  """ Runs the producer and consumer task"""
  t1= asyncio.create_task(produce(TOPIC_NAME)) 
  t2= asyncio.create_task(consume(TOPIC_NAME))

  await t1
  await t2



def main():
  client= AdminClient({"bootstrap.servers":BROKER_URL})
  topic= NewTopic(TOPIC_NAME, num_partitions=1, replication_factor=1)

  client.create_topics([topic])

  try: 
    asyncio.run(produce_consume()) 
  
  except: 
    print("Shutting Down") 
  
  finally: 
    client.delete_topics([topic]) 
    pass 



main()

In [None]:
#!tail -f /var/log/journal/confluent-kafka.service.log    ---Kafka logs

Create Topic 

In [6]:
!./kafka_2.13-3.1.0/bin/kafka-topics.sh --create --bootstrap-server 127.0.0.1:9092 --replication-factor 1 --partitions 1 --topic kafka-arch 


Created topic kafka-arch.


In [10]:
!./kafka_2.13-3.1.0/bin/kafka-topics.sh --alter --topic kafka-arch --partitions 3 --bootstrap-server 127.0.0.1:9092

In [None]:
! ls kafka_2.13-3.1.0/bin

In [None]:
! ls kafka_2.13-3.1.0/config

In [None]:
! ls kafka_2.13-3.1.0/libs

In [29]:
! ls kafka_2.13-3.1.0/

bin  config  libs  LICENSE  licenses  logs  NOTICE  site-docs


In [39]:
! ls /usr

bin  games  grte  include  lib	lib32  local  sbin  share  src


In [42]:
! find / -name kafka-arch



find: ‘/proc/28/task/28/net’: Invalid argument
find: ‘/proc/28/net’: Invalid argument


In [None]:
import asyncio 

from confluent_kafka import Consumer, Producer 
from confluent_kafka.admin import AdminClient, NewTopic 

BROKER_URL= "PLAINTEXT://localhost:9092" 
TOPIC_NAME="sample2" 

async def produce(topic_name): 
  """ Produce data into kafka topic""" 
  p= Producer({"bootstrap.servers":BROKER_URL})
  curr_iteration=0 
  
  while True: 
    p.produce(TOPIC_NAME, f"Message:{curr_iteration}")
    curr_iteration+=1 


    await asyncio.sleep(1)




async def consume(topic_name): 
  """ consume data from kafka topic""" 
  c= Consumer({"bootstrap.servers":BROKER_URL, "group.id":"First-Python_consumer"})
  c.subscribe([TOPIC_NAME]) 

  while True:
    message= c.poll(1.0)
    if message is None :
      print("No message received") 
    
    elif message.error() is not None: 
      print(f"Message had an error{ message.error()}")
    else: 
      print(f"Key: {message.key()}, Value:{message.value()}")

    await asyncio.sleep(1) 


async def produce_consume():
  """ Runs the producer and consumer task"""
  t1= asyncio.create_task(produce(TOPIC_NAME)) 
  t2= asyncio.create_task(consume(TOPIC_NAME))

  await t1
  await t2



#--------------------------------------------------
def topic_exists(client, topic_name): 
  """checks if the given topic exists"""
  topic_metadata= client.list_topics(timeout=5) 
  return topic_metadata.topics.get(topic_name) is not None 



def create_topic(client, topic_name): 
  """ create the topic with the given topic name""" 
  futures= client.create_topics(
      [
       NewTopic(
           topic= topic_name, 
           num_partitions=5, 
           replication_factor=1,
           config={
               "cleanup.policy": "compact",
               "compression.type": "lz4",
               "delete.retention.ms": "100",
               "file.delete.delay.ms":100,

           }
       )
      ]
  )

  for _, future in futures.items():
    try: 
      future.result()
      print("Topic Created") 
    except Exception as e: 
      print(f"failed to created topic {topic_name}:{e}")
      raise 



def main(): 
  """ check fro topic and creates the topic if it does not exist""" 

  client = AdminClient({"bootstrap.servers":BROKER_URL}) 

  # TODO: Decide on a topic name 
  topic_name= "sample2" 
  exists= topic_exists(client, topic_name) 

  print(f"Topic{topic_name} exists: {exists}")

  if exists is False: 
    create_topic(client, topic_name) 
  
  try: 
    asyncio.run(produce_consume())

  except KeyboardInterrupt as e: 
    print("shutting Down") 



main()