In [43]:
# installs PySpark + common libs
!pip install -q pyspark pandas pyarrow kafka-python
print("Installed pyspark, pandas, pyarrow, kafka-python (if needed).")


Installed pyspark, pandas, pyarrow, kafka-python (if needed).


In [44]:
# Python cell
!pip install -q pyspark pandas pyarrow
print("Dependencies installed.")


Dependencies installed.


In [6]:
%%bash
cat > /content/smart_city_run/streaming_job_fixed.py << 'PY'
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

spark = SparkSession.builder.appName("SmartCityStreamingFixed").getOrCreate()
spark.sparkContext.setLogLevel("WARN")

poll_schema = StructType([
    StructField("timestamp", LongType(), True),
    StructField("location", StringType(), True),
    StructField("pm2_5", DoubleType(), True),
])

traf_schema = StructType([
    StructField("timestamp", LongType(), True),
    StructField("road", StringType(), True),
    StructField("vehicles", IntegerType(), True),
])

poll = spark.readStream.format("csv").option("header","true").schema(poll_schema).load("/content/smart_city_run/kafka_mock/pollution")
traf = spark.readStream.format("csv").option("header","true").schema(traf_schema).load("/content/smart_city_run/kafka_mock/traffic")

def handle_poll(batch_df, batch_id):
    print("\n=== POLLUTION BATCH", batch_id, "===")
    if batch_df.rdd.isEmpty():
        print("No new pollution data.")
        return
    agg = batch_df.groupBy("location").avg("pm2_5").withColumnRenamed("avg(pm2_5)","avg_pm2_5")
    agg.show(truncate=False)

def handle_traf(batch_df, batch_id):
    print("\n=== TRAFFIC BATCH", batch_id, "===")
    if batch_df.rdd.isEmpty():
        print("No new traffic data.")
        return
    agg = batch_df.groupBy("road").avg("vehicles").withColumnRenamed("avg(vehicles)","avg_vehicles")
    agg.show(truncate=False)

p_query = poll.writeStream.foreachBatch(handle_poll).trigger(processingTime="3 seconds").start()
t_query = traf.writeStream.foreachBatch(handle_traf).trigger(processingTime="3 seconds").start()

print("Streaming job started (foreachBatch). Waiting for batches...")
p_query.awaitTermination()
t_query.awaitTermination()
PY

echo "Streaming job created: /content/smart_city_run/streaming_job_fixed.py"


Streaming job created: /content/smart_city_run/streaming_job_fixed.py


In [7]:
%%bash
WORK=/content/smart_city_run

cat > $WORK/pollution_producer_colab.py <<'PY'
#!/usr/bin/env python3
import time, csv, os, random, uuid
OUT = os.path.join(os.path.dirname(__file__),"kafka_mock/pollution")
os.makedirs(OUT, exist_ok=True)
for i in range(40):
    msg = {"timestamp":int(time.time()), "location":random.choice(["locA","locB","locC"]), "pm2_5":round(random.uniform(5,250),2)}
    tmp = os.path.join(OUT, "tmp_"+uuid.uuid4().hex + ".csv")
    final = os.path.join(OUT, "msg_"+uuid.uuid4().hex + ".csv")
    with open(tmp,"w",newline="") as f:
        w=csv.DictWriter(f, fieldnames=["timestamp","location","pm2_5"]); w.writeheader(); w.writerow(msg)
    os.replace(tmp, final)
    print("Wrote", final)
    time.sleep(0.5)
PY

cat > $WORK/traffic_producer_colab.py <<'PY'
#!/usr/bin/env python3
import time, csv, os, random, uuid
OUT = os.path.join(os.path.dirname(__file__),"kafka_mock/traffic")
os.makedirs(OUT, exist_ok=True)
for i in range(40):
    msg = {"timestamp":int(time.time()), "road":random.choice(["R1","R2","R3"]), "vehicles":random.randint(0,200)}
    tmp = os.path.join(OUT, "tmp_"+uuid.uuid4().hex + ".csv")
    final = os.path.join(OUT, "msg_"+uuid.uuid4().hex + ".csv")
    with open(tmp,"w",newline="") as f:
        w=csv.DictWriter(f, fieldnames=["timestamp","road","vehicles"]); w.writeheader(); w.writerow(msg)
    os.replace(tmp, final)
    print("Wrote", final)
    time.sleep(0.5)
