In [0]:
# storage_account_name = "jaistorageaccount"
# container_name = "sample"
# mount_point = "/mnt/rawdata"  # Choose a mount point name relevant to your data

# # Fetch service principal credentials from Databricks secrets
# application_id = dbutils.secrets.get(scope="my-secret-scope", key="application-id")
# authentication_key = dbutils.secrets.get(scope="my-secret-scope", key="application-secret")
# tenant_id = dbutils.secrets.get(scope="my-secret-scope", key="tenant-id")
# print(application_id)
# print(authentication_key)
# print(tenant_id)
# endpoint = f"https://login.microsoftonline.com/{tenant_id}/oauth2/token"

# configs = {
#   "fs.azure.account.auth.type": "OAuth",
#   "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
#   "fs.azure.account.oauth2.client.id": application_id,
#   "fs.azure.account.oauth2.client.secret": authentication_key,
#   "fs.azure.account.oauth2.client.endpoint": endpoint
# }

# # Source URI including container and storage account
# source = f"abfss://{container_name}@{storage_account_name}.dfs.core.windows.net/"

# # Mount only if not already mounted
# if not any(mount.mountPoint == mount_point for mount in dbutils.fs.mounts()):
#     dbutils.fs.mount(
#         source = source,
#         mount_point = mount_point,
#         extra_configs = configs
#     )
#     print(f"Mount successful at {mount_point}")
# else:
#     print(f"{mount_point} is already mounted")


In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, year, month, dayofmonth, hour
from pyspark.ml.feature import StringIndexer
import mlflow
import mlflow.sklearn
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import LabelEncoder
import joblib
import os

spark = SparkSession.builder.appName("EndToEndPipeline").getOrCreate()

def feature_engineering(df):
    df = df.withColumn("Issued_date", col("Issued_date").cast("timestamp"))
    df = df.withColumn("Issued_year", year("Issued_date")) \
           .withColumn("Issued_month", month("Issued_date")) \
           .withColumn("Issued_day", dayofmonth("Issued_date")) \
           .withColumn("Issued_hour", hour("Issued_date")) \
           .drop("Issued_date")

    drop_cols = ['Unit_ID', 'Violation_ID', 'Tract']
    for c in drop_cols:
        if c in df.columns:
            df = df.drop(c)

    # Handle categorical columns - encode all string columns
    cat_cols = [c for c, t in df.dtypes if t == 'string']
    for c in cat_cols:
        indexer = StringIndexer(inputCol=c, outputCol=c+"_index", handleInvalid='keep')
        df = indexer.fit(df).transform(df).drop(c).withColumnRenamed(c+"_index", c)

    return df

def replace_missing(df):
    numeric_cols = [f.name for f in df.schema.fields if str(f.dataType) in ['IntegerType', 'DoubleType', 'LongType', 'FloatType']]
    for col_name in numeric_cols:
        median_val = df.approxQuantile(col_name, [0.5], 0.01)[0]
        df = df.na.fill({col_name: median_val})
    return df

def select_features(df):
    drop_cols = ['License_Plate_State', 'Tract', 'Unit_ID', 'Violation_ID']
    for c in drop_cols:
        if c in df.columns:
            df = df.drop(c)
    return df

def split_data(df, train_fraction=0.8):
    return df.randomSplit([train_fraction, 1-train_fraction], seed=42)

def train_and_register(train_df, test_df, model_output_path, registered_model_name):
    train_pd = train_df.toPandas()
    test_pd = test_df.toPandas()

    target = "PaymentIsOutstanding"
    le = LabelEncoder()
    train_pd[target] = le.fit_transform(train_pd[target].astype(str))
    test_pd[target] = le.transform(test_pd[target].astype(str))

    X_train = train_pd.drop(columns=[target])
    y_train = train_pd[target]
    X_test = test_pd.drop(columns=[target])
    y_test = test_pd[target]

    model = RandomForestClassifier(random_state=42)
    model.fit(X_train, y_train)

    os.makedirs(model_output_path, exist_ok=True)
    model_file = os.path.join(model_output_path, "model.joblib")
    joblib.dump(model, model_file)

    # Evaluate and log metrics
    y_pred = model.predict(X_test)
    from sklearn.metrics import accuracy_score, recall_score, precision_score, f1_score
    acc = accuracy_score(y_test, y_pred)
    rec = recall_score(y_test, y_pred)
    prec = precision_score(y_test, y_pred)
    f1 = f1_score(y_test, y_pred)

    with mlflow.start_run():
        mlflow.sklearn.log_model(model, artifact_path="model", registered_model_name=registered_model_name)
        mlflow.log_metric("accuracy", acc)
        mlflow.log_metric("recall", rec)
        mlflow.log_metric("precision", prec)
        mlflow.log_metric("f1_score", f1)

    print(f"Model training and registration successful! Model trained, registered as '{registered_model_name}', and metrics logged.")

def run_pipeline(raw_data_path, model_output_path, registered_model_name):
    df = spark.read.option("header", "true").csv(raw_data_path)
    df = feature_engineering(df)
    df = replace_missing(df)
    df = select_features(df)
    train_df, test_df = split_data(df)

    train_and_register(train_df, test_df, model_output_path, registered_model_name)

# Example run
if __name__ == "__main__":
    raw_path = "/dbfs/mnt/rawdata/reduced_file.csv"
    model_path = "/mnt/models"
    registered_name = "my_spark_native"
    run_pipeline(raw_path, model_path, registered_name)


In [0]:
# import mlflow.pyfunc
# import pandas as pd

# # Load model by name and version (use latest version by default)
# model_name = "my_spark_native_model"
# model_version = 1  # or omit to get latest version

# model_uri = f"models:/{model_name}/{model_version}"  

# # Load model as a PyFunc model (generic for most flavors)
# model = mlflow.pyfunc.load_model(model_uri)

# # Predict on new data - input as Pandas DataFrame or Spark DataFrame (converted to pandas)
# data = pd.DataFrame({
#     "Issued_year": [2023],
#     "Issued_month": [5],
#     "Issued_day": [3],
#     "Issued_hour": [10],
#     "Community_Name": [1],
#     "Sector": [1],
#     "Side": [1],
#     "Hardship_Index": [50.5],
#     "Per_capita_income": [15957.00],
#     "Percent_unemployed": [5.0],
#     "Percent_without_diploma": [10.0],
#     "Percent_households_below_poverty": [12.0],
#     "Neighborhood": [1],
#     "Ward": [1],
#     "ZIP": [12345],
#     "Police_District": [1],
#     "Plate_Type": [1],
#     # Add other features your model requires
# })

# # If Spark DataFrame, convert to pandas
# if hasattr(data, "toPandas"):
#     data = data.toPandas()

# predictions = model.predict(data)
# print(predictions)

