In [0]:
parquet_file_path = "/FileStore/tables/silver_emp_df.parquet"

silver_emp_df = spark.read.format("parquet").load(parquet_file_path)
df = silver_emp_df
silver_emp_df.show(1)

+-----------+----+------+----------------+----------+--------------+-----------------+----------------+------------------+--------------------+--------+------------------+-----------------+--------------+--------------------+---------+------------+--------------+-----------+------------------------+------------------------+------------------+--------------------+---------+
|Employee ID| Age|Gender|Years at Company|  Job Role|Monthly Income|Work-Life Balance|Job Satisfaction|Performance Rating|Number of Promotions|Overtime|Distance from Home|  Education Level|Marital Status|Number of Dependents|Job Level|Company Size|Company Tenure|Remote Work|Leadership Opportunities|Innovation Opportunities|Company Reputation|Employee Recognition|Attrition|
+-----------+----+------+----------------+----------+--------------+-----------------+----------------+------------------+--------------------+--------+------------------+-----------------+--------------+--------------------+---------+------------+

In [0]:
# Feature Engineering for modeling training 

# columns which requires  ordinal encoding 
ordinal_encoding_cols = ["Work-Life Balance", "Job Satisfaction", "Performance Rating", "Education Level",'Job Level', "Company Size", "Company Reputation","Employee Recognition"]

# columns which requires nominal encoding
nominal_encoding_cols = ["Gender", "Job Role", "Overtime", "Marital Status","Remote Work", 'Leadership Opportunities',"Innovation Opportunities" ]



In [0]:
# Select only numeric columns
numeric_cols = [col for col, dtype in df.dtypes if dtype in ['int', 'double']]

In [0]:
# spliting data for training the model 

train, test = df.randomSplit([0.7, 0.3])

In [0]:
numeric_cols.remove('Employee ID')
numeric_cols

Out[6]: ['Age',
 'Years at Company',
 'Monthly Income',
 'Number of Promotions',
 'Distance from Home',
 'Number of Dependents',
 'Company Tenure']

In [0]:
from pyspark.ml.feature import VectorAssembler

numerical_vector_assembler = VectorAssembler(inputCols=numeric_cols,
                                             outputCol='numerical_feature_vector')

train = numerical_vector_assembler.transform(train)
test = numerical_vector_assembler.transform(test)

train.show(2)

+-----------+----+------+----------------+----------+--------------+-----------------+----------------+------------------+--------------------+--------+------------------+---------------+--------------+--------------------+---------+------------+--------------+-----------+------------------------+------------------------+------------------+--------------------+---------+------------------------+
|Employee ID| Age|Gender|Years at Company|  Job Role|Monthly Income|Work-Life Balance|Job Satisfaction|Performance Rating|Number of Promotions|Overtime|Distance from Home|Education Level|Marital Status|Number of Dependents|Job Level|Company Size|Company Tenure|Remote Work|Leadership Opportunities|Innovation Opportunities|Company Reputation|Employee Recognition|Attrition|numerical_feature_vector|
+-----------+----+------+----------------+----------+--------------+-----------------+----------------+------------------+--------------------+--------+------------------+---------------+--------------+

In [0]:
from pyspark.ml.feature import StandardScaler

scaler = StandardScaler(inputCol='numerical_feature_vector',
                        outputCol='scaled_numerical_feature_vector',
                        withStd=True, withMean=True)

scaler = scaler.fit(train)

train = scaler.transform(train)
test = scaler.transform(test)

train.show(1)

+-----------+----+------+----------------+----------+--------------+-----------------+----------------+------------------+--------------------+--------+------------------+---------------+--------------+--------------------+---------+------------+--------------+-----------+------------------------+------------------------+------------------+--------------------+---------+------------------------+-------------------------------+
|Employee ID| Age|Gender|Years at Company|  Job Role|Monthly Income|Work-Life Balance|Job Satisfaction|Performance Rating|Number of Promotions|Overtime|Distance from Home|Education Level|Marital Status|Number of Dependents|Job Level|Company Size|Company Tenure|Remote Work|Leadership Opportunities|Innovation Opportunities|Company Reputation|Employee Recognition|Attrition|numerical_feature_vector|scaled_numerical_feature_vector|
+-----------+----+------+----------------+----------+--------------+-----------------+----------------+------------------+----------------

In [0]:
from pyspark.ml.feature import StringIndexer

# columns which requires  ordinal encoding 
ordinal_encoding_cols = ["Work-Life Balance", "Job Satisfaction", "Performance Rating", "Education Level",'Job Level', "Company Size", "Company Reputation","Employee Recognition"]
ordinal_encoding_cols_index = ["Work-Life Balance_index", "Job Satisfaction_index", "Performance Rating_index", "Education Level_index",'Job Level_index', "Company Size_index", "Company Reputation_index","Employee Recognition_index"]



indexer = StringIndexer(inputCols=ordinal_encoding_cols,
                        outputCols=ordinal_encoding_cols_index)

indexer = indexer.fit(train)
train = indexer.transform(train)
test = indexer.transform(test)

train.show(1)

+-----------+----+------+----------------+----------+--------------+-----------------+----------------+------------------+--------------------+--------+------------------+---------------+--------------+--------------------+---------+------------+--------------+-----------+------------------------+------------------------+------------------+--------------------+---------+------------------------+-------------------------------+-----------------------+----------------------+------------------------+---------------------+---------------+------------------+------------------------+--------------------------+
|Employee ID| Age|Gender|Years at Company|  Job Role|Monthly Income|Work-Life Balance|Job Satisfaction|Performance Rating|Number of Promotions|Overtime|Distance from Home|Education Level|Marital Status|Number of Dependents|Job Level|Company Size|Company Tenure|Remote Work|Leadership Opportunities|Innovation Opportunities|Company Reputation|Employee Recognition|Attrition|numerical_featu

