# Naolib Streaming Analysis

This notebook performs streaming analysis on real-time Naolib transportation data. We'll implement two streaming analyses with time windows:

1. Real-time average wait times per line over sliding windows
2. Detection of unexpected delays and service disruptions

In [None]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext, SparkSession
from pyspark.sql.functions import col, explode, when, expr, to_timestamp, regexp_extract, window, current_timestamp
from pyspark.sql.functions import avg, max, min, count, stddev, from_json, struct, lit
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType, ArrayType, BooleanType
import json

# Configure Spark
conf = SparkConf() \
    .setAppName('NaolibStreamingAnalysis') \
    .setMaster('spark://spark:7077') \
    .set("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.3") \
    .set("spark.sql.shuffle.partitions", "10")

sc = SparkContext.getOrCreate(conf=conf)
sql_context = SQLContext(sc)

## 1. Define the Schemas

First, let's define schemas for the data we'll be reading from Kafka:

In [None]:
# Schema for the ligne (line) nested structure
ligne_schema = StructType([
    StructField("numLigne", StringType(), True),
    StructField("typeLigne", StringType(), True)
])

# Schema for the arret (stop) nested structure
arret_schema = StructType([
    StructField("codeArret", StringType(), True)
])

# Schema for each arrival in the arrivals array
arrival_schema = StructType([
    StructField("sens", StringType(), True),
    StructField("terminus", StringType(), True), 
    StructField("infotrafic", BooleanType(), True),
    StructField("temps", StringType(), True),
    StructField("dernierDepart", StringType(), True),
    StructField("tempsReel", StringType(), True),
    StructField("ligne", ligne_schema, True),
    StructField("arret", arret_schema, True)
])

# Schema for the overall message
message_schema = StructType([
    StructField("timestamp", StringType(), True),
    StructField("stop_code", StringType(), True),
    StructField("stop_name", StringType(), True),
    StructField("arrivals", ArrayType(arrival_schema), True)
])

## 2. Set Up Kafka Connection

Now, let's set up the connection to Kafka to stream our real-time transportation data:

In [None]:
# Kafka configuration
kafka_bootstrap_servers = "kafka:9092"
kafka_topic = "naolib_realtime"

# Create a streaming DataFrame from Kafka
df_kafka = sql_context.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
    .option("subscribe", kafka_topic) \
    .option("startingOffsets", "latest") \
    .load()

# Convert the binary value to string
df_string = df_kafka.selectExpr("CAST(value AS STRING)")

# Parse the JSON string into structured data
df_parsed = df_string \
    .select(from_json(col("value"), message_schema).alias("data")) \
    .select("data.*")

# Explode the arrivals array
df_arrivals = df_parsed \
    .select(
        to_timestamp(col("timestamp"), "yyyy-MM-dd HH:mm:ss").alias("collection_timestamp"),
        col("stop_code"),
        col("stop_name"),
        explode(col("arrivals")).alias("arrival")
    )

# Extract fields from the arrival structure
df_clean = df_arrivals \
    .select(
        col("collection_timestamp"),
        col("stop_code"),
        col("stop_name"),
        col("arrival.sens").alias("direction"),
        col("arrival.terminus").alias("terminus"),
        col("arrival.temps").alias("wait_time_text"),
        col("arrival.tempsReel").alias("is_real_time"),
        col("arrival.ligne.numLigne").alias("line_number")
    ) \
    .withColumn(
        "wait_time_minutes",
        when(col("wait_time_text") == "proche", 0)
        .when(col("wait_time_text").rlike("^\\d+$"), col("wait_time_text").cast("int"))
        .when(col("wait_time_text").rlike("^\\d+ min$"), regexp_extract(col("wait_time_text"), "(\\d+)", 1).cast("int"))
        .otherwise(None)
    ) \
    .withColumn("processing_time", current_timestamp())

## 3. Streaming Analysis 1: Real-time Average Wait Times

Our first streaming analysis calculates real-time average wait times per line using sliding time windows:

