In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, abs, unix_timestamp, row_number
from pyspark.sql.window import Window

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("MergeStreamingData") \
    .master("spark://spark-master:7077") \
    .config("spark.jars", "/home/jovyan/jars/mysql-connector-j-8.0.33.jar") \
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")

# JDBC Configs
jdbc_url = "jdbc:mysql://starrocks-allin1:9030/industrial_warehouse"
jdbc_props = {"user": "root", "password": "", "driver": "com.mysql.cj.jdbc.Driver"}

# Read Tables
iot_df = spark.read.jdbc(jdbc_url, "staging_iot_stream", properties=jdbc_props).withColumn("Timestamp", col("Timestamp").cast("timestamp"))
mes_df = spark.read.jdbc(jdbc_url, "staging_mes_stream", properties=jdbc_props).withColumn("Timestamp", col("Timestamp").cast("timestamp"))
scada_df = spark.read.jdbc(jdbc_url, "staging_scada_stream", properties=jdbc_props).withColumn("Timestamp", col("Timestamp").cast("timestamp"))
machinestatus_df = spark.read.jdbc(jdbc_url, "MachineStatus", properties=jdbc_props)
alarmcodes_df = spark.read.jdbc(jdbc_url, "AlarmCodes", properties=jdbc_props)

# Step 1: Get best MES within ±1 hour
mes_join = iot_df.alias("i").join(mes_df.alias("m"), "Machine_ID") \
    .withColumn("time_diff_mes", abs(unix_timestamp(col("i.Timestamp")) - unix_timestamp(col("m.Timestamp")))) \
    .filter(col("time_diff_mes") <= 3600)

window_mes = Window.partitionBy("i.Machine_ID", "i.Timestamp").orderBy("time_diff_mes")
best_mes = mes_join.withColumn("rn", row_number().over(window_mes)).filter(col("rn") == 1) \
    .select(
        col("i.Timestamp"),
        col("i.Machine_ID"),
        col("i.Temperature_C"),
        col("i.Vibration_mm_s"),
        col("i.Pressure_bar"),
        col("m.Operator_ID"),
        col("m.Units_Produced"),
        col("m.Defective_Units"),
        col("m.Production_Time_min")
    )

# Step 2: Enrich SCADA with StatusID and AlarmID
scada_enriched = scada_df.alias("s") \
    .join(machinestatus_df.alias("ms"), col("s.Machine_Status") == col("ms.StatusName"), "left") \
    .join(alarmcodes_df.alias("ac"), col("s.Alarm_Code") == col("ac.AlarmDescription"), "left") \
    .select(
        col("s.Timestamp").alias("scada_ts"),
        col("s.Machine_ID"),
        col("s.Power_Consumption_kW"),
        col("ms.StatusID").alias("Machine_Status_ID"),
        col("s.Machine_Status"),
        col("ac.AlarmID").alias("Alarm_ID"),
        col("s.Alarm_Code")
    )

# Step 3: Get best SCADA within ±15 minutes
scada_join = iot_df.alias("i").join(scada_enriched.alias("s"), "Machine_ID") \
    .withColumn("time_diff_scada", abs(unix_timestamp(col("i.Timestamp")) - unix_timestamp(col("s.scada_ts")))) \
    .filter(col("time_diff_scada") <= 900)

window_scada = Window.partitionBy("i.Machine_ID", "i.Timestamp").orderBy("time_diff_scada")
best_scada = scada_join.withColumn("rn", row_number().over(window_scada)).filter(col("rn") == 1) \
    .select(
        col("i.Timestamp"),
        col("i.Machine_ID"),
        col("s.Power_Consumption_kW"),
        col("s.Machine_Status_ID"),
        col("s.Machine_Status"),
        col("s.Alarm_ID"),
        col("s.Alarm_Code")
    )

# Step 4: Final Join MES + SCADA
final_df = best_mes.alias("m").join(best_scada.alias("s"), ["Machine_ID", "Timestamp"], "left") \
    .select(
        col("Timestamp"),
        col("Machine_ID"),
        col("Temperature_C"),
        col("Vibration_mm_s"),
        col("Pressure_bar"),
        col("Operator_ID"),
        col("Units_Produced"),
        col("Defective_Units"),
        col("Production_Time_min"),
        col("Power_Consumption_kW"),
        col("Machine_Status_ID"),
        col("Machine_Status"),
        col("Alarm_ID"),
        col("Alarm_Code")
    )

# Optional: reduce to fewer partitions
final_df = final_df.repartition(1)

# Step 5: Write to machine_metrics
try:
    final_df.write.mode("append").format("jdbc") \
        .option("url", jdbc_url) \
        .option("dbtable", "machine_metrics") \
        .option("user", "root") \
        .option("password", "") \
        .option("driver", "com.mysql.cj.jdbc.Driver") \
        .save()
    print("✅ Merge complete and data inserted into machine_metrics.")
except Exception as e:
    print("❌ Failed to write to StarRocks:", e)

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, abs, unix_timestamp, row_number
from pyspark.sql.window import Window
 
# Initialize SparkSession
spark = SparkSession.builder \
    .appName("MergeStreamingData") \
    .master("spark://spark-master:7077") \
    .config("spark.jars", "/home/jovyan/jars/mysql-connector-j-8.0.33.jar") \
    .getOrCreate()
 
spark.sparkContext.setLogLevel("WARN")
 
# JDBC Configs
jdbc_url = "jdbc:mysql://starrocks-allin1:9030/industrial_warehouse"
jdbc_props = {"user": "root", "password": "", "driver": "com.mysql.cj.jdbc.Driver"}
 
# Read Tables
iot_df = spark.read.jdbc(jdbc_url, "staging_iot_stream", properties=jdbc_props).withColumn("Timestamp", col("Timestamp").cast("timestamp"))
mes_df = spark.read.jdbc(jdbc_url, "staging_mes_stream", properties=jdbc_props).withColumn("Timestamp", col("Timestamp").cast("timestamp"))
scada_df = spark.read.jdbc(jdbc_url, "staging_scada_stream", properties=jdbc_props).withColumn("Timestamp", col("Timestamp").cast("timestamp"))
machinestatus_df = spark.read.jdbc(jdbc_url, "MachineStatus", properties=jdbc_props)
alarmcodes_df = spark.read.jdbc(jdbc_url, "AlarmCodes", properties=jdbc_props)
 
# Step 1: Get best MES within ±1 hour
mes_join = iot_df.alias("i").join(mes_df.alias("m"), "Machine_ID") \
    .withColumn("time_diff_mes", abs(unix_timestamp(col("i.Timestamp")) - unix_timestamp(col("m.Timestamp")))) \
    .filter(col("time_diff_mes") <= 3600)
 
window_mes = Window.partitionBy("i.Machine_ID", "i.Timestamp").orderBy("time_diff_mes")
best_mes = mes_join.withColumn("rn", row_number().over(window_mes)).filter(col("rn") == 1) \
    .select(
        col("i.Timestamp"),
        col("i.Machine_ID"),
        col("i.Temperature_C"),
        col("i.Vibration_mm_s"),
        col("i.Pressure_bar"),
        col("m.Operator_ID"),
        col("m.Units_Produced"),
        col("m.Defective_Units"),
        col("m.Production_Time_min")
    )
 
# Step 2: Enrich SCADA with StatusID and AlarmID
scada_enriched = scada_df.alias("s") \
    .join(machinestatus_df.alias("ms"), col("s.Machine_Status") == col("ms.StatusName"), "left") \
    .join(alarmcodes_df.alias("ac"), col("s.Alarm_Code") == col("ac.AlarmDescription"), "left") \
    .select(
        col("s.Timestamp").alias("scada_ts"),
        col("s.Machine_ID"),
        col("s.Power_Consumption_kW"),
        col("ms.StatusID").alias("Machine_Status_ID"),
        col("s.Machine_Status"),
        col("ac.AlarmID").alias("Alarm_ID"),
        col("s.Alarm_Code")
    )
 
