# Bridge Monitoring - Demo & Validation Notebook

## Setup

In [4]:
!apt-get install -y openjdk-11-jdk
!pip install -q pyspark

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"



Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
openjdk-11-jdk is already the newest version (11.0.28+6-1ubuntu1~22.04.1).
0 upgraded, 0 newly installed, 0 to remove and 41 not upgraded.


In [12]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, avg, max as spark_max, min as spark_min, lit, current_timestamp, to_timestamp
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType, IntegerType
from datetime import datetime, timedelta
import random


spark = SparkSession.builder \
    .appName("BridgeMonitoringDemo") \
    .config("spark.sql.shuffle.partitions", "4") \
    .getOrCreate()

print("âœ“ Spark Session created successfully!")

âœ“ Spark Session created successfully!


In [6]:
print("\n" + "="*60)
print("GENERATING SAMPLE DATA")
print("="*60)

# Generate sample Bronze layer data
def generate_sample_bronze_data(sensor_type, num_records=100):
    """Generate sample sensor events"""
    data = []
    base_time = datetime.now() - timedelta(hours=1)

    for i in range(num_records):
        bridge_id = f"bridge_{random.randint(1, 5):03d}"
        event_time = base_time + timedelta(seconds=i*30)

        if sensor_type == "temperature":
            value = round(random.uniform(15.0, 35.0), 2)
        elif sensor_type == "vibration":
            value = round(random.uniform(0.5, 15.0), 2)
        else:  # tilt
            value = round(random.uniform(0.0, 5.0), 2)

        data.append((
            event_time.isoformat(),
            bridge_id,
            sensor_type,
            value,
            datetime.now().isoformat(),
            event_time,
            datetime.now(),
            event_time.date()
        ))

    return data


GENERATING SAMPLE DATA


In [7]:
# Create Bronze DataFrames
schema = StructType([
    StructField("event_time", StringType(), False),
    StructField("bridge_id", StringType(), False),
    StructField("sensor_type", StringType(), False),
    StructField("value", DoubleType(), False),
    StructField("ingest_time", StringType(), True),
    StructField("event_time_ts", TimestampType(), False),
    StructField("processing_time", TimestampType(), False),
    StructField("partition_date", StringType(), False)
])

In [8]:
print("\nGenerating Bronze layer data...")
df_bronze_temp = spark.createDataFrame(
    generate_sample_bronze_data("temperature", 100), schema
)
df_bronze_vib = spark.createDataFrame(
    generate_sample_bronze_data("vibration", 100), schema
)
df_bronze_tilt = spark.createDataFrame(
    generate_sample_bronze_data("tilt", 100), schema
)

print(f"âœ“ Temperature: {df_bronze_temp.count()} records")
print(f"âœ“ Vibration: {df_bronze_vib.count()} records")
print(f"âœ“ Tilt: {df_bronze_tilt.count()} records")


Generating Bronze layer data...
âœ“ Temperature: 100 records
âœ“ Vibration: 100 records
âœ“ Tilt: 100 records


In [9]:
# ============================================
# BRONZE LAYER VALIDATION
# ============================================

print("\n" + "="*60)
print("BRONZE LAYER VALIDATION")
print("="*60)

print("\nBronze Temperature Sample:")
df_bronze_temp.select("event_time_ts", "bridge_id", "value").show(10, truncate=False)

print("\nBronze Layer Statistics:")
for name, df in [("Temperature", df_bronze_temp), ("Vibration", df_bronze_vib), ("Tilt", df_bronze_tilt)]:
    stats = df.select(
        spark_min("value").alias("min"),
        spark_max("value").alias("max"),
        avg("value").alias("avg")
    ).first()
    print(f"{name:12} - Min: {stats['min']:6.2f}, Max: {stats['max']:6.2f}, Avg: {stats['avg']:6.2f}")



BRONZE LAYER VALIDATION

