# Time Series Analysis with PySpark Window Functions

## Overview
This notebook demonstrates PySpark window functions for time series analysis: 
- **lag()**: Access previous row values
- **lead()**: Access next row values  
- **Moving averages**: Calculate rolling statistics

## Use Case
Analyzing traffic patterns over time to identify trends and anomalies. 

In [1]:
import os
os.environ['JAVA_HOME'] = '/opt/homebrew/opt/openjdk@11'

In [2]:
import sys
print(f"Python version: {sys.version}")

Python version: 3.11.14 (main, Oct  9 2025, 16:16:55) [Clang 17.0.0 (clang-1700.4.4.1)]


In [3]:
from pyspark.sql import SparkSession, Window
from pyspark.sql import functions as F

spark = SparkSession.builder \
    .appName("TimeSeriesAnalysis") \
    .getOrCreate()

print("Spark session initialized!")
print(f"Spark version: {spark.version}")

26/01/07 16:07:23 WARN Utils: Your hostname, Zipcoders-MacBook-Pro-8.local resolves to a loopback address: 127.0.0.1; using 192.168.200.39 instead (on interface en0)
26/01/07 16:07:23 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
26/01/07 16:07:24 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Spark session initialized!
Spark version: 3.4.0


In [5]:
# Define a window specification
window_spec = Window.partitionBy("sensor_id").orderBy("timestamp")

# Let's create some sample data first to test
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType
from datetime import datetime, timedelta

# Create sample traffic data
sample_data = [
    ("sensor_001", datetime(2024, 1, 1, 8, 0), 45),
    ("sensor_001", datetime(2024, 1, 1, 8, 5), 52),
    ("sensor_001", datetime(2024, 1, 1, 8, 10), 48),
    ("sensor_001", datetime(2024, 1, 1, 8, 15), 55),
    ("sensor_002", datetime(2024, 1, 1, 8, 0), 30),
    ("sensor_002", datetime(2024, 1, 1, 8, 5), 35),
]

schema = StructType([
    StructField("sensor_id", StringType(), True),
    StructField("timestamp", TimestampType(), True),
    StructField("vehicle_count", IntegerType(), True)
])

df = spark.createDataFrame(sample_data, schema)

print("Sample data created:")
df.show()

Sample data created:
+----------+-------------------+-------------+
| sensor_id|          timestamp|vehicle_count|
+----------+-------------------+-------------+
|sensor_001|2024-01-01 08:00:00|           45|
|sensor_001|2024-01-01 08:05:00|           52|
|sensor_001|2024-01-01 08:10:00|           48|
|sensor_001|2024-01-01 08:15:00|           55|
|sensor_002|2024-01-01 08:00:00|           30|
|sensor_002|2024-01-01 08:05:00|           35|
+----------+-------------------+-------------+



In [7]:
# Test LAG - get previous value
df_with_lag = df.withColumn(
    "previous_count", 
    F.lag("vehicle_count", 1).over(window_spec)
)

print("Testing LAG function (previous value):")
df_with_lag.show()

Testing LAG function (previous value):
+----------+-------------------+-------------+--------------+
| sensor_id|          timestamp|vehicle_count|previous_count|
+----------+-------------------+-------------+--------------+
|sensor_001|2024-01-01 08:00:00|           45|          null|
|sensor_001|2024-01-01 08:05:00|           52|            45|
|sensor_001|2024-01-01 08:10:00|           48|            52|
|sensor_001|2024-01-01 08:15:00|           55|            48|
|sensor_002|2024-01-01 08:00:00|           30|          null|
|sensor_002|2024-01-01 08:05:00|           35|            30|
+----------+-------------------+-------------+--------------+



In [8]:
# Test LEAD - get next value
df_with_lead = df.withColumn(
    "next_count", 
    F.lead("vehicle_count", 1).over(window_spec)
)

print("Testing LEAD function (next value):")
df_with_lead.show()