# Step 3: Get best SCADA within ±15 minutes
scada_join = iot_df.alias("i").join(scada_enriched.alias("s"), "Machine_ID") \
    .withColumn("time_diff_scada", abs(unix_timestamp(col("i.Timestamp")) - unix_timestamp(col("s.scada_ts")))) \
    .filter(col("time_diff_scada") <= 900)
 
window_scada = Window.partitionBy("i.Machine_ID", "i.Timestamp").orderBy("time_diff_scada")
best_scada = scada_join.withColumn("rn", row_number().over(window_scada)).filter(col("rn") == 1) \
    .select(
        col("i.Timestamp"),
        col("i.Machine_ID"),
        col("s.Power_Consumption_kW"),
        col("s.Machine_Status_ID"),
        col("s.Machine_Status"),
        col("s.Alarm_ID"),
        col("s.Alarm_Code")
    )
 
# Step 4: Final Join MES + SCADA
final_df = best_mes.alias("m").join(best_scada.alias("s"), ["Machine_ID", "Timestamp"], "left") \
    .select(
        col("Timestamp"),
        col("Machine_ID"),
        col("Temperature_C"),
        col("Vibration_mm_s"),
        col("Pressure_bar"),
        col("Operator_ID"),
        col("Units_Produced"),
        col("Defective_Units"),
        col("Production_Time_min"),
        col("Power_Consumption_kW"),
        col("Machine_Status_ID"),
        col("Machine_Status"),
        col("Alarm_ID"),
        col("Alarm_Code")
    )
 
# Optional: reduce to fewer partitions
final_df = final_df.repartition(1)
 
# Step 5: Write to machine_metrics
try:
    final_df.write.mode("append").format("jdbc") \
        .option("url", jdbc_url) \
        .option("dbtable", "machine_metrics_test") \
        .option("user", "root") \
        .option("password", "") \
        .option("driver", "com.mysql.cj.jdbc.Driver") \
        .save()
    print("✅ Merge complete and data inserted into machine_metrics.")
except Exception as e:
    print("❌ Failed to write to StarRocks:", e)

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, abs, unix_timestamp, row_number, trim, lower
from pyspark.sql.window import Window

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("MergeStreamingData") \
    .master("spark://spark-master:7077") \
    .config("spark.jars", "/home/jovyan/jars/mysql-connector-j-8.0.33.jar") \
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")

# JDBC configuration
jdbc_url = "jdbc:mysql://starrocks-allin1:9030/industrial_warehouse"
jdbc_props = {"user": "root", "password": "", "driver": "com.mysql.cj.jdbc.Driver"}

# Read all tables
iot_df = spark.read.jdbc(jdbc_url, "staging_iot_stream", properties=jdbc_props).withColumn("Timestamp", col("Timestamp").cast("timestamp"))
mes_df = spark.read.jdbc(jdbc_url, "staging_mes_stream", properties=jdbc_props).withColumn("Timestamp", col("Timestamp").cast("timestamp"))
scada_df = spark.read.jdbc(jdbc_url, "staging_scada_stream", properties=jdbc_props).withColumn("Timestamp", col("Timestamp").cast("timestamp"))
machinestatus_df = spark.read.jdbc(jdbc_url, "MachineStatus", properties=jdbc_props)
alarmcodes_df = spark.read.jdbc(jdbc_url, "AlarmCodes", properties=jdbc_props)

# Best MES within ±1 hour
mes_join = iot_df.alias("i").join(mes_df.alias("m"), "Machine_ID") \
    .withColumn("time_diff_mes", abs(unix_timestamp(col("i.Timestamp")) - unix_timestamp(col("m.Timestamp")))) \
    .filter(col("time_diff_mes") <= 3600)

window_mes = Window.partitionBy("i.Machine_ID", "i.Timestamp").orderBy("time_diff_mes")
best_mes = mes_join.withColumn("rn", row_number().over(window_mes)).filter(col("rn") == 1) \
    .select(
        col("i.Timestamp").alias("Timestamp"),
        col("i.Machine_ID"),
        col("i.Temperature_C"),
        col("i.Vibration_mm_s"),
        col("i.Pressure_bar"),
        col("m.Operator_ID"),
        col("m.Units_Produced"),
        col("m.Defective_Units"),
        col("m.Production_Time_min")
    )

# Best SCADA within ±15 minutes
scada_join = iot_df.alias("i").join(scada_df.alias("s"), "Machine_ID") \
    .withColumn("time_diff_scada", abs(unix_timestamp(col("i.Timestamp")) - unix_timestamp(col("s.Timestamp")))) \
    .filter(col("time_diff_scada") <= 900)

window_scada = Window.partitionBy("i.Machine_ID", "i.Timestamp").orderBy("time_diff_scada")
best_scada = scada_join.withColumn("rn", row_number().over(window_scada)).filter(col("rn") == 1) \
    .select(
        col("i.Timestamp").alias("Timestamp"),
        col("i.Machine_ID"),
        col("s.Power_Consumption_kW"),
        col("s.Machine_Status"),
        col("s.Alarm_Code")
    )

# Enrich SCADA with Machine_Status_ID and Alarm_ID
scada_enriched = best_scada.alias("s") \
    .join(machinestatus_df.alias("ms"), trim(lower(col("s.Machine_Status"))) == trim(lower(col("ms.StatusName"))), "left") \
    .join(alarmcodes_df.alias("ac"), trim(lower(col("s.Alarm_Code"))) == trim(lower(col("ac.AlarmDescription"))), "left") \
    .select(
        col("s.Timestamp"),
        col("s.Machine_ID"),
        col("s.Power_Consumption_kW"),
        col("ms.StatusID").alias("Machine_Status_ID"),
        col("s.Machine_Status"),
        col("ac.AlarmID").alias("Alarm_ID"),
        col("s.Alarm_Code")
    )

# Final join
final_df = best_mes.alias("m").join(scada_enriched.alias("s"), ["Machine_ID", "Timestamp"], "left") \
    .select(
        col("Timestamp"),
        col("Machine_ID"),
        col("Temperature_C"),
        col("Vibration_mm_s"),
        col("Pressure_bar"),
        col("Operator_ID"),
        col("Units_Produced"),
        col("Defective_Units"),
        col("Production_Time_min"),
        col("Power_Consumption_kW"),
        col("Machine_Status_ID"),
        col("Machine_Status"),
        col("Alarm_ID"),
        col("Alarm_Code")
    )

# Optional: Reduce Spark write concurrency to avoid StarRocks tablet version overflow
final_df = final_df.repartition(1)

# Write to StarRocks
try:
    final_df.write.mode("append").format("jdbc") \
        .option("url", jdbc_url) \
        .option("dbtable", "machine_metrics") \
        .option("user", "root") \
        .option("password", "") \
        .option("driver", "com.mysql.cj.jdbc.Driver") \
        .save()
    print("✅ Merge complete and data inserted into machine_metrics.")
except Exception as e:
    print("❌ Failed to write to StarRocks:", e)

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, abs, unix_timestamp, row_number, lower, trim
from pyspark.sql.window import Window

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("MergeStreamingData") \
    .master("spark://spark-master:7077") \
    .config("spark.jars", "/home/jovyan/jars/mysql-connector-j-8.0.33.jar") \
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")

# JDBC configurations
jdbc_url = "jdbc:mysql://starrocks-allin1:9030/industrial_warehouse"
jdbc_props = {"user": "root", "password": "", "driver": "com.mysql.cj.jdbc.Driver"}