PY

echo "Producer scripts created at $WORK/"
ls -la /content/smart_city_run | sed -n '1,200p'


Producer scripts created at /content/smart_city_run/
total 44
drwxr-xr-x 3 root root 4096 Nov 21 16:54 .
drwxr-xr-x 1 root root 4096 Nov 21 16:29 ..
drwxr-xr-x 4 root root 4096 Nov 21 16:29 kafka_mock
-rw-r--r-- 1 root root  656 Nov 21 16:54 pollution_producer_colab.py
-rw-r--r-- 1 root root  824 Nov 21 16:31 pollution_producer_mock.py
-rw-r--r-- 1 root root 2833 Nov 21 16:44 streaming_job_colab.py
-rw-r--r-- 1 root root 1308 Nov 21 16:31 streaming_job_filesource.py
-rw-r--r-- 1 root root 1763 Nov 21 16:54 streaming_job_fixed.py
-rw-r--r-- 1 root root  637 Nov 21 16:54 traffic_producer_colab.py
-rw-r--r-- 1 root root  803 Nov 21 16:31 traffic_producer_mock.py


In [None]:
# Python cell
!python3 /content/smart_city_run/streaming_job_colab.py


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/11/21 16:55:03 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/11/21 16:55:05 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
25/11/21 16:55:05 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
No JSON stream_input files found; continuing to watch producer CSV folders.
25/11/21 16:55:11 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-d966ece1-d545-4de3-9e37-82906fa4fee7. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
25/11/21 16:55:11 WARN ResolveWriteToStream: spark.sql.adaptive.enab

In [None]:
!python3 /content/smart_city_run/pollution_producer_colab.py


In [2]:
!python3 /content/smart_city_run/traffic_producer_colab.py


Wrote /content/smart_city_run/kafka_mock/traffic/msg_dc3f038ef1444bf5b0af1a3ce7b126a8.csv
Wrote /content/smart_city_run/kafka_mock/traffic/msg_dd0f8072dc294a3a903214c5b2cd066a.csv
Wrote /content/smart_city_run/kafka_mock/traffic/msg_cc7665d7df0247d2b4884735efa76daf.csv
Wrote /content/smart_city_run/kafka_mock/traffic/msg_9db6b6fe497841e8893110dd334db959.csv
Wrote /content/smart_city_run/kafka_mock/traffic/msg_4f1c7ca5960341508b7a10451ad082fc.csv
Wrote /content/smart_city_run/kafka_mock/traffic/msg_476ad2f42a53445e991b4e5c9735a663.csv
Wrote /content/smart_city_run/kafka_mock/traffic/msg_d02865a0c7814687b00383853e031392.csv
Wrote /content/smart_city_run/kafka_mock/traffic/msg_bb2a6f12e3da45fea9240da736703b1c.csv
Wrote /content/smart_city_run/kafka_mock/traffic/msg_5afa4cb04ffe4bf3849c851503461ecd.csv
Wrote /content/smart_city_run/kafka_mock/traffic/msg_eb81df53130e4475a0051e700140794a.csv
Wrote /content/smart_city_run/kafka_mock/traffic/msg_d28334c538cc4efabd64860b1e43cb06.csv
Wrote /con

In [3]:
# Run this in a Python cell
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, LongType, StringType, DoubleType, IntegerType
spark = SparkSession.builder.appName("BatchCheck").getOrCreate()
spark.sparkContext.setLogLevel("WARN")

poll_schema = StructType([
    StructField("timestamp", LongType(), True),
    StructField("location", StringType(), True),
    StructField("pm2_5", DoubleType(), True),
])
traf_schema = StructType([
    StructField("timestamp", LongType(), True),
    StructField("road", StringType(), True),
    StructField("vehicles", IntegerType(), True),
])