Testing LEAD function (next value):
+----------+-------------------+-------------+----------+
| sensor_id|          timestamp|vehicle_count|next_count|
+----------+-------------------+-------------+----------+
|sensor_001|2024-01-01 08:00:00|           45|        52|
|sensor_001|2024-01-01 08:05:00|           52|        48|
|sensor_001|2024-01-01 08:10:00|           48|        55|
|sensor_001|2024-01-01 08:15:00|           55|      null|
|sensor_002|2024-01-01 08:00:00|           30|        35|
|sensor_002|2024-01-01 08:05:00|           35|      null|
+----------+-------------------+-------------+----------+



In [9]:
# Test MOVING AVERAGE - 3-point rolling average
rolling_window = Window.partitionBy("sensor_id").orderBy("timestamp").rowsBetween(-2, 0)

df_with_avg = df.withColumn(
    "moving_avg_3", 
    F.avg("vehicle_count").over(rolling_window)
)

print("Testing MOVING AVERAGE (3-point window):")
df_with_avg.show()

Testing MOVING AVERAGE (3-point window):
+----------+-------------------+-------------+------------------+
| sensor_id|          timestamp|vehicle_count|      moving_avg_3|
+----------+-------------------+-------------+------------------+
|sensor_001|2024-01-01 08:00:00|           45|              45.0|
|sensor_001|2024-01-01 08:05:00|           52|              48.5|
|sensor_001|2024-01-01 08:10:00|           48|48.333333333333336|
|sensor_001|2024-01-01 08:15:00|           55|51.666666666666664|
|sensor_002|2024-01-01 08:00:00|           30|              30.0|
|sensor_002|2024-01-01 08:05:00|           35|              32.5|
+----------+-------------------+-------------+------------------+



## Window Functions Tested:

1. **LAG**: Gets the previous value in the time series
   - Useful for: Comparing current vs previous readings
   
2. **LEAD**: Gets the next value in the time series
   - Useful for: Looking ahead in predictions
   
3. **MOVING AVERAGE**: Calculates average over a rolling window
   - Useful for: Smoothing out noise in sensor data

---
## Part 2: Apply to Real Sample Data

Now let's load Alan's actual traffic sensor sample data and apply the same window functions.

In [12]:
# Load Alan's sample traffic data (go up one directory first)
traffic_sample = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv("../data/samples/traffic_sample.csv")

# Convert timestamp to proper format
traffic_sample = traffic_sample.withColumn(
    "timestamp", 
    F.to_timestamp(F.col("timestamp"))
)

print("✅ Loaded Alan's sample traffic data:")
print(f"Total rows: {traffic_sample.count()}")
print(f"Columns: {traffic_sample.columns}")
traffic_sample.show(10)

# Check for any null timestamps
null_count = traffic_sample.filter(F.col("timestamp").isNull()).count()
print(f"Rows with null timestamps: {null_count}")

✅ Loaded Alan's sample traffic data:
Total rows: 500
Columns: ['sensor_id', 'timestamp', 'location_lat', 'location_lon', 'vehicle_count', 'avg_speed', 'congestion_level', 'road_type']
+-----------+--------------------+------------------+------------------+-------------+---------+----------------+-----------+
|  sensor_id|           timestamp|      location_lat|      location_lon|vehicle_count|avg_speed|congestion_level|  road_type|
+-----------+--------------------+------------------+------------------+-------------+---------+----------------+-----------+
|TRAFFIC_003|2026-01-04 01:39:...|39.994546889470804|-75.15878485055464|           50|    49.22|        Critical|Main Street|
|TRAFFIC_006|2026-01-05 00:39:...| 39.94164075651462|-75.14205586940092|          406|    62.87|        Critical|Main Street|
|TRAFFIC_000|2026-01-02 19:39:...| 39.92581673937803|-75.11199193892674|          418|     60.9|            High|    Highway|
|TRAFFIC_003|2026-01-01 11:39:...| 39.94685511939174| -75.15