In [None]:
import mlflow
import mlflow.spark
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, GBTClassifier
from pyspark.ml.feature import VectorAssembler, StandardScaler, PCA, StringIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
import matplotlib.pyplot as plt
import numpy as np
import os
import time
import uuid
from hdfs import InsecureClient


# Spark Session
spark = SparkSession.builder \
    .appName("SparkML_MLflow_Test") \
    .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
    .getOrCreate()

print("Spark version:", spark.version)

# MLflow setup
mlflow.set_tracking_uri("http://localhost:5001")
mlflow.set_experiment("02 - Spark MLlib")

# HDFS client
client = InsecureClient('http://10.0.2.15:9870', user='gadet')
print(client.list('/user/gadet/mlflow/artifacts'))


def log_artifact_and_backup(local_path, hdfs_dir, client):
    mlflow.log_artifact(local_path)
    file_name = os.path.basename(local_path)
    hdfs_path = os.path.join(hdfs_dir, file_name)
    client.upload(hdfs_path, local_path, overwrite=True)
    print(f"Uploaded backup to HDFS: {hdfs_path}")
    
def log_confusion_matrix(pred_df, run_name):
    y_true = np.array(pred_df.select("label").collect()).flatten()
    y_pred = np.array(pred_df.select("prediction").collect()).flatten()
    from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
    
    cm = confusion_matrix(y_true, y_pred)
    disp = ConfusionMatrixDisplay(cm)
    disp.plot(cmap="Blues")
    plt.title(f"Confusion Matrix: {run_name}")

    artifact_path = f"{run_name}_confusion_matrix.png"
    plt.savefig(artifact_path, bbox_inches="tight")
    plt.close()
    log_artifact_and_backup(artifact_path, f"/user/gadet/mlflow/artifacts/{run_name}", client)
    


SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/phoenix-hbase-2.5-5.1.3-bin/phoenix-client-hbase-2.5-5.1.3.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/hadoop-3.3.2/share/hadoop/common/lib/slf4j-log4j12-1.7.30.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/apache-tez-0.9.1-bin/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
2025-10-21 11:19:21,527 WARN util.Utils: Your hostname, node resolves to a loopback address: 127.0.0.1; using 10.0.2.15 instead (on interface enp0s3)
2025-10-21 11:19:21,534 WARN util.Utils: Set SPARK_LOCAL_IP if you need to bind to another address
2025-10-21 11:19:23,021 INFO spark.SparkContext: Running Spark version 3.4.1
2025-10-21 11:19:23,111 WARN util.NativeCodeLoader: Un

2025-10-21 11:19:48,387 INFO yarn.Client: Application report for application_1761038015217_0001 (state: ACCEPTED)
2025-10-21 11:19:49,395 INFO yarn.Client: Application report for application_1761038015217_0001 (state: ACCEPTED)
2025-10-21 11:19:50,401 INFO yarn.Client: Application report for application_1761038015217_0001 (state: ACCEPTED)
2025-10-21 11:19:51,408 INFO yarn.Client: Application report for application_1761038015217_0001 (state: ACCEPTED)
2025-10-21 11:19:52,413 INFO yarn.Client: Application report for application_1761038015217_0001 (state: ACCEPTED)
2025-10-21 11:19:53,418 INFO yarn.Client: Application report for application_1761038015217_0001 (state: ACCEPTED)
2025-10-21 11:19:54,427 INFO yarn.Client: Application report for application_1761038015217_0001 (state: ACCEPTED)
2025-10-21 11:19:55,431 INFO yarn.Client: Application report for application_1761038015217_0001 (state: ACCEPTED)
2025-10-21 11:19:56,435 INFO yarn.Client: Application report for application_17610380152

2025-10-21 11:20:00,544 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@612240d0{/executors/threadDump/json,null,AVAILABLE,@Spark}
2025-10-21 11:20:00,549 INFO ui.ServerInfo: Adding filter to /static: org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
2025-10-21 11:20:00,665 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@2c6f63b7{/static,null,AVAILABLE,@Spark}
2025-10-21 11:20:00,666 INFO ui.ServerInfo: Adding filter to /: org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
2025-10-21 11:20:00,687 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@71b770a5{/,null,AVAILABLE,@Spark}
2025-10-21 11:20:00,688 INFO ui.ServerInfo: Adding filter to /api: org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
2025-10-21 11:20:00,726 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@4f45a683{/api,null,AVAILABLE,@Spark}
2025-10-21 11:20:00,727 INFO ui.ServerInfo: Adding filter to /jobs/job/kill: org.apache.

Spark version: 3.4.1


