In [None]:
pip install confluent-kafka


In [None]:
try:
    from confluent_kafka import Producer
    print("confluent_kafka is installed and imported successfully!")
except ImportError as e:
    print(f"Error importing confluent_kafka: {e}")


## Procedure process 

In [1]:
import pandas as pd
from confluent_kafka import Producer
import json

# Function to produce messages to Kafka
def kafka_producer(server, topic, data):
    producer = Producer({'bootstrap.servers': server})
    
    def delivery_report(err, msg):
        if err is not None:
            print('Message delivery failed: {}'.format(err))
        else:
            print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))

    for record in data.to_dict(orient='records'):
        producer.produce(topic, value=json.dumps(record), callback=delivery_report)
        producer.poll(0)
    
    producer.flush()

# Kafka configuration
server = '172.21.82.235:9092'  # Remote Kafka server IP and port
topic = 'socialmedia'  # Replace with your topic name

# Sample data to send to Kafka
data = pd.DataFrame({
    'id': [1, 2, 3],
    'name': ['John', 'Jane', 'Doe'],
    'age': [28, 34, 29]
})

# Produce data to Kafka
kafka_producer(server, topic, data)


Message delivered to socialmedia [0]
Message delivered to socialmedia [0]
Message delivered to socialmedia [0]


## Consumer Process

In [None]:
from confluent_kafka import Consumer, KafkaError
import json

# Function to consume messages from Kafka
def kafka_consumer(server, group_id, topics):
    consumer = Consumer({
        'bootstrap.servers': server,
        'group.id': group_id,
        'auto.offset.reset': 'earliest'
    })
    
    consumer.subscribe(topics)
    
    while True:
        msg = consumer.poll(1.0)  # Timeout in seconds
        
        if msg is None:
            continue
        
        if msg.error():
            if msg.error().code() == KafkaError._PARTITION_EOF:
                continue
            else:
                print(msg.error())
                break
        
        try:
            # Deserialize the JSON data
            record = json.loads(msg.value().decode('utf-8'))
            # Process the record (e.g., print it or store it)
            print(record)
        except json.JSONDecodeError as e:
            # Handle the JSON decode error
            print(f"JSONDecodeError: {e}")
            print(f"Message content: {msg.value().decode('utf-8')}")
        
    consumer.close()

# Kafka configuration
server = '172.21.82.235:9092'  # Remote Kafka server IP and port
group_id = 'my_consumer_group'  # Replace with your desired group ID
topics = ['socialmedia']  # Replace with your topic name

# Consume data from Kafka
kafka_consumer(server, group_id, topics)


{'id': 1, 'name': 'John', 'age': 28}
{'id': 2, 'name': 'Jane', 'age': 34}
{'id': 3, 'name': 'Doe', 'age': 29}


In [None]:
pip install confluent-kafka pandas


In [None]:
Producer read CSV (socialmedia)

In [1]:
import pandas as pd
from confluent_kafka import Producer
import json

# Function to read CSV data
def read_csv(file_path):
    return pd.read_csv(file_path)

# Function to produce messages to Kafka
def kafka_producer(server, topic, data):
    producer = Producer({'bootstrap.servers': server})
    
    def delivery_report(err, msg):
        if err is not None:
            print(f'Message delivery failed: {err}')
        else:
            print(f'Message delivered to {msg.topic()} [{msg.partition()}]')
    
    for _, row in data.iterrows():
        # Convert each row to JSON and produce to Kafka
        json_data = row.to_json()
        producer.produce(topic, value=json_data.encode('utf-8'), callback=delivery_report)
        producer.poll(0)
    
    producer.flush()

# Kafka configuration
server = '172.21.82.235:9092'  # Replace with your Kafka server address
topic = 'socialmedia1'  # Replace with your desired topic name

# Path to your CSV file
csv_file = './Processed.csv'

# Read CSV file
data = read_csv(csv_file)