Bronze Temperature Sample:
+--------------------------+----------+-----+
|event_time_ts             |bridge_id |value|
+--------------------------+----------+-----+
|2025-11-08 17:11:09.267101|bridge_002|26.79|
|2025-11-08 17:11:39.267101|bridge_003|17.51|
|2025-11-08 17:12:09.267101|bridge_003|15.84|
|2025-11-08 17:12:39.267101|bridge_005|24.26|
|2025-11-08 17:13:09.267101|bridge_001|31.35|
|2025-11-08 17:13:39.267101|bridge_003|28.13|
|2025-11-08 17:14:09.267101|bridge_002|25.63|
|2025-11-08 17:14:39.267101|bridge_004|31.03|
|2025-11-08 17:15:09.267101|bridge_001|25.51|
|2025-11-08 17:15:39.267101|bridge_003|17.7 |
+--------------------------+----------+-----+
only showing top 10 rows


Bronze Layer Statistics:
Temperature  - Min:  15.26, Max:  34.93, Avg:  24.45
Vibration    - Min:   0.76, Max:  14.78, Avg:   8.11
Tilt         - Min:   0.01, Max:   4.92, Avg:   2.35


In [13]:
# ============================================
# CREATE METADATA
# ============================================

print("\n" + "="*60)
print("BRIDGE METADATA")
print("="*60)

metadata_data = [
    ("bridge_001", "Golden Gate Bridge", "San Francisco", "2020-01-15", "Suspension", 2737, 1937),
    ("bridge_002", "Brooklyn Bridge", "New York", "2019-06-22", "Suspension", 1825, 1883),
    ("bridge_003", "London Bridge", "London", "2021-03-10", "Arch", 283, 1973),
    ("bridge_004", "Sydney Harbour Bridge", "Sydney", "2020-11-05", "Arch", 1149, 1932),
    ("bridge_005", "Tower Bridge", "London", "2022-02-18", "Bascule", 244, 1894)
]

metadata_schema = StructType([
    StructField("bridge_id", StringType(), False),
    StructField("bridge_name", StringType(), False),
    StructField("location", StringType(), False),
    StructField("installation_date", StringType(), False),
    StructField("bridge_type", StringType(), False),
    StructField("length_meters", IntegerType(), False),
    StructField("construction_year", IntegerType(), False)
])

df_metadata = spark.createDataFrame(metadata_data, metadata_schema)
print("\nBridge Metadata:")
df_metadata.show(truncate=False)


BRIDGE METADATA

Bridge Metadata:
+----------+---------------------+-------------+-----------------+-----------+-------------+-----------------+
|bridge_id |bridge_name          |location     |installation_date|bridge_type|length_meters|construction_year|
+----------+---------------------+-------------+-----------------+-----------+-------------+-----------------+
|bridge_001|Golden Gate Bridge   |San Francisco|2020-01-15       |Suspension |2737         |1937             |
|bridge_002|Brooklyn Bridge      |New York     |2019-06-22       |Suspension |1825         |1883             |
|bridge_003|London Bridge        |London       |2021-03-10       |Arch       |283          |1973             |
|bridge_004|Sydney Harbour Bridge|Sydney       |2020-11-05       |Arch       |1149         |1932             |
|bridge_005|Tower Bridge         |London       |2022-02-18       |Bascule    |244          |1894             |
+----------+---------------------+-------------+-----------------+-----------

In [14]:
# ============================================
# SILVER LAYER - ENRICHMENT
# ============================================

print("\n" + "="*60)
print("SILVER LAYER - ENRICHMENT & QUALITY CHECKS")
print("="*60)

