# E-Commerce Streaming Analytics - Local Development

This notebook provides tools for local development and testing of the streaming analytics pipeline.

## Environment Setup

Make sure all local services are running:
- Kafka (localhost:9092)
- MinIO (localhost:9000)
- Spark (localhost:8081)
- PostgreSQL (localhost:5432)

In [None]:
# Import required libraries
import pandas as pd
import numpy as np
from kafka import KafkaProducer, KafkaConsumer
import json
from datetime import datetime, timedelta
import boto3
from botocore.client import Config
import pyspark
from pyspark.sql import SparkSession
import time
import random

## Test Kafka Connection

In [None]:
# Test Kafka connection
def test_kafka_connection():
    try:
        producer = KafkaProducer(
            bootstrap_servers=['localhost:9092'],
            value_serializer=lambda x: json.dumps(x).encode('utf-8')
        )
        
        # Send test message
        test_message = {
            'event_type': 'test',
            'timestamp': datetime.now().isoformat(),
            'message': 'Hello from Jupyter!'
        }
        
        producer.send('raw-events', value=test_message)
        producer.flush()
        producer.close()
        
        print("✅ Kafka connection successful!")
        return True
    except Exception as e:
        print(f"❌ Kafka connection failed: {e}")
        return False

test_kafka_connection()

## Test MinIO (S3) Connection

In [None]:
# Test MinIO connection
def test_minio_connection():
    try:
        s3_client = boto3.client(
            's3',
            endpoint_url='http://localhost:9000',
            aws_access_key_id='minioadmin',
            aws_secret_access_key='minioadmin',
            config=Config(signature_version='s3v4'),
            region_name='us-east-1'
        )
        
        # List buckets
        response = s3_client.list_buckets()
        buckets = [bucket['Name'] for bucket in response['Buckets']]
        
        print("✅ MinIO connection successful!")
        print(f"Available buckets: {buckets}")
        return True
    except Exception as e:
        print(f"❌ MinIO connection failed: {e}")
        return False

test_minio_connection()

## Test Spark Connection

In [None]:
# Test Spark connection
def test_spark_connection():
    try:
        spark = SparkSession.builder \
            .appName("LocalTest") \
            .master("spark://localhost:7077") \
            .config("spark.sql.adaptive.enabled", "true") \
            .getOrCreate()
        
        # Create test DataFrame
        test_data = [(1, "test", datetime.now())]
        columns = ["id", "message", "timestamp"]
        df = spark.createDataFrame(test_data, columns)
        
        print("✅ Spark connection successful!")
        print(f"Spark version: {spark.version}")
        df.show()
        
        spark.stop()
        return True
    except Exception as e:
        print(f"❌ Spark connection failed: {e}")
        return False

test_spark_connection()

## Generate Sample Data

In [None]:
# Generate sample e-commerce events
def generate_sample_events(num_events=10):
    events = []
    event_types = ['page_view', 'add_to_cart', 'purchase', 'user_signup']
    
    for i in range(num_events):
        event = {
            'event_id': f'evt_{i:06d}',
            'event_type': random.choice(event_types),
            'user_id': f'user_{random.randint(1, 1000):04d}',
            'product_id': f'prod_{random.randint(1, 500):04d}',
            'timestamp': (datetime.now() - timedelta(minutes=random.randint(0, 1440))).isoformat(),
            'session_id': f'sess_{random.randint(1, 100):04d}',
            'value': round(random.uniform(10, 500), 2)
        }
        events.append(event)
    
    return events

# Generate and display sample events
sample_events = generate_sample_events(5)
print("Sample events:")
for event in sample_events:
    print(json.dumps(event, indent=2))

## Send Events to Kafka

In [None]:
# Send events to Kafka
def send_events_to_kafka(events, topic='raw-events'):
    producer = KafkaProducer(
        bootstrap_servers=['localhost:9092'],
        value_serializer=lambda x: json.dumps(x).encode('utf-8')
    )
    
    for event in events:
        producer.send(topic, value=event)
        print(f"Sent: {event['event_type']} - {event['event_id']}")
    
    producer.flush()
    producer.close()
    print(f"\n✅ Sent {len(events)} events to topic '{topic}'")

# Send sample events
send_events_to_kafka(sample_events)

## Monitor Kafka Topics

In [None]:
# Monitor Kafka topic
def monitor_kafka_topic(topic='raw-events', timeout=10):
    consumer = KafkaConsumer(
        topic,
        bootstrap_servers=['localhost:9092'],
        auto_offset_reset='latest',
        enable_auto_commit=True,
        group_id='jupyter-monitor',
        value_deserializer=lambda m: json.loads(m.decode('utf-8'))
    )
    
    print(f"Monitoring topic '{topic}' for {timeout} seconds...")
    
    start_time = time.time()
    message_count = 0
    
    for message in consumer:
        message_count += 1
        print(f"Received: {message.value}")
        
        if time.time() - start_time > timeout:
            break
    
    consumer.close()
    print(f"\n📊 Received {message_count} messages in {timeout} seconds")

# Uncomment to monitor (will block for 10 seconds)
# monitor_kafka_topic()

## Next Steps

1. **Check Kafka UI**: Open http://localhost:8080 to see topics and messages
2. **Check Dagster**: Open http://localhost:3000 to see orchestration pipeline
3. **Check Grafana**: Open http://localhost:3001 to see monitoring dashboards
4. **Run data generator**: Execute the data generation script to create continuous stream
5. **Test Spark jobs**: Run streaming jobs to process the data