# Read staging tables
iot_df = spark.read.jdbc(jdbc_url, "staging_iot_stream", properties=jdbc_props).withColumn("Timestamp", col("Timestamp").cast("timestamp"))
mes_df = spark.read.jdbc(jdbc_url, "staging_mes_stream", properties=jdbc_props).withColumn("Timestamp", col("Timestamp").cast("timestamp"))
scada_df = spark.read.jdbc(jdbc_url, "staging_scada_stream", properties=jdbc_props).withColumn("Timestamp", col("Timestamp").cast("timestamp"))
machinestatus_df = spark.read.jdbc(jdbc_url, "MachineStatus", properties=jdbc_props)
alarmcodes_df = spark.read.jdbc(jdbc_url, "AlarmCodes", properties=jdbc_props)

# MES join
mes_join = iot_df.alias("i").join(mes_df.alias("m"), "Machine_ID") \
    .withColumn("diff", abs(unix_timestamp("i.Timestamp") - unix_timestamp("m.Timestamp"))) \
    .filter(col("diff") <= 3600)

mes_window = Window.partitionBy("i.Machine_ID", "i.Timestamp").orderBy("diff")
best_mes = mes_join.withColumn("rn", row_number().over(mes_window)).filter(col("rn") == 1) \
    .select(
        col("i.Timestamp").alias("Timestamp"),
        col("i.Machine_ID"),
        col("i.Temperature_C"),
        col("i.Vibration_mm_s"),
        col("i.Pressure_bar"),
        col("m.Operator_ID"),
        col("m.Units_Produced"),
        col("m.Defective_Units"),
        col("m.Production_Time_min")
    )

# SCADA join
scada_join = iot_df.alias("i").join(scada_df.alias("s"), "Machine_ID") \
    .withColumn("diff", abs(unix_timestamp("i.Timestamp") - unix_timestamp("s.Timestamp"))) \
    .filter(col("diff") <= 900)

scada_window = Window.partitionBy("i.Machine_ID", "i.Timestamp").orderBy("diff")
best_scada = scada_join.withColumn("rn", row_number().over(scada_window)).filter(col("rn") == 1) \
    .select(
        col("i.Timestamp").alias("Timestamp"),
        col("i.Machine_ID"),
        col("s.Power_Consumption_kW"),
        col("s.Machine_Status"),
        col("s.Alarm_Code")
    )

# FIXED: Join using trimmed/lowercased text to get proper ID matches
scada_enriched = best_scada.alias("s") \
    .join(machinestatus_df.alias("ms"), trim(lower(col("s.Machine_Status"))) == trim(lower(col("ms.StatusName"))), "left") \
    .join(alarmcodes_df.alias("ac"), trim(lower(col("s.Alarm_Code"))) == trim(lower(col("ac.AlarmDescription"))), "left") \
    .select(
        "s.Timestamp", "s.Machine_ID", "Power_Consumption_kW",
        col("ms.StatusID").alias("Machine_Status_ID"), "s.Machine_Status",
        col("ac.AlarmID").alias("Alarm_ID"), "s.Alarm_Code"
    )

# Final merge
final_df = best_mes.alias("m").join(scada_enriched.alias("s"),
    (col("m.Machine_ID") == col("s.Machine_ID")) & (col("m.Timestamp") == col("s.Timestamp")),
    how="left"
).select(
    col("m.Timestamp"), col("m.Machine_ID"),
    "Temperature_C", "Vibration_mm_s", "Pressure_bar",
    "Operator_ID", "Units_Produced", "Defective_Units", "Production_Time_min",
    "Power_Consumption_kW", "Machine_Status_ID", "Machine_Status",
    "Alarm_ID", "Alarm_Code"
)

# Show or write to table
# final_df.show()

final_df.write.mode("append").format("jdbc") \
    .option("url", jdbc_url) \
    .option("dbtable", "machine_metrics") \
    .option("user", "root") \
    .option("password", "") \
    .option("driver", "com.mysql.cj.jdbc.Driver") \
    .save()

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, abs, unix_timestamp, row_number
from pyspark.sql.window import Window

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("MergeStreamingData") \
    .master("spark://spark-master:7077") \
    .config("spark.jars", "/home/jovyan/jars/mysql-connector-j-8.0.33.jar") \
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")

# JDBC configurations
jdbc_url = "jdbc:mysql://starrocks-allin1:9030/industrial_warehouse"
jdbc_props = {"user": "root", "password": "", "driver": "com.mysql.cj.jdbc.Driver"}

# Load tables
iot_df = spark.read.jdbc(jdbc_url, "staging_iot_stream", properties=jdbc_props).withColumn("Timestamp", col("Timestamp").cast("timestamp"))
mes_df = spark.read.jdbc(jdbc_url, "staging_mes_stream", properties=jdbc_props).withColumn("Timestamp", col("Timestamp").cast("timestamp"))
scada_df = spark.read.jdbc(jdbc_url, "staging_scada_stream", properties=jdbc_props).withColumn("Timestamp", col("Timestamp").cast("timestamp"))
machinestatus_df = spark.read.jdbc(jdbc_url, "MachineStatus", properties=jdbc_props)
alarmcodes_df = spark.read.jdbc(jdbc_url, "AlarmCodes", properties=jdbc_props)

# MES join
mes_join = iot_df.alias("i").join(mes_df.alias("m"), "Machine_ID") \
    .withColumn("diff", abs(unix_timestamp("i.Timestamp") - unix_timestamp("m.Timestamp"))) \
    .filter(col("diff") <= 3600)

mes_window = Window.partitionBy("i.Machine_ID", "i.Timestamp").orderBy("diff")
best_mes = mes_join.withColumn("rn", row_number().over(mes_window)).filter(col("rn") == 1) \
    .select(
        col("i.Timestamp").alias("Timestamp"),
        col("i.Machine_ID"),
        col("i.Temperature_C"),
        col("i.Vibration_mm_s"),
        col("i.Pressure_bar"),
        col("m.Operator_ID"),
        col("m.Units_Produced"),
        col("m.Defective_Units"),
        col("m.Production_Time_min")
    )

# SCADA join
scada_join = iot_df.alias("i").join(scada_df.alias("s"), "Machine_ID") \
    .withColumn("diff", abs(unix_timestamp("i.Timestamp") - unix_timestamp("s.Timestamp"))) \
    .filter(col("diff") <= 900)

scada_window = Window.partitionBy("i.Machine_ID", "i.Timestamp").orderBy("diff")
best_scada = scada_join.withColumn("rn", row_number().over(scada_window)).filter(col("rn") == 1) \
    .select(
        col("i.Timestamp").alias("Timestamp"),
        col("i.Machine_ID"),
        col("s.Power_Consumption_kW"),
        col("s.Machine_Status"),
        col("s.Alarm_Code")
    )

# Enrich with status and alarm IDs
scada_enriched = best_scada.alias("s") \
    .join(machinestatus_df.alias("ms"), col("s.Machine_Status") == col("ms.StatusName"), "left") \
    .join(alarmcodes_df.alias("ac"), col("s.Alarm_Code") == col("ac.AlarmDescription"), "left") \
    .select(
        "s.Timestamp", "s.Machine_ID", "Power_Consumption_kW",
        col("ms.StatusID").alias("Machine_Status_ID"), "s.Machine_Status",
        col("ac.AlarmID").alias("Alarm_ID"), "s.Alarm_Code"
    )

# Final merge
final_df = best_mes.alias("m").join(scada_enriched.alias("s"),
    (col("m.Machine_ID") == col("s.Machine_ID")) & (col("m.Timestamp") == col("s.Timestamp")),
    how="left"
).select(
    col("m.Timestamp"), col("m.Machine_ID"),
    "Temperature_C", "Vibration_mm_s", "Pressure_bar",
    "Operator_ID", "Units_Produced", "Defective_Units", "Production_Time_min",
    "Power_Consumption_kW", "Machine_Status_ID", "Machine_Status",
    "Alarm_ID", "Alarm_Code"
)

