In [0]:
# Databricks notebook source
# MAGIC %md
# MAGIC ## Demo_train_Notebook1
# MAGIC This notebook is a minimal prototype for training a sample ML model on Databricks.
# MAGIC 
# MAGIC **Purpose:** Demonstrate full notebook lifecycle for automation & job integration.
# MAGIC 
# MAGIC - Uses mock data
# MAGIC - Trains fast
# MAGIC - Logs to MLflow
# MAGIC - No hardcoding
# MAGIC - Fully portable using config

# Mock dbutils if not in Databricks environment
# Mock dbutils if not in Databricks environment
try:
    dbutils
except NameError:
    class DBUtilsMock:
        def notebook(self):
            return self

        def getContext(self):
            return self

        def userName(self):
            return self

        def get(self):
            return "mock_user@example.com"

    dbutils = DBUtilsMock()

# Now proceed with the rest of the notebook

In [0]:
#install dependencies
%pip install pandas scikit-learn mlflow --quiet


In [0]:
# COMMAND ----------
# Section 2 - Imports & Setup

import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score
import mlflow
import mlflow.sklearn
from mlflow.tracking import MlflowClient
from datetime import datetime
import json
import os
from urllib.parse import urlparse

# Load runtime config from mlops_config.json
CONFIG_PATH = "mlops_config.json"
if os.path.exists(CONFIG_PATH):
    with open(CONFIG_PATH, "r") as f:
        config = json.load(f)
        repo_name = config.get("repo_name", "unknown-repo")
        branch_name = config.get("branch_name", "unknown")
else:
    raise FileNotFoundError(f"❌ Config file not found: {CONFIG_PATH}")

# Derive env from branch
env = "prod" if branch_name == "main" else "dev"


# Get current user
try:
    dbutils
except NameError:
    class DBUtilsMock:
        def notebook(self):
            return self
        def getContext(self):
            return self
        def userName(self):
            return self
        def get(self):
            return "mock_user@example.com"
    dbutils = DBUtilsMock()

try:
    user_email = dbutils.notebook.entry_point.getDbutils().notebook().getContext().userName().get()
except Exception:
    import getpass
    user_email = getpass.getuser()

In [0]:
# COMMAND ----------
# Section 3 - Data Ingestion (Mock)

data = pd.DataFrame({
    "feature1": np.random.rand(100),
    "feature2": np.random.rand(100),
    "label": np.random.randint(0, 2, 100)
})


In [0]:
# COMMAND ----------
# Section 4 - Preprocessing

X = data[["feature1", "feature2"]]
y = data["label"]


In [0]:
%python
# COMMAND ----------
# Section 5.1 - Save training data to UC table (dynamic path discovery)
try:
    from pyspark.sql import SparkSession
    spark = SparkSession.builder.getOrCreate()

    # Combine features + label
    train_df_combined = X_train.copy()
    train_df_combined["label"] = y_train

    # Convert to Spark DF
    spark_train_df = spark.createDataFrame(train_df_combined)

    # Step 1: Dynamically find a writable catalog + schema
    catalogs = [row['catalog'] for row in spark.sql("SHOW CATALOGS").collect()]
    selected_catalog = None
    selected_schema = None

    for cat in catalogs:
        try:
            schemas = spark.sql(f"SHOW SCHEMAS IN {cat}").collect()
            for s in schemas:
                schema_name = s['databaseName']
                # Simple logic: pick the first one that starts with your repo name or "default"
                if repo_name in schema_name or schema_name.lower() == "default":
                    selected_catalog = cat
                    selected_schema = schema_name
                    break
            if selected_catalog: break
        except Exception as e:
            continue  # some catalogs like system may error — just skip

    # Step 2: Build UC path
    if not selected_catalog or not selected_schema:
        raise Exception("❌ Could not determine a valid UC catalog and schema.")

    uc_table_path = f"{selected_catalog}.{selected_schema}.train_input_data"

    # Step 3: Save to UC Delta table
    spark_train_df.write.mode("overwrite").format("delta").saveAsTable(uc_table_path)

    print(f"✅ Training data saved to Unity Catalog table: {uc_table_path}")

except Exception as e:
    print(f"⚠️ Skipping UC write (non-Databricks env): {e}")

In [0]:
# COMMAND ----------
# Section 5 - Train/Test Split

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)


In [0]:
# COMMAND ----------
# Section 6 - Model Training

model = LogisticRegression()
model.fit(X_train, y_train)


In [0]:
# COMMAND ----------
# Section 7 - Evaluation

predictions = model.predict(X_test)
acc = accuracy_score(y_test, predictions)
print(f"Accuracy: {acc}")


In [0]:
# COMMAND ----------
# Section 8 - MLflow Tracking

experiment_path = f"/Users/{user_email}/{repo_name}_train_{env}"

client = MlflowClient()
if not client.get_experiment_by_name(experiment_path):
    client.create_experiment(experiment_path)

mlflow.set_experiment(experiment_path)

if mlflow.active_run():
    mlflow.end_run()

run_name = f"{repo_name}-train-{env}"

with mlflow.start_run(run_name=run_name):
    mlflow.set_tags({
        "project": repo_name,
        "notebook": "Demo_train_Notebook1",
        "branch": branch_name,
        "env": env,
        "owner": user_email,
        "run_type": "train",
        "date": datetime.today().strftime('%Y-%m-%d')
    })

    mlflow.log_param("model_type", "LogisticRegression")
    mlflow.log_param("train_rows", len(X_train))
    mlflow.log_param("features", X.columns.tolist())
    mlflow.log_metric("accuracy", acc)
    mlflow.sklearn.log_model(model, "model")

    print(f"✅ Model logged to MLflow under run '{run_name}' on branch '{branch_name}' and env '{env}'")


In [0]:
# COMMAND ----------
# Section 10 - Clean Exit

print("✅ Training complete and model logged.")
