# <span style="color:#1f77b4">**Machine Learning 01 - Training Model**</span>


### <span style="color:#1f77b4">**Unity Catalog configuration**</span>

Set up widgets for `CATALOG`, `SCHEMA`, and `VOLUME`, resolve the active catalog, and build a reusable `BASE` path for storage.


In [None]:
# Configure Unity Catalog widgets and resolve the active catalog.

# Unity Catalog config for this project
dbutils.widgets.removeAll()
dbutils.widgets.text("CATALOG", "")
dbutils.widgets.text("SCHEMA", "default")
dbutils.widgets.text("VOLUME", "ml_lab")

catalog_widget = dbutils.widgets.get("CATALOG")
if catalog_widget:
    CATALOG = catalog_widget
else:
    # Prefer current catalog, otherwise pick the first non-system catalog
    current = spark.sql("SELECT current_catalog()").first()[0]
    catalogs = [r.catalog for r in spark.sql("SHOW CATALOGS").collect()]
    CATALOG = current if current not in ("system",) else next(c for c in catalogs if c not in ("system",))

SCHEMA = dbutils.widgets.get("SCHEMA")
VOLUME = dbutils.widgets.get("VOLUME")
BASE = f"dbfs:/Volumes/{CATALOG}/{SCHEMA}/{VOLUME}"


### <span style="color:#1f77b4">**Create schema and volume**</span>

Ensure the schema and volume exist in Unity Catalog so read/write operations succeed.


In [None]:
# Create the schema and volume if they do not exist.

# Ensure schema and volume exist
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {CATALOG}.{SCHEMA}")
spark.sql(f"CREATE VOLUME IF NOT EXISTS {CATALOG}.{SCHEMA}.{VOLUME}")


### <span style="color:#1f77b4">**Load data into the UC volume**</span>

Check for the diabetes CSV and copy it from GitHub only if missing to avoid overwriting files during jobs.


In [None]:
# Only copy the dataset when it is missing to avoid job conflicts.

# Sync raw data files into the UC volume (only if missing)
data_dir = f"{BASE}/diabetes"
data_file = f"{data_dir}/diabetes.csv"
try:
    dbutils.fs.ls(data_file)
    file_exists = True
except Exception:
    file_exists = False

if not file_exists:
    dbutils.fs.mkdirs(data_dir)
    dbutils.fs.cp("https://raw.githubusercontent.com/Ch3rry-Pi3-Azure/DataBricks-Machine-Learning/refs/heads/main/data/diabetes.csv", data_file)


### <span style="color:#1f77b4">**Preview the raw dataset**</span>

Read the CSV into a Spark DataFrame and display a sample to confirm the data loaded correctly.


In [None]:
# Load the CSV and quickly inspect a sample of records.

# Load dataset into a Spark DataFrame
df = spark.read.format("csv").option("header", "true").load(BASE + "/diabetes/diabetes.csv")
display(df)


### <span style="color:#1f77b4">**Clean and cast columns**</span>

Drop null rows and cast each feature to the correct numeric type using `pyspark.sql.functions` so ML models can train.


In [None]:
# Cast fields into numeric types for ML and remove nulls.

# Import required libraries
from pyspark.sql.types import *
from pyspark.sql.functions import *
   
data = df.dropna().select(col("Pregnancies").astype("int"),
                           col("Glucose").astype("int"),
                          col("BloodPressure").astype("int"),
                          col("SkinThickness").astype("int"),
                          col("Insulin").astype("int"),
                          col("BMI").astype("float"),
                          col("DiabetesPedigreeFunction").astype("float"),
                          col("Age").astype("int"),
                          col("Outcome").astype("int")
                          )
display(data)


### <span style="color:#1f77b4">**Train/test split**</span>

Split the cleaned dataset into training and testing partitions for evaluation.


In [None]:
# Split the dataset for training and evaluation.

# Split data into training and testing sets
splits = data.randomSplit([0.7, 0.3])
train = splits[0]
test = splits[1]
print ("Training Rows:", train.count(), " Testing Rows:", test.count())


### <span style="color:#1f77b4">**Assemble and scale features**</span>

Use `VectorAssembler` to build a feature vector and `MinMaxScaler` to normalize values for stable model training.


In [None]:
# Assemble features and normalize them for model stability.

# Import required libraries
from pyspark.ml.feature import VectorAssembler, MinMaxScaler

numericFeatures = ["Pregnancies", "Glucose", "BloodPressure", "SkinThickness", "Insulin", "BMI", "DiabetesPedigreeFunction"]
numericColVector = VectorAssembler(inputCols=numericFeatures, outputCol = "numericFeatures")
vectorizedData = numericColVector.transform(train)

minMax = MinMaxScaler(inputCol= numericColVector.getOutputCol(), outputCol="normalizedFeatures")
scaledData = minMax.fit(vectorizedData).transform(vectorizedData)

compareNumerics = scaledData.select("numericFeatures", "normalizedFeatures")
display(compareNumerics)


### <span style="color:#1f77b4">**Prepare features and labels**</span>

Select the normalized feature vector as `features` and the outcome column as `label` for Spark ML.


In [None]:
# Create the feature vector and label columns expected by Spark ML.

preppedData = scaledData[col("normalizedFeatures").alias("features"), col("Outcome").alias("label")]
display(preppedData)


### <span style="color:#1f77b4">**Train logistic regression**</span>