# Print the data read from CSV (optional)
print(f"Read data from CSV:\n{data}")

# Produce data to Kafka
kafka_producer(server, topic, data)


Read data from CSV:
    User_ID   Age      Gender  Platform  Daily_Usage_Time (minutes)  \
0     500.0  27.0      Female  Snapchat                       120.0   
1     488.0  21.0  Non-binary  Snapchat                        60.0   
2     776.0  28.0  Non-binary  Snapchat                       115.0   
3     869.0  27.0        Male  Telegram                       105.0   
4     573.0  21.0  Non-binary  Facebook                        55.0   
..      ...   ...         ...       ...                         ...   
91    418.0  27.0        Male   Twitter                        90.0   
92    184.0  22.0        Male   Twitter                        70.0   
93    896.0  33.0        Male  LinkedIn                        85.0   
94    708.0  24.0  Non-binary  Facebook                       110.0   
95    993.0  24.0        Male  Telegram                        75.0   

    Posts_Per_Day  Likes_Received_Per_Day  Comments_Received_Per_Day  \
0             4.0                    40.0              

## Consumer read from csv

In [None]:
from confluent_kafka import Consumer, KafkaError
import json

# Function to consume messages from Kafka
def kafka_consumer(server, group_id, topics):
    consumer = Consumer({
        'bootstrap.servers': server,
        'group.id': group_id,
        'auto.offset.reset': 'earliest'
    })
    
    consumer.subscribe(topics)
    
    while True:
        msg = consumer.poll(1.0)  # Timeout in seconds
        
        if msg is None:
            continue
        
        if msg.error():
            if msg.error().code() == KafkaError._PARTITION_EOF:
                continue
            else:
                print(f"Consumer error: {msg.error()}")
                break
        
        # Deserialize the JSON data
        try:
            record = json.loads(msg.value().decode('utf-8'))
            print(f"Consumed message: {record}")
        except json.JSONDecodeError as e:
            print(f"Error decoding message: {e}")
        
    consumer.close()

# Kafka configuration
server = '172.21.82.235:9092'  # Replace with your Kafka server address
group_id = 'my_consumer_group'  # Replace with your desired group ID
topics = ['socialmedia1']  # Replace with your desired topic name

# Consume data from Kafka
kafka_consumer(server, group_id, topics)


Consumed message: {'User_ID': 500.0, 'Age': 27.0, 'Gender': 'Female', 'Platform': 'Snapchat', 'Daily_Usage_Time (minutes)': 120.0, 'Posts_Per_Day': 4.0, 'Likes_Received_Per_Day': 40.0, 'Comments_Received_Per_Day': 18.0, 'Messages_Sent_Per_Day': 22.0, 'Dominant_Emotion': 'Neutral'}
Consumed message: {'User_ID': 488.0, 'Age': 21.0, 'Gender': 'Non-binary', 'Platform': 'Snapchat', 'Daily_Usage_Time (minutes)': 60.0, 'Posts_Per_Day': 1.0, 'Likes_Received_Per_Day': 18.0, 'Comments_Received_Per_Day': 7.0, 'Messages_Sent_Per_Day': 12.0, 'Dominant_Emotion': 'Neutral'}
Consumed message: {'User_ID': 776.0, 'Age': 28.0, 'Gender': 'Non-binary', 'Platform': 'Snapchat', 'Daily_Usage_Time (minutes)': 115.0, 'Posts_Per_Day': 3.0, 'Likes_Received_Per_Day': 38.0, 'Comments_Received_Per_Day': 18.0, 'Messages_Sent_Per_Day': 27.0, 'Dominant_Emotion': 'Anxiety'}
Consumed message: {'User_ID': 869.0, 'Age': 27.0, 'Gender': 'Male', 'Platform': 'Telegram', 'Daily_Usage_Time (minutes)': 105.0, 'Posts_Per_Day': 3.