In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import col, split, trim
from pyspark.sql.functions import regexp_replace
from pyspark.sql.functions import split, from_json, to_timestamp, avg, window
from pyspark.ml import PipelineModel

In [2]:
spark = SparkSession.builder \
        .appName("sparkKafkaESStatefulStreamingApplication") \
        .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.2,org.elasticsearch:elasticsearch-spark-30_2.12:8.10.1") \
        .config("spark.sql.streaming.statefulOperator.checkCorrectness.enabled", "false") \
        .getOrCreate()

In [3]:
spark

In [4]:
schema = StructType([
    StructField("systemId", StringType()),
    StructField("username", StringType()),
    StructField("timestamp", StringType()),
    StructField("Core_VIDs_avg_V", DoubleType(), True),
    StructField("Core_Clocks_avg_MHz", DoubleType(), True),
    StructField("Ring_LLC_Clock_MHz", DoubleType(), True),
    StructField("Core_Usage_avg_percent", DoubleType(), True),
    StructField("Core_Temperatures_avg_C", DoubleType(), True),
    StructField("Core_Distance_to_TjMAX_avg_C", DoubleType(), True),
    StructField("CPU_Package_C", DoubleType(), True),
    StructField("CPU_Package_Power_W", DoubleType(), True),
    StructField("PL1_Power_Limit_Static_W", DoubleType(), True),
    StructField("PL1_Power_Limit_Dynamic_W", DoubleType(), True),
    StructField("PL2_Power_Limit_Static_W", DoubleType(), True),
    StructField("PL2_Power_Limit_Dynamic_W", DoubleType(), True),
    StructField("CPU_FAN_RPM", DoubleType(), True),
    StructField("GPU_FAN_RPM", DoubleType(), True),
    StructField("GPU_Temperature", DoubleType(), True),
    StructField("GPU_Thermal_Limit", DoubleType(), True),
    StructField("GPU_Core_Voltage", DoubleType(), True),
    StructField("GPU_Power", DoubleType(), True),
    StructField("GPU_Clock", DoubleType(), True),
    StructField("GPU_Core_Load", DoubleType(), True),
    StructField("GPU_Memory_Usage", DoubleType(), True)
])

In [5]:
bootstrap_server = "kafka:9092"
kafka_topic = "metricsIngestion"

df = spark.readStream.format("kafka") \
    .option("kafka.bootstrap.servers", bootstrap_server) \
    .option("subscribe", kafka_topic) \
    .load()

In [6]:
from pyspark.sql.functions import from_json, col, to_timestamp

json_df = df.selectExpr("CAST(value AS STRING) as json_string")

parsed_df = json_df.select(from_json(col("json_string"), schema).alias("data")).select("data.*")

df_with_delta = parsed_df.withColumn("GPU_Temperature_Delta", col("GPU_Thermal_Limit") - col("GPU_Temperature"))

df_with_delta = df_with_delta.withColumn("timestamp", to_timestamp("timestamp"))


In [7]:
# query = df_with_delta.writeStream.outputMode("append").format("console").option("truncate", False).start()

In [8]:
# query.stop()

In [9]:
# Combined required features for both models
cpu_required_cols = [
    "Core_VIDs_avg_V", "Core_Clocks_avg_MHz", "Ring_LLC_Clock_MHz",
    "Core_Usage_avg_percent", "Core_Temperatures_avg_C", "Core_Distance_to_TjMAX_avg_C",
    "CPU_Package_C", "CPU_Package_Power_W",
    "PL1_Power_Limit_Static_W", "PL1_Power_Limit_Dynamic_W",
    "PL2_Power_Limit_Static_W", "PL2_Power_Limit_Dynamic_W"
]

gpu_required_cols = [
    "GPU_Temperature", "GPU_Core_Voltage", "GPU_Power", "GPU_Clock",
    "GPU_Core_Load", "GPU_Memory_Usage", "GPU_Temperature_Delta"
]

# Combine them
all_required_cols = list(set(cpu_required_cols + gpu_required_cols))

# Drop any rows with nulls in critical input features
df_cleaned = df_with_delta.na.drop(subset=all_required_cols)


In [10]:
# stateful 5 minute aggregations
# stateful_df = parsed_df \
# .withWatermark("timestamp", "6 minutes") \
# .groupBy(
#     col("systemId"),
#     window(col("timestamp"), "5 minutes", "1 minute")
# ) \
# .agg(
#     avg("cpucoreUsage").alias("5_min_core_usage_avg"),
#     avg("cpupackageTemperature").alias("5_min_core_temp_avg")
# ) \
# .withColumn("high_core_utility", col("5_min_core_usage_avg") > 80)

# result_df = stateful_df.selectExpr(
#     "systemId", 
#     "username",
#     "timestamp",
#     "cpucoreVid", 
#     "cpucoreClocks", 
#     "cpucoreUsage", 
#     "cpupackageTemperature", 
#     "cpupackagePower"
# )


