In [17]:
!pip uninstall kafka -y
!pip uninstall kafka-python -y
!conda remove kafka -y
!pip show kafka-python
!pip install kafka-python pandas


import kafka
print(kafka.__file__)

Found existing installation: kafka-python 2.3.0
Uninstalling kafka-python-2.3.0:
  Successfully uninstalled kafka-python-2.3.0

PackagesNotFoundError: The following packages are missing from the target environment:
  - kafka


Collecting kafka-python
  Using cached kafka_python-2.3.0-py2.py3-none-any.whl.metadata (10.0 kB)
Using cached kafka_python-2.3.0-py2.py3-none-any.whl (326 kB)
Installing collected packages: kafka-python
Successfully installed kafka-python-2.3.0


ImportError: cannot import name 'ConsumerProtocolMemberMetadata_v0' from 'kafka.coordinator.protocol' (/opt/anaconda3/lib/python3.10/site-packages/kafka/coordinator/protocol.py)

In [15]:
import json
import pandas as pd
from kafka import KafkaProducer, KafkaConsumer

ImportError: cannot import name 'ConsumerProtocolMemberMetadata_v0' from 'kafka.coordinator.protocol' (/opt/anaconda3/lib/python3.10/site-packages/kafka/coordinator/protocol.py)

In [None]:
# ------------------------------------------
# Kafka Producer — Full Code + Explanation
# ------------------------------------------

# Import KafkaProducer class from kafka-python library
# This class allows you to send messages to Kafka topics.
from kafka import KafkaProducer

# JSON library used to convert Python dict -> JSON string -> bytes
import json

# ---------------------------------------------------------
# CREATE A KAFKA PRODUCER
# ---------------------------------------------------------
producer = KafkaProducer(
    # Hostname & port of Kafka Broker.
    # Producer connects to this broker, discovers all brokers in cluster.
    bootstrap_servers='localhost:9092',

    # Convert Python dict → JSON string → bytes,
    # because Kafka messages must be bytes.
    value_serializer=lambda v: json.dumps(v).encode('utf-8'),

    # Convert string keys into bytes (Kafka stores keys as bytes).
    key_serializer=lambda k: k.encode('utf-8') if k else None
)

# ---------------------------------------------------------
# SEND MESSAGES
# ---------------------------------------------------------
for i in range(5):

    # Create a Python dictionary to send as JSON
    msg = {"id": i, "value": f"message-{i}"}

    # Send to topic "test_topic"
    # Internally:
    # 1. Key decides which partition the message goes to
    # 2. Value is serialized into bytes
    # 3. Message sits in a local buffer (batch)
    producer.send("test_topic", key=str(i), value=msg)

    # Print for our reference
    print("Sent:", msg)

# ---------------------------------------------------------
# FLUSH AND CLOSE
# ---------------------------------------------------------

# flush(): forces all buffered (batched) messages to be delivered to Kafka
# KafkaProducer batches messages to improve throughput
producer.flush()

# close(): closes TCP connection + flushes leftover messages
producer.close()


Behind-the-scenes (Important Explanation)
1. KafkaProducer internal workflow
When you create:
KafkaProducer(...)
Kafka does the following in background:
Opens a TCP connection to the Kafka Broker (localhost:9092)
Gets metadata about:
All topics
All partitions
Leaders for each partition
Initializes a background I/O thread that:
Batches messages
Compresses (if configured)
Sends messages asynchronously
2. Serializers
Kafka messages must be bytes.
That means Python objects must be converted → bytes.
value_serializer
Converts dict → JSON → bytes
key_serializer
Converts string key → bytes
Kafka uses key to decide the partition:
Same key = same partition
Used for ordering guarantees
3. producer.send()
When you send:
producer.send("test_topic", key=str(i), value=msg)
Kafka does NOT immediately send it.
Instead:
Message is placed in an in-memory batch buffer
KafkaProducer background thread sends batches to broker
Broker writes message into:
Topic
Partition
Commit log (append-only file)
4. producer.flush()
Flush forces:
send all batched messages
wait for broker ACK
guaranteed delivery before closure

In [None]:
# ------------------------------------------
# Kafka Consumer — Full Code + Explanation
# ------------------------------------------

# Import KafkaConsumer class
from kafka import KafkaConsumer

# JSON for converting byte → JSON → Python dict
import json

# ---------------------------------------------------------
# CREATE CONSUMER
# ---------------------------------------------------------

consumer = KafkaConsumer(
    # Topic to subscribe to
    "test_topic",

    # Where Kafka Consumer should connect
    bootstrap_servers='localhost:9092',

    # If no previous offset is stored, start reading from:
    # 'earliest' -> beginning of topic
    # 'latest'   -> newest messages only
    auto_offset_reset='earliest',

    # Automatically commit offsets periodically
    # Kafka stores consumer's last read position in "__consumer_offsets"
    enable_auto_commit=True,

    # Consumers reading together share a GROUP ID
    # Kafka assigns partitions among consumers in the same group
    group_id="group1",

    # Convert bytes → JSON string → Python dict
    value_deserializer=lambda v: json.loads(v.decode('utf-8'))
)

print("Waiting for messages...")

# ---------------------------------------------------------
# READ MESSAGES (Infinite Loop)
# ---------------------------------------------------------

