In [0]:
import os
import json
import subprocess

# --- Third-party ---
import numpy as np
import pandas as pd

# --- PySpark ---
from pyspark.sql.functions import col, lag
from pyspark.sql.window import Window

# --- scikit-learn ---
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_squared_error, mean_absolute_error

# --- MLflow ---
import mlflow
import mlflow.sklearn
from mlflow.models.signature import ModelSignature
from mlflow.types.schema import Schema, ColSpec

In [0]:
%pip install kaggle

[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m


In [0]:
# Create a hidden text input box at the top of the notebook
dbutils.widgets.text("kaggle_key_input", "", "Enter Kaggle Key")

In [0]:
# Get the value from the widget
secret_key = dbutils.widgets.get("kaggle_key_input")
os.environ['KAGGLE_KEY'] = secret_key
print("Kaggle is settled!")

INFO:py4j.clientserver:Received command c on object id p0


Kaggle is settled!


In [0]:
subprocess.run(['kaggle', 'datasets', 'download', '-d', 'sumanthvrao/daily-climate-time-series-data', '--unzip'])

Dataset URL: https://www.kaggle.com/datasets/sumanthvrao/daily-climate-time-series-data
License(s): CC0-1.0
Downloading daily-climate-time-series-data.zip to /Workspace/Users/n.najmehakbari@gmail.com/Drafts


  0%|          | 0.00/22.0k [00:00<?, ?B/s]100%|██████████| 22.0k/22.0k [00:00<00:00, 5.07MB/s]





CompletedProcess(args=['kaggle', 'datasets', 'download', '-d', 'sumanthvrao/daily-climate-time-series-data', '--unzip'], returncode=0)

In [0]:

local_path = f"{os.getcwd()}/DailyDelhiClimateTrain.csv"
pdf = pd.read_csv(local_path)

# Pandas DataFrame to Spark DataFrame
df_all = spark.createDataFrame(pdf)
df_all = df_all.orderBy("date")

print("Data successfully loaded into Spark via Pandas!")
display(df_all.limit(5))

Data successfully loaded into Spark via Pandas!


date,meantemp,humidity,wind_speed,meanpressure
2013-01-01,10.0,84.5,0.0,1015.6666666666666
2013-01-02,7.4,92.0,2.98,1017.8
2013-01-03,7.166666666666667,87.0,4.633333333333334,1018.6666666666666
2013-01-04,8.666666666666666,71.33333333333333,1.2333333333333334,1017.1666666666666
2013-01-05,6.0,86.83333333333333,3.7,1016.5


In [0]:
total_rows = df_all.count()
chunk_size = total_rows // 5

for i in range(5):
    start = i * chunk_size
    limit_val = (i + 1) * chunk_size if i < 4 else total_rows
    
    # Create a Spark DataFrame for the current batch
    current_batch = df_all.limit(limit_val).tail(limit_val - start)
    batch_spark = spark.createDataFrame(current_batch, df_all.schema)
    
    # Bronze layer ingestion
    batch_spark.write.format("delta").mode("append").saveAsTable("weather_bronze")
    print(f"Batch {i+1} ingested into Bronze layer.")

Batch 1 ingested into Bronze layer.
Batch 2 ingested into Bronze layer.
Batch 3 ingested into Bronze layer.
Batch 4 ingested into Bronze layer.
Batch 5 ingested into Bronze layer.


In [0]:
# Read from Bronze layer
bronze_df = spark.read.table("weather_bronze")

# Silver layer ingestion 
silver_df = bronze_df.dropDuplicates(["date"]).dropna()

# Remove outliers from meantemp column in Silver layer
silver_df = silver_df.filter((col("meantemp") > -20) & (col("meantemp") < 60))

# Write to Silver layer
silver_df.write.format("delta").mode("overwrite").saveAsTable("weather_silver")

In [0]:

# Read from Silver layer
silver_df = spark.read.table("weather_silver")

# Define a window specification
windowSpec = Window.orderBy("date")

# Gold layer ingestion(Feature Engineering)
gold_df = silver_df.withColumn("prev_day_temp", lag("meantemp", 1).over(windowSpec))

#  Remove rows with null values
gold_df = gold_df.select("date", "prev_day_temp", "meantemp").dropna()

# Write to Gold layer 
gold_df.write.format("delta").mode("overwrite").saveAsTable("weather_gold")

print("Gold table created successfully!")



Gold table created successfully!


In [0]:
%sql
DESCRIBE HISTORY weather_bronze

version,timestamp,userId,userName,operation,operationParameters,job,notebook,queryHistoryStatementId,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
50,2026-02-21T15:19:22.000Z,71445245683791,n.najmehakbari@gmail.com,WRITE,"Map(mode -> Append, statsOnLoad -> true, partitionBy -> [])",,List(2199524748115638),98c9e26c-2795-46e5-aaf2-85efed579d8f,0221-145446-ohe4wj7y-v2n,49.0,WriteSerializable,True,"Map(numFiles -> 1, numOutputRows -> 294, numOutputBytes -> 9549)",,Databricks-Runtime/18.0.x-aarch64-photon-scala2.13
49,2026-02-21T15:19:19.000Z,71445245683791,n.najmehakbari@gmail.com,WRITE,"Map(mode -> Append, statsOnLoad -> true, partitionBy -> [])",,List(2199524748115638),aec510cf-d060-4b0a-bd9b-b2b572e4b966,0221-145446-ohe4wj7y-v2n,48.0,WriteSerializable,True,"Map(numFiles -> 1, numOutputRows -> 292, numOutputBytes -> 7907)",,Databricks-Runtime/18.0.x-aarch64-photon-scala2.13
48,2026-02-21T15:19:17.000Z,71445245683791,n.najmehakbari@gmail.com,WRITE,"Map(mode -> Append, statsOnLoad -> true, partitionBy -> [])",,List(2199524748115638),a925e671-e3fa-4b1f-a7f7-86c683f4e1c8,0221-145446-ohe4wj7y-v2n,47.0,WriteSerializable,True,"Map(numFiles -> 1, numOutputRows -> 292, numOutputBytes -> 7226)",,Databricks-Runtime/18.0.x-aarch64-photon-scala2.13
47,2026-02-21T15:19:14.000Z,71445245683791,n.najmehakbari@gmail.com,WRITE,"Map(mode -> Append, statsOnLoad -> true, partitionBy -> [])",,List(2199524748115638),e7817db9-5dde-4f73-8aa5-ffea0c805a8a,0221-145446-ohe4wj7y-v2n,46.0,WriteSerializable,True,"Map(numFiles -> 1, numOutputRows -> 292, numOutputBytes -> 7592)",,Databricks-Runtime/18.0.x-aarch64-photon-scala2.13
46,2026-02-21T15:19:12.000Z,71445245683791,n.najmehakbari@gmail.com,WRITE,"Map(mode -> Append, statsOnLoad -> true, partitionBy -> [])",,List(2199524748115638),397b4095-af05-4abf-bbb6-5a30210fcca6,0221-145446-ohe4wj7y-v2n,45.0,WriteSerializable,True,"Map(numFiles -> 1, numOutputRows -> 292, numOutputBytes -> 8288)",,Databricks-Runtime/18.0.x-aarch64-photon-scala2.13
45,2026-02-21T15:01:24.000Z,71445245683791,n.najmehakbari@gmail.com,WRITE,"Map(mode -> Append, statsOnLoad -> true, partitionBy -> [])",,List(2199524748115638),f1cba034-0eab-4968-b6d0-556540def256,0221-145446-ohe4wj7y-v2n,44.0,WriteSerializable,True,"Map(numFiles -> 1, numOutputRows -> 294, numOutputBytes -> 9549)",,Databricks-Runtime/18.0.x-aarch64-photon-scala2.13
44,2026-02-21T15:01:20.000Z,71445245683791,n.najmehakbari@gmail.com,WRITE,"Map(mode -> Append, statsOnLoad -> true, partitionBy -> [])",,List(2199524748115638),0096bbfe-e069-4a6e-a09d-9357977f8da2,0221-145446-ohe4wj7y-v2n,43.0,WriteSerializable,True,"Map(numFiles -> 1, numOutputRows -> 292, numOutputBytes -> 7907)",,Databricks-Runtime/18.0.x-aarch64-photon-scala2.13
43,2026-02-21T15:01:17.000Z,71445245683791,n.najmehakbari@gmail.com,WRITE,"Map(mode -> Append, statsOnLoad -> true, partitionBy -> [])",,List(2199524748115638),40bb8e89-7ebe-4571-91b5-a8d0f52e1bcd,0221-145446-ohe4wj7y-v2n,42.0,WriteSerializable,True,"Map(numFiles -> 1, numOutputRows -> 292, numOutputBytes -> 7226)",,Databricks-Runtime/18.0.x-aarch64-photon-scala2.13
42,2026-02-21T15:01:14.000Z,71445245683791,n.najmehakbari@gmail.com,WRITE,"Map(mode -> Append, statsOnLoad -> true, partitionBy -> [])",,List(2199524748115638),808481cc-9291-4834-95c5-b846de8a3e3b,0221-145446-ohe4wj7y-v2n,41.0,WriteSerializable,True,"Map(numFiles -> 1, numOutputRows -> 292, numOutputBytes -> 7592)",,Databricks-Runtime/18.0.x-aarch64-photon-scala2.13
41,2026-02-21T15:01:11.000Z,71445245683791,n.najmehakbari@gmail.com,WRITE,"Map(mode -> Append, statsOnLoad -> true, partitionBy -> [])",,List(2199524748115638),0ab4c255-a4d7-4b01-96b0-9878fe433689,0221-145446-ohe4wj7y-v2n,40.0,WriteSerializable,True,"Map(numFiles -> 1, numOutputRows -> 292, numOutputBytes -> 8288)",,Databricks-Runtime/18.0.x-aarch64-photon-scala2.13


In [0]:
# 
def get_spark_shape(df):
    return (df.count(), len(df.columns))

# Read from Bronze, Silver, and Gold layers
bronze_df = spark.read.table("weather_bronze")
silver_df = spark.read.table("weather_silver")
gold_df = spark.read.table("weather_gold")

# Print shapes
print(f"Bronze Layer Shape: {get_spark_shape(bronze_df)} -> (Rows, Columns)")
print(f"Silver Layer Shape: {get_spark_shape(silver_df)} -> (Data Quality applied)")
print(f"Gold Layer Shape:   {get_spark_shape(gold_df)}   -> (Features engineered)")

print("\n--- Columns in Gold Layer ---")
print(gold_df.columns)

INFO:py4j.clientserver:Received command c on object id p0


Bronze Layer Shape: (14620, 5) -> (Rows, Columns)
Silver Layer Shape: (1462, 5) -> (Data Quality applied)
Gold Layer Shape:   (1461, 3)   -> (Features engineered)

--- Columns in Gold Layer ---
['date', 'prev_day_temp', 'meantemp']


In [0]:
# --- Unit Test 1: Validate Bronze History ---
history_df = spark.sql("DESCRIBE HISTORY weather_bronze")
assert history_df.count() > 0, "Test Failed: Bronze history is empty."
print("Unit Test 1 Passed: Versioning history found.")

# --- Unit Test 2: Validate Silver Deduplication ---
# Silver count must equal unique dates in Bronze
unique_dates = spark.table("weather_bronze").select("date").distinct().count()
silver_count = spark.table("weather_silver").count()
assert silver_count == unique_dates, f"Test Failed: Silver count ({silver_count}) != Unique dates ({unique_dates})."
print(f"Unit Test 2 Passed: Silver layer successfully deduplicated to {silver_count} rows.")

# --- Unit Test 3: Validate Gold Feature Engineering ---
# Gold must have exactly 3 columns and no nulls
gold_df = spark.table("weather_gold")
assert len(gold_df.columns) == 3, "Test Failed: Gold layer does not have 3 columns."
assert gold_df.filter(col("prev_day_temp").isNull()).count() == 0, "Test Failed: Null values found in lag features."
print("Unit Test 3 Passed: Gold layer is clean and feature-engineered.")

Unit Test 1 Passed: Versioning history found.
Unit Test 2 Passed: Silver layer successfully deduplicated to 1462 rows.
Unit Test 3 Passed: Gold layer is clean and feature-engineered.


## **Assignment 4 starts here.**


**1. Delta Table Lineage and Versioning**

 This part accesses the Delta Lake transaction log to retrieve the history of the weather_gold table. Its purpose is to programmatically identify the latest data version, which is a mandatory requirement for establishing an explicit linkage between the data version and the model version.


In [0]:
# Gold table history (Delta versions)
gold_history = spark.sql("DESCRIBE HISTORY weather_gold")
display(gold_history.select("version", "timestamp", "operation", "operationParameters"))

latest_gold_version = gold_history.agg({"version":"max"}).collect()[0][0]
print("Latest weather_gold Delta version:", latest_gold_version)

INFO:py4j.clientserver:Received command c on object id p0


version,timestamp,operation,operationParameters
7,2026-02-21T15:19:29.000Z,CREATE OR REPLACE TABLE AS SELECT,"Map(partitionBy -> [], clusterBy -> [], description -> null, isManaged -> true, properties -> {""delta.parquet.compression.codec"":""zstd"",""delta.enableDeletionVectors"":""true""}, statsOnLoad -> true)"
6,2026-02-21T15:01:32.000Z,CREATE OR REPLACE TABLE AS SELECT,"Map(partitionBy -> [], clusterBy -> [], description -> null, isManaged -> true, properties -> {""delta.parquet.compression.codec"":""zstd"",""delta.enableDeletionVectors"":""true""}, statsOnLoad -> true)"
5,2026-02-21T11:44:44.000Z,CREATE OR REPLACE TABLE AS SELECT,"Map(partitionBy -> [], clusterBy -> [], description -> null, isManaged -> true, properties -> {""delta.parquet.compression.codec"":""zstd"",""delta.enableDeletionVectors"":""true""}, statsOnLoad -> true)"
4,2026-02-21T11:36:59.000Z,CREATE OR REPLACE TABLE AS SELECT,"Map(partitionBy -> [], clusterBy -> [], description -> null, isManaged -> true, properties -> {""delta.parquet.compression.codec"":""zstd"",""delta.enableDeletionVectors"":""true""}, statsOnLoad -> true)"
3,2026-02-21T11:13:02.000Z,CREATE OR REPLACE TABLE AS SELECT,"Map(partitionBy -> [], clusterBy -> [], description -> null, isManaged -> true, properties -> {""delta.parquet.compression.codec"":""zstd"",""delta.enableDeletionVectors"":""true""}, statsOnLoad -> true)"
2,2026-02-21T10:40:22.000Z,CREATE OR REPLACE TABLE AS SELECT,"Map(partitionBy -> [], clusterBy -> [], description -> null, isManaged -> true, properties -> {""delta.parquet.compression.codec"":""zstd"",""delta.enableDeletionVectors"":""true""}, statsOnLoad -> true)"
1,2026-02-14T13:11:47.000Z,CREATE OR REPLACE TABLE AS SELECT,"Map(partitionBy -> [], clusterBy -> [], description -> null, isManaged -> true, properties -> {""delta.parquet.compression.codec"":""zstd"",""delta.enableDeletionVectors"":""true""}, statsOnLoad -> true)"
0,2026-02-13T21:10:47.000Z,CREATE OR REPLACE TABLE AS SELECT,"Map(partitionBy -> [], clusterBy -> [], description -> null, isManaged -> true, properties -> {""delta.parquet.compression.codec"":""zstd"",""delta.enableDeletionVectors"":""true""}, statsOnLoad -> true)"


Latest weather_gold Delta version: 7


**2. Time-Series Data Splitting**

This section prepares the dataset for training by implementing a temporal split. Unlike standard shuffling, this respects the chronological order of climate data. An 80/20 split is used to create a training set and a validation set, ensuring that the model is evaluated on "future" data relative to its training.

In [0]:
gold_df = spark.table("weather_gold").orderBy("date")

n = gold_df.count()
train_n = int(n * 0.8)

train_df = gold_df.limit(train_n)
valid_df = gold_df.subtract(train_df)

print("Train data")
display(train_df.limit(5))

print("Valid data")
display(valid_df.limit(5))

Train data


date,prev_day_temp,meantemp
2013-01-02,10.0,7.4
2013-01-03,7.4,7.166666666666667
2013-01-04,7.166666666666667,8.666666666666666
2013-01-05,8.666666666666666,6.0
2013-01-06,6.0,7.0


Valid data


date,prev_day_temp,meantemp
2016-03-15,22.375,24.066666666666663
2016-04-13,30.2,31.75
2016-05-04,35.5,33.714285714285715
2016-05-15,36.5625,37.25
2016-05-17,37.21428571428572,37.5


In [0]:
#os.environ["MLFLOW_DFS_TMP"] = "/Volumes/main/default/mlflow_tmp"
#os.environ["MLFLOW_DFS_TMP"] = "/Volumes/workspace/mlops/mlflow_tmp"

**3. Model Registration and Traning**

The purpose of this call is to save the trained model into the MLflow Model Registry. By including a signature, you define the expected input (prev_day_temp) and output (meantemp) schemas, which ensures model reproducibility and prevents errors during production scoring.

In [0]:
# -------------------------
# Train + Log + Register (UC-safe: signature + input_example)
# -------------------------
import numpy as np
import mlflow
import mlflow.sklearn
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_squared_error, mean_absolute_error
from mlflow.models.signature import infer_signature

# 1) Convert Spark train/valid -> Pandas (needed for sklearn)
train_pd = train_df.orderBy("date").toPandas()
valid_pd = valid_df.orderBy("date").toPandas()

X_train = train_pd[["prev_day_temp"]]
y_train = train_pd["meantemp"]

X_valid = valid_pd[["prev_day_temp"]]
y_valid = valid_pd["meantemp"]

# 2) Train model
model = LinearRegression()
model.fit(X_train, y_train)

# 3) Validation KPIs
valid_preds = model.predict(X_valid)
rmse = float(np.sqrt(mean_squared_error(y_valid, valid_preds)))
mae  = float(mean_absolute_error(y_valid, valid_preds))

print(f"Validation RMSE: {rmse:.4f} | MAE: {mae:.4f}")

# 4) Link model to latest Gold delta version
gold_version = spark.sql("DESCRIBE HISTORY weather_gold") \
    .agg({"version":"max"}).collect()[0][0]

# 5) MLflow setup
mlflow.set_experiment("/Users/n.najmehakbari@gmail.com/Delhi_Climate_ModelOps")

# Avoid "run already active" error
if mlflow.active_run() is not None:
    mlflow.end_run()

# 6) UC-compliant logging: signature + input_example
input_example = X_train.head(1).copy()
signature = infer_signature(input_example, model.predict(input_example))

with mlflow.start_run(run_name=f"LR_Gold_V{gold_version}"):

    mlflow.log_param("model_type", "LinearRegression")
    mlflow.log_param("gold_delta_version", int(gold_version))
    mlflow.log_param("split_ratio", 0.8)
    mlflow.log_param("features", "prev_day_temp")
    mlflow.log_param("target", "meantemp")

    mlflow.log_metric("rmse", rmse)
    mlflow.log_metric("mae", mae)

    mlflow.sklearn.log_model(
        sk_model=model,
        artifact_path="model",
        registered_model_name="DelhiClimateModel",
        signature=signature,
        input_example=input_example
    )

Validation RMSE: 1.6784 | MAE: 1.2744


Uploading artifacts:   0%|          | 0/11 [00:00<?, ?it/s]

Registered model 'DelhiClimateModel' already exists. Creating a new version of this model...


Uploading artifacts:   0%|          | 0/11 [00:00<?, ?it/s]

Created version '10' of model 'workspace.default.delhiclimatemodel'.


**4. Experiment Tracking Evidence**

This block retrieves and prints metadata from the active MLflow session, such as the run_id and experiment_id. This serves as verifiable evidence required for the deliverable to prove that the ModelOps pipeline was executed and tracked in a managed environment.

In [0]:
# -------------------------
# (A) Evidence: print current MLflow run + model registry info
# -------------------------
current_run = mlflow.active_run()
if current_run is not None:
    run_id = current_run.info.run_id
    exp_id = current_run.info.experiment_id
    print("MLflow run_id:", run_id)
    print("MLflow experiment_id:", exp_id)
else:
    print("No active MLflow run (you already ended it). This is fine.")

print("Registered model name: DelhiClimateModel")

No active MLflow run (you already ended it). This is fine.
Registered model name: DelhiClimateModel


**5. Persistent ModelOps State Management**

This part creates a dedicated Delta table (modelops_state) to act as a "memory" for the pipeline. By storing the last_trained_gold_version, the system can keep track of which data version the current model is based on, enabling the automation logic required for the assignment.

In [0]:
# -------------------------
# (B) Track last trained Gold delta version in a Delta table (simple ModelOps state)
#     This is used to decide if retraining is needed when Gold changes.
# -------------------------
spark.sql("""
CREATE TABLE IF NOT EXISTS modelops_state (
  key STRING,
  value STRING
) USING delta
""")

def get_state(key: str):
    rows = spark.table("modelops_state").filter(col("key") == key).select("value").collect()
    return rows[0]["value"] if rows else None

def set_state(key: str, value: str):
    spark.sql(f"DELETE FROM modelops_state WHERE key = '{key}'")
    spark.createDataFrame([(key, value)], ["key", "value"]).write.mode("append").saveAsTable("modelops_state")

def get_latest_gold_version() -> int:
    return int(
        spark.sql("DESCRIBE HISTORY weather_gold")
        .agg({"version": "max"})
        .collect()[0][0]
    )

INFO:py4j.clientserver:Received command c on object id p0


**6. Automated Retraining Function**

This is the core logic for Model Lifecycle Management. It encapsulates the entire process—data loading, time-splitting, training, KPI calculation (RMSE and MAE), and MLflow logging—into a single reusable function. It ensures that every time a model is created, its parameters and metrics are logged consistently.

In [0]:
# -------------------------
# (C) Retrain function (sklearn) that:
#     - reads Gold (ordered by date)
#     - time-splits into train/valid
#     - trains LR
#     - logs params/metrics
#     - registers new model version in MLflow registry
# -------------------------
def retrain_and_register_sklearn(
    gold_version: int,
    experiment_path: str = "/Users/n.najmehakbari@gmail.com/Delhi_Climate_ModelOps",
    registered_model_name: str = "DelhiClimateModel",
    split_ratio: float = 0.8,
):
    # Load Gold ordered by date
    gold_pd = spark.table("weather_gold").orderBy("date").toPandas()

    # Features/target
    X_all = gold_pd[["prev_day_temp"]]
    y_all = gold_pd["meantemp"]

    # Time-based split
    split = int(len(gold_pd) * split_ratio)
    X_train, y_train = X_all.iloc[:split], y_all.iloc[:split]
    X_valid, y_valid = X_all.iloc[split:], y_all.iloc[split:]

    # Setup MLflow experiment
    mlflow.set_experiment(experiment_path)

    # Signature (same as your earlier one)
    input_schema = Schema([ColSpec("double", "prev_day_temp")])
    output_schema = Schema([ColSpec("double", "prediction")])
    signature = ModelSignature(inputs=input_schema, outputs=output_schema)
    if mlflow.active_run() is not None:
        mlflow.end_run()

    with mlflow.start_run(run_name=f"LR_Gold_V{gold_version}"):

        # Train
        model = LinearRegression()
        model.fit(X_train, y_train)

        # Validate KPIs
        valid_preds = model.predict(X_valid)
        rmse = float(np.sqrt(mean_squared_error(y_valid, valid_preds)))
        mae  = float(mean_absolute_error(y_valid, valid_preds))

        # Log linkage + parameters
        mlflow.log_param("model_type", "LinearRegression")
        mlflow.log_param("gold_delta_version", int(gold_version))
        mlflow.log_param("split_ratio", split_ratio)
        mlflow.log_param("features", "prev_day_temp")
        mlflow.log_param("target", "meantemp")

        # Log KPIs (Primary: RMSE, Secondary: MAE)
        mlflow.log_metric("rmse", rmse)
        mlflow.log_metric("mae", mae)

        # Log & register model (creates new registry version)
        mlflow.sklearn.log_model(
            sk_model=model,
            artifact_path="model",
            registered_model_name=registered_model_name,
            signature=signature
        )

        print(f"✅ Logged + registered {registered_model_name} for Gold version {gold_version}")
        print(f"Validation RMSE: {rmse:.4f} | MAE: {mae:.4f}")

    return model

**7. Continuous Model Update (CMU) Logic**

This block implements the Continuous Model Update mechanism. It compares the latest version of the Gold data against the version stored in the modelops_state table. If new data is detected (e.g., a new batch from Assignment 3), it automatically triggers the retraining function, fulfilling the bonus requirement for automation.

In [0]:
# -------------------------
# (D) Continuous update logic:
#     - check latest Gold delta version
#     - compare with last trained version stored in modelops_state
#     - retrain only if new version exists
# -------------------------
latest_version = get_latest_gold_version()
last_trained = get_state("last_trained_gold_version")
last_trained_int = int(last_trained) if last_trained is not None else None

print("Latest Gold delta version:", latest_version)
print("Last trained Gold delta version:", last_trained_int)

if last_trained_int is None or latest_version > last_trained_int:
    print("🔁 New Gold version detected (or first run) -> retraining...")
    model = retrain_and_register_sklearn(gold_version=latest_version)
    set_state("last_trained_gold_version", str(latest_version))
    print("✅ Updated state: last_trained_gold_version =", latest_version)
else:
    print("✅ No new Gold version -> skipping retraining")

Latest Gold delta version: 7
Last trained Gold delta version: 5
🔁 New Gold version detected (or first run) -> retraining...


Uploading artifacts:   0%|          | 0/9 [00:00<?, ?it/s]

Registered model 'DelhiClimateModel' already exists. Creating a new version of this model...


Uploading artifacts:   0%|          | 0/9 [00:00<?, ?it/s]

Created version '11' of model 'workspace.default.delhiclimatemodel'.


✅ Logged + registered DelhiClimateModel for Gold version 7
Validation RMSE: 1.6784 | MAE: 1.2744
✅ Updated state: last_trained_gold_version = 7


**8. Production Scoring on test.csv**

This is a mandatory requirement to validate the model on a completely external dataset. It reproduces the exact feature engineering (lagging the temperature) used during training and generates final performance metrics (RMSE/MAE). These results serve as the final proof of the model's effectiveness in a simulated production environment.

**9. Verification and Traceability**

The final display of predictions against actual values provides clear visual evidence for the report. It allows you to verify that the temporal features were calculated correctly and that the model's predictions align with the climate trends in the test dataset.

In [0]:
# -------------------------
# (E) Production scoring on test.csv (MANDATORY)
#     - build prev_day_temp feature in the same way
#     - run predictions
#     - print final RMSE (evidence for report)
# -------------------------
# Adjust path if needed (DBFS / FileStore / Workspace)
test_csv_path = "DailyDelhiClimateTest.csv"
if not os.path.exists(test_csv_path):
    print(f"⚠️ test.csv not found at: {test_csv_path}")
    print("Update `test_csv_path` to the correct location in your environment.")
else:
    test_pdf = pd.read_csv(test_csv_path)

    # Apply same lag feature engineering as Gold
    test_pdf["prev_day_temp"] = test_pdf["meantemp"].shift(1)
    test_pdf = test_pdf.dropna()

    X_test = test_pdf[["prev_day_temp"]]
    y_test = test_pdf["meantemp"]

    final_preds = model.predict(X_test)

    final_rmse = float(np.sqrt(mean_squared_error(y_test, final_preds)))
    final_mae  = float(mean_absolute_error(y_test, final_preds))

    print("\n==============================")
    print("FINAL PRODUCTION RESULT (test.csv)")
    print("RMSE:", final_rmse)
    print("MAE :", final_mae)
    print("==============================\n")

    # Show a few predictions for evidence
    preview = test_pdf[["date", "prev_day_temp", "meantemp"]].copy()
    preview["prediction"] = final_preds
    display(preview.head(10))


FINAL PRODUCTION RESULT (test.csv)
RMSE: 1.6757278347025497
MAE : 1.305134956793525



date,prev_day_temp,meantemp,prediction
2017-01-02,15.91304347826087,18.5,16.155656825526716
2017-01-03,18.5,17.11111111111111,18.67319982844745
2017-01-04,17.11111111111111,18.7,17.321577768055832
2017-01-05,18.7,18.38888888888889,18.867833405143845
2017-01-06,18.38888888888889,19.318181818181817,18.565070063616123
2017-01-07,19.318181818181817,14.708333333333334,19.469428096750875
2017-01-08,14.708333333333334,15.68421052631579,14.983271603578336
2017-01-09,15.68421052631579,14.571428571428571,15.932963946011393
2017-01-10,14.571428571428571,12.11111111111111,14.850040286196876
2017-01-11,12.11111111111111,11.0,12.45573835064601
