1. Spark Session Setup

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Comcast Digital Twin - Data Processing") \
    .getOrCreate()

print("✅ Spark session started")

✅ Spark session started


 2. Load the CSV Data

In [2]:
df = spark.read.csv("../data/simulated_network_telemetry.csv", header=True, inferSchema=True)
df.printSchema()
df.show(5)

root
 |-- node_id: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- cpu_usage: double (nullable = true)
 |-- latency: double (nullable = true)
 |-- throughput: double (nullable = true)
 |-- packet_loss: double (nullable = true)
 |-- temperature: double (nullable = true)
 |-- error_rate: double (nullable = true)
 |-- failure_label: integer (nullable = true)

+---------+--------------------+------------------+------------------+------------------+-------------------+-----------------+-------------------+-------------+
|  node_id|           timestamp|         cpu_usage|           latency|        throughput|        packet_loss|      temperature|         error_rate|failure_label|
+---------+--------------------+------------------+------------------+------------------+-------------------+-----------------+-------------------+-------------+
|NODE-1000|2025-04-12 23:59:...| 21.65024273930277| 22.50989903031263| 128.8664631227477| 1.2525283013552362|59.45818753855583| 0

3. Basic Cleaning

In [3]:
from pyspark.sql.functions import col

# Drop rows with nulls (if any)
df_clean = df.dropna()

# Cast timestamp to proper type
from pyspark.sql.functions import to_timestamp
df_clean = df_clean.withColumn("timestamp", to_timestamp(col("timestamp")))

df_clean.show(3)

+---------+--------------------+------------------+------------------+-----------------+-------------------+-----------------+------------------+-------------+
|  node_id|           timestamp|         cpu_usage|           latency|       throughput|        packet_loss|      temperature|        error_rate|failure_label|
+---------+--------------------+------------------+------------------+-----------------+-------------------+-----------------+------------------+-------------+
|NODE-1000|2025-04-12 23:59:...| 21.65024273930277| 22.50989903031263|128.8664631227477| 1.2525283013552362|59.45818753855583|0.5433606347984982|            0|
|NODE-1000|2025-04-13 00:09:...|28.460248694267477|  22.1450268348712|94.98258009738434| 2.5133706222367214|66.26767150969349|0.4615166620379736|            0|
|NODE-1000|2025-04-13 00:19:...|31.629683971304825|27.125946586721945| 98.5162914546391|0.11731107744023583|65.85668799361488| 1.108900089737807|            0|
+---------+--------------------+--------

4. Feature Engineering: Add Hour, Day, Lag Features

In [4]:
from pyspark.sql.functions import hour, dayofweek

df_feat = df_clean \
    .withColumn("hour", hour("timestamp")) \
    .withColumn("dayofweek", dayofweek("timestamp"))

df_feat.select("timestamp", "hour", "dayofweek").show(5)

+--------------------+----+---------+
|           timestamp|hour|dayofweek|
+--------------------+----+---------+
|2025-04-12 23:59:...|  23|        7|
|2025-04-13 00:09:...|   0|        1|
|2025-04-13 00:19:...|   0|        1|
|2025-04-13 00:29:...|   0|        1|
|2025-04-13 00:39:...|   0|        1|
+--------------------+----+---------+
only showing top 5 rows



5. Rolling Aggregations

In [5]:
from pyspark.sql.window import Window
from pyspark.sql.functions import avg

windowSpec = Window.partitionBy("node_id").orderBy("timestamp").rowsBetween(-3, 0)

df_feat = df_feat.withColumn("rolling_avg_cpu", avg("cpu_usage").over(windowSpec))
df_feat.select("node_id", "timestamp", "cpu_usage", "rolling_avg_cpu").show(5)

+---------+--------------------+------------------+------------------+
|  node_id|           timestamp|         cpu_usage|   rolling_avg_cpu|
+---------+--------------------+------------------+------------------+
|NODE-1000|2025-04-12 23:59:...| 21.65024273930277| 21.65024273930277|
|NODE-1000|2025-04-13 00:09:...|28.460248694267477|25.055245716785123|
|NODE-1000|2025-04-13 00:19:...|31.629683971304825|27.246725134958357|
|NODE-1000|2025-04-13 00:29:...|40.988281266370436|30.682114167811378|
|NODE-1000|2025-04-13 00:39:...| 34.99904017572465| 34.01931352691685|
+---------+--------------------+------------------+------------------+
only showing top 5 rows



6. Save Cleaned & Enriched Data

In [7]:
df_feat_pd = df_feat.toPandas()  # Convert to Pandas
df_feat_pd.to_csv("../data/processed_network_data.csv", index=False)  # Save normally
print("✅ Saved using Pandas instead of Spark.")

  from pandas.core import (


✅ Saved using Pandas instead of Spark.