final_df.write.mode("append").format("jdbc") \
    .option("url", jdbc_url) \
    .option("dbtable", "machine_metrics") \
    .option("user", "root") \
    .option("password", "") \
    .option("driver", "com.mysql.cj.jdbc.Driver") \
    .save()


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, abs, unix_timestamp, row_number
from pyspark.sql.window import Window

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("MergeToMachineMetrics") \
    .master("spark://spark-master:7077") \
    .config("spark.jars", "/home/jovyan/jars/mysql-connector-j-8.0.33.jar") \
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")

# JDBC configs
jdbc_url = "jdbc:mysql://starrocks-allin1:9030/industrial_warehouse"
jdbc_props = {"user": "root", "password": "", "driver": "com.mysql.cj.jdbc.Driver"}

# Load staging and dimension tables
iot_df = spark.read.jdbc(jdbc_url, "staging_iot_stream", properties=jdbc_props).withColumn("Timestamp", col("Timestamp").cast("timestamp"))
mes_df = spark.read.jdbc(jdbc_url, "staging_mes_stream", properties=jdbc_props).withColumn("Timestamp", col("Timestamp").cast("timestamp"))
scada_df = spark.read.jdbc(jdbc_url, "staging_scada_stream", properties=jdbc_props).withColumn("Timestamp", col("Timestamp").cast("timestamp"))
status_df = spark.read.jdbc(jdbc_url, "MachineStatus", properties=jdbc_props)
alarm_df = spark.read.jdbc(jdbc_url, "AlarmCodes", properties=jdbc_props)

# Best MES within ±1 hour
mes_join = iot_df.alias("i").join(mes_df.alias("m"), "Machine_ID") \
    .withColumn("time_diff", abs(unix_timestamp(col("i.Timestamp")) - unix_timestamp(col("m.Timestamp")))) \
    .filter(col("time_diff") <= 3600)

mes_window = Window.partitionBy("i.Machine_ID", "i.Timestamp").orderBy("time_diff")
mes_best = mes_join.withColumn("rn", row_number().over(mes_window)).filter(col("rn") == 1) \
    .select(
        col("i.Timestamp").alias("iot_Timestamp"),
        col("i.Machine_ID"),
        col("i.Temperature_C"),
        col("i.Vibration_mm_s"),
        col("i.Pressure_bar"),
        col("m.Operator_ID"),
        col("m.Units_Produced"),
        col("m.Defective_Units"),
        col("m.Production_Time_min")
    )

# Best SCADA within ±15 minutes
scada_join = iot_df.alias("i").join(scada_df.alias("s"), "Machine_ID") \
    .withColumn("time_diff", abs(unix_timestamp(col("i.Timestamp")) - unix_timestamp(col("s.Timestamp")))) \
    .filter(col("time_diff") <= 900)

scada_window = Window.partitionBy("i.Machine_ID", "i.Timestamp").orderBy("time_diff")
scada_best = scada_join.withColumn("rn", row_number().over(scada_window)).filter(col("rn") == 1) \
    .select(
        col("i.Timestamp").alias("iot_Timestamp"),
        col("i.Machine_ID"),
        col("s.Power_Consumption_kW"),
        col("s.Machine_Status").alias("Machine_Status_Name"),
        col("s.Alarm_Code").alias("Alarm_Code_Desc")
    )

# Join all and enrich with IDs
final_df = mes_best.alias("m").join(scada_best.alias("s"),
    (col("m.Machine_ID") == col("s.Machine_ID")) & (col("m.iot_Timestamp") == col("s.iot_Timestamp")),
    how="left"
).join(status_df.alias("ms"), col("s.Machine_Status_Name") == col("ms.StatusName"), "left") \
 .join(alarm_df.alias("ac"), col("s.Alarm_Code_Desc") == col("ac.AlarmDescription"), "left") \
 .select(
    col("m.iot_Timestamp").alias("Timestamp"),
    col("m.Machine_ID"),
    col("m.Temperature_C"),
    col("m.Vibration_mm_s"),
    col("m.Pressure_bar"),
    col("m.Operator_ID"),
    col("m.Units_Produced"),
    col("m.Defective_Units"),
    col("m.Production_Time_min"),
    col("s.Power_Consumption_kW"),
    col("ms.StatusID").alias("Machine_Status_ID"),
    col("ms.StatusName").alias("Machine_Status"),
    col("ac.AlarmID").alias("Alarm_ID"),
    col("ac.AlarmDescription").alias("Alarm_Code")
)

# Optional: Write to StarRocks
try:
    final_df.write.mode("append").format("jdbc") \
        .option("url", jdbc_url) \
        .option("dbtable", "machine_metrics") \
        .option("user", "root") \
        .option("password", "") \
        .option("driver", "com.mysql.cj.jdbc.Driver") \
        .save()
    print("✅ Data inserted into machine_metrics.")
except Exception as e:
    print("❌ Failed to write to StarRocks:", e)

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, abs, unix_timestamp, row_number
from pyspark.sql.window import Window

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("MergeToMachineMetrics") \
    .master("spark://spark-master:7077") \
    .config("spark.jars", "/home/jovyan/jars/mysql-connector-j-8.0.33.jar") \
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")

# JDBC configs
jdbc_url = "jdbc:mysql://starrocks-allin1:9030/industrial_warehouse"
jdbc_props = {"user": "root", "password": "", "driver": "com.mysql.cj.jdbc.Driver"}

# Load staging and dimension tables
iot_df = spark.read.jdbc(jdbc_url, "staging_iot_stream", properties=jdbc_props).withColumn("Timestamp", col("Timestamp").cast("timestamp"))
mes_df = spark.read.jdbc(jdbc_url, "staging_mes_stream", properties=jdbc_props).withColumn("Timestamp", col("Timestamp").cast("timestamp"))
scada_df = spark.read.jdbc(jdbc_url, "staging_scada_stream", properties=jdbc_props).withColumn("Timestamp", col("Timestamp").cast("timestamp"))
status_df = spark.read.jdbc(jdbc_url, "MachineStatus", properties=jdbc_props)
alarm_df = spark.read.jdbc(jdbc_url, "AlarmCodes", properties=jdbc_props)

# Best MES within ±1 hour
mes_join = iot_df.alias("i").join(mes_df.alias("m"), "Machine_ID") \
    .withColumn("time_diff", abs(unix_timestamp(col("i.Timestamp")) - unix_timestamp(col("m.Timestamp")))) \
    .filter(col("time_diff") <= 3600)

mes_window = Window.partitionBy("i.Machine_ID", "i.Timestamp").orderBy("time_diff")
mes_best = mes_join.withColumn("rn", row_number().over(mes_window)).filter(col("rn") == 1) \
    .select(
        col("i.Timestamp").alias("iot_Timestamp"),
        col("i.Machine_ID"),
        col("i.Temperature_C"),
        col("i.Vibration_mm_s"),
        col("i.Pressure_bar"),
        col("m.Operator_ID"),
        col("m.Units_Produced"),
        col("m.Defective_Units"),
        col("m.Production_Time_min")
    )

# Best SCADA within ±15 minutes
scada_join = iot_df.alias("i").join(scada_df.alias("s"), "Machine_ID") \
    .withColumn("time_diff", abs(unix_timestamp(col("i.Timestamp")) - unix_timestamp(col("s.Timestamp")))) \
    .filter(col("time_diff") <= 900)

scada_window = Window.partitionBy("i.Machine_ID", "i.Timestamp").orderBy("time_diff")
scada_best = scada_join.withColumn("rn", row_number().over(scada_window)).filter(col("rn") == 1) \
    .select(
        col("i.Timestamp").alias("iot_Timestamp"),
        col("i.Machine_ID"),
        col("s.Power_Consumption_kW"),
        col("s.Machine_Status").alias("Machine_Status_Name"),
        col("s.Alarm_Code").alias("Alarm_Code_Desc")
    )

