In [1]:
!pip -q install pyspark==3.5.1


In [3]:
!apt -y update -qq
!apt -y install openjdk-11-jdk-headless -qq

45 packages can be upgraded. Run 'apt list --upgradable' to see them.
[1;33mW: [0mSkipping acquire of configured file 'main/source/Sources' as repository 'https://r2u.stat.illinois.edu/ubuntu jammy InRelease' does not seem to provide it (sources.list entry misspelt?)[0m
The following additional packages will be installed:
  ca-certificates-java java-common libpcsclite1 openjdk-11-jre-headless
Suggested packages:
  default-jre pcscd openjdk-11-demo openjdk-11-source libnss-mdns
  fonts-dejavu-extra fonts-ipafont-gothic fonts-ipafont-mincho
  fonts-wqy-microhei | fonts-wqy-zenhei fonts-indic
The following NEW packages will be installed:
  ca-certificates-java java-common libpcsclite1 openjdk-11-jdk-headless
  openjdk-11-jre-headless
0 upgraded, 5 newly installed, 0 to remove and 45 not upgraded.
Need to get 116 MB of archives.
After this operation, 258 MB of additional disk space will be used.
Selecting previously unselected package java-common.
(Reading database ... 125082 files and 

In [4]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["PATH"] = os.environ["JAVA_HOME"] + "/bin:" + os.environ["PATH"]

!java -version

openjdk version "11.0.28" 2025-07-15
OpenJDK Runtime Environment (build 11.0.28+6-post-Ubuntu-1ubuntu122.04.1)
OpenJDK 64-Bit Server VM (build 11.0.28+6-post-Ubuntu-1ubuntu122.04.1, mixed mode, sharing)


In [5]:
!pip -q install --force-reinstall pyspark==3.5.1

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m1.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m200.5/200.5 kB[0m [31m17.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


In [6]:
from pyspark.sql import SparkSession
spark = (SparkSession.builder
         .appName("BridgeMonitoringColab")
         .master("local[*]")
         .config("spark.sql.shuffle.partitions","4")
         .getOrCreate())
spark.sparkContext.setLogLevel("WARN")
print("Spark version:", spark.version)

Spark version: 3.5.1


In [9]:
from google.colab import drive
drive.mount('/content/drive')

BASE = '/content/drive/MyDrive/bridge-monitoring'
import os
dirs = [
    f'{BASE}/streams/bridge_temperature',
    f'{BASE}/streams/bridge_vibration',
    f'{BASE}/streams/bridge_tilt',
    f'{BASE}/bronze/temperature', f'{BASE}/bronze/vibration', f'{BASE}/bronze/tilt',
    f'{BASE}/bronze/rejected',
    f'{BASE}/silver/temperature', f'{BASE}/silver/vibration', f'{BASE}/silver/tilt',
    f'{BASE}/silver/rejected',
    f'{BASE}/gold/bridge_metrics',
    f'{BASE}/checkpoints',
    f'{BASE}/metadata'
]
for d in dirs:
    os.makedirs(d, exist_ok=True)
print('BASE =', BASE)


Mounted at /content/drive
BASE = /content/drive/MyDrive/bridge-monitoring


In [10]:
import pandas as pd, os
bridges = pd.DataFrame([
    {'bridge_id':1,'name':'Korangi','location':'Karachi','installation_date':'2020-01-10'},
    {'bridge_id':2,'name':'Jhelum','location':'Punjab','installation_date':'2019-05-21'},
    {'bridge_id':3,'name':'Sukkur','location':'Sindh','installation_date':'2021-03-15'},
    {'bridge_id':4,'name':'Attock','location':'Punjab','installation_date':'2018-11-02'},
    {'bridge_id':5,'name':'Mangla','location':'AJK','installation_date':'2022-07-01'},
])
os.makedirs(f'{BASE}/metadata', exist_ok=True)
bridges.to_csv(f'{BASE}/metadata/bridges.csv', index=False)
bridges.head()


Unnamed: 0,bridge_id,name,location,installation_date
0,1,Korangi,Karachi,2020-01-10
1,2,Jhelum,Punjab,2019-05-21
2,3,Sukkur,Sindh,2021-03-15
3,4,Attock,Punjab,2018-11-02
4,5,Mangla,AJK,2022-07-01


In [13]:
import json, time, random, threading, os
from datetime import datetime, timedelta, timezone

RUN_FLAG = {'on': True}

def write_event(path, bridge_id, sensor_type):
    now = datetime.now(timezone.utc)
    lag = random.randint(0, 60)
    event_time = now - timedelta(seconds=lag)
    ingest_time = datetime.now(timezone.utc)
    if sensor_type == 'temperature':
        value = round(random.uniform(10, 45), 2)
    elif sensor_type == 'vibration':
        value = round(random.uniform(0, 5), 3)
    else:
        value = round(random.uniform(0, 20), 2)
    rec = {
        'event_time': event_time.isoformat(),
        'bridge_id': bridge_id,
        'sensor_type': sensor_type,
        'value': value,
        'ingest_time': ingest_time.isoformat()
    }
    date_dir = event_time.strftime('%Y-%m-%d')
    out_dir = os.path.join(path, f'dt={date_dir}')
    os.makedirs(out_dir, exist_ok=True)
    fname = sensor_type + '_' + str(bridge_id) + '_' + str(int(time.time()*1000)) + '.json'
    with open(os.path.join(out_dir, fname), 'w') as f:
        json.dump(rec, f)

def generator_loop(base, rate_per_5s=3):
    temp = f'{base}/streams/bridge_temperature'
    vib  = f'{base}/streams/bridge_vibration'
    tilt = f'{base}/streams/bridge_tilt'
    ids = [1,2,3,4,5]
    while RUN_FLAG['on']:
        for _ in range(rate_per_5s):
            b = random.choice(ids)
            write_event(temp, b, 'temperature')
            write_event(vib,  b, 'vibration')
            write_event(tilt, b, 'tilt')
        time.sleep(5)

t = threading.Thread(target=generator_loop, args=(BASE, 3), daemon=True)
t.start()
print('Generator started. Stop with RUN_FLAG["on"]=False')

Generator started. Stop with RUN_FLAG["on"]=False


In [16]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
schema = (StructType()
          .add('event_time', StringType())
          .add('bridge_id', IntegerType())
          .add('sensor_type', StringType())
          .add('value', DoubleType())
          .add('ingest_time', StringType()))


In [18]:
for q in spark.streams.active:
    try: q.stop()
    except: pass

In [19]:
from pyspark.sql.functions import col, to_timestamp, lit, when

def bronze_stream(src_dir, out_dir, ckpt_dir):
    df = (spark.readStream.format("json").schema(schema)
          .option("maxFilesPerTrigger", 50)
          .load(src_dir))

    bronze = (df
              .withColumn("event_time_ts", to_timestamp("event_time"))
              .withColumn("ingest_time_ts", to_timestamp("ingest_time")))

    cond = (col("event_time_ts").isNotNull() &
            col("value").isNotNull() &
            col("bridge_id").isNotNull())

    valid = bronze.filter(cond)

    rejected = (bronze.filter(~cond)
                .withColumn(
                    "reject_reason",
                    when(col("event_time_ts").isNull(), "bad event_time")
                    .when(col("value").isNull(), "null value")
                    .when(col("bridge_id").isNull(), "null bridge_id")
                    .otherwise("missing/invalid fields")
                ))

    q_ok = (valid.writeStream.format("parquet")
            .option("checkpointLocation", f"{ckpt_dir}/ok")
            .outputMode("append").start(out_dir))

    q_rj = (rejected.writeStream.format("parquet")
            .option("checkpointLocation", f"{ckpt_dir}/rej")
            .outputMode("append").start(f"{BASE}/bronze/rejected"))

    return [q_ok, q_rj]

bronze_qs = []
bronze_qs += bronze_stream(f"{BASE}/streams/bridge_temperature", f"{BASE}/bronze/temperature", f"{BASE}/checkpoints/bronze_temp")
bronze_qs += bronze_stream(f"{BASE}/streams/bridge_vibration",  f"{BASE}/bronze/vibration",  f"{BASE}/checkpoints/bronze_vib")
bronze_qs += bronze_stream(f"{BASE}/streams/bridge_tilt",       f"{BASE}/bronze/tilt",       f"{BASE}/checkpoints/bronze_tilt")

print("Bronze streams restarted:", len(bronze_qs))


Bronze streams restarted: 6


In [23]:
for q in spark.streams.active:
    try: q.stop()
    except: pass

In [24]:
from pyspark.sql.functions import when, col
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, TimestampType

bronze_parquet_schema = StructType([
    StructField("event_time",     StringType(),   True),
    StructField("bridge_id",      IntegerType(),  True),
    StructField("sensor_type",    StringType(),   True),
    StructField("value",          DoubleType(),   True),
    StructField("ingest_time",    StringType(),   True),
    StructField("event_time_ts",  TimestampType(),True),
    StructField("ingest_time_ts", TimestampType(),True),
])

metadata_df = (spark.read.format("csv").option("header", True)
               .load(f"{BASE}/metadata/bridges.csv")
               .select(col("bridge_id").cast("int").alias("b_bridge_id"),
                       "name","location","installation_date"))

def silver_stream(bronze_dir, out_dir, ckpt_dir, sensor):
    sdf = (spark.readStream.format("parquet")
           .schema(bronze_parquet_schema)
           .load(bronze_dir)
           .filter(col("sensor_type") == sensor))

    enriched = (sdf.join(metadata_df, sdf.bridge_id == col("b_bridge_id"), "left")
                .drop("b_bridge_id"))

    if sensor == "temperature":
        rule = col("value").between(-40, 80)
    elif sensor == "vibration":
        rule = (col("value") >= 0)
    else:
        rule = col("value").between(0, 90)

    cond = col("event_time_ts").isNotNull() & rule & col("name").isNotNull()

    valid = enriched.filter(cond)

    rejected = (enriched.filter(~cond)
                .withColumn(
                    "reject_reason",
                    when(col("name").isNull(), "no metadata match")
                    .when(~rule, "range violation")
                    .when(col("event_time_ts").isNull(), "bad timestamp")
                    .otherwise("dq fail")
                ))

    q_ok = (valid.writeStream.format("parquet")
            .option("checkpointLocation", f"{ckpt_dir}/ok")
            .outputMode("append").start(out_dir))

    q_rj = (rejected.writeStream.format("parquet")
            .option("checkpointLocation", f"{ckpt_dir}/rej")
            .outputMode("append").start(f"{BASE}/silver/rejected"))

    return [q_ok, q_rj]

silver_qs = []
silver_qs += silver_stream(f"{BASE}/bronze/temperature", f"{BASE}/silver/temperature", f"{BASE}/checkpoints/silver_temp", "temperature")
silver_qs += silver_stream(f"{BASE}/bronze/vibration",  f"{BASE}/silver/vibration",  f"{BASE}/checkpoints/silver_vib",  "vibration")
silver_qs += silver_stream(f"{BASE}/bronze/tilt",       f"{BASE}/silver/tilt",       f"{BASE}/checkpoints/silver_tilt", "tilt")

print("Silver streams started:", len(silver_qs))

Silver streams started: 6


In [26]:
for q in spark.streams.active:
    try: q.stop()
    except: pass

In [27]:
from pyspark.sql.functions import window, avg, max as smax, col
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, TimestampType

silver_parquet_schema = StructType([
    StructField("event_time",     StringType(),   True),
    StructField("bridge_id",      IntegerType(),  True),
    StructField("sensor_type",    StringType(),   True),
    StructField("value",          DoubleType(),   True),
    StructField("ingest_time",    StringType(),   True),
    StructField("event_time_ts",  TimestampType(),True),
    StructField("ingest_time_ts", TimestampType(),True),
    StructField("name",           StringType(),   True),
    StructField("location",       StringType(),   True),
    StructField("installation_date", StringType(),True),
])

def agg_stream(silver_dir, metric_colname, agg_expr):
    df = (spark.readStream.format('parquet')
          .schema(silver_parquet_schema)
          .load(silver_dir)
          .withWatermark('event_time_ts', '2 minutes'))

    agg = (df.groupBy(window(col('event_time_ts'), '1 minute'), col('bridge_id'))
           .agg(agg_expr.alias(metric_colname))
           .select(
               col('bridge_id'),
               col('window.start').alias('window_start'),
               col('window.end').alias('window_end'),
               col(metric_colname)
           ))
    return agg

temp_agg = agg_stream(f'{BASE}/silver/temperature', 'avg_temperature', avg('value'))
vib_agg  = agg_stream(f'{BASE}/silver/vibration',  'max_vibration',   smax('value'))
tilt_agg = agg_stream(f'{BASE}/silver/tilt',       'max_tilt',        smax('value'))

tv  = temp_agg.join(vib_agg,  on=['bridge_id','window_start','window_end'], how='inner')
tvt = tv.join(tilt_agg,       on=['bridge_id','window_start','window_end'], how='inner')

gold_q = (tvt.writeStream.format('parquet')
          .option('checkpointLocation', f'{BASE}/checkpoints/gold_metrics')
          .outputMode('append')
          .start(f'{BASE}/gold/bridge_metrics'))

print('Gold writer started.')

Gold writer started.


In [28]:
import time
print('Warming ~90s...')
time.sleep(90)
df_gold = spark.read.format('parquet').load(f'{BASE}/gold/bridge_metrics')
print('Gold count:', df_gold.count())
df_gold.orderBy('window_start','bridge_id').show(20, truncate=False)


Warming ~90s...
Gold count: 46
+---------+-------------------+-------------------+------------------+-------------+--------+
|bridge_id|window_start       |window_end         |avg_temperature   |max_vibration|max_tilt|
+---------+-------------------+-------------------+------------------+-------------+--------+
|4        |2025-11-08 13:48:00|2025-11-08 13:49:00|33.1              |4.346        |10.47   |
|1        |2025-11-08 13:49:00|2025-11-08 13:50:00|30.53             |4.274        |18.41   |
|2        |2025-11-08 13:49:00|2025-11-08 13:50:00|27.777272727272724|4.987        |14.22   |
|3        |2025-11-08 13:49:00|2025-11-08 13:50:00|28.56833333333333 |4.913        |19.05   |
|4        |2025-11-08 13:49:00|2025-11-08 13:50:00|29.335            |4.857        |18.88   |
|5        |2025-11-08 13:49:00|2025-11-08 13:50:00|28.00833333333333 |4.808        |18.74   |
|1        |2025-11-08 13:50:00|2025-11-08 13:51:00|26.848333333333333|4.727        |19.79   |
|2        |2025-11-08 13:50:0

In [29]:
RUN_FLAG['on'] = False
for q in spark.streams.active:
    try:
        q.stop()
    except Exception as e:
        print('Stop error:', e)
print('All stopped.')


All stopped.
