In [0]:
# Step 1: Initialize Spark session and load data

from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder.appName("HR Employee Churn").getOrCreate()

# Define file location and type
file_location = "/FileStore/tables/hr_employee_churn_data.csv"
file_type = "csv"

# Load the data into a Spark DataFrame
df_spark = spark.read.format(file_type) \
  .option("inferSchema", "true") \
  .option("header", "true") \
  .option("sep", ",") \
  .load(file_location)

# Show the first few rows of the DataFrame
df_spark.show()


In [0]:
# Step 2: Initial data exploration with Spark SQL

# Create a temporary view
df_spark.createOrReplaceTempView("hr_employee_churn")

# Use Spark SQL to query the data
query = """
SELECT COUNT(*) AS total_records,
       AVG(satisfaction_level) AS avg_satisfaction_level,
       AVG(last_evaluation) AS avg_last_evaluation,
       AVG(number_project) AS avg_number_project,
       AVG(average_montly_hours) AS avg_monthly_hours,
       AVG(time_spend_company) AS avg_time_spent,
       SUM(CASE WHEN left = 1 THEN 1 ELSE 0 END) AS total_churn
FROM hr_employee_churn
"""

# Run the query and show the results
df_summary = spark.sql(query)
df_summary.show()


In [0]:
# Step 3: Data preprocessing with PySpark

from pyspark.sql.functions import mean, col
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler

# Handle missing values
mean_satisfaction_level = df_spark.select(mean(col('satisfaction_level'))).collect()[0][0]
df_spark = df_spark.na.fill({'satisfaction_level': mean_satisfaction_level})

# Encode categorical variables using StringIndexer and OneHotEncoder
indexer = StringIndexer(inputCol="salary", outputCol="salaryIndex")
df_spark = indexer.fit(df_spark).transform(df_spark)

encoder = OneHotEncoder(inputCols=["salaryIndex"], outputCols=["salaryVec"])
df_spark = encoder.fit(df_spark).transform(df_spark)

# Use VectorAssembler to combine all feature columns into a single vector column
assembler = VectorAssembler(inputCols=['satisfaction_level', 'last_evaluation', 'number_project', 
                                       'average_montly_hours', 'time_spend_company', 'Work_accident', 
                                       'promotion_last_5years', 'salaryVec'], outputCol="features")
df_spark = assembler.transform(df_spark)

# Drop the intermediate columns
df_spark = df_spark.drop("salary", "salaryIndex", "salaryVec")

# Show the processed Spark DataFrame
df_spark.show()


In [0]:
# Step 4: Convert Spark DataFrame to Pandas for further processing
import pandas as pd
import numpy as np 

# Convert Spark DataFrame to Pandas DataFrame
df_pandas = df_spark.select("empid", "features", "left").toPandas()

# Function to extract features from the vector column
def extract_features(features_vector):
    return features_vector.toArray()

# Apply the function to extract features
features_array = np.array(df_pandas['features'].apply(lambda x: x.toArray()).tolist())

# Define feature names
feature_names = ['satisfaction_level', 'last_evaluation', 'number_project', 
                 'average_montly_hours', 'time_spend_company', 'Work_accident', 
                 'promotion_last_5years', 'salaryVec_0', 'salaryVec_1']

# Create a DataFrame from the features array
features_df = pd.DataFrame(features_array, columns=feature_names)

# Combine the features DataFrame with the target and ID columns
df_pandas = pd.concat([df_pandas[['empid', 'left']], features_df], axis=1)

# Show the first few rows of the Pandas DataFrame
print(df_pandas.head())


In [0]:
df_pandas.sample(5)

In [0]:
import seaborn as sns
import matplotlib.pyplot as plt

# Plot the distribution of the target variable 'left'
sns.countplot(x='left', data=df_pandas)
plt.title('Distribution of Employee Churn')
plt.show()

