# Imports

In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m4.0 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425345 sha256=fe411525d5c060d814648c79371d2c44bdef10bdd8e60178293c5e449eba92a9
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


In [2]:
import pandas as pd

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, DoubleType
from pyspark.sql.functions import col, expr, round, mean, when
import pyspark.sql.functions as F
from pyspark.sql.window import Window
from pyspark.ml.feature import StringIndexer, Word2Vec, VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import Tokenizer
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Create Spark Session

In [None]:
spark = SparkSession.builder \
    .master("local") \
    .appName("Classification") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

# Read data

In [10]:
# Read dataframe from connection because notebook was developed in Google Collab and not locally
df_pandas = pd.read_csv("df_pyspark.csv")
df_pandas = df_pandas.drop(["Unnamed: 0"], axis=1)
df_pandas.columns

  df_pandas = pd.read_csv("df_pyspark.csv")


Index(['row.names', 'MM.code', 'Mesc.Code', 'Short.description', 'Deleted.',
       'Stock.Non.stock', 'Material.type.description', 'ABC.indicator',
       'Criticality', 'Repairable.Not.Repairable', 'BOM.Linkage',
       'Total.Installed.Qty', 'Criticality.based.on.Location', 'Column1',
       'Installed.QTY.under.Critical.Equipment',
       'Plant.section.of.installed.equipment', 'PM.task.list', 'PSGC.Descrp.1',
       'PSGC.Descrp.2', 'Discipline', 'Category', 'MM.Group',
       'Disciplines.for.Stock.changes', 'OSR', 'PDT', 'GR', 'Sap.lead.time',
       'AVG.historical.LT', 'MAX.historical.LT', 'MIN.historical.LT',
       'Last.PO.number', 'Last.PO.Date', 'PO.number', 'LT.accuracy',
       'Manufacturer.number', 'Vendor', 'OA', 'OA.start.date', 'OA.end.date',
       'OA.validity', 'Consumption..blank..2010.2021',
       'Consumed.under.High.critical.WO.share', 'Emergency',
       'Schedule.Breaker', 'DISP.Qty.', 'DISP.Value.USD.', 'SOLD.Qty.',
       'SOLD.Value.USD.', 'Total.Dispo

In [11]:
# Define the schema for PySpark DataFrame
schema = StructType([
    StructField("row_names", StringType(), True),
    StructField("MM_code", IntegerType(), True),
    StructField("Mesc_Code", StringType(), True),
    StructField("Short_description", StringType(), True),
    StructField("Deleted", StringType(), True),
    StructField("Stock_Non_stock", StringType(), True),
    StructField("Material_type_description", StringType(), True),
    StructField("ABC_indicator", StringType(), True),
    StructField("Criticality", StringType(), True),
    StructField("Repairable_Not_Repairable", StringType(), True),
    StructField("BOM_Linkage", StringType(), True),
    StructField("Total_Installed_Qty", FloatType(), True),
    StructField("Criticality_based_on_Location", StringType(), True),
    StructField("Column1", StringType(), True),
    StructField("Installed_QTY_under_Critical_Equipment", FloatType(), True),
    StructField("Plant_section_of_installed_equipment", StringType(), True),
    StructField("PM_task_list", StringType(), True),
    StructField("PSGC_Descrp_1", StringType(), True),
    StructField("PSGC_Descrp_2", StringType(), True),
    StructField("Discipline", StringType(), True),
    StructField("Category", StringType(), True),
    StructField("MM_Group", StringType(), True),
    StructField("Disciplines_for_Stock_changes", StringType(), True),
    StructField("OSR", StringType(), True),
    StructField("PDT", IntegerType(), True),
    StructField("GR", IntegerType(), True),
    StructField("Sap_lead_time", IntegerType(), True),
    StructField("AVG_historical_LT", FloatType(), True),
    StructField("MAX_historical_LT", FloatType(), True),
    StructField("MIN_historical_LT", FloatType(), True),
    StructField("Last_PO_number", FloatType(), True),
    StructField("Last_PO_Date", StringType(), True),
    StructField("PO_number", FloatType(), True),
    StructField("LT_accuracy", StringType(), True),
    StructField("Manufacturer_number", FloatType(), True),
    StructField("Vendor", StringType(), True),
    StructField("OA", FloatType(), True),
    StructField("OA_start_date", StringType(), True),
    StructField("OA_end_date", StringType(), True),
    StructField("OA_validity", StringType(), True),
    StructField("Consumption_blank_2010_2021", FloatType(), True),
    StructField("Consumed_under_High_critical_WO_share", StringType(), True),
    StructField("Emergency", FloatType(), True),
    StructField("Schedule_Breaker", FloatType(), True),
    StructField("DISP_Qty", StringType(), True),
    StructField("DISP_Value_USD", StringType(), True),
    StructField("SOLD_Qty", StringType(), True),
    StructField("SOLD_Value_USD", StringType(), True),
    StructField("Total_Disposal_Qty", StringType(), True),
    StructField("Final_Disposal_Value_USD", StringType(), True),
    StructField("Dormant_Review_History", StringType(), True),
    StructField("Number_of_reviews_in_tracker", FloatType(), True),
    StructField("Latest_review_Date", FloatType(), True)
])

# Rename columns in pandas DataFrame
df_pandas.columns = [col.replace(".", "_") for col in df_pandas.columns]

# Create PySpark DataFrame
df = spark.createDataFrame(df_pandas, schema=schema)
df.printSchema()


root
 |-- row_names: string (nullable = true)
 |-- MM_code: integer (nullable = true)
 |-- Mesc_Code: string (nullable = true)
 |-- Short_description: string (nullable = true)
 |-- Deleted: string (nullable = true)
 |-- Stock_Non_stock: string (nullable = true)
 |-- Material_type_description: string (nullable = true)
 |-- ABC_indicator: string (nullable = true)
 |-- Criticality: string (nullable = true)
 |-- Repairable_Not_Repairable: string (nullable = true)
 |-- BOM_Linkage: string (nullable = true)
 |-- Total_Installed_Qty: float (nullable = true)
 |-- Criticality_based_on_Location: string (nullable = true)
 |-- Column1: string (nullable = true)
 |-- Installed_QTY_under_Critical_Equipment: float (nullable = true)
 |-- Plant_section_of_installed_equipment: string (nullable = true)
 |-- PM_task_list: string (nullable = true)
 |-- PSGC_Descrp_1: string (nullable = true)
 |-- PSGC_Descrp_2: string (nullable = true)
 |-- Discipline: string (nullable = true)
 |-- Category: string (nullabl

In [12]:
df.show()

+---------+--------+---------+--------------------+-------+---------------+-------------------------+-------------+-------------------+-------------------------+-----------------+-------------------+-----------------------------+-------+--------------------------------------+------------------------------------+---------------+--------------------+--------------------+---------------+---------------+---------------+-----------------------------+---------+---+---+-------------+-----------------+-----------------+-----------------+--------------+------------+---------+--------------------+-------------------+---------+-----------+-------------------+-------------------+-----------+---------------------------+-------------------------------------+---------+----------------+--------+--------------+--------+--------------+------------------+------------------------+----------------------+----------------------------+------------------+
|row_names| MM_code|Mesc_Code|   Short_description|Dele

# Data Transformation

## Clean and filter data

In [13]:
df2 = df.drop(*['Mesc_code', 'PM_task_list', 'LT_accuracy', 'Schedule_Breaker', 'SOLD_Qty', 'SOLD_Value_USD', 'row_names', 'Dormant_Review_History', 'Number_of_reviews_in_tracker', 'Latest_review_Date', 'Column1'])
df2 = df2.filter(F.col('Deleted') == 'No')
df2 = df2.withColumn('DISP_Qty', expr("CAST(REPLACE(`DISP_Qty`, ',', '.') AS FLOAT)")) \
                         .withColumn('Consumed_under_High_critical_WO_share', F.expr("CAST(REPLACE(`Consumed_under_High_critical_WO_share`, ',', '.') AS FLOAT)")) \
                         .withColumn('DISP_Value_USD', F.expr("CAST(REPLACE(`DISP_Value_USD`, ',', '.') AS FLOAT)")) \
                         .withColumn('Total_Disposal_Qty', F.expr("CAST(REPLACE(`Total_Disposal_Qty`, ',', '.') AS FLOAT)")) \
                         .withColumn('Final_Disposal_Value_USD', F.expr("CAST(REPLACE(`Final_Disposal_Value_USD`, ',', '.') AS FLOAT)")) \
                         .withColumn('OA', F.col('OA').cast(StringType())) \
                         .withColumn('MM_code', F.col('MM_code').cast(StringType())) \
                         .withColumn('Last_PO_number', F.col('Last_PO_number').cast(StringType())) \
                         .withColumn('Manufacturer_number', F.col('Manufacturer_number').cast(StringType()))
df2 = df2.filter(F.col('MM_Group') != '')

cols_to_round = [col_name for col_name, data_type in df2.dtypes if data_type in ['int', 'double']]
df2 = df2.select([round(col(col_name), 2).alias(col_name) if col_name in cols_to_round else col_name for col_name in df2.columns])


df2 = (
    df2
    .withColumn('ABC_indicator', F.when(df2['ABC_indicator'] == '', 'D').otherwise(df2['ABC_indicator']))
    .withColumn('Criticality_based_on_Location', F.when(df2['Criticality_based_on_Location'] == '', 'Not Specified').otherwise(df2['Criticality_based_on_Location']))
    )

# Define a window specification
window_spec = Window.partitionBy('Vendor', 'Category', 'Material_type_description')
# Fill missing values with mean within groups
df2 = df2.withColumn('AVG_historical_LT_filled', F.when((F.col('AVG_historical_LT').isNull()) | (F.col("AVG_historical_LT")=='NaN') | (F.col("AVG_historical_LT")==''), F.avg('AVG_historical_LT').over(window_spec)).otherwise(F.col('AVG_historical_LT')))
# Fill remaining missing values with overall mean
df2 = df2.withColumn('AVG_historical_LT_filled', F.coalesce(F.col('AVG_historical_LT_filled'), F.avg('AVG_historical_LT').over(Window.partitionBy())))
df2 = df2.drop("AVG_historical_LT")
df2 = df2.withColumnRenamed("AVG_historical_LT_filled", "AVG_historical_LT")

df2 = (
    df2
    .withColumn('BOM_Linkage', F.when(df2['BOM_Linkage'] == 'Linked to BOM', 1).otherwise(0).cast(IntegerType()))
    .withColumn('Stock_Non_stock', F.when(df2['Stock_Non_stock'] == 'Stock', 1).otherwise(0).cast(IntegerType()))
    )

df2 = df2.withColumn('Vendor', F.expr("dense_rank() OVER (ORDER BY Vendor)").cast(DoubleType()))



## Encode categorical columns

In [14]:
# Extract categorical columns
cat_cols = [col_name for col_name, dtype in df2.dtypes if dtype == 'string']

# Define columns to drop
drop_cat_cols = ['MM_code', 'Short_description', 'Deleted.', 'Column1', 'Plant_section_of_installed_equipment',
                 'PSGC_Descrp_1', 'PSGC_Descrp_2', 'Disciplines_for_Stock_changes', 'OSR', 'Last_PO_number',
                 'Last_PO_Date', 'Manufacturer_.number', 'OA', 'OA_start_date', 'OA_end_date', 'OA_validity',
                 'Discipline', 'MM_Group']

# Remove columns to drop from categorical columns
cat_cols = list(set(cat_cols) - set(drop_cat_cols))

# Print the resulting categorical columns
print(cat_cols)


['Material_type_description', 'Category', 'Criticality', 'Manufacturer_number', 'ABC_indicator', 'Repairable_Not_Repairable', 'Deleted', 'Criticality_based_on_Location']


In [15]:
# Apply StringIndexer for categorical columns
indexers = [StringIndexer(inputCol=col, outputCol=col+"_index", handleInvalid="keep") for col in cat_cols]
indexer_pipeline = Pipeline(stages=indexers)
df_indexed = indexer_pipeline.fit(df2).transform(df2)
df_indexed = df_indexed.drop(*cat_cols)
df_indexed.show(5)

+--------+--------------------+---------------+-----------+-------------------+--------------------------------------+------------------------------------+--------------------+--------------------+----------+--------+-----------------------------+---+---+---+-------------+-----------------+-----------------+--------------+------------+---------+------+---+-------------+-----------+-----------+---------------------------+-------------------------------------+---------+--------+--------------+------------------+------------------------+-----------------+-------------------------------+--------------+-----------------+-------------------------+-------------------+-------------------------------+-------------+-----------------------------------+
| MM_code|   Short_description|Stock_Non_stock|BOM_Linkage|Total_Installed_Qty|Installed_QTY_under_Critical_Equipment|Plant_section_of_installed_equipment|       PSGC_Descrp_1|       PSGC_Descrp_2|Discipline|MM_Group|Disciplines_for_Stock_changes|O

## Create target column

In [16]:
df_indexed = df_indexed.withColumn('Time_Difference', F.col('Sap_lead_time') - F.col('AVG_historical_LT'))
med = df_indexed.approxQuantile('Time_Difference', [0.5], 0.05)[0]
df_indexed = df_indexed.withColumn('target', F.when(F.col('Time_Difference') >= med, 1).otherwise(0))
df_indexed.show(5)

+--------+--------------------+---------------+-----------+-------------------+--------------------------------------+------------------------------------+--------------------+--------------------+----------+--------+-----------------------------+---+---+---+-------------+-----------------+-----------------+--------------+------------+---------+------+---+-------------+-----------+-----------+---------------------------+-------------------------------------+---------+--------+--------------+------------------+------------------------+-----------------+-------------------------------+--------------+-----------------+-------------------------+-------------------+-------------------------------+-------------+-----------------------------------+---------------+------+
| MM_code|   Short_description|Stock_Non_stock|BOM_Linkage|Total_Installed_Qty|Installed_QTY_under_Critical_Equipment|Plant_section_of_installed_equipment|       PSGC_Descrp_1|       PSGC_Descrp_2|Discipline|MM_Group|Discipli

## Tokenize text data

In [17]:
# Drop unnecessary columns
drop_columns = ['Time_Difference', 'Sap_lead_time', 'PDT', 'GR', 'AVG_historical_LT']
df_indexed = df_indexed.drop(*drop_columns)

# Tokenize text column
tokenizer = Tokenizer(inputCol="PSGC_Descrp_2", outputCol="Tokens")
df_tokenized = tokenizer.transform(df_indexed)
df_tokenized.show(5)

+--------+--------------------+---------------+-----------+-------------------+--------------------------------------+------------------------------------+--------------------+--------------------+----------+--------+-----------------------------+---+-----------------+-----------------+--------------+------------+---------+------+---+-------------+-----------+-----------+---------------------------+-------------------------------------+---------+--------+--------------+------------------+------------------------+-------------------------------+--------------+-----------------+-------------------------+-------------------+-------------------------------+-------------+-----------------------------------+------+--------------------+
| MM_code|   Short_description|Stock_Non_stock|BOM_Linkage|Total_Installed_Qty|Installed_QTY_under_Critical_Equipment|Plant_section_of_installed_equipment|       PSGC_Descrp_1|       PSGC_Descrp_2|Discipline|MM_Group|Disciplines_for_Stock_changes|OSR|MAX_histo

## Word2Vec model

In [18]:
# Train Word2Vec model
word2vec = Word2Vec(vectorSize=100, windowSize=5, minCount=1, inputCol="Tokens", outputCol="Word2VecEmbeddings")
word2vec_model = word2vec.fit(df_tokenized)
df_embedded = word2vec_model.transform(df_tokenized)
df_embedded.show(5)

+--------+--------------------+---------------+-----------+-------------------+--------------------------------------+------------------------------------+--------------------+--------------------+----------+--------+-----------------------------+---+-----------------+-----------------+--------------+------------+---------+------+---+-------------+-----------+-----------+---------------------------+-------------------------------------+---------+--------+--------------+------------------+------------------------+-------------------------------+--------------+-----------------+-------------------------+-------------------+-------------------------------+-------------+-----------------------------------+------+--------------------+--------------------+
| MM_code|   Short_description|Stock_Non_stock|BOM_Linkage|Total_Installed_Qty|Installed_QTY_under_Critical_Equipment|Plant_section_of_installed_equipment|       PSGC_Descrp_1|       PSGC_Descrp_2|Discipline|MM_Group|Disciplines_for_Stock_

In [19]:
df_embedded = df_embedded.drop('Short_description', 'Deleted',
       'Plant_section_of_installed_equipment', 'PSGC_Descrp_1',
       'PSGC_Descrp_2', 'Discipline', 'MM_Group',
       'Disciplines_for_Stock_changes', 'OSR', 'Last_PO_number',
       'Last_PO_Date', 'Manufacturer_number', 'OA', 'OA_start_date',
       'OA_end_date', 'OA_validity', 'MAX_historical_LT', 'MIN_historical_LT', 'MM_code', 'Tokens')
df_embedded.show(5)

+---------------+-----------+-------------------+--------------------------------------+---------+------+---------------------------+-------------------------------------+---------+--------+--------------+------------------+------------------------+-------------------------------+--------------+-----------------+-------------------------+-------------------+-------------------------------+-------------+-----------------------------------+------+--------------------+
|Stock_Non_stock|BOM_Linkage|Total_Installed_Qty|Installed_QTY_under_Critical_Equipment|PO_number|Vendor|Consumption_blank_2010_2021|Consumed_under_High_critical_WO_share|Emergency|DISP_Qty|DISP_Value_USD|Total_Disposal_Qty|Final_Disposal_Value_USD|Material_type_description_index|Category_index|Criticality_index|Manufacturer_number_index|ABC_indicator_index|Repairable_Not_Repairable_index|Deleted_index|Criticality_based_on_Location_index|target|  Word2VecEmbeddings|
+---------------+-----------+-------------------+---------

In [20]:
df_embedded = df_embedded.na.drop()

## Vectorize data

In [21]:
# Assemble features into a single vector
feature_cols = df_embedded.columns
feature_cols.remove('target')
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
df_assembled = assembler.transform(df_embedded)
df_assembled.show(5)

+---------------+-----------+-------------------+--------------------------------------+---------+------+---------------------------+-------------------------------------+---------+--------+--------------+------------------+------------------------+-------------------------------+--------------+-----------------+-------------------------+-------------------+-------------------------------+-------------+-----------------------------------+------+--------------------+--------------------+
|Stock_Non_stock|BOM_Linkage|Total_Installed_Qty|Installed_QTY_under_Critical_Equipment|PO_number|Vendor|Consumption_blank_2010_2021|Consumed_under_High_critical_WO_share|Emergency|DISP_Qty|DISP_Value_USD|Total_Disposal_Qty|Final_Disposal_Value_USD|Material_type_description_index|Category_index|Criticality_index|Manufacturer_number_index|ABC_indicator_index|Repairable_Not_Repairable_index|Deleted_index|Criticality_based_on_Location_index|target|  Word2VecEmbeddings|            features|
+---------------

# Split data into train and test sets

In [22]:
# Split data into train and test sets
train_df, test_df = df_assembled.randomSplit([0.8, 0.2], seed=42)

# Train Random Forest model

In [23]:
# Train RandomForestClassifier
rf = RandomForestClassifier(featuresCol="features", labelCol="target", maxBins=3000, seed=42)
rf_model = rf.fit(train_df)

# Make predictions

In [24]:
# Make predictions
predictions = rf_model.transform(test_df)

# Evaluate model

In [25]:
# Evaluate
evaluator = MulticlassClassificationEvaluator(labelCol="target", predictionCol="prediction", metricName="weightedPrecision")
precision = evaluator.evaluate(predictions)

evaluator.setMetricName("weightedRecall")
recall = evaluator.evaluate(predictions)

evaluator.setMetricName("weightedFMeasure")
f1_score = evaluator.evaluate(predictions)

evaluator.setMetricName("accuracy")
accuracy = evaluator.evaluate(predictions)

print("Precision:", precision)
print("Recall:", recall)
print("F1 Score:", f1_score)
print("Accuracy:", accuracy)

evaluator = BinaryClassificationEvaluator(labelCol="target")
roc_auc = evaluator.evaluate(predictions)
print("ROC AUC:", roc_auc)


Precision: 0.6244967542398665
Recall: 0.622602023020579
F1 Score: 0.6221010893390084
Accuracy: 0.622602023020579
ROC AUC: 0.6681836681836685


In [26]:
spark.stop()