2025-10-21 11:20:02,192 INFO internal.SharedState: spark.sql.warehouse.dir is not set, but hive.metastore.warehouse.dir is set. Setting spark.sql.warehouse.dir to the value of hive.metastore.warehouse.dir.
2025-10-21 11:20:02,196 INFO internal.SharedState: Warehouse path is 'hdfs://10.0.2.15:9000/user/hive/warehouse'.
2025-10-21 11:20:02,242 INFO ui.ServerInfo: Adding filter to /SQL: org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
2025-10-21 11:20:02,245 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@5c8632f5{/SQL,null,AVAILABLE,@Spark}
2025-10-21 11:20:02,245 INFO ui.ServerInfo: Adding filter to /SQL/json: org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
2025-10-21 11:20:02,246 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@3464aa34{/SQL/json,null,AVAILABLE,@Spark}
2025-10-21 11:20:02,246 INFO ui.ServerInfo: Adding filter to /SQL/execution: org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
2025-10-21 11:20:02,247 IN

In [None]:
from sklearn.datasets import load_digits
import pandas as pd

digits = load_digits()
X, y = digits.data, digits.target
df = pd.DataFrame(X, columns=[f"feature_{i}" for i in range(X.shape[1])])
df["label"] = y

spark_df = spark.createDataFrame(df)

train_df, test_df = spark_df.randomSplit([0.8, 0.2], seed=42)
print("Train size:", train_df.count(), "Test size:", test_df.count())


In [None]:
with mlflow.start_run(run_name="Spark_LogisticRegression"):

    # Assemble features
    assembler = VectorAssembler(inputCols=[f"feature_{i}" for i in range(64)], outputCol="features")

    lr = LogisticRegression(maxIter=50, regParam=0.01, labelCol="label", featuresCol="features")

    pipeline = Pipeline(stages=[assembler, lr])

    start = time.time()
    model = pipeline.fit(train_df)
    end = time.time()

    preds = model.transform(test_df)
    
    evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
    acc = evaluator.evaluate(preds)
    f1_eval = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")
    f1 = f1_eval.evaluate(preds)
    
    mlflow.log_param("model", "LogisticRegression")
    mlflow.log_param("maxIter", 50)
    mlflow.log_param("regParam", 0.01)
    mlflow.log_metric("accuracy", acc)
    mlflow.log_metric("f1", f1)
    mlflow.log_metric("train_time_sec", end - start)
    
     # Log model
    mlflow.spark.log_model(model, "model")

    log_confusion_matrix(preds, "Spark_LogReg")

print("Spark_LogisticRegression -> acc:", acc, "f1:", f1)

In [None]:
with mlflow.start_run(run_name="Spark_RandomForest"):

    assembler = VectorAssembler(inputCols=[f"feature_{i}" for i in range(64)], outputCol="features")
    rf = RandomForestClassifier(labelCol="label", featuresCol="features", numTrees=100, maxDepth=10)

    pipeline = Pipeline(stages=[assembler, rf])

    start = time.time()
    model = pipeline.fit(train_df)
    end = time.time()
    preds = model.transform(test_df)

    acc = evaluator.evaluate(preds)
    f1 = f1_eval.evaluate(preds)

    mlflow.log_param("model", "RandomForest")
    mlflow.log_param("numTrees", 100)
    mlflow.log_param("maxDepth", 10)
    mlflow.log_metric("accuracy", acc)
    mlflow.log_metric("f1", f1)
    mlflow.log_metric("train_time_sec", end - start)

    mlflow.spark.log_model(model, "model")
    
    log_confusion_matrix(preds, "Spark_RandomForest")

print("Spark_RandomForest -> acc:", acc, "f1:", f1)
    

In [None]:
with mlflow.start_run(run_name="Spark_GradientBoosting"):

    assembler = VectorAssembler(inputCols=[f"feature_{i}" for i in range(64)], outputCol="features")
    gbt = GBTClassifier(labelCol="label", featuresCol="features", maxIter=100, maxDepth=5)

    pipeline = Pipeline(stages=[assembler, gbt])

    start = time.time()
    model = pipeline.fit(train_df)
    end = time.time()

    preds = model.transform(test_df)

    acc = evaluator.evaluate(preds)
    f1 = f1_eval.evaluate(preds)
    
    mlflow.log_param("model", "GradientBoosting")
    mlflow.log_param("maxIter", 100)
    mlflow.log_param("maxDepth", 5)
    mlflow.log_metric("accuracy", acc)
    mlflow.log_metric("f1", f1)
    mlflow.log_metric("train_time_sec", end - start)

    mlflow.spark.log_model(model, "model")

    log_confusion_matrix(preds, "Spark_GradientBoosting")

print("Spark_GradientBoosting -> acc:", acc, "f1:", f1)