## Kafka Producer

In [7]:
!pip install kafka-python

Collecting kafka-python
  Downloading kafka_python-2.0.2-py2.py3-none-any.whl.metadata (7.8 kB)
Downloading kafka_python-2.0.2-py2.py3-none-any.whl (246 kB)
   ---------------------------------------- 0.0/246.5 kB ? eta -:--:--
   - -------------------------------------- 10.2/246.5 kB ? eta -:--:--
   ---- ---------------------------------- 30.7/246.5 kB 660.6 kB/s eta 0:00:01
   --------------------------------- ------ 204.8/246.5 kB 2.1 MB/s eta 0:00:01
   ---------------------------------------- 246.5/246.5 kB 2.2 MB/s eta 0:00:00
Installing collected packages: kafka-python
Successfully installed kafka-python-2.0.2


In [8]:
# load important libraries
from time import sleep
from json import dumps
from kafka import KafkaProducer

In [10]:
# create producer
producer = KafkaProducer(bootstrap_servers=['172.29.16.101:9092'],
                         value_serializer=lambda x: dumps(x).encode('utf-8'))

### define topic

In [21]:
topic = 'traffic_data_group7'

### send message

In [27]:
# now write a message to an existing topic
# (with a delay of 5 seconds)
data = {'test' : "traffic data test"}
producer.send(topic, value=data)  

<kafka.producer.future.FutureRecordMetadata at 0x18611fe7ad0>

## Current Topics

Kafka topics act as channels for messages in an event streaming platform. Producers send data streams to specific topics, and consumers subscribe to those topics to receive the data. Imagine them as folders for categorized messages in a constantly flowing river of information. ou can write Python code using the `confluent_kafka.admin` library to programmatically list topics. This approach is useful if you want to integrate topic listing within your application.

In [2]:
from confluent_kafka.admin import AdminClient

In [3]:
conf = {'bootstrap.servers': '172.29.16.101:9092'}

In [4]:
kadmin = AdminClient(conf)

In [5]:
kadmin.list_topics().topics

{'here-api-routes': TopicMetadata(here-api-routes, 1 partitions),
 'traffic_data_group7': TopicMetadata(traffic_data_group7, 1 partitions),
 'stock-prices': TopicMetadata(stock-prices, 1 partitions),
 'hello-world': TopicMetadata(hello-world, 1 partitions),
 'london_routes': TopicMetadata(london_routes, 1 partitions),
 'yahoo-finance': TopicMetadata(yahoo-finance, 1 partitions),
 'wikimedia-changes': TopicMetadata(wikimedia-changes, 1 partitions),
 'delhi_routes': TopicMetadata(delhi_routes, 1 partitions),
 'testTopic': TopicMetadata(testTopic, 1 partitions),
 'roulette': TopicMetadata(roulette, 1 partitions),
 'berlin_routes': TopicMetadata(berlin_routes, 1 partitions),
 'here_api_data': TopicMetadata(here_api_data, 1 partitions),
 'traffic-data': TopicMetadata(traffic-data, 1 partitions),
 'hello': TopicMetadata(hello, 1 partitions),
 'nyt_article_publishes': TopicMetadata(nyt_article_publishes, 1 partitions),
 '__consumer_offsets': TopicMetadata(__consumer_offsets, 50 partitions)}

In [22]:
kadmin.list_consumer_groups()

<Future at 0x269bc6c9f50 state=running>

## Kafka Consumer

A Kafka consumer is a program that listens for and processes messages from specific Kafka topics. Think of it as an ear in a bustling marketplace, tuned in to receive messages of interest published by producers.

In [8]:
from kafka import KafkaConsumer
import json

consumer = KafkaConsumer(
 bootstrap_servers='172.29.16.101:9092',
 value_deserializer = lambda v: json.loads(v.decode('ascii')),
 auto_offset_reset='earliest'
)

In [None]:
consumer.subscribe(topics=topic)
for m in consumer:
  print ("%d:%d: v=%s" % (m.partition,
                          m.offset,
                          m.value['test']))

0:7: v=traffic data test


In [9]:
consumer.subscribe(topics='nyt_article_publishes')
for m in consumer:
  print ("%d:%d: v=%s" % (m.partition,
                          m.offset,
                          m.value['status']))

0:0: v=OK


KeyboardInterrupt: 