# Compute stateful 5-min aggregations per system
# agg_df = parsed_df \
#     .withWatermark("timestamp", "6 minutes") \
#     .groupBy(
#         col("systemId"),
#         window(col("timestamp"), "5 minutes", "1 minute")
#     ) \
#     .agg(
#         avg("cpucoreUsage").alias("5_min_core_usage_avg"),
#         avg("cpupackageTemperature").alias("5_min_core_temp_avg")
#     )

# # Flatten for join
# flattened_agg = agg_df.selectExpr(
#     "systemId",
#     "window.start as window_start",
#     "window.end as window_end",
#     "5_min_core_usage_avg",
#     "5_min_core_temp_avg"
# )


# # Join with original parsed_df to get full row + state
# joined = parsed_df \
#     .join(flattened_agg,
#           (parsed_df["systemId"] == flattened_agg["systemId"]) &
#           (parsed_df["timestamp"] >= flattened_agg["window_start"]) &
#           (parsed_df["timestamp"] < flattened_agg["window_end"])
#          ) \
#     .drop("systemId", "window_start", "window_end")

# # Add final flag column
# final_df = joined.withColumn("high_core_utility", col("5_min_core_usage_avg") > 80)

In [11]:
# model = PipelineModel.load("model1")

# feature_columns = [
#     "Core_VIDs_avg_V", "Core_Clocks_avg_MHz", "Ring_LLC_Clock_MHz",
#     "Core_Usage_avg_percent", "Core_Temperatures_avg_C", "Core_Distance_to_TjMAX_avg_C",
#     "CPU_Package_C", "CPU_Package_Power_W", "PL1_Power_Limit_Static_W",
#     "PL1_Power_Limit_Dynamic_W", "PL2_Power_Limit_Static_W", "PL2_Power_Limit_Dynamic_W"
# ]

# df_for_model = parsed_df.select(*feature_columns)

# df_with_prediction = model.transform(parsed_df) \
#     .withColumnRenamed("prediction", "Core_Thermal_Throttling") \
#     .select(
#         "systemId", "username", "timestamp",
#         "Core_VIDs_avg_V", "Core_Clocks_avg_MHz", "Ring_LLC_Clock_MHz",
#         "Core_Usage_avg_percent", "Core_Temperatures_avg_C", "Core_Distance_to_TjMAX_avg_C",
#         "CPU_Package_C", "CPU_Package_Power_W", "PL1_Power_Limit_Static_W",
#         "PL1_Power_Limit_Dynamic_W", "PL2_Power_Limit_Static_W", "PL2_Power_Limit_Dynamic_W",
#         "CPU_FAN_RPM", "GPU_FAN_RPM",
#         "Core_Thermal_Throttling"
#     )

In [12]:
# 6️⃣ Load models
cpu_model = PipelineModel.load("model1")
gpu_model = PipelineModel.load("gpu_optimal_pipeline_LR_V2")

In [13]:
# 7️⃣ Run CPU model first
df_with_cpu = cpu_model.transform(df_cleaned) \
    .withColumnRenamed("prediction", "Core_Thermal_Throttling")

In [14]:
# 9️⃣ Run GPU model next
df_with_gpu = gpu_model.transform(df_with_cpu) \
    .withColumnRenamed("gpu_prediction", "GPU_Optimal_Performance")

In [15]:
# 🔟 Final selection of fields
df_final = df_with_gpu.select(
    "systemId", "username", "timestamp",
    "Core_VIDs_avg_V", "Core_Clocks_avg_MHz", "Ring_LLC_Clock_MHz", "Core_Usage_avg_percent",
    "Core_Temperatures_avg_C", "Core_Distance_to_TjMAX_avg_C", "CPU_Package_C", "CPU_Package_Power_W",
    "PL1_Power_Limit_Static_W", "PL1_Power_Limit_Dynamic_W", "PL2_Power_Limit_Static_W", "PL2_Power_Limit_Dynamic_W",
    "CPU_FAN_RPM", "GPU_FAN_RPM",
    "GPU_Temperature", "GPU_Thermal_Limit", "GPU_Core_Voltage", "GPU_Power",
    "GPU_Clock", "GPU_Core_Load", "GPU_Memory_Usage", "GPU_Temperature_Delta",
    "Core_Thermal_Throttling", "GPU_Optimal_Performance"
)

In [16]:
# testing output before sending to final destionation at elasticsearch
query = df_final.writeStream.outputMode("append").format("console").option("truncate", False).start()

In [18]:
query.stop()

In [16]:
# beginning writing to elasticsearch
resource = "hwinfo_dev_v2" # elasticsearch dev testing index
port = 9200

es_writestream = df_final.writeStream \
    .outputMode("append") \
    .format("org.elasticsearch.spark.sql") \
    .option("es.nodes", "elasticsearch") \
    .option("es.port", port) \
    .option("es.resource", resource) \
    .option("es.net.ssl", "false") \
    .option("checkpointLocation", "/tmp/spark-checkpoints") \
    .start()

In [17]:
es_writestream.stop()