poll_df = spark.read.format("csv").option("header","true").schema(poll_schema).load("/content/smart_city_run/kafka_mock/pollution")
traf_df = spark.read.format("csv").option("header","true").schema(traf_schema).load("/content/smart_city_run/kafka_mock/traffic")

print("Pollution count:", poll_df.count())
print("Traffic count:", traf_df.count())

print("Pollution aggregation:")
poll_df.groupBy("location").avg("pm2_5").withColumnRenamed("avg(pm2_5)","avg_pm2_5").show(truncate=False)

print("Traffic aggregation:")
traf_df.groupBy("road").avg("vehicles").withColumnRenamed("avg(vehicles)","avg_vehicles").show(truncate=False)


Pollution count: 40
Traffic count: 40
Pollution aggregation:
+--------+------------------+
|location|avg_pm2_5         |
+--------+------------------+
|locC    |112.44199999999998|
|locB    |126.22            |
|locA    |104.06687500000001|
+--------+------------------+

Traffic aggregation:
+----+-----------------+
|road|avg_vehicles     |
+----+-----------------+
|R3  |96.33333333333333|
|R2  |105.1875         |
|R1  |93.55555555555556|
+----+-----------------+



In [None]:
!pip install -q dash plotly pandas pyarrow fastparquet pyngrok
print("Installed dash, plotly, pandas, pyngrok ...")


In [None]:
# OPTIONAL: set your ngrok auth token (recommended to avoid rate limits)
NGROK_AUTH_TOKEN = "35lsQR8RxpBHi6QzRStxL5C6lvr_AKiwW9i7j2YVqsQhh4Nm"   # <--- replace with your token or leave as ""
from pyngrok import ngrok
if NGROK_AUTH_TOKEN:
    ngrok.set_auth_token(NGROK_AUTH_TOKEN)
    print("ngrok auth token set.")
else:
    print("No ngrok token set; a temporary tunnel will be created.")


In [None]:
# Create Dash app file
app_code = r'''
from dash import Dash, dcc, html, Input, Output
import pandas as pd, plotly.express as px, glob, os, time

app = Dash(__name__)

app.layout = html.Div([
    html.H3("Smart City â€” Live Stream Dashboard"),
    dcc.Interval(id="interval", interval=4000, n_intervals=0),
    dcc.Graph(id="pm25-chart"),
    dcc.Graph(id="traffic-chart"),
    html.Div(id="last-update")
])

def load_pollution():
    path = "/content/smart_city_run/kafka_mock/pollution"
    files = sorted(glob.glob(os.path.join(path,"*.csv")))
    if not files:
        return pd.DataFrame(columns=["timestamp","location","pm2_5"])
    df = pd.concat([pd.read_csv(f) for f in files], ignore_index=True)
    return df

def load_traffic():
    path = "/content/smart_city_run/kafka_mock/traffic"
    files = sorted(glob.glob(os.path.join(path,"*.csv")))
    if not files:
        return pd.DataFrame(columns=["timestamp","road","vehicles"])
    df = pd.concat([pd.read_csv(f) for f in files], ignore_index=True)
    return df

@app.callback(
    Output("pm25-chart","figure"),
    Output("traffic-chart","figure"),
    Output("last-update","children"),
    Input("interval","n_intervals")
)
def update(n):
    p = load_pollution()
    t = load_traffic()
    if p.empty:
        fig1 = px.bar(title="No pollution data yet")
    else:
        agg = p.groupby("location", as_index=False)["pm2_5"].mean()
        fig1 = px.bar(agg, x="location", y="pm2_5", title="Average PM2.5 by Location")
    if t.empty:
        fig2 = px.bar(title="No traffic data yet")
    else:
        agg2 = t.groupby("road", as_index=False)["vehicles"].mean()
        fig2 = px.bar(agg2, x="road", y="vehicles", title="Average Vehicles by Road")
    return fig1, fig2, f"Last update: {time.ctime()}"

# Expose the app object so the runner thread can import it
app.server  # ensure server exists
with open("/content/smart_city_dash.py", "w") as f:
    f.write(app_code)
print("Dash app written to /content/smart_city_dash.py")
'''
print("Writing Dash app...")
exec(app_code, {})
