# **Data Streaming with Kafka**

# **OVERVIEW**
The project aims to build a Kafka pipeline that can receive real-time data from
telecommunications mobile money transactions and process it for analysis. The pipeline should
be designed to handle high volumes of data and ensure that the data is processed efficiently.


# **OBJECTIVES **
1. Set up a Kafka cluster: You must set up a Kafka cluster that can handle high volumes
of data. You can use either a cloud-based or on-premises Kafka cluster.
2. Develop a Kafka producer: You must develop a Kafka producer that can ingest data
from telecommunications mobile money transactions and send it to the Kafka cluster.
The producer should be designed to handle high volumes of data and ensure that the
data is sent to the Kafka cluster efficiently.
3. Develop a Kafka consumer: You must develop a Kafka consumer to receive data from
the Kafka cluster and process it for analysis. The consumer should be designed to
handle high volumes of data and ensure that the data is processed efficiently.
4. Process the data: Once you have set up the Kafka pipeline, you must process the data
for analysis. This may involve cleaning and aggregating the data, performing
calculations, and creating visualizations.
5. Test the solution: You must test your solution using the provided dummy json file. The
file contains sample data that you can use to ensure that your Kafka pipeline is working
correctly.
Here’s the dummy JSON file that represents

In [15]:
!pip install confluent-kafka

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [16]:
pip install kafka-python

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


Kafka cluster setup

Create a Kafka Producer instance and configure it with your Confluent Cloud credentials:

In [30]:
from confluent_kafka import Producer, KafkaError

import random
import time
# Set up Confluent Cloud credentials
conf = {
'bootstrap.servers': 'pkc-ewzgj.europe-west4.gcp.confluent.cloud:9092',
'security.protocol': 'SASL_SSL',
'sasl.mechanism': 'PLAIN',
'sasl.username': 'JWDRXK7ZNET3SSQO',
'sasl.password': 'eRZ18nXGN9kAxvflMUZU4wwBQhFWM+P5MRsgFncVDV3P49IUbz4mNwGh3e4ho47M', }

In [None]:

#test
# Create a Kafka producer instance
producer = Producer(conf)
# Publish dummy data to the topic every second
topic_name = 'moringa'

while True:
# Generate a random number between 0 and 100 as the message
 message = str(random.randint(0, 100))
 producer.produce(topic_name, message.encode('utf-8'))
 producer.flush()

print(f"Published message '{message}' to topic '{topic_name}'")
time.sleep(1)

Create a Kafka Consumer instance and configure it with your Confluent Cloud credentials:

In [20]:
from confluent_kafka import Consumer, KafkaError

# Set up Confluent Cloud credentials
conf = {
'bootstrap.servers': 'pkc-ewzgj.europe-west4.gcp.confluent.cloud:9092',
'security.protocol': 'SASL_SSL',
'sasl.mechanism': 'PLAIN',
'sasl.username': 'JWDRXK7ZNET3SSQO',
'sasl.password': 'eRZ18nXGN9kAxvflMUZU4wwBQhFWM+P5MRsgFncVDV3P49IUbz4mNwGh3e4ho47M', 
'group.id': 'learning'
}

In [None]:
#test
# Create a Kafka consumer instance
consumer = Consumer(conf)
topic_name = 'moringa'

# Subscribe to your Confluent Cloud topic and consume messages
consumer.subscribe([topic_name])
while True:
 msg = consumer.poll(1.0)
if msg is None:
continue
if msg.error():
print("Consumer error: {}".format(msg.error()))
continue
print('Received message: {}'.format(msg.value().decode('utf-8')))

Writing Data to Kafka Topics

In [None]:
from kafka import KafkaProducer
import json

# Create Kafka producer
producer = KafkaProducer(bootstrap_servers=['pkc-ewzgj.europe-west4.gcp.confluent.cloud:9092'])

# Define sample CDR data
data = {"transaction_id": "12345",
"sender_phone_number": "256777123456",
"receiver_phone_number": "256772987654",
"transaction_amount": 100000,
"transaction_time": "2023-04-19 12:00:00"
}

# Serialize CDR data to JSON
serialized_cdr = json.dumps(data).encode('utf-8')

# Produce CDRs to Kafka topic
producer.send('moringa', value=serialized_cdr)

In [None]:
from confluent_kafka import Consumer, KafkaError

consumer_conf = {
    'bootstrap.servers': 'pkc-ewzgj.europe-west4.gcp.confluent.cloud:9092',
    'group.id': 'my-group',
    'auto.offset.reset': 'earliest',
    'enable.auto.commit': False  # Disable auto commit
}

consumer = Consumer(consumer_conf)

while True:
    msg = consumer.poll(1.0)
    if msg is None:
        continue
    if msg.error():
        if msg.error().code() == KafkaError._PARTITION_EOF:
            print(f'Reached end of partition {msg.topic()}-{msg.partition()}')
        else:
            print(f'Error while consuming messages: {msg.error()}')
    else:
        print(f'Received message: {msg.value()}')
        consumer.commit({
            'topic': msg.topic(),
            'partition': msg.partition(),
            'offset': msg.offset()
        })

Data processing and analysis

In [None]:
from confluent_kafka import Consumer, Producer, KafkaError
import json

# Set up Kafka consumer
consumer = Consumer({
    'bootstrap.servers': 'pkc-ewzgj.europe-west4.gcp.confluent.cloud:9092',
    'group.id': 'group1',
    'auto.offset.reset': 'earliest'
})
consumer.subscribe(['mobile_money_transactions'])

# Set up Kafka producer
producer = Producer({'bootstrap.servers': 'pkc-ewzgj.europe-west4.gcp.confluent.cloud:9092'})

# Define function to filter data
def filter_data(msg):
    data = json.loads(msg.value().decode('utf-8'))
    if data['region'] == 'Kampala':
        return True
    else:
        return False

# Start streaming data and filter transactions
while True:
    msg = consumer.poll(1.0)
    if msg is None:
        continue
    if msg.error():
        print("Consumer error: {}".format(msg.error()))
        continue
    if filter_data(msg):
        producer.produce('kampala_transactions', msg.value())
    producer.flush()