In [None]:
# Filter out rows with null wait times
df_valid_waits = df_clean.filter(col("wait_time_minutes").isNotNull())

# Add watermarking to handle late data
df_watermarked = df_valid_waits.withWatermark("collection_timestamp", "10 minutes")

# Calculate average wait times over 10-minute windows, sliding every 2 minutes
avg_wait_times = df_watermarked \
    .groupBy(
        window(col("collection_timestamp"), "10 minutes", "2 minutes"),
        col("line_number")
    ) \
    .agg(
        avg("wait_time_minutes").alias("avg_wait_time"),
        count("*").alias("observation_count"),
        stddev("wait_time_minutes").alias("stddev_wait_time")
    ) \
    .filter(col("observation_count") >= 3)  # Only include results with enough observations

# Extract window start and end times for better readability
avg_wait_times_readable = avg_wait_times \
    .select(
        col("window.start").alias("window_start"),
        col("window.end").alias("window_end"),
        col("line_number"),
        col("avg_wait_time"),
        col("observation_count"),
        col("stddev_wait_time")
    ) \
    .orderBy(col("window_start").desc(), col("avg_wait_time").desc())

# Output the results to the console
query1 = avg_wait_times_readable \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .option("truncate", "false") \
    .option("numRows", 20) \
    .start()

## 4. Streaming Analysis 2: Real-time Delay Detection

Our second streaming analysis detects unusual delays and potential service disruptions using time windows:

In [None]:
# First, calculate historical average wait times per line (as baseline)
# In a real system, this would come from a historical database
# For simplicity, we'll define typical_wait_times at a fixed value for now
typical_wait_time = 10  # minutes

# Detect unusual delays (wait times significantly above normal)
df_delay_detection = df_valid_waits \
    .withColumn(
        "is_delayed",
        col("wait_time_minutes") > (typical_wait_time * 1.5)  # 50% above normal is considered unusual
    ) \
    .withColumn(
        "delay_minutes",
        when(col("is_delayed"), col("wait_time_minutes") - typical_wait_time).otherwise(0)
    ) \
    .withWatermark("collection_timestamp", "10 minutes")

# Aggregate delays by line and time window
delay_alerts = df_delay_detection \
    .groupBy(
        window(col("collection_timestamp"), "15 minutes", "5 minutes"),
        col("line_number"),
        col("stop_name")
    ) \
    .agg(
        max("wait_time_minutes").alias("max_wait_time"),
        avg("wait_time_minutes").alias("avg_wait_time"),
        count("*").alias("observation_count"),
        sum(when(col("is_delayed"), 1).otherwise(0)).alias("delayed_count"),
        avg("delay_minutes").alias("avg_delay_minutes")
    ) \
    .filter(col("delayed_count") >= 2)  # At least 2 observations must show delays
    
# Calculate delay severity
delay_alerts_with_severity = delay_alerts \
    .withColumn(
        "delay_severity",
        when(col("avg_delay_minutes") > 20, "SEVERE")
        .when(col("avg_delay_minutes") > 10, "MODERATE")
        .otherwise("MINOR")
    ) \
    .withColumn(
        "alert_message",
        expr("concat('DELAY ALERT: Line ', line_number, ' at ', stop_name, ' - ', delay_severity, ' delay of ', round(avg_delay_minutes, 1), ' minutes')")
    ) \
    .select(
        col("window.start").alias("alert_window_start"),
        col("window.end").alias("alert_window_end"),
        col("line_number"),
        col("stop_name"),
        col("avg_wait_time"),
        col("max_wait_time"),
        col("delayed_count"),
        col("observation_count"),
        col("avg_delay_minutes"),
        col("delay_severity"),
        col("alert_message")
    ) \
    .orderBy(col("alert_window_start").desc(), col("avg_delay_minutes").desc())

# Output the delay alerts to the console
query2 = delay_alerts_with_severity \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .option("truncate", "false") \
    .option("numRows", 10) \
    .start()

