TASK ONE: DATA INGESTION WITH APACHE KAFKA
Set up Apache Kafka for data ingestion and streaming. This project helps you
understand the basics of event streaming and data processing. Create Kafka
producers and consumers in Java or Python. Stream and process data from
producers to consumers.

Step 1: Install Kafka and Dependencies


In [None]:
# Install Java

!apt-get install openjdk-8-jdk-headless -qq > /dev/null


In [None]:
# Download and extract Kafka
!wget -q https://archive.apache.org/dist/kafka/2.8.0/kafka_2.12-2.8.0.tgz
!tar -xzf kafka_2.12-2.8.0.tgz
!mv kafka_2.12-2.8.0 /usr/local/kafka


Step 2: Start Kafka and Zookeeper


In [None]:
# Start Zookeeper
!nohup /usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties &


nohup: appending output to 'nohup.out'


In [None]:
# Start Kafka
!nohup /usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties &


nohup: appending output to 'nohup.out'


Step 3: Create a Kafka Topic


In [None]:

! /usr/local/kafka/bin/kafka-topics.sh --create --topic test-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1


Created topic test-topic.


Step 4: Install Kafka Python Library


In [None]:

!pip install kafka-python


Collecting kafka-python
  Downloading kafka_python-2.0.2-py2.py3-none-any.whl.metadata (7.8 kB)
Downloading kafka_python-2.0.2-py2.py3-none-any.whl (246 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m246.5/246.5 kB[0m [31m3.8 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: kafka-python
Successfully installed kafka-python-2.0.2


sssss

In [None]:
# Install Java
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# Download and extract Kafka
!wget -q https://archive.apache.org/dist/kafka/2.8.0/kafka_2.12-2.8.0.tgz
!tar -xzf kafka_2.12-2.8.0.tgz
!mv kafka_2.12-2.8.0 /usr/local/kafka


In [None]:
# Start Zookeeper
!nohup /usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties &

# Start Kafka
!nohup /usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties &


nohup: appending output to 'nohup.out'
nohup: appending output to 'nohup.out'


In [None]:
! /usr/local/kafka/bin/kafka-topics.sh --create --topic test-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1


Error while executing topic command : Topic 'test-topic' already exists.
[2024-08-05 14:36:13,296] ERROR org.apache.kafka.common.errors.TopicExistsException: Topic 'test-topic' already exists.
 (kafka.admin.TopicCommand$)


In [None]:
!pip install kafka-python




Step 5: Create Kafka Producers and Consumers in Python


In [None]:
#Kafka Producer
from kafka import KafkaProducer
import json
import time

# Initialize the Kafka producer
producer = KafkaProducer(
    bootstrap_servers='localhost:9092',
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

# Send some messages to the topic
for i in range(10):
    message = {'number': i}
    producer.send('test-topic', message)
    print(f'Sent: {message}')
    time.sleep(1)

# Close the producer
producer.flush()
producer.close()


Sent: {'number': 0}
Sent: {'number': 1}
Sent: {'number': 2}
Sent: {'number': 3}
Sent: {'number': 4}
Sent: {'number': 5}
Sent: {'number': 6}
Sent: {'number': 7}
Sent: {'number': 8}
Sent: {'number': 9}


#Kafka Consumer


In [None]:
from kafka import KafkaConsumer
import json

# Initialize the Kafka consumer
consumer = KafkaConsumer(
    'test-topic',
    bootstrap_servers='localhost:9092',
    auto_offset_reset='earliest',
    enable_auto_commit=True,
    group_id='my-group',
    value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)

# Read messages from the topic
for message in consumer:
    print(f'Received: {message.value}')


Received: {'number': 0}
Received: {'number': 1}
Received: {'number': 2}
Received: {'number': 3}
Received: {'number': 4}
Received: {'number': 5}
Received: {'number': 6}
Received: {'number': 7}
Received: {'number': 8}
Received: {'number': 9}


In [None]:
n