# PySpark Streaming Demo

This notebook demonstrates PySpark Structured Streaming capabilities with interactive examples.

In [None]:
import sys
import os
sys.path.append('../shared')

from spark_utils import SparkSessionManager
from data_generator import DataGenerator
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [None]:
# Create Spark session for streaming
spark = SparkSessionManager.get_session('streaming', 'StreamingDemo')
print(f"Spark version: {spark.version}")
print(f"Spark UI: http://localhost:4040")

In [None]:
# Generate streaming data
streaming_events = DataGenerator.generate_streaming_data(1000)
print(f"Generated {len(streaming_events)} events")
print("Sample event:")
print(streaming_events[0])

In [None]:
# Create sample batch DataFrame for analysis
import json
from datetime import datetime, timedelta

# Parse events into DataFrame
events_data = [json.loads(event) for event in streaming_events[:100]]
events_df = spark.createDataFrame(events_data)

print("Event schema:")
events_df.printSchema()
print("\nSample events:")
events_df.show(5, truncate=False)

In [None]:
# Analyze event patterns
print("Event type distribution:")
events_df.groupBy('event_type').count().orderBy('count', ascending=False).show()

print("\nTop pages:")
events_df.groupBy('page').count().orderBy('count', ascending=False).show(10)

In [None]:
# Purchase analysis (for purchase events)
purchase_events = events_df.filter(col('event_type') == 'purchase')

if purchase_events.count() > 0:
    print("Purchase analytics:")
    purchase_events.agg(
        count('*').alias('total_purchases'),
        sum('amount').alias('total_revenue'),
        avg('amount').alias('avg_purchase'),
        min('amount').alias('min_purchase'),
        max('amount').alias('max_purchase')
    ).show()
else:
    print("No purchase events found in this sample")

In [None]:
# User session analysis
print("User session analysis:")
session_stats = events_df.groupBy('user_id', 'session_id').agg(
    count('*').alias('events_per_session'),
    countDistinct('page').alias('unique_pages'),
    sum('amount').alias('session_revenue')
)

session_stats.show(10)

print("\nSession summary statistics:")
session_stats.agg(
    avg('events_per_session').alias('avg_events_per_session'),
    avg('unique_pages').alias('avg_pages_per_session'),
    avg('session_revenue').alias('avg_revenue_per_session')
).show()

In [None]:
# Stop Spark session
SparkSessionManager.stop_session()
print("Spark session stopped")