In [13]:
# first set up the and run the kafka server also install kafka-python. The commands can be found in 'kakfa commands.txt'

from kafka import KafkaAdminClient
from kafka.admin import NewTopic

# Create an instance of KafkaAdminClient
admin_client = KafkaAdminClient(
    bootstrap_servers="localhost:9092",  # Update with your Kafka broker(s)
    client_id='my_client'
)

topic_name = 'my_first_topic'

num_partitions = 2
replication_factor = 1
topic_configs = {"retention.ms": str(60000), # all messages that are older than 10 min in the topic get deleted
                 "retention.bytes": str(int(100e6)), #when the messages in the topic exide 100 MB old messages get deleted 
                 'max.message.bytes': str(int(5e6))} #max allowed size in MB of each message set to 5MB 

try:
    my_first_topic = NewTopic(name=topic_name, num_partitions=num_partitions,
                            replication_factor=replication_factor,
                            topic_configs=topic_configs)

    admin_client.create_topics(new_topics=[my_first_topic])
    print(f'created topic: {topic_name}')
except:
    print(f'topic: {topic_name} already existed')
# List all topics
topics = admin_client.list_topics()
print("Topics:", topics)




topic: my_first_topic already existed
Topics: ['my_first_topic']


In [5]:
from kafka.admin import ConfigResource, ConfigResourceType

config_resource = ConfigResource(ConfigResourceType.TOPIC, topic_name)

configs = admin_client.describe_configs([config_resource])
# admin_client.alter_configs({config_resource: new_config})

configs[0].get_item('resources')


[(0,
  '',
  2,
  'my_first_topic',
  [('compression.type', 'producer', False, 5, False, []),
   ('leader.replication.throttled.replicas', '', False, 5, False, []),
   ('message.downconversion.enable', 'true', False, 5, False, []),
   ('min.insync.replicas', '1', False, 5, False, []),
   ('segment.jitter.ms', '0', False, 5, False, []),
   ('cleanup.policy', 'delete', False, 5, False, []),
   ('flush.ms', '9223372036854775807', False, 5, False, []),
   ('follower.replication.throttled.replicas', '', False, 5, False, []),
   ('segment.bytes', '1073741824', False, 5, False, []),
   ('retention.ms', '60000', False, 1, False, []),
   ('flush.messages', '9223372036854775807', False, 5, False, []),
   ('message.format.version', '3.0-IV1', False, 5, False, []),
   ('max.compaction.lag.ms', '9223372036854775807', False, 5, False, []),
   ('file.delete.delay.ms', '60000', False, 5, False, []),
   ('max.message.bytes', '5000000', False, 1, False, []),
   ('min.compaction.lag.ms', '0', False, 5, F

In [6]:
from kafka import KafkaProducer
import json

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                         value_serializer=lambda v: json.dumps(v).encode('utf-8'))

for i in range(3):
    data = {f'key': f'value{i}', f'key2': f'value{i}'}
    # send asyncronously with callbacks 
    producer.send(topic_name, value=data, partition=0)# the data is sent to the topic, in a specific partition , if the topic dosent exist it is created 
    print(f"Sent: data = {data}, partition = 0")

for i in range(3):
    data = {f'key': f'value{i}', f'key2': f'value{i}'}
    # send asyncronously with callbacks 
    producer.send(topic_name, value=data, partition=1)# the data is sent to the topic, in a specific partition , if the topic dosent exist it is created 
    print(f"Sent: data = {data}, partition = 1")


# future = producer.send(topic_name, b'another_message')
# result = future.get(timeout=60)

# block until all async messages are sent 
producer.flush()

Sent: data = {'key': 'value0', 'key2': 'value0'}, partition = 0
Sent: data = {'key': 'value1', 'key2': 'value1'}, partition = 0
Sent: data = {'key': 'value2', 'key2': 'value2'}, partition = 0
Sent: data = {'key': 'value0', 'key2': 'value0'}, partition = 1
Sent: data = {'key': 'value1', 'key2': 'value1'}, partition = 1
Sent: data = {'key': 'value2', 'key2': 'value2'}, partition = 1


In [9]:
from kafka import KafkaConsumer, TopicPartition

import json

consumer = KafkaConsumer(#topic = topic_name,
                         bootstrap_servers=['localhost:9092'],
                         auto_offset_reset='earliest', #will start consuming from the first message in the topic 
                         value_deserializer = lambda x: json.loads(x.decode("utf-8")),
                          consumer_timeout_ms=1000 #stop consumer from waiting for messages after 1000ms 
                         )

tp1 = TopicPartition(topic_name, 0)
tp2 = TopicPartition(topic_name, 1)
consumer.assign([tp1, tp2])

# msg = next(consumer)
# print(msg.value)
# must use try except or get raise StopIteration() error when time out occurs 


# try:
#     msg = next(consumer)
#     print(msg.value)
# except:
#     print('no data recieved')

for message in consumer: #consumer will bring all events from the start end then wait for the next event to happend untill it time outs after 1000ms 
    print(f"Received Value: {message.value}, Key:{message.key}, offset: {message.offset}, partition:{message.partition}")



Received Value: {'key': 'value0', 'key2': 'value0'}, Key:None, offset: 0, partition:0
Received Value: {'key': 'value1', 'key2': 'value1'}, Key:None, offset: 1, partition:0
Received Value: {'key': 'value2', 'key2': 'value2'}, Key:None, offset: 2, partition:0
Received Value: {'key': 'value0', 'key2': 'value0'}, Key:None, offset: 0, partition:1
Received Value: {'key': 'value1', 'key2': 'value1'}, Key:None, offset: 1, partition:1
Received Value: {'key': 'value2', 'key2': 'value2'}, Key:None, offset: 2, partition:1


In [11]:
admin_client.delete_topics(topics=[topic_name])

DeleteTopicsResponse_v3(throttle_time_ms=0, topic_error_codes=[(topic='my_first_topic', error_code=0)])