# Join all and enrich with IDs
final_df = mes_best.alias("m").join(scada_best.alias("s"),
    (col("m.Machine_ID") == col("s.Machine_ID")) & (col("m.iot_Timestamp") == col("s.iot_Timestamp")),
    how="left"
).join(status_df.alias("ms"), col("s.Machine_Status_Name") == col("ms.StatusName"), "left") \
 .join(alarm_df.alias("ac"), col("s.Alarm_Code_Desc") == col("ac.AlarmDescription"), "left") \
 .select(
    col("m.iot_Timestamp").alias("Timestamp"),
    col("m.Machine_ID"),
    col("m.Temperature_C"),
    col("m.Vibration_mm_s"),
    col("m.Pressure_bar"),
    col("m.Operator_ID"),
    col("m.Units_Produced"),
    col("m.Defective_Units"),
    col("m.Production_Time_min"),
    col("s.Power_Consumption_kW"),
    col("ms.StatusID").alias("Machine_Status_ID"),
    col("ms.StatusName").alias("Machine_Status"),
    col("ac.AlarmID").alias("Alarm_ID"),
    col("ac.AlarmDescription").alias("Alarm_Code")
)

# Optional: Write to StarRocks
try:
    final_df.write.mode("append").format("jdbc") \
        .option("url", jdbc_url) \
        .option("dbtable", "machine_metrics") \
        .option("user", "root") \
        .option("password", "") \
        .option("driver", "com.mysql.cj.jdbc.Driver") \
        .save()
    print("✅ Data inserted into machine_metrics.")
except Exception as e:
    print("❌ Failed to write to StarRocks:", e)

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, abs, unix_timestamp, row_number
from pyspark.sql.window import Window

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("MergeToMachineMetrics") \
    .master("spark://spark-master:7077") \
    .config("spark.jars", "/home/jovyan/jars/mysql-connector-j-8.0.33.jar") \
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")

# JDBC configs
jdbc_url = "jdbc:mysql://starrocks-allin1:9030/industrial_warehouse"
jdbc_props = {"user": "root", "password": "", "driver": "com.mysql.cj.jdbc.Driver"}

# Load staging and dimension tables
iot_df = spark.read.jdbc(jdbc_url, "staging_iot_stream", properties=jdbc_props).withColumn("Timestamp", col("Timestamp").cast("timestamp"))
mes_df = spark.read.jdbc(jdbc_url, "staging_mes_stream", properties=jdbc_props).withColumn("Timestamp", col("Timestamp").cast("timestamp"))
scada_df = spark.read.jdbc(jdbc_url, "staging_scada_stream", properties=jdbc_props).withColumn("Timestamp", col("Timestamp").cast("timestamp"))
status_df = spark.read.jdbc(jdbc_url, "MachineStatus", properties=jdbc_props)
alarm_df = spark.read.jdbc(jdbc_url, "AlarmCodes", properties=jdbc_props)

# Best MES within ±1 hour
mes_join = iot_df.alias("i").join(mes_df.alias("m"), "Machine_ID") \
    .withColumn("time_diff", abs(unix_timestamp(col("i.Timestamp")) - unix_timestamp(col("m.Timestamp")))) \
    .filter(col("time_diff") <= 3600)

mes_window = Window.partitionBy("i.Machine_ID", "i.Timestamp").orderBy("time_diff")
mes_best = mes_join.withColumn("rn", row_number().over(mes_window)).filter(col("rn") == 1) \
    .select(
        col("i.Timestamp").alias("iot_Timestamp"),
        col("i.Machine_ID"),
        col("i.Temperature_C"),
        col("i.Vibration_mm_s"),
        col("i.Pressure_bar"),
        col("m.Operator_ID"),
        col("m.Units_Produced"),
        col("m.Defective_Units"),
        col("m.Production_Time_min")
    )

# Best SCADA within ±15 minutes
scada_join = iot_df.alias("i").join(scada_df.alias("s"), "Machine_ID") \
    .withColumn("time_diff", abs(unix_timestamp(col("i.Timestamp")) - unix_timestamp(col("s.Timestamp")))) \
    .filter(col("time_diff") <= 900)

scada_window = Window.partitionBy("i.Machine_ID", "i.Timestamp").orderBy("time_diff")
scada_best = scada_join.withColumn("rn", row_number().over(scada_window)).filter(col("rn") == 1) \
    .select(
        col("i.Timestamp").alias("iot_Timestamp"),
        col("i.Machine_ID"),
        col("s.Power_Consumption_kW"),
        col("s.Machine_Status").alias("Machine_Status_Name"),
        col("s.Alarm_Code").alias("Alarm_Code_Desc")
    )

# Join all and enrich with IDs
final_df = mes_best.alias("m").join(scada_best.alias("s"),
    (col("m.Machine_ID") == col("s.Machine_ID")) & (col("m.iot_Timestamp") == col("s.iot_Timestamp")),
    how="left"
).join(status_df.alias("ms"), col("s.Machine_Status_Name") == col("ms.StatusName"), "left") \
 .join(alarm_df.alias("ac"),
      col("s.Alarm_Code_Desc").isNull() & (col("ac.AlarmDescription") == "None") |
      (col("s.Alarm_Code_Desc") == col("ac.AlarmDescription")),
      "left").select(
    col("m.iot_Timestamp").alias("Timestamp"),
    col("m.Machine_ID"),
    col("m.Temperature_C"),
    col("m.Vibration_mm_s"),
    col("m.Pressure_bar"),
    col("m.Operator_ID"),
    col("m.Units_Produced"),
    col("m.Defective_Units"),
    col("m.Production_Time_min"),
    col("s.Power_Consumption_kW"),
    col("ms.StatusID").alias("Machine_Status_ID"),
    col("ms.StatusName").alias("Machine_Status"),
    col("ac.AlarmID").alias("Alarm_ID"),
    col("ac.AlarmDescription").alias("Alarm_Code")
)

# Optional: Write to StarRocks
try:
    final_df.write.mode("append").format("jdbc") \
        .option("url", jdbc_url) \
        .option("dbtable", "machine_metrics") \
        .option("user", "root") \
        .option("password", "") \
        .option("driver", "com.mysql.cj.jdbc.Driver") \
        .save()
    print("✅ Data inserted into machine_metrics.")
except Exception as e:
    print("❌ Failed to write to StarRocks:", e)

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, abs, unix_timestamp, row_number
from pyspark.sql.window import Window

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("MergeToMachineMetrics") \
    .master("spark://spark-master:7077") \
    .config("spark.jars", "/home/jovyan/jars/mysql-connector-j-8.0.33.jar") \
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")

# JDBC configs
jdbc_url = "jdbc:mysql://starrocks-allin1:9030/industrial_warehouse"
jdbc_props = {"user": "root", "password": "", "driver": "com.mysql.cj.jdbc.Driver"}

# Load staging and dimension tables
iot_df = spark.read.jdbc(jdbc_url, "staging_iot_stream", properties=jdbc_props).withColumn("Timestamp", col("Timestamp").cast("timestamp"))
mes_df = spark.read.jdbc(jdbc_url, "staging_mes_stream", properties=jdbc_props).withColumn("Timestamp", col("Timestamp").cast("timestamp"))
scada_df = spark.read.jdbc(jdbc_url, "staging_scada_stream", properties=jdbc_props).withColumn("Timestamp", col("Timestamp").cast("timestamp"))
status_df = spark.read.jdbc(jdbc_url, "MachineStatus", properties=jdbc_props)
alarm_df = spark.read.jdbc(jdbc_url, "AlarmCodes", properties=jdbc_props)

# Best MES within ±1 hour
mes_join = iot_df.alias("i").join(mes_df.alias("m"), "Machine_ID") \
    .withColumn("time_diff", abs(unix_timestamp(col("i.Timestamp")) - unix_timestamp(col("m.Timestamp")))) \
    .filter(col("time_diff") <= 3600)