In [0]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer
from pyspark.ml import Pipeline

# Columns that require nominal encoding
nominal_encoding_cols = ["Gender", "Job Role", "Overtime", "Marital Status", "Remote Work", "Leadership Opportunities", "Innovation Opportunities"]


# Create StringIndexers for each nominal column
indexers = [StringIndexer(inputCol=col, outputCol=col + "_index") for col in nominal_encoding_cols]


# Create OneHotEncoders for each indexed column
one_hot_encoders = [OneHotEncoder(inputCol=col + "_index", outputCol=col + "_onehot") for col in nominal_encoding_cols]

# Combine indexers and one-hot encoders in a single pipeline
pipeline = Pipeline(stages=indexers + one_hot_encoders)

# Fit the pipeline on the training data
pipeline_model = pipeline.fit(train)

# Transform both the training and test datasets
train = pipeline_model.transform(train)
test = pipeline_model.transform(test)

# Display the first few rows of the transformed training data
train.select("Gender", "Gender_index", "Gender_onehot").show(3)

+------+------------+-------------+
|Gender|Gender_index|Gender_onehot|
+------+------------+-------------+
|Female|         1.0|    (1,[],[])|
|  Male|         0.0|(1,[0],[1.0])|
|Female|         1.0|    (1,[],[])|
+------+------------+-------------+
only showing top 3 rows



In [0]:
# Index the target column
target_indexer = StringIndexer(inputCol="Attrition", outputCol="Attrition_index")
train = target_indexer.fit(train).transform(train)
test = target_indexer.fit(test).transform(test)

# Display the encoded target column
train.select("Attrition", "Attrition_index").show(3)

+---------+---------------+
|Attrition|Attrition_index|
+---------+---------------+
|   Stayed|            0.0|
|   Stayed|            0.0|
|   Stayed|            0.0|
+---------+---------------+
only showing top 3 rows



In [0]:
nominal_encoding_cols = ["Gender", "Job Role", "Overtime", "Marital Status", "Remote Work", "Leadership Opportunities", "Innovation Opportunities"]
ordinal_encoding_cols = ["Work-Life Balance", "Job Satisfaction", "Performance Rating", "Education Level",'Job Level', "Company Size", "Company Reputation","Employee Recognition"]
# Drop the original categorical columns
train = train.drop(*nominal_encoding_cols)
test = test.drop(*nominal_encoding_cols)
train = train.drop(*ordinal_encoding_cols)
test = test.drop(*ordinal_encoding_cols)

In [0]:
feature_columns = ["scaled_numerical_feature_vector"] + [col + "_index" for col in ordinal_encoding_cols]  + [col + "_onehot" for col in nominal_encoding_cols] 


# Initialize VectorAssembler with all feature columns
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")

# Transform the data to add the assembled feature column
train = assembler.transform(train)
test = assembler.transform(test)

# Display the resulting DataFrame with the assembled features
train.select("features").show(3, truncate=False)

+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|features                                                                                                                                                                                                               |
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|[-0.18840823627322334,0.3214253831457459,0.7805121667929313,1.3482364629590387,-0.3807465138326364,-0.4246079015579666,-0.6532820212350585,0.0,2.0,1.0,2.0,1.0,2.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,1.0,0.0,1.0,1.0,1.0,1.0]|
|[1.299403533588313,1.6800714373568721,-0.5131381211985538,0.2256848169165007,1.0474725467227224,0.8692105010248888,1.3233707135

In [0]:
feature_columns = ["scaled_numerical_feature_vector"] + [col + "_index" for col in ordinal_encoding_cols]  + [col + "_onehot" for col in nominal_encoding_cols]
feature_columns

Out[14]: ['scaled_numerical_feature_vector',
 'Work-Life Balance_index',
 'Job Satisfaction_index',
 'Performance Rating_index',
 'Education Level_index',
 'Job Level_index',
 'Company Size_index',
 'Company Reputation_index',
 'Employee Recognition_index',
 'Gender_onehot',
 'Job Role_onehot',
 'Overtime_onehot',
 'Marital Status_onehot',
 'Remote Work_onehot',
 'Leadership Opportunities_onehot',
 'Innovation Opportunities_onehot']

In [0]:
train_silver = train
test_silver = test
test_silver.show(1)


+-----------+----+----------------+--------------+--------------------+------------------+--------------------+--------------+---------+------------------------+-------------------------------+-----------------------+----------------------+------------------------+---------------------+---------------+------------------+------------------------+--------------------------+------------+--------------+--------------+--------------------+-----------------+------------------------------+------------------------------+-------------+---------------+---------------+---------------------+------------------+-------------------------------+-------------------------------+---------------+--------------------+
|Employee ID| Age|Years at Company|Monthly Income|Number of Promotions|Distance from Home|Number of Dependents|Company Tenure|Attrition|numerical_feature_vector|scaled_numerical_feature_vector|Work-Life Balance_index|Job Satisfaction_index|Performance Rating_index|Education Level_index|Job Le

In [0]:
train_file_path = "/FileStore/tables/gold_train.parquet"
test_file_path = "/FileStore/tables/gold_test.parquet"


train_silver.write.format("parquet") \
    .mode("overwrite") \
    .save(train_file_path)

test_silver.write.format("parquet") \
    .mode("overwrite") \
    .save(test_file_path)
