In [4]:
from kafka import KafkaConsumer
import json
import pandas as pd
import os
import time

In [5]:
# Kafka Consumer Configuration
consumer = KafkaConsumer(
    "weather_data",  # Kafka topic name
    bootstrap_servers="localhost:9092",
    auto_offset_reset="latest",
    enable_auto_commit=True,
    value_deserializer=lambda x: json.loads(x.decode("utf-8")),
)
# File path for saving data
file_path = "weather_data.csv"
print("✅ Consumer started listening for weather data...")
print("Saving to:", os.path.abspath(file_path))  # Print file path for debugging

✅ Consumer started listening for weather data...
Saving to: /Users/architgupta280/Desktop/Spark-DDP/Project/weather_data.csv


In [6]:
batch_size = 10  # Process data in batches of 10
buffer = []  # Temporary storage for 10 entries

# Infinite loop to continuously receive and store weather data
for message in consumer:
    weather_record = message.value
    print(f"Received: {weather_record}")

    buffer.append(weather_record)  # Add record to buffer

    # Process data only when 10 entries are collected
    if len(buffer) == batch_size:
        # Convert buffer to DataFrame
        new_data = pd.DataFrame(buffer)

        # Check if the file exists
        file_exists = os.path.isfile(file_path)

        # Save data to CSV (create if not exists, otherwise append)
        new_data.to_csv(file_path, mode="a", header=not file_exists, index=False)

        # Clear the buffer after saving
        buffer.clear()

        print("✅ Saved 10 records to file.")
        print("Waiting 30 seconds before the next round...")
        time.sleep(30)  # Wait 30 seconds before collecting the next 10 records

Received: {'timestamp': 1744199679, 'city': 'New York', 'temperature': -0.55, 'humidity': 41, 'wind_speed': 6.29, 'weather_condition': 'clear sky'}
Received: {'timestamp': 1744199918, 'city': 'London', 'temperature': 12.65, 'humidity': 58, 'wind_speed': 4.45, 'weather_condition': 'few clouds'}
Received: {'timestamp': 1744199719, 'city': 'Tokyo', 'temperature': 15.08, 'humidity': 49, 'wind_speed': 6.38, 'weather_condition': 'broken clouds'}
Received: {'timestamp': 1744199674, 'city': 'Delhi', 'temperature': 40.64, 'humidity': 10, 'wind_speed': 1.25, 'weather_condition': 'clear sky'}
Received: {'timestamp': 1744200021, 'city': 'Paris', 'temperature': 16.43, 'humidity': 35, 'wind_speed': 4.13, 'weather_condition': 'clear sky'}
Received: {'timestamp': 1744200239, 'city': 'Berlin', 'temperature': 13.8, 'humidity': 43, 'wind_speed': 3.42, 'weather_condition': 'broken clouds'}
Received: {'timestamp': 1744199945, 'city': 'Sydney', 'temperature': 18.53, 'humidity': 78, 'wind_speed': 4.67, 'weat

KeyboardInterrupt: 