# Consumer Notebook - Visualizing Real-Time Weather Data from Kafka

## Prerequisites

In [None]:
%pip install confluent-kafka plotly pandas

## Configuration

In [None]:
# Import necessary libraries
import json
import time
import pandas as pd
from confluent_kafka import Consumer, KafkaException, KafkaError
import plotly.graph_objs as go
from plotly.subplots import make_subplots
from IPython.display import display, clear_output

# Configuration Parameters

# Kafka Configuration
KAFKA_BOOTSTRAP_SERVERS = 'localhost:9092'  # Kafka broker address
KAFKA_TOPIC = 'weather'                      # Kafka topic name
KAFKA_GROUP_ID = 'weather_consumer_group'    # Consumer group ID

# Visualization Configuration
UPDATE_INTERVAL = 5  # seconds between visualization updates

# Initialize empty DataFrame to store weather data
weather_df = pd.DataFrame(columns=['timestamp', 'temperature', 'humidity', 'weather_description'])


## Setting Up the Kafka Consumer

In [None]:
# Kafka Consumer Configuration
consumer_config = {
    'bootstrap.servers': KAFKA_BOOTSTRAP_SERVERS,
    'group.id': KAFKA_GROUP_ID,
    'auto.offset.reset': 'earliest',  # Start reading at the earliest message
    'enable.auto.commit': True        # Automatically commit offsets
}

# Create Consumer instance
consumer = Consumer(consumer_config)

# Subscribe to the Kafka topic
consumer.subscribe([KAFKA_TOPIC])

## Consuming Messages from Kafka

In [None]:
def consume_messages(consumer, timeout=1.0):
    """
    Consumes messages from Kafka and returns a list of weather data dictionaries.
    
    :param consumer: Kafka Consumer instance
    :param timeout: Maximum time (in seconds) to wait for messages
    :return: List of weather data dictionaries
    """
    messages = []
    try:
        while True:
            msg = consumer.poll(timeout=timeout)
            if msg is None:
                break  # No more messages
            if msg.error():
                if msg.error().code() == KafkaError._PARTITION_EOF:
                    # End of partition event
                    print(f"[INFO] End of partition reached {msg.topic()} [{msg.partition()}]")
                elif msg.error():
                    raise KafkaException(msg.error())
            else:
                # Proper message
                weather_data = json.loads(msg.value().decode('utf-8'))
                messages.append(weather_data)
    except Exception as e:
        print(f"[ERROR] Failed to consume messages: {e}")
    return messages

## Data Processing

In [None]:
def process_messages(messages, df):
    """
    Processes a list of weather data messages and appends them to the DataFrame.
    
    :param messages: List of weather data dictionaries
    :param df: Pandas DataFrame to append data to
    :return: Updated DataFrame
    """
    for msg in messages:
        try:
            timestamp = pd.to_datetime(msg['timestamp'], unit='s')
            temperature = msg['temperature']
            humidity = msg['humidity']
            weather_description = msg['weather_description']
            
            # Append to DataFrame
            df = df.append({
                'timestamp': timestamp,
                'temperature': temperature,
                'humidity': humidity,
                'weather_description': weather_description
            }, ignore_index=True)
        except KeyError as e:
            print(f"[ERROR] Missing key in message: {e}")
        except Exception as e:
            print(f"[ERROR] Failed to process message: {e}")
    return df

## Real-Time Data Visualization

In [None]:
def create_live_visualization(df):
    """
    Creates live-updating visualizations for temperature and humidity.
    
    :param df: Pandas DataFrame containing weather data
    """
    # Clear previous output
    clear_output(wait=True)
    
    # Create subplots: one for temperature, one for humidity
    fig = make_subplots(rows=2, cols=1, shared_xaxes=True,
                        subplot_titles=("Temperature Over Time", "Humidity Over Time"))
    
    # Temperature Trace
    fig.add_trace(
        go.Scatter(
            x=df['timestamp'],
            y=df['temperature'],
            mode='lines+markers',
            name='Temperature (°C)'
        ),
        row=1, col=1
    )
    
    # Humidity Trace
    fig.add_trace(
        go.Scatter(
            x=df['timestamp'],
            y=df['humidity'],
            mode='lines+markers',
            name='Humidity (%)',
            line=dict(color='orange')
        ),
        row=2, col=1
    )
    
    # Update layout for better visuals
    fig.update_layout(
        height=600,
        width=800,
        title_text="Live Weather Data Visualization",
        showlegend=True
    )
    
    # Update x-axis with date formatting
    fig.update_xaxes(
        rangeslider_visible=True,
        rangeselector=dict(
            buttons=list([
                dict(count=1, label="1h", step="hour", stepmode="backward"),
                dict(count=6, label="6h", step="hour", stepmode="backward"),
                dict(count=12, label="12h", step="hour", stepmode="backward"),
                dict(count=1, label="1d", step="day", stepmode="backward"),
                dict(step="all")
            ])
        )
    )
    
    # Display the figure
    fig.show()


## Logging and Monitoring

In [None]:
import logging

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s %(levelname)s %(message)s',
    handlers=[
        logging.StreamHandler()
    ]
)

logger = logging.getLogger(__name__)

## Main Execution Loop

In [None]:
def main():
    logger.info(f"Starting Kafka consumer for topic '{KAFKA_TOPIC}'")
    logger.info(f"Visualizations will update every {UPDATE_INTERVAL} seconds.\n")
    
    global weather_df
    
    try:
        while True:
            # Consume messages
            messages = consume_messages(consumer, timeout=1.0)
            
            if messages:
                # Process and append messages to DataFrame
                weather_df = process_messages(messages, weather_df)
                logger.info(f"Consumed {len(messages)} messages.")
            
            if not weather_df.empty:
                # Create live visualization
                create_live_visualization(weather_df)
            
            # Wait before next consumption cycle
            time.sleep(UPDATE_INTERVAL)
    
    except KeyboardInterrupt:
        logger.info("\n[INFO] Consumer interrupted by user. Shutting down...")
    
    except Exception as e:
        logger.error(f"[ERROR] An unexpected error occurred: {e}")
    
    finally:
        # Close consumer gracefully
        consumer.close()
        logger.info("[INFO] Consumer has been shut down.")

## Entry Point

In [None]:
if __name__ == '__main__':
    main()