mes_window = Window.partitionBy("i.Machine_ID", "i.Timestamp").orderBy("time_diff")
mes_best = mes_join.withColumn("rn", row_number().over(mes_window)).filter(col("rn") == 1) \
    .select(
        col("i.Timestamp").alias("iot_Timestamp"),
        col("i.Machine_ID"),
        col("i.Temperature_C"),
        col("i.Vibration_mm_s"),
        col("i.Pressure_bar"),
        col("m.Operator_ID"),
        col("m.Units_Produced"),
        col("m.Defective_Units"),
        col("m.Production_Time_min")
    )

# Best SCADA within ±15 minutes
scada_join = iot_df.alias("i").join(scada_df.alias("s"), "Machine_ID") \
    .withColumn("time_diff", abs(unix_timestamp(col("i.Timestamp")) - unix_timestamp(col("s.Timestamp")))) \
    .filter(col("time_diff") <= 900)

scada_window = Window.partitionBy("i.Machine_ID", "i.Timestamp").orderBy("time_diff")
scada_best = scada_join.withColumn("rn", row_number().over(scada_window)).filter(col("rn") == 1) \
    .select(
        col("i.Timestamp").alias("iot_Timestamp"),
        col("i.Machine_ID"),
        col("s.Power_Consumption_kW"),
        col("s.Machine_Status").alias("Machine_Status_Name"),
        col("s.Alarm_Code").alias("Alarm_Code_Desc")
    )
scada_best = scada_best.fillna({"Alarm_Code_Desc": "None"})

# Join all and enrich with IDs
final_df = mes_best.alias("m").join(scada_best.alias("s"),
    (col("m.Machine_ID") == col("s.Machine_ID")) & (col("m.iot_Timestamp") == col("s.iot_Timestamp")),
    how="left"
).join(status_df.alias("ms"), col("s.Machine_Status_Name") == col("ms.StatusName"), "left") \
 .join(alarm_df.alias("ac"), col("s.Alarm_Code_Desc") == col("ac.AlarmDescription"), "left").select(
    col("m.iot_Timestamp").alias("Timestamp"),
    col("m.Machine_ID"),
    col("m.Temperature_C"),
    col("m.Vibration_mm_s"),
    col("m.Pressure_bar"),
    col("m.Operator_ID"),
    col("m.Units_Produced"),
    col("m.Defective_Units"),
    col("m.Production_Time_min"),
    col("s.Power_Consumption_kW"),
    col("ms.StatusID").alias("Machine_Status_ID"),
    col("ms.StatusName").alias("Machine_Status"),
    col("ac.AlarmID").alias("Alarm_ID"),
    col("ac.AlarmDescription").alias("Alarm_Code")
)

# Optional: Write to StarRocks
try:
    final_df.write.mode("append").format("jdbc") \
        .option("url", jdbc_url) \
        .option("dbtable", "machine_metrics") \
        .option("user", "root") \
        .option("password", "") \
        .option("driver", "com.mysql.cj.jdbc.Driver") \
        .save()
    print("✅ Data inserted into machine_metrics.")
except Exception as e:
    print("❌ Failed to write to StarRocks:", e)

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, abs, unix_timestamp, row_number, when, lit
from pyspark.sql.window import Window

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("MergeToMachineMetrics") \
    .master("spark://spark-master:7077") \
    .config("spark.jars", "/home/jovyan/jars/mysql-connector-j-8.0.33.jar") \
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")

# JDBC configs
jdbc_url = "jdbc:mysql://starrocks-allin1:9030/industrial_warehouse"
jdbc_props = {"user": "root", "password": "", "driver": "com.mysql.cj.jdbc.Driver"}

# Load staging and dimension tables
iot_df = spark.read.jdbc(jdbc_url, "staging_iot_stream", properties=jdbc_props).withColumn("Timestamp", col("Timestamp").cast("timestamp"))
mes_df = spark.read.jdbc(jdbc_url, "staging_mes_stream", properties=jdbc_props).withColumn("Timestamp", col("Timestamp").cast("timestamp"))
scada_df = spark.read.jdbc(jdbc_url, "staging_scada_stream", properties=jdbc_props).withColumn("Timestamp", col("Timestamp").cast("timestamp"))
status_df = spark.read.jdbc(jdbc_url, "MachineStatus", properties=jdbc_props)
alarm_df = spark.read.jdbc(jdbc_url, "AlarmCodes", properties=jdbc_props)

# Best MES within ±1 hour
mes_join = iot_df.alias("i").join(mes_df.alias("m"), "Machine_ID") \
    .withColumn("time_diff", abs(unix_timestamp(col("i.Timestamp")) - unix_timestamp(col("m.Timestamp")))) \
    .filter(col("time_diff") <= 3600)

mes_window = Window.partitionBy("i.Machine_ID", "i.Timestamp").orderBy("time_diff")
mes_best = mes_join.withColumn("rn", row_number().over(mes_window)).filter(col("rn") == 1) \
    .select(
        col("i.Timestamp").alias("iot_Timestamp"),
        col("i.Machine_ID"),
        col("i.Temperature_C"),
        col("i.Vibration_mm_s"),
        col("i.Pressure_bar"),
        col("m.Operator_ID"),
        col("m.Units_Produced"),
        col("m.Defective_Units"),
        col("m.Production_Time_min")
    )

# Best SCADA within ±15 minutes
scada_join = iot_df.alias("i").join(scada_df.alias("s"), "Machine_ID") \
    .withColumn("time_diff", abs(unix_timestamp(col("i.Timestamp")) - unix_timestamp(col("s.Timestamp")))) \
    .filter(col("time_diff") <= 900)

scada_window = Window.partitionBy("i.Machine_ID", "i.Timestamp").orderBy("time_diff")
scada_best = scada_join.withColumn("rn", row_number().over(scada_window)).filter(col("rn") == 1) \
    .select(
        col("i.Timestamp").alias("iot_Timestamp"),
        col("i.Machine_ID"),
        col("s.Power_Consumption_kW"),
        col("s.Machine_Status").alias("Machine_Status_Name"),
        col("s.Alarm_Code").alias("Alarm_Code_Desc")
    )
scada_best = scada_best.fillna({"Alarm_Code_Desc": "None"})
scada_best = scada_best.withColumn(
    "Alarm_Code_Desc",
    when(col("Alarm_Code_Desc").isNull() | (col("Alarm_Code_Desc") == ""), lit("None"))
    .otherwise(col("Alarm_Code_Desc"))
)

# Join all and enrich with IDs
final_df = mes_best.alias("m").join(scada_best.alias("s"),
    (col("m.Machine_ID") == col("s.Machine_ID")) & (col("m.iot_Timestamp") == col("s.iot_Timestamp")),
    how="left"
).join(status_df.alias("ms"), col("s.Machine_Status_Name") == col("ms.StatusName"), "left") \
 .join(alarm_df.alias("ac"), col("s.Alarm_Code_Desc") == col("ac.AlarmDescription"), "left").select(
    col("m.iot_Timestamp").alias("Timestamp"),
    col("m.Machine_ID"),
    col("m.Temperature_C"),
    col("m.Vibration_mm_s"),
    col("m.Pressure_bar"),
    col("m.Operator_ID"),
    col("m.Units_Produced"),
    col("m.Defective_Units"),
    col("m.Production_Time_min"),
    col("s.Power_Consumption_kW"),
    col("ms.StatusID").alias("Machine_Status_ID"),
    col("ms.StatusName").alias("Machine_Status"),
    col("ac.AlarmID").alias("Alarm_ID"),
    col("ac.AlarmDescription").alias("Alarm_Code")
)