def enrich_and_validate(df_bronze, metadata, sensor_type):
    """Enrich with metadata and apply quality checks"""

    # Join with metadata
    df_enriched = df_bronze.join(
        metadata,
        df_bronze.bridge_id == metadata.bridge_id,
        "left"
    ).select(
        df_bronze["*"],
        metadata["bridge_name"],
        metadata["location"],
        metadata["installation_date"],
        metadata["bridge_type"]
    )

    # Apply data quality rules
    if sensor_type == "temperature":
        df_valid = df_enriched.filter((col("value") >= -40) & (col("value") <= 80))
        df_rejected = df_enriched.filter((col("value") < -40) | (col("value") > 80))
    elif sensor_type == "vibration":
        df_valid = df_enriched.filter((col("value") >= 0) & (col("value") <= 100))
        df_rejected = df_enriched.filter((col("value") < 0) | (col("value") > 100))
    else:  # tilt
        df_valid = df_enriched.filter((col("value") >= 0) & (col("value") <= 90))
        df_rejected = df_enriched.filter((col("value") < 0) | (col("value") > 90))

    return df_valid, df_rejected

print("\nEnriching Bronze data with metadata...")

df_silver_temp, df_rejected_temp = enrich_and_validate(df_bronze_temp, df_metadata, "temperature")
df_silver_vib, df_rejected_vib = enrich_and_validate(df_bronze_vib, df_metadata, "vibration")
df_silver_tilt, df_rejected_tilt = enrich_and_validate(df_bronze_tilt, df_metadata, "tilt")

print(f"âœ“ Temperature: {df_silver_temp.count()} valid, {df_rejected_temp.count()} rejected")
print(f"âœ“ Vibration: {df_silver_vib.count()} valid, {df_rejected_vib.count()} rejected")
print(f"âœ“ Tilt: {df_silver_tilt.count()} valid, {df_rejected_tilt.count()} rejected")

print("\nSilver Layer Sample (Enriched):")
df_silver_temp.select("bridge_id", "bridge_name", "location", "value", "event_time_ts").show(10, truncate=False)

# Calculate join success rate
bronze_count = df_bronze_temp.count()
silver_count = df_silver_temp.count()
join_success = (silver_count / bronze_count * 100) if bronze_count > 0 else 0
print(f"\nJoin Success Rate: {join_success:.2f}%")


SILVER LAYER - ENRICHMENT & QUALITY CHECKS

Enriching Bronze data with metadata...
âœ“ Temperature: 100 valid, 0 rejected
âœ“ Vibration: 100 valid, 0 rejected
âœ“ Tilt: 100 valid, 0 rejected

Silver Layer Sample (Enriched):
+----------+---------------------+-------------+-----+--------------------------+
|bridge_id |bridge_name          |location     |value|event_time_ts             |
+----------+---------------------+-------------+-----+--------------------------+
|bridge_005|Tower Bridge         |London       |24.26|2025-11-08 17:12:39.267101|
|bridge_001|Golden Gate Bridge   |San Francisco|31.35|2025-11-08 17:13:09.267101|
|bridge_001|Golden Gate Bridge   |San Francisco|25.51|2025-11-08 17:15:09.267101|
|bridge_001|Golden Gate Bridge   |San Francisco|20.74|2025-11-08 17:16:09.267101|
|bridge_002|Brooklyn Bridge      |New York     |26.79|2025-11-08 17:11:09.267101|
|bridge_002|Brooklyn Bridge      |New York     |25.63|2025-11-08 17:14:09.267101|
|bridge_004|Sydney Harbour Bridge|Syd

In [15]:
# ============================================
# GOLD LAYER - AGGREGATIONS
# ============================================

print("\n" + "="*60)
print("GOLD LAYER - WINDOWED AGGREGATIONS")
print("="*60)

from pyspark.sql.functions import window

print("\nComputing 1-minute windowed aggregations...")

# Temperature aggregation
df_temp_agg = df_silver_temp.groupBy(
    col("bridge_id"),
    col("bridge_name"),
    window(col("event_time_ts"), "1 minute")
).agg(
    avg("value").alias("avg_temperature")
).select(
    col("bridge_id"),
    col("bridge_name"),
    col("window.start").alias("window_start"),
    col("window.end").alias("window_end"),
    col("avg_temperature")
)

