In [4]:
from kafka.admin import KafkaAdminClient, NewTopic

def create_topic(topic_name, num_partitions, replication_factor):
    admin_client = KafkaAdminClient(
        bootstrap_servers=['localhost:29092', 'localhost:29093'],
        client_id='test_client'
    )
    
    topic_list = []
    topic_list.append(NewTopic(name=topic_name, num_partitions=num_partitions, replication_factor=replication_factor))
    admin_client.create_topics(new_topics=topic_list, validate_only=False)

    admin_client.close()
    print(f"Topic '{topic_name}' created successfully.")

if __name__ == "__main__":
    create_topic("my_topic1", 1, 1)

Topic 'my_topic1' created successfully.


In [10]:
from kafka.admin import KafkaAdminClient, ConfigResource, ConfigResourceType
from kafka.errors import KafkaError

bootstrap_servers=['localhost:29092', 'localhost:29093'] #multibroker
#bootstrap_servers='localhost:9092' #single Broker


# Listing Topics
def list_topics(bootstrap_servers):
   try:
       admin_client = KafkaAdminClient(bootstrap_servers=bootstrap_servers)
       topics = admin_client.list_topics()
       admin_client.close()
       return topics
   except KafkaError as e:
       print(f"Failed to list topics: {e}")
       return []
   
list_topics(bootstrap_servers)   

['test',
 'test12345',
 'my_topic',
 '__consumer_offsets',
 'testjava',
 'test12',
 'csvdata']

In [9]:
# Deleting Topics
def delete_topic(bootstrap_servers, topic_name):
   try:
       admin_client = KafkaAdminClient(bootstrap_servers=bootstrap_servers)
       admin_client.delete_topics([topic_name])
       admin_client.close()
       print(f"Topic '{topic_name}' deleted.")
   except KafkaError as e:
       print(f"Failed to delete topic: {e}")


# Usage
bootstrap_servers=['localhost:29092', 'localhost:29093'] #multibroker
#bootstrap_servers='localhost:9092' #single Broker

topic_name="my_topic1"
# Delete Topic
delete_topic(bootstrap_servers, topic_name)

Topic 'my_topic1' deleted.


In [12]:
from kafka.admin import KafkaAdminClient, ConfigResource, ConfigResourceType
from kafka.errors import KafkaError


def modify_topic_config(bootstrap_servers, topic_name, config_updates):
   try:
       admin_client = KafkaAdminClient(bootstrap_servers=bootstrap_servers)
       topics = admin_client.list_topics()
       if topic_name not in topics:
           print(f"Topic '{topic_name}' does not exist.")
           return
      
       config_resource = ConfigResource(ConfigResourceType.TOPIC, topic_name, configs=config_updates)
      
       admin_client.alter_configs([config_resource])
       admin_client.close()
       print(f"Configuration for topic '{topic_name}' updated.")
   except KafkaError as e:
       print(f"Failed to modify topic configuration: {e}")
       
# Usage
if __name__ == "__main__":
   bootstrap_servers = ['localhost:29092', 'localhost:29093']
   topic_name = 'csvdata'
   config_updates = {
   'retention.ms': '86400000',  # Retain messages for 1 day
   'cleanup.policy': 'compact',  # Enable log compaction
   'segment.ms': '604800000',  # Roll log segments every 1 week
   'segment.bytes': '1073741824',  # Roll log segments at 1 GB
   'max.message.bytes': '2000000'  # Set max message size to 2 MB
}


modify_topic_config(bootstrap_servers, topic_name, config_updates)


Configuration for topic 'csvdata' updated.


Streaming

In [13]:
from kafka import KafkaProducer
import csv
import json
from kafka.errors import KafkaError

def read_csv(file_path):
    with open(file_path, mode='r') as file:
        csv_reader = csv.DictReader(file)
        for row in csv_reader:
            yield row

def on_send_success(record_metadata):
    print(f'Message successfully sent to topic {record_metadata.topic} partition {record_metadata.partition} offset {record_metadata.offset}')

def on_send_error(excp):
    print(f'I am an error callback: {excp}')
    # Handle the exception

