In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, DoubleType, StringType, IntegerType, StructField
from pyspark.sql.functions import col, split, trim
from pyspark.sql.functions import regexp_replace
from pyspark.sql.types import StructType, StructField, IntegerType, DoubleType, StringType
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.types import StructType, DoubleType, StringType, IntegerType, StructField
from pyspark.sql.functions import col, split, trim
from pyspark.sql.functions import regexp_replace
from pyspark.ml.pipeline import PipelineModel
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import split

In [2]:
spark = SparkSession.builder \
    .appName("KafkaToElasticsearchHwinfoLogs") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.2,org.elasticsearch:elasticsearch-spark-30_2.12:8.10.1") \
    .getOrCreate()

In [3]:
spark

In [4]:
df_kafka = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("subscribe", "hwinfo") \
    .option("startingOffsets", "latest") \
    .option("failOnDataLoss", "false") \
    .load()


In [5]:
hwinfo_schema = StructType([
    StructField("Date", StringType(), True),
    StructField("Time", StringType(), True),
    StructField("Core_VIDs_avg_V", DoubleType(), True),
    StructField("Core_Clocks_avg_MHz", IntegerType(), True),
    StructField("Ring_LLC_Clock_MHz", DoubleType(), True),
    StructField("Core_Usage_avg_percent", DoubleType(), True),
    StructField("Core_Temperatures_avg_C", DoubleType(), True),
    StructField("Core_Distance_to_TjMAX_avg_C", DoubleType(), True),
    StructField("CPU_Package_C", IntegerType(), True),
    StructField("CPU_Package_Power_W", DoubleType(), True),
    StructField("PL1_Power_Limit_Static_W", DoubleType(), True),
    StructField("PL1_Power_Limit_Dynamic_W", DoubleType(), True),
    StructField("PL2_Power_Limit_Static_W", DoubleType(), True),
    StructField("PL2_Power_Limit_Dynamic_W", DoubleType(), True)
])

In [6]:
# converting the binary shit show to what the it should actually represent
json_data = df_kafka.selectExpr("CAST(value AS STRING)").alias("strings")

In [7]:
df_parsed = json_data.selectExpr("CAST(value AS STRING)")

df_final = df_parsed.withColumn("value", regexp_replace(col("value"), "[\\[\\]']", "")) 

columns = [
    "Date", "Time", "Core_VIDs_avg_V", "Core_Clocks_avg_MHz", "Ring_LLC_Clock_MHz",
    "Core_Usage_avg_percent", "Core_Temperatures_avg_C", "Core_Distance_to_TjMAX_avg_C",
    "CPU_Package_C", "CPU_Package_Power_W", "PL1_Power_Limit_Static_W",
    "PL1_Power_Limit_Dynamic_W", "PL2_Power_Limit_Static_W", "PL2_Power_Limit_Dynamic_W"
]

df_split = df_final.withColumn("value", split(col("value"), ","))

for i, column in enumerate(columns):
    df_split = df_split.withColumn(column, df_split["value"].getItem(i))

df_cleaned = df_split.select(
    col("Date"), col("Time"),
    col("Core_VIDs_avg_V").cast("double"),
    col("Core_Clocks_avg_MHz").cast("int"),
    col("Ring_LLC_Clock_MHz").cast("double"),
    col("Core_Usage_avg_percent").cast("double"),
    col("Core_Temperatures_avg_C").cast("double"),
    col("Core_Distance_to_TjMAX_avg_C").cast("double"),
    col("CPU_Package_C").cast("int"),
    col("CPU_Package_Power_W").cast("double"),
    col("PL1_Power_Limit_Static_W").cast("double"),
    col("PL1_Power_Limit_Dynamic_W").cast("double"),
    col("PL2_Power_Limit_Static_W").cast("double"),
    col("PL2_Power_Limit_Dynamic_W").cast("double")
)
# df_final

In [8]:
# Load the damn model
model = PipelineModel.load("model1")

feature_columns = [
    "Core_VIDs_avg_V", "Core_Clocks_avg_MHz", "Ring_LLC_Clock_MHz", "Core_Usage_avg_percent",
    "Core_Temperatures_avg_C", "Core_Distance_to_TjMAX_avg_C", "CPU_Package_C",
    "CPU_Package_Power_W", "PL1_Power_Limit_Static_W", "PL1_Power_Limit_Dynamic_W",
    "PL2_Power_Limit_Static_W", "PL2_Power_Limit_Dynamic_W"
]

# df_predictions = model.transform(df_cleaned) \
#     .select("Date", "Time", "prediction") \
#     .withColumnRenamed("prediction", "Thermal_Throttling")  # Rename to meaningful column name

df_final_with_prediction = model.transform(df_cleaned) \
    .withColumnRenamed("prediction", "Core_Thermal_Throttling") \
    .select("Date", "Time", "Core_VIDs_avg_V", "Core_Clocks_avg_MHz", "Ring_LLC_Clock_MHz", "Core_Usage_avg_percent", "Core_Temperatures_avg_C", "Core_Distance_to_TjMAX_avg_C", "CPU_Package_C", "CPU_Package_Power_W", "PL1_Power_Limit_Static_W", "PL1_Power_Limit_Dynamic_W", "PL2_Power_Limit_Static_W", "PL2_Power_Limit_Dynamic_W", "Core_Thermal_Throttling" )

In [9]:
# initiating two streaming from here

In [14]:
# stream sink 1 : going directly to elasticsearch for future querying and other random nonsense
# use resource = "hwinfo" for production
# use resource = "hwinfo_test" for testing

es_query = df_final_with_prediction.writeStream \
    .outputMode("append") \
    .format("org.elasticsearch.spark.sql") \
    .option("es.nodes", "elasticsearch") \
    .option("es.port", "9200") \
    .option("es.resource", "hwinfo_test") \
    .option("es.net.ssl", "false") \
    .option("checkpointLocation", "/tmp/spark-checkpoints") \
    .start()

# # stream sink 2: used for serving the real time dashboard, stream to a kafka topic that will do all the necessary shit
kafka_query = df_final_with_prediction.selectExpr("to_json(struct(*)) AS value") \
    .writeStream \
    .outputMode("append") \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("topic", "hwinfo_logs_RT") \
    .option("checkpointLocation", "/tmp/spark-checkpoints-RT") \
    .start()

# under testing 

# Write messages to console
# query = df_final.writeStream \
#     .outputMode("append") \
#     .format("console") \
#     .start()


# stream sink 2: used for serving the real time dashboard, stream to a kafka topic that will do all the necessary shit
# kafka_query = df_final.writeStream \
#     .outputMode("append") \
#     .format("kafka") \
#     .option("kafka.bootstrap.servers", "kafka:9092") \
#     .option("topic", "hwinfo_logs_RT") \
#     .option("checkpointLocation", "/tmp/spark-checkpoints-RT") \
#     .start()

# the above code should error out since kafka needs value as a message

In [15]:
es_query.stop()
kafka_query.stop()

# es_query = df_final.writeStream \
#     .outputMode("append") \
#     .format("org.elasticsearch.spark.sql") \
#     .option("es.nodes", "elasticsearch") \
#     .option("es.port", "9200") \
#     .option("es.resource", "hwinfo") \
#     .option("es.net.ssl", "false") \
#     .option("checkpointLocation", "/tmp/spark-checkpoints") \
#     .start()