# Vibration aggregation
df_vib_agg = df_silver_vib.groupBy(
    col("bridge_id"),
    col("bridge_name"),
    window(col("event_time_ts"), "1 minute")
).agg(
    spark_max("value").alias("max_vibration")
).select(
    col("bridge_id"),
    col("bridge_name"),
    col("window.start").alias("window_start"),
    col("window.end").alias("window_end"),
    col("max_vibration")
)

# Tilt aggregation
df_tilt_agg = df_silver_tilt.groupBy(
    col("bridge_id"),
    col("bridge_name"),
    window(col("event_time_ts"), "1 minute")
).agg(
    spark_max("value").alias("max_tilt_angle")
).select(
    col("bridge_id"),
    col("bridge_name"),
    col("window.start").alias("window_start"),
    col("window.end").alias("window_end"),
    col("max_tilt_angle")
)

print(f"âœ“ Temperature windows: {df_temp_agg.count()}")
print(f"âœ“ Vibration windows: {df_vib_agg.count()}")
print(f"âœ“ Tilt windows: {df_tilt_agg.count()}")

# Join all metrics
print("\nJoining metrics streams...")
df_metrics = df_temp_agg.join(
    df_vib_agg,
    (df_temp_agg.bridge_id == df_vib_agg.bridge_id) &
    (df_temp_agg.window_start == df_vib_agg.window_start),
    "inner"
).select(
    df_temp_agg.bridge_id,
    df_temp_agg.bridge_name,
    df_temp_agg.window_start,
    df_temp_agg.window_end,
    df_temp_agg.avg_temperature,
    df_vib_agg.max_vibration
)

df_metrics = df_metrics.join(
    df_tilt_agg,
    (df_metrics.bridge_id == df_tilt_agg.bridge_id) &
    (df_metrics.window_start == df_tilt_agg.window_start),
    "inner"
).select(
    df_metrics.bridge_id,
    df_metrics.bridge_name,
    df_metrics.window_start,
    df_metrics.window_end,
    df_metrics.avg_temperature,
    df_metrics.max_vibration,
    df_tilt_agg.max_tilt_angle
)

print(f"âœ“ Final joined metrics: {df_metrics.count()} windows")

print("\nGold Layer - Bridge Metrics:")
df_metrics.orderBy(col("window_start").desc()).show(20, truncate=False)



GOLD LAYER - WINDOWED AGGREGATIONS

Computing 1-minute windowed aggregations...
âœ“ Temperature windows: 89
âœ“ Vibration windows: 94
âœ“ Tilt windows: 93

Joining metrics streams...
âœ“ Final joined metrics: 14 windows

Gold Layer - Bridge Metrics:
+----------+---------------------+-------------------+-------------------+------------------+-------------+--------------+
|bridge_id |bridge_name          |window_start       |window_end         |avg_temperature   |max_vibration|max_tilt_angle|
+----------+---------------------+-------------------+-------------------+------------------+-------------+--------------+
|bridge_004|Sydney Harbour Bridge|2025-11-08 18:00:00|2025-11-08 18:01:00|23.07             |12.98        |1.38          |
|bridge_005|Tower Bridge         |2025-11-08 17:59:00|2025-11-08 18:00:00|27.43             |10.05        |2.32          |
|bridge_003|London Bridge        |2025-11-08 17:53:00|2025-11-08 17:54:00|23.03             |5.39         |0.49          |
|bridge_004

In [16]:
# ============================================
# ANALYTICS & INSIGHTS
# ============================================

print("\n" + "="*60)
print("ANALYTICS & INSIGHTS")
print("="*60)

# Statistics by bridge
print("\nStatistics by Bridge:")
bridge_stats = df_metrics.groupBy("bridge_id", "bridge_name").agg(
    count("*").alias("num_windows"),
    avg("avg_temperature").alias("mean_temp"),
    spark_max("max_vibration").alias("peak_vibration"),
    spark_max("max_tilt_angle").alias("peak_tilt")
)
bridge_stats.show(truncate=False)