for message in consumer:

    # message.key is in bytes, message.value is Python dict (because we deserialized)
    print(f"Key={message.key}, Value={message.value}")


When you create KafkaConsumer()
Kafka does the following:
Connects to broker on localhost:9092
Joins consumer group group1
Broker decides which partition this consumer will read
Fetches offsets from internal Kafka topic __consumer_offsets
2. Consumption logic
For each new message:
Kafka stores messages in a commit log
Consumer polls (pull-based)
Consumer receives a batch of messages
Deserializer converts bytes → Python object
3. auto_offset_reset='earliest'
Used only when no committed offset exists.
Meaning:
First time consumer runs → starts from beginning
Next runs → continues from stored offset
4. enable_auto_commit=True
Consumer stores offsets automatically every 5 seconds.
Offsets are stored in Kafka topic:
__consumer_offsets
This ensures consumers can continue where they left off.

In [None]:
# ------------------------------------------
# INGESTING DATA FROM FILE (CSV → Kafka)
# ------------------------------------------

from kafka import KafkaProducer
import pandas as pd
import json

# ----------------------------------------------------
# Create Kafka Producer
# ----------------------------------------------------
producer = KafkaProducer(
    bootstrap_servers='localhost:9092',                 # Connect to broker
    value_serializer=lambda v: json.dumps(v).encode()  # Convert dict -> bytes
)

# ----------------------------------------------------
# Load CSV File into a DataFrame
# ----------------------------------------------------
df = pd.read_csv("employees.csv")     # Example file: name, age, dept

# ----------------------------------------------------
# Iterate each row and send as JSON to Kafka
# ----------------------------------------------------
for _, row in df.iterrows():

    record = row.to_dict()            # Convert row -> dict format
    producer.send("file_topic", value=record)  
    # Behind the scenes:
    # 1. Row converted to JSON bytes
    # 2. Added to producer buffer
    # 3. Background thread batches & sends to Kafka
    print("Sent:", record)

# ----------------------------------------------------
# Make sure all messages are pushed
# ----------------------------------------------------
producer.flush()
producer.close()


# ------------------------------------------
# INGESTING DATA FROM API → Kafka
# ------------------------------------------

import requests
from kafka import KafkaProducer
import json
import time

# ----------------------------------------------------
# Create Kafka Producer
# ----------------------------------------------------
producer = KafkaProducer(
    bootstrap_servers='localhost:9092',
    value_serializer=lambda v: json.dumps(v).encode()
)

# API endpoint to get data from
API_URL = "https://api.example.com/users"  
# Replace with real API

# ----------------------------------------------------
# Poll API and send each item to Kafka
# ----------------------------------------------------
while True:       # Keep running every interval

    response = requests.get(API_URL)
    data = response.json()            # Convert API response -> Python object

    for item in data:
        producer.send("api_topic", value=item)
        print("Sent:", item)

    # Wait 5 seconds before next API call
    time.sleep(5)

# ------------------------------------------
# INGESTING FROM DATABASE → Kafka
# ------------------------------------------

import mysql.connector
from kafka import KafkaProducer
import json
import time

# ----------------------------------------------------
# Create Kafka Producer
# ----------------------------------------------------
producer = KafkaProducer(
    bootstrap_servers='localhost:9092',
    value_serializer=lambda v: json.dumps(v).encode()
)

# ----------------------------------------------------
# MySQL Database Connection
# ----------------------------------------------------
conn = mysql.connector.connect(
    host="localhost",
    user="root",
    password="password",
    database="company"
)

cursor = conn.cursor(dictionary=True)    
# dictionary=True -> returns rows as dict (easier to convert to JSON)

# ----------------------------------------------------
# Periodically read table and push new rows
# ----------------------------------------------------
while True:
    cursor.execute("SELECT * FROM employees")   # Fetch table rows
    rows = cursor.fetchall()

    for row in rows:
        producer.send("db_topic", value=row)
        print("Sent:", row)

    time.sleep(10)   # Poll DB every 10 sec (simple incremental ingestion)


# ------------------------------------------
# INGESTING DATA FROM CLOUD STORAGE (AWS S3 → Kafka)
# ------------------------------------------

import boto3
import json
from kafka import KafkaProducer

# ----------------------------------------------------
# Create Kafka Producer
# ----------------------------------------------------
producer = KafkaProducer(
    bootstrap_servers='localhost:9092',
    value_serializer=lambda v: json.dumps(v).encode()
)

# ----------------------------------------------------
# Connect to AWS S3
# ----------------------------------------------------
s3 = boto3.client(
    's3',
    aws_access_key_id="YOUR_KEY",
    aws_secret_access_key="YOUR_SECRET"
)

bucket = "my-bucket"
file_name = "data.json"

# ----------------------------------------------------
# Download file content from S3
# ----------------------------------------------------
obj = s3.get_object(Bucket=bucket, Key=file_name)
content = obj["Body"].read().decode("utf-8")     # Read file as string
data = json.loads(content)                        # Convert -> Python list/dict

# ----------------------------------------------------
# Send each record to Kafka
# ----------------------------------------------------
for record in data:
    producer.send("cloud_topic", value=record)
    print("Sent:", record)

producer.flush()
