In [6]:
import json
import time
import pandas as pd
from kafka import KafkaConsumer

# Consumer function to process data from Kafka topic
def consumer_function(bootstrap_servers, topic, duration_minutes):
    consumer = KafkaConsumer(bootstrap_servers=bootstrap_servers,
                             value_deserializer=lambda v: json.loads(v.decode('utf-8')))
    consumer.subscribe([topic])

    # Create an empty DataFrame to store the data
    data_df = pd.DataFrame(columns=['gender', 'name', 'location', 'email'])

    # Calculate the end time based on the duration
    end_time = time.time() + (duration_minutes * 60)

    while time.time() < end_time:
        for message in consumer:
            data = message.value
            # Process the data as needed
            gender = data['gender']
            name = f"{data['name']['title']} {data['name']['first']} {data['name']['last']}"
            location = data['location']['city']
            email = data['email']

            # Append the data to the DataFrame
            data_df = data_df.append({'gender': gender, 'name': name, 'location': location, 'email': email},
                                     ignore_index=True)

            # Print or perform any other desired operations with the extracted data

    # Save the DataFrame to a CSV file
    data_df.to_csv('consumer_data.csv', index=False)

# Kafka Configuration
bootstrap_servers = '10.1.1.109:9092'
topic = 'api_data_topic'

# Define the duration in minutes (e.g., 5 minutes)
duration_minutes = 5

# Call the consumer function
consumer_function(bootstrap_servers, topic, duration_minutes)