# Find anomalies
print("\nAnomalies Detected (High Vibration > 10 Hz):")
df_anomalies = df_metrics.filter(col("max_vibration") > 10.0)
anomaly_count = df_anomalies.count()
if anomaly_count > 0:
    print(f"âš  Found {anomaly_count} anomalous windows")
    df_anomalies.select("bridge_name", "window_start", "max_vibration", "max_tilt_angle").show(truncate=False)
else:
    print("âœ“ No anomalies detected")



ANALYTICS & INSIGHTS

Statistics by Bridge:
+----------+---------------------+-----------+------------------+--------------+---------+
|bridge_id |bridge_name          |num_windows|mean_temp         |peak_vibration|peak_tilt|
+----------+---------------------+-----------+------------------+--------------+---------+
|bridge_005|Tower Bridge         |3          |26.451666666666664|10.05         |2.32     |
|bridge_001|Golden Gate Bridge   |2          |22.54             |11.2          |2.54     |
|bridge_004|Sydney Harbour Bridge|4          |25.526249999999997|12.98         |2.83     |
|bridge_002|Brooklyn Bridge      |2          |20.27             |12.87         |2.45     |
|bridge_003|London Bridge        |3          |25.02333333333333 |7.14          |3.93     |
+----------+---------------------+-----------+------------------+--------------+---------+


Anomalies Detected (High Vibration > 10 Hz):
âš  Found 6 anomalous windows
+---------------------+-------------------+-------------+--

In [17]:
# ============================================
# DATA QUALITY DASHBOARD
# ============================================

print("\n" + "="*60)
print("DATA QUALITY DASHBOARD")
print("="*60)

print(f"\nðŸ“Š Pipeline Summary:")
print(f"   Bronze Records:  {df_bronze_temp.count() + df_bronze_vib.count() + df_bronze_tilt.count()}")
print(f"   Silver Records:  {df_silver_temp.count() + df_silver_vib.count() + df_silver_tilt.count()}")
print(f"   Gold Windows:    {df_metrics.count()}")
print(f"   Join Success:    {join_success:.2f}%")

print(f"\nðŸ“ˆ Sensor Value Ranges:")
for name, df in [("Temperature", df_silver_temp), ("Vibration", df_silver_vib), ("Tilt", df_silver_tilt)]:
    stats = df.select(
        spark_min("value").alias("min"),
        spark_max("value").alias("max"),
        avg("value").alias("avg")
    ).first()
    print(f"   {name:12} - [{stats['min']:6.2f}, {stats['max']:6.2f}] avg: {stats['avg']:6.2f}")

print(f"\nâœ… Demo completed successfully!")
print(f"\nThis notebook demonstrated:")
print(f"   âœ“ Bronze layer ingestion")
print(f"   âœ“ Silver layer enrichment with stream-static joins")
print(f"   âœ“ Data quality validation")
print(f"   âœ“ Gold layer windowed aggregations")
print(f"   âœ“ Stream-stream joins")
print(f"   âœ“ Analytics and anomaly detection")

# Stop Spark
spark.stop()


DATA QUALITY DASHBOARD

ðŸ“Š Pipeline Summary:
   Bronze Records:  300
   Silver Records:  300
   Gold Windows:    14
   Join Success:    100.00%

ðŸ“ˆ Sensor Value Ranges:
   Temperature  - [ 15.26,  34.93] avg:  24.45
   Vibration    - [  0.76,  14.78] avg:   8.11
   Tilt         - [  0.01,   4.92] avg:   2.35

âœ… Demo completed successfully!

This notebook demonstrated:
   âœ“ Bronze layer ingestion
   âœ“ Silver layer enrichment with stream-static joins
   âœ“ Data quality validation
   âœ“ Gold layer windowed aggregations
   âœ“ Stream-stream joins
   âœ“ Analytics and anomaly detection