# Optional: Write to StarRocks
try:
    final_df.write.mode("append").format("jdbc") \
        .option("url", jdbc_url) \
        .option("dbtable", "machine_metrics") \
        .option("user", "root") \
        .option("password", "") \
        .option("driver", "com.mysql.cj.jdbc.Driver") \
        .save()
    print("✅ Data inserted into machine_metrics.")
except Exception as e:
    print("❌ Failed to write to StarRocks:", e)

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, abs, unix_timestamp, row_number
from pyspark.sql.window import Window

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("MergeToMachineMetrics") \
    .master("spark://spark-master:7077") \
    .config("spark.jars", "/home/jovyan/jars/mysql-connector-j-8.0.33.jar") \
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")

# JDBC configs
jdbc_url = "jdbc:mysql://starrocks-allin1:9030/industrial_warehouse"
jdbc_props = {"user": "root", "password": "", "driver": "com.mysql.cj.jdbc.Driver"}

# Load staging and dimension tables
iot_df = spark.read.jdbc(jdbc_url, "staging_iot_stream", properties=jdbc_props).withColumn("Timestamp", col("Timestamp").cast("timestamp"))
mes_df = spark.read.jdbc(jdbc_url, "staging_mes_stream", properties=jdbc_props).withColumn("Timestamp", col("Timestamp").cast("timestamp"))
scada_df = spark.read.jdbc(jdbc_url, "staging_scada_stream", properties=jdbc_props).withColumn("Timestamp", col("Timestamp").cast("timestamp"))
status_df = spark.read.jdbc(jdbc_url, "MachineStatus", properties=jdbc_props)
alarm_df = spark.read.jdbc(jdbc_url, "AlarmCodes", properties=jdbc_props)

# Best MES within ±1 hour
mes_join = iot_df.alias("i").join(mes_df.alias("m"), "Machine_ID") \
    .withColumn("time_diff", abs(unix_timestamp(col("i.Timestamp")) - unix_timestamp(col("m.Timestamp")))) \
    .filter(col("time_diff") <= 3600)

mes_window = Window.partitionBy("i.Machine_ID", "i.Timestamp").orderBy("time_diff")
mes_best = mes_join.withColumn("rn", row_number().over(mes_window)).filter(col("rn") == 1) \
    .select(
        col("i.Timestamp").alias("iot_Timestamp"),
        col("i.Machine_ID"),
        col("i.Temperature_C"),
        col("i.Vibration_mm_s"),
        col("i.Pressure_bar"),
        col("m.Operator_ID"),
        col("m.Units_Produced"),
        col("m.Defective_Units"),
        col("m.Production_Time_min")
    )

# Best SCADA within ±15 minutes
scada_join = iot_df.alias("i").join(scada_df.alias("s"), "Machine_ID") \
    .withColumn("time_diff", abs(unix_timestamp(col("i.Timestamp")) - unix_timestamp(col("s.Timestamp")))) \
    .filter(col("time_diff") <= 900)

scada_window = Window.partitionBy("i.Machine_ID", "i.Timestamp").orderBy("time_diff")
scada_best = scada_join.withColumn("rn", row_number().over(scada_window)).filter(col("rn") == 1) \
    .select(
        col("i.Timestamp").alias("iot_Timestamp"),
        col("i.Machine_ID"),
        col("s.Power_Consumption_kW"),
        col("s.Machine_Status").alias("Machine_Status_Name"),
        col("s.Alarm_Code").alias("Alarm_Code_Desc")
    )

# Join all and enrich with IDs
final_df = mes_best.alias("m").join(scada_best.alias("s"),
    (col("m.Machine_ID") == col("s.Machine_ID")) & (col("m.iot_Timestamp") == col("s.iot_Timestamp")),
    how="left"
).join(status_df.alias("ms"), col("s.Machine_Status_Name") == col("ms.StatusName"), "left") \
 .join(alarm_df.alias("ac"), col("s.Alarm_Code_Desc") == col("ac.AlarmDescription"), "left") \
 .select(
    col("m.iot_Timestamp").alias("Timestamp"),
    col("m.Machine_ID"),
    col("m.Temperature_C"),
    col("m.Vibration_mm_s"),
    col("m.Pressure_bar"),
    col("m.Operator_ID"),
    col("m.Units_Produced"),
    col("m.Defective_Units"),
    col("m.Production_Time_min"),
    col("s.Power_Consumption_kW"),
    col("ms.StatusID").alias("Machine_Status_ID"),
    col("ms.StatusName").alias("Machine_Status"),
    col("ac.AlarmID").alias("Alarm_ID"),
    col("ac.AlarmDescription").alias("Alarm_Code")
)

# Optional: Write to StarRocks
try:
    final_df.write.mode("append").format("jdbc") \
        .option("url", jdbc_url) \
        .option("dbtable", "machine_metrics") \
        .option("user", "root") \
        .option("password", "") \
        .option("driver", "com.mysql.cj.jdbc.Driver") \
        .save()
    print("✅ Data inserted into machine_metrics.")
except Exception as e:
    print("❌ Failed to write to StarRocks:", e)

In [None]:
from pyspark.sql.functions import lit, when
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, abs, unix_timestamp, row_number
from pyspark.sql.window import Window

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("MergeToMachineMetrics") \
    .master("spark://spark-master:7077") \
    .config("spark.jars", "/home/jovyan/jars/mysql-connector-j-8.0.33.jar") \
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")

# JDBC configs
jdbc_url = "jdbc:mysql://starrocks-allin1:9030/industrial_warehouse"
jdbc_props = {"user": "root", "password": "", "driver": "com.mysql.cj.jdbc.Driver"}

# Load staging and dimension tables
iot_df = spark.read.jdbc(jdbc_url, "staging_iot_stream", properties=jdbc_props).withColumn("Timestamp", col("Timestamp").cast("timestamp"))
mes_df = spark.read.jdbc(jdbc_url, "staging_mes_stream", properties=jdbc_props).withColumn("Timestamp", col("Timestamp").cast("timestamp"))
scada_df = spark.read.jdbc(jdbc_url, "staging_scada_stream", properties=jdbc_props).withColumn("Timestamp", col("Timestamp").cast("timestamp"))
status_df = spark.read.jdbc(jdbc_url, "MachineStatus", properties=jdbc_props)
alarm_df = spark.read.jdbc(jdbc_url, "AlarmCodes", properties=jdbc_props)

# Best MES within ±1 hour
mes_join = iot_df.alias("i").join(mes_df.alias("m"), "Machine_ID") \
    .withColumn("time_diff", abs(unix_timestamp(col("i.Timestamp")) - unix_timestamp(col("m.Timestamp")))) \
    .filter(col("time_diff") <= 3600)

mes_window = Window.partitionBy("i.Machine_ID", "i.Timestamp").orderBy("time_diff")
mes_best = mes_join.withColumn("rn", row_number().over(mes_window)).filter(col("rn") == 1) \
    .select(
        col("i.Timestamp").alias("iot_Timestamp"),
        col("i.Machine_ID"),
        col("i.Temperature_C"),
        col("i.Vibration_mm_s"),
        col("i.Pressure_bar"),
        col("m.Operator_ID"),
        col("m.Units_Produced"),
        col("m.Defective_Units"),
        col("m.Production_Time_min")
    )

# Best SCADA within ±15 minutes
scada_join = iot_df.alias("i").join(scada_df.alias("s"), "Machine_ID") \
    .withColumn("time_diff", abs(unix_timestamp(col("i.Timestamp")) - unix_timestamp(col("s.Timestamp")))) \
    .filter(col("time_diff") <= 900)

scada_window = Window.partitionBy("i.Machine_ID", "i.Timestamp").orderBy("time_diff")
scada_best = scada_join.withColumn("rn", row_number().over(scada_window)).filter(col("rn") == 1) \
    .select(
        col("i.Timestamp").alias("iot_Timestamp"),
        col("i.Machine_ID"),
        col("s.Power_Consumption_kW"),
        col("s.Machine_Status").alias("Machine_Status_Name"),
        col("s.Alarm_Code").alias("Alarm_Code_Desc")
    )
