In [1]:
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!pip install pyspark




In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("BridgeMonitoring") \
    .getOrCreate()


Create bridges metadata

In [9]:
bridges_content = """bridge_id,name,location,installation_date
1,Bridge A,City X,2010-01-01
2,Bridge B,City Y,2012-05-23
3,Bridge C,City Z,2015-09-15
4,Bridge D,City W,2018-07-12
5,Bridge E,City V,2020-03-30
"""

with open("bridges.csv", "w") as f:
    f.write(bridges_content)

print("bridges.csv created successfully!")

bridges.csv created successfully!


Generate sample sensor data

In [10]:
import json
from datetime import datetime, timedelta
import random

def generate_sensor_events(sensor_type, num_bridges=5, num_events=5):
    events = []
    for i in range(1, num_bridges+1):
        for _ in range(num_events):
            event_time = datetime.now() - timedelta(seconds=random.randint(0,60))
            events.append({
                "event_time": event_time.isoformat(),
                "bridge_id": str(i),
                "sensor_type": sensor_type,
                "value": round(random.uniform(0, 50), 2) if sensor_type=="temperature" else \
                         round(random.uniform(0, 10), 2) if sensor_type=="vibration" else \
                         round(random.uniform(0, 90), 2),
                "ingest_time": datetime.now().isoformat()
            })
    return events

for sensor in ["temperature","vibration","tilt"]:
    events = generate_sensor_events(sensor)
    with open(f"{sensor}.json", "w") as f:
        for e in events:
            f.write(json.dumps(e) + "\n")

print("Sample sensor JSON files created!")

Sample sensor JSON files created!


Bronze Layer

In [11]:
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType
from pyspark.sql.functions import to_timestamp, current_timestamp

schema = StructType([
    StructField("event_time", StringType(), True),
    StructField("bridge_id", StringType(), True),
    StructField("sensor_type", StringType(), True),
    StructField("value", DoubleType(), True),
    StructField("ingest_time", StringType(), True)
])

def read_bronze(sensor_file):
    df = spark.read.json(sensor_file, schema=schema) \
        .withColumn("event_time", to_timestamp("event_time")) \
        .withColumn("ingest_time", current_timestamp())
    return df

bronze_temp = read_bronze("temperature.json")
bronze_vib = read_bronze("vibration.json")
bronze_tilt = read_bronze("tilt.json")

print("Bronze Temperature:")
bronze_temp.show()

Bronze Temperature:
+--------------------+---------+-----------+-----+--------------------+
|          event_time|bridge_id|sensor_type|value|         ingest_time|
+--------------------+---------+-----------+-----+--------------------+
|2025-11-11 18:09:...|        1|temperature|34.38|2025-11-11 18:11:...|
|2025-11-11 18:10:...|        1|temperature|  0.1|2025-11-11 18:11:...|
|2025-11-11 18:10:...|        1|temperature|  3.2|2025-11-11 18:11:...|
|2025-11-11 18:09:...|        1|temperature| 16.5|2025-11-11 18:11:...|
|2025-11-11 18:10:...|        1|temperature|18.97|2025-11-11 18:11:...|
|2025-11-11 18:10:...|        2|temperature| 1.49|2025-11-11 18:11:...|
|2025-11-11 18:10:...|        2|temperature|10.64|2025-11-11 18:11:...|
|2025-11-11 18:10:...|        2|temperature| 2.93|2025-11-11 18:11:...|
|2025-11-11 18:10:...|        2|temperature|38.12|2025-11-11 18:11:...|
|2025-11-11 18:10:...|        2|temperature|15.66|2025-11-11 18:11:...|
|2025-11-11 18:09:...|        3|temperature|

Silver Layer (enrichment + QA)

In [12]:
bridges_df = spark.read.csv("bridges.csv", header=True, inferSchema=True)

def silver_enrich(bronze_df, sensor_type):
    df = bronze_df.join(bridges_df, on="bridge_id", how="left")
    if sensor_type=="temperature":
        df = df.filter((df.value >= -40) & (df.value <= 80))
    elif sensor_type=="vibration":
        df = df.filter(df.value >= 0)
    elif sensor_type=="tilt":
        df = df.filter((df.value >= 0) & (df.value <= 90))
    return df

silver_temp = silver_enrich(bronze_temp, "temperature")
silver_vib = silver_enrich(bronze_vib, "vibration")
silver_tilt = silver_enrich(bronze_tilt, "tilt")

print("Silver Temperature:")
silver_temp.show()


