In [0]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml import Pipeline
from sklearn.metrics import accuracy_score
import mlflow
import mlflow.spark

# Load gold dataset
df_gold = spark.read.format("delta").load("/mnt/attrition/gold/hr_features/")

# Encode categorical features
indexers = [StringIndexer(inputCol=column, outputCol=column + "_index") for column in [
    "BusinessTravel", "Department", "EducationField", "Gender", "JobRole", "MaritalStatus", "OverTime"
]]

# Assemble features
assembler = VectorAssembler(
    inputCols=[
        "Age", "DistanceFromHome", "Education", "EnvironmentSatisfaction", "MonthlyIncome",
        "NumCompaniesWorked", "PercentSalaryHike", "TotalWorkingYears", "TrainingTimesLastYear",
        "WorkLifeBalance", "YearsAtCompany", "YearsInCurrentRole",
        "BusinessTravel_index", "Department_index", "EducationField_index",
        "Gender_index", "JobRole_index", "MaritalStatus_index", "OverTime_index"
    ],
    outputCol="features"
)

# Define model
lr = LogisticRegression(labelCol="Attrition", featuresCol="features")

# Create pipeline
pipeline = Pipeline(stages=indexers + [assembler, lr])

# Train model
with mlflow.start_run():
    model = pipeline.fit(df_gold)
    mlflow.spark.log_model(model, "attrition_model")
    mlflow.log_param("model_type", "LogisticRegression")
    print("Model logged to MLflow.")

2025/07/16 12:42:53 INFO mlflow.spark: Inferring pip requirements by reloading the logged model from the databricks artifact repository, which can be time-consuming. To speed up, explicitly specify the conda_env or pip_requirements when calling log_model().


Model logged to MLflow.


In [0]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
import mlflow

# Step 1: Convert 'Attrition' (Yes/No) to numeric label (0/1)
label_indexer = StringIndexer(inputCol="Attrition", outputCol="label")
df_gold_indexed = label_indexer.fit(df_gold).transform(df_gold)

# Step 2: Make predictions again using transformed data
predictions = model.transform(df_gold_indexed)

# Step 3: Define evaluators
accuracy_evaluator = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="accuracy"
)
f1_evaluator = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="f1"
)
precision_evaluator = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="weightedPrecision"
)
recall_evaluator = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="weightedRecall"
)
roc_evaluator = BinaryClassificationEvaluator(
    labelCol="label", rawPredictionCol="rawPrediction", metricName="areaUnderROC"
)

# Step 4: Evaluate and log metrics
accuracy = accuracy_evaluator.evaluate(predictions)
f1_score = f1_evaluator.evaluate(predictions)
precision = precision_evaluator.evaluate(predictions)
recall = recall_evaluator.evaluate(predictions)
roc_auc = roc_evaluator.evaluate(predictions)

# Step 5: Log to MLflow
mlflow.log_metric("accuracy", accuracy)
mlflow.log_metric("f1_score", f1_score)
mlflow.log_metric("precision", precision)
mlflow.log_metric("recall", recall)
mlflow.log_metric("roc_auc", roc_auc)

# Step 6: Print for report
print("Model Evaluation Metrics:")
print(f"Accuracy: {accuracy:.4f}")
print(f"F1 Score: {f1_score:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"ROC AUC: {roc_auc:.4f}")

Model Evaluation Metrics:
Accuracy: 0.8544
F1 Score: 0.8184
Precision: 0.8323
Recall: 0.8544
ROC AUC: 0.7882