# Plot the correlation matrix
corr_matrix = df_pandas.corr()
plt.figure(figsize=(12, 8))
sns.heatmap(corr_matrix, annot=True, cmap='coolwarm')
plt.title('Correlation Matrix')
plt.show()

# Plot distributions of numerical features
numerical_features = ['satisfaction_level', 'last_evaluation', 'number_project', 'average_montly_hours', 'time_spend_company']
for feature in numerical_features:
    plt.figure(figsize=(8, 4))
    sns.histplot(df_pandas[feature], kde=True)
    plt.title(f'Distribution of {feature}')
    plt.show()


In [0]:
# Step 6: Machine Learning Model Building with Scikit-Learn

from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.metrics import classification_report, roc_auc_score, confusion_matrix

# Define features and target variable
X = df_pandas.drop(columns=['empid', 'left'])
y = df_pandas['left']

In [0]:
# Split the data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

In [0]:
# Scale the features
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)

In [0]:
# Define models
models = {
    'Logistic Regression': LogisticRegression(max_iter=1000),
    'Decision Tree': DecisionTreeClassifier(),
    'Random Forest': RandomForestClassifier(),
    'Gradient Boosting': GradientBoostingClassifier()
}

In [0]:
# Train and evaluate models
results = []
for model_name, model in models.items():
    model.fit(X_train_scaled, y_train)
    y_pred = model.predict(X_test_scaled)
    accuracy = model.score(X_test_scaled, y_test)
    roc_auc = roc_auc_score(y_test, y_pred)
    
    results.append((model_name, accuracy, roc_auc))
    print(f'{model_name} - Accuracy: {accuracy:.4f}, ROC-AUC: {roc_auc:.4f}')
    print(classification_report(y_test, y_pred))
    
    # Plot confusion matrix
    cm = confusion_matrix(y_test, y_pred)
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues')
    plt.title(f'Confusion Matrix for {model_name}')
    plt.xlabel('Predicted')
    plt.ylabel('Actual')
    plt.show()

# Display results
results_df = pd.DataFrame(results, columns=['Model', 'Accuracy', 'ROC-AUC'])
print(results_df)

In [0]:
from sklearn.model_selection import GridSearchCV
import mlflow

# Define parameter grid for RandomForestClassifier
param_grid = {
    'n_estimators': [100, 200, 300],
    'max_depth': [None, 10, 20, 30],
    'min_samples_split': [2, 5, 10],
    'min_samples_leaf': [1, 2, 4]
}

# Set up GridSearchCV
grid_search = GridSearchCV(estimator=models['Random Forest'], param_grid=param_grid, 
                           cv=5, scoring='roc_auc', n_jobs=-1, verbose=2)

# Perform grid search with MLflow logging
with mlflow.start_run(run_name="Random Forest - Grid Search"):
    grid_search.fit(X_train_scaled, y_train)
    
    best_rf_model = grid_search.best_estimator_
    y_pred = best_rf_model.predict(X_test_scaled)
    accuracy = best_rf_model.score(X_test_scaled, y_test)
    roc_auc = roc_auc_score(y_test, y_pred)
    
    # Log parameters, metrics, and best model
    mlflow.log_param("model_name", "Random Forest - Grid Search")
    mlflow.log_params(grid_search.best_params_)
    mlflow.log_metric("accuracy", accuracy)
    mlflow.log_metric("roc_auc", roc_auc)
    mlflow.sklearn.log_model(best_rf_model, "Random Forest - Grid Search")
    
    print(f'Random Forest - Grid Search - Accuracy: {accuracy:.4f}, ROC-AUC: {roc_auc:.4f}')
    print(classification_report(y_test, y_pred))
    
    # Plot confusion matrix
    cm = confusion_matrix(y_test, y_pred)
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues')
    plt.title('Confusion Matrix for Random Forest - Grid Search')
    plt.xlabel('Predicted')
    plt.ylabel('Actual')
    plt.show()