def main():
    producer = KafkaProducer(
        bootstrap_servers = ['localhost:29092', 'localhost:29093'], #For Multi Brooker Cluster
        # bootstrap_servers = 'localhost:9092', #For Single Broker Cluster
        value_serializer=lambda v: json.dumps(v).encode('utf-8')
    )
    
    for data in read_csv('sample_data.csv'):
        producer.send('my_topic', value=data).add_callback(on_send_success).add_errback(on_send_error)
        
    producer.flush()
    producer.close()

if __name__ == '__main__':
    main()


Message successfully sent to topic my_topic partition 0 offset 0
Message successfully sent to topic my_topic partition 0 offset 1
Message successfully sent to topic my_topic partition 0 offset 2
Message successfully sent to topic my_topic partition 0 offset 3
Message successfully sent to topic my_topic partition 0 offset 4
Message successfully sent to topic my_topic partition 0 offset 5
Message successfully sent to topic my_topic partition 0 offset 6
Message successfully sent to topic my_topic partition 0 offset 7
Message successfully sent to topic my_topic partition 0 offset 8
Message successfully sent to topic my_topic partition 0 offset 9
Message successfully sent to topic my_topic partition 0 offset 10
Message successfully sent to topic my_topic partition 0 offset 11
Message successfully sent to topic my_topic partition 0 offset 12
Message successfully sent to topic my_topic partition 0 offset 13
Message successfully sent to topic my_topic partition 0 offset 14
Message successfully

In [16]:
from kafka import KafkaConsumer
import json
import mysql.connector
from mysql.connector import Error

def insert_into_db(data):
    try:
        conn = mysql.connector.connect(
        host = "gateway01.ap-southeast-1.prod.aws.tidbcloud.com",
        port = 4000,
        user = "3PTDVomKhQVBrZJ.root",
        password = "1ykHI1dKoeNvSlY4",
        database = "kafka",
        )
        cursor = conn.cursor()
        
        add_data = ("INSERT INTO your_table "
                    "(id, name, age, city) "
                    "VALUES (%s, %s, %s, %s)")
        
        data_tuple = (data['id'], data['name'], data['age'], data['city'])
        cursor.execute(add_data, data_tuple)
        
        conn.commit()
        print(f"Inserted data: {data}")
    except Error as e:
        print(f"Error: {e}")
    finally:
        if cursor:
            cursor.close()
        if conn:
            conn.close()

def main():
    print("Starting Kafka consumer...")
    consumer = KafkaConsumer(
        'my_topic',
        bootstrap_servers = ['localhost:29092', 'localhost:29093'],
        auto_offset_reset='earliest',
        enable_auto_commit=True,
        group_id='my_group',
        value_deserializer=lambda m: json.loads(m.decode('utf-8')),
        consumer_timeout_ms=10000  # Timeout after 10 seconds if no message is received
    )
    
    for message in consumer:
        data = message.value
        print(f"Received data: {data}")
        insert_into_db(data)
    
    print("No more messages in the topic or consumer timed out.")
    consumer.close()

if __name__ == '__main__':
    main()


Starting Kafka consumer...
Received data: {'id': '1', 'name': 'Person 1', 'age': '21', 'city': 'City 1'}
Inserted data: {'id': '1', 'name': 'Person 1', 'age': '21', 'city': 'City 1'}
Received data: {'id': '2', 'name': 'Person 2', 'age': '22', 'city': 'City 2'}
Inserted data: {'id': '2', 'name': 'Person 2', 'age': '22', 'city': 'City 2'}
Received data: {'id': '3', 'name': 'Person 3', 'age': '23', 'city': 'City 3'}
Inserted data: {'id': '3', 'name': 'Person 3', 'age': '23', 'city': 'City 3'}
Received data: {'id': '4', 'name': 'Person 4', 'age': '24', 'city': 'City 4'}
Inserted data: {'id': '4', 'name': 'Person 4', 'age': '24', 'city': 'City 4'}
Received data: {'id': '5', 'name': 'Person 5', 'age': '25', 'city': 'City 5'}
Inserted data: {'id': '5', 'name': 'Person 5', 'age': '25', 'city': 'City 5'}
Received data: {'id': '6', 'name': 'Person 6', 'age': '26', 'city': 'City 6'}
Inserted data: {'id': '6', 'name': 'Person 6', 'age': '26', 'city': 'City 6'}
Received data: {'id': '7', 'name': 'P