scada_best = scada_best.fillna({"Alarm_Code_Desc": "None"})

scada_best = scada_best.withColumn(
    "Alarm_Code_Desc",
    when(col("Alarm_Code_Desc").isNull() | (col("Alarm_Code_Desc") == ""), lit("None"))
    .otherwise(col("Alarm_Code_Desc"))
)

# Join all and enrich with IDs
final_df = mes_best.alias("m").join(scada_best.alias("s"),
    (col("m.Machine_ID") == col("s.Machine_ID")) & (col("m.iot_Timestamp") == col("s.iot_Timestamp")),
    how="left"
).join(status_df.alias("ms"), col("s.Machine_Status_Name") == col("ms.StatusName"), "left") \
 .join(alarm_df.alias("ac"), col("s.Alarm_Code_Desc") == col("ac.AlarmDescription"), "left") \
 .select(
    col("m.iot_Timestamp").alias("Timestamp"),
    col("m.Machine_ID"),
    col("m.Temperature_C"),
    col("m.Vibration_mm_s"),
    col("m.Pressure_bar"),
    col("m.Operator_ID"),
    col("m.Units_Produced"),
    col("m.Defective_Units"),
    col("m.Production_Time_min"),
    col("s.Power_Consumption_kW"),
    col("ms.StatusID").alias("Machine_Status_ID"),
    col("ms.StatusName").alias("Machine_Status"),
    col("ac.AlarmID").alias("Alarm_ID"),
    col("ac.AlarmDescription").alias("Alarm_Code")
)

# Optional: Write to StarRocks
try:
    final_df.write.mode("append").format("jdbc") \
        .option("url", jdbc_url) \
        .option("dbtable", "machine_metrics") \
        .option("user", "root") \
        .option("password", "") \
        .option("driver", "com.mysql.cj.jdbc.Driver") \
        .save()
    print("✅ Data inserted into machine_metrics.")
except Exception as e:
    print("❌ Failed to write to StarRocks:", e)

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, abs, unix_timestamp, row_number
from pyspark.sql.window import Window

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("MergeToMachineMetrics") \
    .master("spark://spark-master:7077") \
    .config("spark.jars", "/home/jovyan/jars/mysql-connector-j-8.0.33.jar") \
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")

# JDBC configs
jdbc_url = "jdbc:mysql://starrocks-allin1:9030/industrial_warehouse"
jdbc_props = {"user": "root", "password": "", "driver": "com.mysql.cj.jdbc.Driver"}

# Load staging and dimension tables
iot_df = spark.read.jdbc(jdbc_url, "staging_iot_stream", properties=jdbc_props).withColumn("Timestamp", col("Timestamp").cast("timestamp"))
mes_df = spark.read.jdbc(jdbc_url, "staging_mes_stream", properties=jdbc_props).withColumn("Timestamp", col("Timestamp").cast("timestamp"))
scada_df = spark.read.jdbc(jdbc_url, "staging_scada_stream", properties=jdbc_props).withColumn("Timestamp", col("Timestamp").cast("timestamp"))
status_df = spark.read.jdbc(jdbc_url, "MachineStatus", properties=jdbc_props)
alarm_df = spark.read.jdbc(jdbc_url, "AlarmCodes", properties=jdbc_props)

# Best MES within ±1 hour
mes_join = iot_df.alias("i").join(mes_df.alias("m"), "Machine_ID") \
    .withColumn("time_diff", abs(unix_timestamp(col("i.Timestamp")) - unix_timestamp(col("m.Timestamp")))) \
    .filter(col("time_diff") <= 3600)

mes_window = Window.partitionBy("i.Machine_ID", "i.Timestamp").orderBy("time_diff")
mes_best = mes_join.withColumn("rn", row_number().over(mes_window)).filter(col("rn") == 1) \
    .select(
        col("i.Timestamp").alias("iot_Timestamp"),
        col("i.Machine_ID"),
        col("i.Temperature_C"),
        col("i.Vibration_mm_s"),
        col("i.Pressure_bar"),
        col("m.Operator_ID"),
        col("m.Units_Produced"),
        col("m.Defective_Units"),
        col("m.Production_Time_min")
    )

# Best SCADA within ±15 minutes
scada_join = iot_df.alias("i").join(scada_df.alias("s"), "Machine_ID") \
    .withColumn("time_diff", abs(unix_timestamp(col("i.Timestamp")) - unix_timestamp(col("s.Timestamp")))) \
    .filter(col("time_diff") <= 900)

scada_window = Window.partitionBy("i.Machine_ID", "i.Timestamp").orderBy("time_diff")
scada_best = scada_join.withColumn("rn", row_number().over(scada_window)).filter(col("rn") == 1) \
    .select(
        col("i.Timestamp").alias("iot_Timestamp"),
        col("i.Machine_ID"),
        col("s.Power_Consumption_kW"),
        col("s.Machine_Status").alias("Machine_Status_Name"),
        col("s.Alarm_Code").alias("Alarm_Code_Desc")
    )

# Join all and enrich with IDs
final_df = mes_best.alias("m").join(scada_best.alias("s"),
    (col("m.Machine_ID") == col("s.Machine_ID")) & (col("m.iot_Timestamp") == col("s.iot_Timestamp")),
    how="left"
).join(status_df.alias("ms"), col("s.Machine_Status_Name") == col("ms.StatusName"), "left") \
 .join(alarm_df.alias("ac"), col("s.Alarm_Code_Desc") == col("ac.AlarmDescription"), "left") \
 .select(
    col("m.iot_Timestamp").alias("Timestamp"),
    col("m.Machine_ID"),
    col("m.Temperature_C"),
    col("m.Vibration_mm_s"),
    col("m.Pressure_bar"),
    col("m.Operator_ID"),
    col("m.Units_Produced"),
    col("m.Defective_Units"),
    col("m.Production_Time_min"),
    col("s.Power_Consumption_kW"),
    col("ms.StatusID").alias("Machine_Status_ID"),
    col("ms.StatusName").alias("Machine_Status"),
    col("ac.AlarmID").alias("Alarm_ID"),
    col("ac.AlarmDescription").alias("Alarm_Code")
)

# Optional: Write to StarRocks
try:
    final_df.write.mode("append").format("jdbc") \
        .option("url", jdbc_url) \
        .option("dbtable", "machine_metrics") \
        .option("user", "root") \
        .option("password", "") \
        .option("driver", "com.mysql.cj.jdbc.Driver") \
        .save()
    print("✅ Data inserted into machine_metrics.")
except Exception as e:
    print("❌ Failed to write to StarRocks:", e)

❌ Failed to write to StarRocks: An error occurred while calling o511.save.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 47.0 failed 4 times, most recent failure: Lost task 1.3 in stage 47.0 (TID 41) (172.18.0.11 executor 0): java.sql.BatchUpdateException: Failed to load data into tablet 400271, because of too many versions, current/limit: 1002/1000. You can reduce the loading job concurrency, or increase loading data batch size. If you are loading data with Routine Load, you can increase FE configs routine_load_task_consume_second and max_routine_load_batch_size,:  be:127.0.0.1
	at com.mysql.cj.jdbc.exceptions.SQLError.createBatchUpdateException(SQLError.java:224)
	at com.mysql.cj.jdbc.ClientPreparedStatement.executeBatchSerially(ClientPreparedStatement.java:816)
	at com.mysql.cj.jdbc.ClientPreparedStatement.executeBatchInternal(ClientPreparedStatement.java:418)
	at com.mysql.cj.jdbc.StatementImpl.executeBatch(StatementImpl.java:795)
	at org.apac