## 5. Wait for Query Termination

Let the streaming queries run until manually terminated:

In [None]:
# Wait for the first query
try:
    query1.awaitTermination()
except KeyboardInterrupt:
    print("Query 1 terminated by user.")

In [None]:
# Wait for the second query
try:
    query2.awaitTermination()
except KeyboardInterrupt:
    print("Query 2 terminated by user.")

## 6. Stop the Queries (Run this when you want to stop the streaming)

In [None]:
# Stop the queries
query1.stop()
query2.stop()
print("All streaming queries stopped.")

## 7. Service Disruption Pattern Detection (Bonus)

Let's implement a more sophisticated streaming analysis to detect patterns that might indicate service disruptions:

In [None]:
# Define a pattern detection model to identify potential service disruptions
# We'll use consecutive abnormal wait times as indicators of service issues

# Prepare the data with anomaly scores
df_anomaly_detection = df_valid_waits \
    .withColumn(
        "wait_time_zscore",  # Simplified z-score calculation with fixed standard deviation
        (col("wait_time_minutes") - typical_wait_time) / 5  # Using 5 min as a standard deviation approximation
    ) \
    .withColumn(
        "is_abnormal",
        col("wait_time_zscore") > 2  # More than 2 standard deviations is considered abnormal
    ) \
    .withWatermark("collection_timestamp", "15 minutes")

# Group by line and 30-minute tumbling windows to detect disruption patterns
disruption_detection = df_anomaly_detection \
    .groupBy(
        window(col("collection_timestamp"), "30 minutes"),
        col("line_number")
    ) \
    .agg(
        count("*").alias("total_observations"),
        sum(when(col("is_abnormal"), 1).otherwise(0)).alias("abnormal_count"),
        avg("wait_time_minutes").alias("avg_wait_time"),
        max("wait_time_minutes").alias("max_wait_time")
    ) \
    .filter(col("total_observations") >= 5)  # Need enough observations for reliable detection

# Classify service status
service_status = disruption_detection \
    .withColumn(
        "abnormal_percentage",
        (col("abnormal_count") / col("total_observations")) * 100
    ) \
    .withColumn(
        "service_status",
        when(col("abnormal_percentage") >= 70, "MAJOR DISRUPTION")
        .when(col("abnormal_percentage") >= 40, "MINOR DISRUPTION")
        .when(col("abnormal_percentage") >= 20, "DEGRADED SERVICE")
        .otherwise("NORMAL SERVICE")
    ) \
    .select(
        col("window.start").alias("window_start"),
        col("window.end").alias("window_end"),
        col("line_number"),
        col("total_observations"),
        col("abnormal_count"),
        col("abnormal_percentage"),
        col("avg_wait_time"),
        col("max_wait_time"),
        col("service_status")
    ) \
    .orderBy(col("window_start").desc(), col("abnormal_percentage").desc())

# Output the service status to the console
query3 = service_status \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .option("truncate", "false") \
    .option("numRows", 10) \
    .start()

In [None]:
# Wait for the third query
try:
    query3.awaitTermination()
except KeyboardInterrupt:
    print("Query 3 terminated by user.")
    query3.stop()

## 8. Summary of Streaming Analyses

In this notebook, we've implemented three streaming analyses:

1. **Real-time Average Wait Times**: Calculates and updates average wait times per line using 10-minute sliding windows with 2-minute slides. This provides a continuous view of the current state of the transportation system.

2. **Real-time Delay Detection**: Identifies unusual delays at specific stops and lines using 15-minute windows sliding every 5 minutes. It applies severity classification (MINOR, MODERATE, SEVERE) based on the magnitude of delays.

3. **Service Disruption Detection**: Uses a more sophisticated pattern recognition approach to detect potential service disruptions over 30-minute tumbling windows. It classifies the service status from NORMAL to MAJOR DISRUPTION based on the percentage of abnormal observations.

These analyses provide valuable real-time insights into the Nantes public transportation system, which could be used by operators and passengers to make informed decisions.