Fit a logistic regression classifier with regularization to create a baseline model.


In [None]:
# Train the classifier with regularization.

# Import required libraries
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(labelCol="label", featuresCol="features", maxIter=10, regParam=0.3)
model = lr.fit(preppedData)
print ("Model trained!")


### <span style="color:#1f77b4">**Generate predictions**</span>

Transform the test data and produce predicted labels and probabilities for evaluation.


In [None]:
# Transform the test data and compute predictions.

# Prepare the test data

vectorizedTestData = numericColVector.transform(test)
scaledTestData = minMax.fit(vectorizedTestData).transform(vectorizedTestData)
preppedTestData = scaledTestData[col("normalizedFeatures").alias("features"), col("Outcome").alias("label")]
   
# Get predictions
prediction = model.transform(preppedTestData)
predicted = prediction.select("features", "probability", col("prediction").astype("Int"), col("label").alias("trueLabel"))
display(predicted)


### <span style="color:#1f77b4">**Evaluate the model**</span>

Use `MulticlassClassificationEvaluator` to compute accuracy, precision, recall, and F1 scores.


In [None]:
# Compute multiple classification metrics.

# Import required libraries
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
   
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction")
   
# Simple accuracy
accuracy = evaluator.evaluate(prediction, {evaluator.metricName:"accuracy"})
print("Accuracy:", accuracy)
   
# Individual class metrics
labels = [0,1]
print("\nIndividual class metrics:")
for label in sorted(labels):
    print ("Class %s" % (label))
   
    # Precision
    precision = evaluator.evaluate(prediction, {evaluator.metricLabel:label,
                                                evaluator.metricName:"precisionByLabel"})
    print("\tPrecision:", precision)
   
    # Recall
    recall = evaluator.evaluate(prediction, {evaluator.metricLabel:label,
                                             evaluator.metricName:"recallByLabel"})
    print("\tRecall:", recall)
   
    # F1 score
    f1 = evaluator.evaluate(prediction, {evaluator.metricLabel:label,
                                         evaluator.metricName:"fMeasureByLabel"})
    print("\tF1 Score:", f1)
   
# Weighted (overall) metrics
overallPrecision = evaluator.evaluate(prediction, {evaluator.metricName:"weightedPrecision"})
print("Overall Precision:", overallPrecision)
overallRecall = evaluator.evaluate(prediction, {evaluator.metricName:"weightedRecall"})
print("Overall Recall:", overallRecall)
overallF1 = evaluator.evaluate(prediction, {evaluator.metricName:"weightedFMeasure"})
print("Overall F1 Score:", overallF1)


### <span style="color:#1f77b4">**Build a reusable pipeline**</span>

Bundle feature engineering and the classifier into a `Pipeline` so the same steps can be reused consistently.


In [None]:
# Create a pipeline to keep feature steps and model together.

# Import required libraries
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler, MinMaxScaler
from pyspark.ml.classification import LogisticRegression
   

numFeatures = ["Pregnancies", "Glucose", "BloodPressure", "SkinThickness", "Insulin", "BMI", "DiabetesPedigreeFunction", "Age"]
   
# Define the feature engineering and model training algorithm steps
numVector = VectorAssembler(inputCols=numFeatures, outputCol="numericFeatures")
numScaler = MinMaxScaler(inputCol = numVector.getOutputCol(), outputCol="normalizedFeatures")
featureVector = VectorAssembler(inputCols=["normalizedFeatures"], outputCol="Features")
algo = LogisticRegression(labelCol="Outcome", featuresCol="Features", maxIter=10, regParam=0.3)
   
# Chain the steps as stages in a pipeline
pipeline = Pipeline(stages=[ numVector, numScaler, featureVector, algo])
   
# Use the pipeline to prepare data and fit the model algorithm
model = pipeline.fit(train)
print ("Model trained!")


### <span style="color:#1f77b4">**Pipeline inference**</span>

Run the pipeline on test data and review predictions from the pipeline output.


In [None]:
# Run inference using the pipeline output.

prediction = model.transform(test)
predicted = prediction.select("Features", "probability", col("prediction").astype("Int"), col("Outcome").alias("trueLabel"))
display(predicted)


### <span style="color:#1f77b4">**Save the trained model**</span>

Persist the pipeline model to the Unity Catalog volume so it can be loaded later.


In [None]:
# Write the model to UC storage for reuse.

model.write().overwrite().save(BASE + "/models/diabetes.model")


### <span style="color:#1f77b4">**Load and infer with the saved model**</span>

Load the saved `PipelineModel` and run inference on a new sample record.


In [None]:
# Load the model back and score a new row.

# Import required libraries
from pyspark.ml.pipeline import PipelineModel

persistedModel = PipelineModel.load(BASE + "/models/diabetes.model")
   
newData = spark.createDataFrame ([{"Pregnancies": 8,
                                  "Glucose": 85,
                                  "BloodPressure": 65,
                                  "SkinThickness": 29,
                                  "Insulin": 0,
                                  "BMI": 26.6,
                                  "DiabetesPedigreeFunction": 0.672,
                                  "Age": 34
                                  }])
   
   
predictions = persistedModel.transform(newData)
display(predictions.select("Pregnancies", "Glucose", "BloodPressure", "SkinThickness", "Insulin", "BMI", "DiabetesPedigreeFunction", "Age",  col("prediction").alias("PredictedOutcome")))