Silver Temperature:
+---------+--------------------+-----------+-----+--------------------+--------+--------+-----------------+
|bridge_id|          event_time|sensor_type|value|         ingest_time|    name|location|installation_date|
+---------+--------------------+-----------+-----+--------------------+--------+--------+-----------------+
|        1|2025-11-11 18:09:...|temperature|34.38|2025-11-11 18:11:...|Bridge A|  City X|       2010-01-01|
|        1|2025-11-11 18:10:...|temperature|  0.1|2025-11-11 18:11:...|Bridge A|  City X|       2010-01-01|
|        1|2025-11-11 18:10:...|temperature|  3.2|2025-11-11 18:11:...|Bridge A|  City X|       2010-01-01|
|        1|2025-11-11 18:09:...|temperature| 16.5|2025-11-11 18:11:...|Bridge A|  City X|       2010-01-01|
|        1|2025-11-11 18:10:...|temperature|18.97|2025-11-11 18:11:...|Bridge A|  City X|       2010-01-01|
|        2|2025-11-11 18:10:...|temperature| 1.49|2025-11-11 18:11:...|Bridge B|  City Y|       2012-05-23|
|       

 Gold Layer (aggregations)

In [13]:
from pyspark.sql.functions import avg, max

gold_temp = silver_temp.groupBy("bridge_id").agg(avg("value").alias("avg_temperature"))
gold_vib = silver_vib.groupBy("bridge_id").agg(max("value").alias("max_vibration"))
gold_tilt = silver_tilt.groupBy("bridge_id").agg(max("value").alias("max_tilt"))


gold_metrics = gold_temp.join(gold_vib, on="bridge_id").join(gold_tilt, on="bridge_id")
print("Gold Metrics:")
gold_metrics.show()


gold_metrics.coalesce(1).write.csv("gold_metrics.csv", header=True, mode="overwrite")
print("Gold metrics saved to gold_metrics.csv")

Gold Metrics:
+---------+------------------+-------------+--------+
|bridge_id|   avg_temperature|max_vibration|max_tilt|
+---------+------------------+-------------+--------+
|        3|22.554000000000002|         9.84|   88.04|
|        5|26.381999999999998|         9.96|   88.31|
|        1|             14.63|         9.22|   75.94|
|        4|            19.166|         9.17|   59.74|
|        2|            13.768|         9.19|   82.65|
+---------+------------------+-------------+--------+

Gold metrics saved to gold_metrics.csv


In [14]:
import time

def generate_batch(sensor_type, num_bridges=5, batch_size=2):
    batch = []
    for i in range(1, num_bridges+1):
        for _ in range(batch_size):
            batch.append({
                "event_time": (datetime.now() - timedelta(seconds=random.randint(0,60))).isoformat(),
                "bridge_id": str(i),
                "sensor_type": sensor_type,
                "value": round(random.uniform(0, 50), 2) if sensor_type=="temperature" else \
                         round(random.uniform(0, 10), 2) if sensor_type=="vibration" else \
                         round(random.uniform(0, 90), 2),
                "ingest_time": datetime.now().isoformat()
            })
    return batch


In [15]:
num_batches = 5
for batch_num in range(num_batches):
    print(f"\n=== Batch {batch_num+1} ===")


    for sensor in ["temperature", "vibration", "tilt"]:
        batch = generate_batch(sensor)
        filename = f"{sensor}_batch.json"
        with open(filename, "w") as f:
            for e in batch:
                f.write(json.dumps(e) + "\n")


    bronze_temp = read_bronze("temperature_batch.json")
    bronze_vib = read_bronze("vibration_batch.json")
    bronze_tilt = read_bronze("tilt_batch.json")


    silver_temp = silver_enrich(bronze_temp, "temperature")
    silver_vib = silver_enrich(bronze_vib, "vibration")
    silver_tilt = silver_enrich(bronze_tilt, "tilt")


    gold_temp = silver_temp.groupBy("bridge_id").agg(avg("value").alias("avg_temperature"))
    gold_vib = silver_vib.groupBy("bridge_id").agg(max("value").alias("max_vibration"))
    gold_tilt = silver_tilt.groupBy("bridge_id").agg(max("value").alias("max_tilt"))

    gold_metrics = gold_temp.join(gold_vib, on="bridge_id").join(gold_tilt, on="bridge_id")


    gold_metrics.show()


    time.sleep(2)



=== Batch 1 ===
+---------+------------------+-------------+--------+
|bridge_id|   avg_temperature|max_vibration|max_tilt|
+---------+------------------+-------------+--------+
|        3|            36.725|         2.99|   39.11|
|        5|              4.67|         7.75|   32.82|
|        1|             34.21|         9.88|   65.85|
|        4|29.615000000000002|         9.99|   71.19|
|        2|             24.13|         5.12|   66.06|
+---------+------------------+-------------+--------+


=== Batch 2 ===
+---------+------------------+-------------+--------+
|bridge_id|   avg_temperature|max_vibration|max_tilt|
+---------+------------------+-------------+--------+
|        3|             29.55|         7.25|   80.21|
|        5|41.239999999999995|         5.54|   64.43|
|        1|             23.18|          4.5|   84.38|
|        4|             6.105|          6.1|   84.85|
|        2|             40.44|         7.55|   61.14|
+---------+------------------+-------------+---