In [24]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Manufacturing-FE").getOrCreate()

df = spark.read.csv(
    "D:\Study\Centillion\Pyspark\Datasets\hybrid_manufacturing_categorical.csv",
    header=True,
    inferSchema=True
)

df.show(10)


+------+----------+--------------+-------------+---------------+------------------+--------------------+-------------------+-------------------+-------------------+-------------------+----------+---------------------+
|Job_ID|Machine_ID|Operation_Type|Material_Used|Processing_Time|Energy_Consumption|Machine_Availability|    Scheduled_Start|      Scheduled_End|       Actual_Start|         Actual_End|Job_Status|Optimization_Category|
+------+----------+--------------+-------------+---------------+------------------+--------------------+-------------------+-------------------+-------------------+-------------------+----------+---------------------+
|  J001|       M01|      Grinding|         3.17|             76|             11.42|                  96|2023-03-18 08:00:00|2023-03-18 09:16:00|2023-03-18 08:05:00|2023-03-18 09:21:00| Completed|  Moderate Efficiency|
|  J002|       M01|      Grinding|         3.35|             79|              6.61|                  84|2023-03-18 08:10:00|2023

In [25]:
from pyspark.sql.functions import col

df = df.withColumn(
    "Planned_Duration_Min",
    (col("Scheduled_End").cast("long") - col("Scheduled_Start").cast("long")) / 60
)

df = df.withColumn(
    "Actual_Duration_Min",
    (col("Actual_End").cast("long") - col("Actual_Start").cast("long")) / 60
)

df = df.withColumn(
    "Delay_Min",
    col("Actual_Duration_Min") - col("Planned_Duration_Min")
)


df.show(10)

+------+----------+--------------+-------------+---------------+------------------+--------------------+-------------------+-------------------+-------------------+-------------------+----------+---------------------+--------------------+-------------------+---------+
|Job_ID|Machine_ID|Operation_Type|Material_Used|Processing_Time|Energy_Consumption|Machine_Availability|    Scheduled_Start|      Scheduled_End|       Actual_Start|         Actual_End|Job_Status|Optimization_Category|Planned_Duration_Min|Actual_Duration_Min|Delay_Min|
+------+----------+--------------+-------------+---------------+------------------+--------------------+-------------------+-------------------+-------------------+-------------------+----------+---------------------+--------------------+-------------------+---------+
|  J001|       M01|      Grinding|         3.17|             76|             11.42|                  96|2023-03-18 08:00:00|2023-03-18 09:16:00|2023-03-18 08:05:00|2023-03-18 09:21:00| Complete

In [26]:
from pyspark.sql.functions import when

df = df.withColumn(
    "Is_Delayed",
    when(col("Delay_Min") > 0, 1).otherwise(0)
)

df = df.withColumn(
    "Energy_per_Min",
    col("Energy_Consumption") / col("Processing_Time")
)

df = df.withColumn(
    "High_Machine_Load",
    when(col("Machine_Availability") < 85, 1).otherwise(0)
)

df = df.fillna({
    "Actual_Duration_Min": 0,
    "Delay_Min": 0,
    "Energy_per_Min": 0
})


df.show(10)

+------+----------+--------------+-------------+---------------+------------------+--------------------+-------------------+-------------------+-------------------+-------------------+----------+---------------------+--------------------+-------------------+---------+----------+-------------------+-----------------+
|Job_ID|Machine_ID|Operation_Type|Material_Used|Processing_Time|Energy_Consumption|Machine_Availability|    Scheduled_Start|      Scheduled_End|       Actual_Start|         Actual_End|Job_Status|Optimization_Category|Planned_Duration_Min|Actual_Duration_Min|Delay_Min|Is_Delayed|     Energy_per_Min|High_Machine_Load|
+------+----------+--------------+-------------+---------------+------------------+--------------------+-------------------+-------------------+-------------------+-------------------+----------+---------------------+--------------------+-------------------+---------+----------+-------------------+-----------------+
|  J001|       M01|      Grinding|         3.1

In [27]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StringIndexer

categorical_cols = [
    "Machine_ID",
    "Operation_Type",
    "Job_Status",
    "Optimization_Category"
]

indexers = [
    StringIndexer(inputCol=c, outputCol=f"{c}_Index", handleInvalid="keep")
    for c in categorical_cols
]


feature_cols = [
    "Processing_Time",
    "Energy_Consumption",
    "Machine_Availability",
    "Planned_Duration_Min",
    "Actual_Duration_Min",
    "Delay_Min",
    "Energy_per_Min",
    "High_Machine_Load",
    "Machine_ID_Index",
    "Operation_Type_Index"
]

assembler = VectorAssembler(
    inputCols=feature_cols,
    outputCol="features"
)



pipeline = Pipeline(stages=indexers + [assembler])

fe_model = pipeline.fit(df)
final_df = fe_model.transform(df)


final_df.select(
    "features",
    "Job_Status_Index"
).show(truncate=False)



+------------------------------------------------------------------+----------------+
|features                                                          |Job_Status_Index|
+------------------------------------------------------------------+----------------+
|[76.0,11.42,96.0,76.0,76.0,0.0,0.15026315789473685,0.0,0.0,1.0]   |0.0             |
|[79.0,6.61,84.0,79.0,79.0,0.0,0.08367088607594937,1.0,0.0,1.0]    |1.0             |
|[56.0,11.11,92.0,56.0,0.0,0.0,0.19839285714285712,0.0,2.0,3.0]    |2.0             |
|[106.0,12.5,95.0,106.0,106.0,0.0,0.1179245283018868,0.0,2.0,1.0]  |0.0             |
|[46.0,8.13,88.0,46.0,46.0,0.0,0.17673913043478262,0.0,0.0,0.0]    |0.0             |
|[100.0,13.83,86.0,100.0,100.0,0.0,0.1383,0.0,1.0,3.0]             |0.0             |
|[22.0,14.2,87.0,22.0,22.0,0.0,0.6454545454545454,0.0,2.0,2.0]     |0.0             |
|[79.0,13.86,91.0,79.0,79.0,0.0,0.17544303797468352,0.0,3.0,1.0]   |0.0             |
|[42.0,8.97,81.0,42.0,42.0,0.0,0.21357142857142858,1.0