In [0]:
#Notebook used from
#Data+AI Summit talk 2021 on Databricks
#by 
#Chengyin Eng
#Niall Turbitt

In [0]:
#All the imports
import mlflow
from mlflow.tracking import MlflowClient
from mlflow.exceptions import RestException
from mlflow.models.signature import ModelSignature
from mlflow.types.schema import Schema, ColSpec
from delta.tables import DeltaTable

from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import OneHotEncoder
from sklearn.compose import make_column_selector as selector
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error, r2_score

from scipy import stats
import matplotlib.pyplot as plt
import seaborn as sns

import tempfile
import os
import numpy as np
import pandas as pd
import pyspark.sql.functions as F



In [0]:
#%pip install protobuf==3.20.*

In [0]:
#return any features that exceed the specified null threshold.
def check_null_proportion(new_pdf, null_proportion_threshold):
  missing_stats = pd.DataFrame(new_pdf.isnull().sum() / len(new_pdf)).transpose()
  null_dict = {}
  null_col_list = missing_stats.columns[(missing_stats >= null_proportion_threshold).iloc[0]]
  for feature in null_col_list:
    null_dict[feature] = missing_stats[feature][0]
  try:
    assert len(null_dict) == 0
  except:
    print("Alert: There are feature(s) that exceed(s) the expected null threshold. Please ensure that the data is ingested correctly")
    print(null_dict)

In [0]:
def check_diff_in_summary_stats(new_stats_pdf, prod_stats_pdf, num_cols, stats_threshold_limit, statistic_list):
  feature_diff_list = []
  for feature in num_cols: 
    print(f"\nCHECKING {feature}.........")
    for statistic in statistic_list: 
      val = prod_stats_pdf[[str(feature)]].loc[str(statistic)][0]
      upper_val_limit = val * (1 + stats_threshold_limit)
      lower_val_limit = val * (1 - stats_threshold_limit)

      new_metric_value = new_stats_pdf[[str(feature)]].loc[str(statistic)][0]

      if new_metric_value < lower_val_limit:
        feature_diff_list.append(str(feature))
        print(f"\tThe {statistic} {feature} in the new data is at least {stats_threshold_limit *100}% lower than the {statistic} in the production data. Decreased from {round(val, 2)} to {round(new_metric_value,2)}.")

      elif new_metric_value > upper_val_limit:
        feature_diff_list.append(str(feature))
        print(f"\tThe {statistic} {feature} in the new data is at least {stats_threshold_limit *100}% higher than the {statistic} in the production data. Increased from {round(val, 2)} to {round(new_metric_value, 2)}.")

      else:
        pass
  
  return np.unique(feature_diff_list)

In [0]:
def check_diff_in_variances(reference_df, new_df, num_cols, p_threshold):
  var_dict = {}
  for feature in num_cols:
    levene_stat, levene_pval = stats.levene(reference_df[str(feature)], new_df[str(feature)], center="median")
    if levene_pval <= p_threshold:
      var_dict[str(feature)] = levene_pval
  try:
    assert len(var_dict) == 0
    print(f"No features have significantly different variances compared to production data at p-value {p_threshold}")
  except:
    print(f"The feature(s) below have significantly different variances compared to production data at p-value {p_threshold}")
    print(var_dict)

In [0]:
#ks test: Kolmogorov–Smirnov_test is used to check distributions of the two samples are the same or not
def check_dist_ks_bonferroni_test(reference_df, new_df, num_cols, p_threshold, ks_alternative="two-sided"):
    ks_dict = {}
    ### Bonferroni correction 
    corrected_alpha = p_threshold / len(num_cols)
    print(f"The Bonferroni-corrected alpha level is {round(corrected_alpha, 4)}. Any features with KS statistic below this alpha level have shifted significantly.")
    for feature in num_cols:
      ks_stat, ks_pval = stats.ks_2samp(reference_df[feature], new_df[feature], alternative=ks_alternative, mode="asymp")
      if ks_pval <= corrected_alpha:
        ks_dict[feature] = ks_pval
    try:
      assert len(ks_dict) == 0
      print(f"No feature distributions has shifted according to the KS test at the Bonferroni-corrected alpha level of {round(corrected_alpha, 4)}. ")
    except:
      print(f"The feature(s) below have significantly different distributions compared to production data at Bonferroni-corrected alpha level of {round(corrected_alpha, 4)}, according to the KS test")
      print("\t", ks_dict)

In [0]:
def check_categorical_diffs(reference_pdf, new_pdf, cat_cols, p_threshold):
  chi_dict = {}
  catdiff_list = []
  
  # Compute modes for all cat cols
  reference_modes_pdf = reference_pdf[cat_cols].mode(axis=0, numeric_only=False, dropna=True)
  new_modes_pdf = new_pdf[cat_cols].mode(axis=0, numeric_only=False, dropna=True)
  
  for feature in cat_cols: 
    prod_array = reference_pdf[feature].value_counts(ascending=True).to_numpy()
    new_array = new_pdf[feature].value_counts(ascending=True).to_numpy()
    try:
      chi_stats, chi_pval = stats.chisquare(new_array, prod_array)
      if chi_pval <= p_threshold:
        chi_dict[feature] = chi_pval
    except ValueError as ve :
      catdiff_list.append(feature)
      
    # Check if the mode has changed
    
    reference_mode = reference_modes_pdf[feature].iloc[0]
    new_mode = new_modes_pdf[feature].iloc[0]
    try:
      assert reference_mode == new_mode
    except:
      print(f"The mode for {feature} has changed from {reference_mode} to {new_mode}.")


In [0]:
def compare_model_perfs(current_staging_run, current_prod_run, min_model_perf_threshold, metric_to_check):
  model_diff_fraction = current_staging_run.data.metrics[str(metric_to_check)] / current_prod_run.data.metrics[str(metric_to_check)]
  model_diff_percent = round((model_diff_fraction - 1)*100, 2)
  print(f"Staging run's {metric_to_check}: {round(current_staging_run.data.metrics[str(metric_to_check)],3)}")
  print(f"Current production run's {metric_to_check}: {round(current_prod_run.data.metrics[str(metric_to_check)],3)}")

  if model_diff_percent >= 0 and (model_diff_fraction - 1 >= min_model_perf_threshold):
    print(f"The current staging run exceeds the model improvement threshold of at least +{min_model_perf_threshold}. You may proceed with transitioning the staging model to production now.")
    
  elif model_diff_percent >= 0 and (model_diff_fraction - 1  < min_model_perf_threshold):
    print(f"CAUTION: The current staging run does not meet the improvement threshold of at least +{min_model_perf_threshold}. Transition the staging model to production with caution.")
  else: 
    print(f"ALERT: The current staging run underperforms by {model_diff_percent}% when compared to the production model. Do not transition the staging model to production.")

In [0]:
def plot_boxplots(unique_feature_diff_array, reference_pdf, new_pdf):
  sns.set_theme(style="whitegrid")
  fig, ax = plt.subplots(len(unique_feature_diff_array), 2, figsize=(15,8))
  fig.suptitle("Distribution Comparisons between Incoming Data and Production Data")
  ax[0, 0].set_title("Production Data")
  ax[0, 1].set_title("Incoming Data")

  for i in range(len(unique_feature_diff_array)):
    p1 = sns.boxplot(ax=ax[i, 0], x=reference_pdf[unique_feature_diff_array[i]])
    p1.set_xlabel(str(unique_feature_diff_array[i]))
    p1.annotate(str(unique_feature_diff_array[i]), xy=(10,0.5))
    p2 = sns.boxplot(ax=ax[i, 1], x=new_pdf[unique_feature_diff_array[i]])
    p2.annotate(str(unique_feature_diff_array[i]), xy=(10,0.5))

In [0]:
def cleanup_registered_model(registry_model_name):
  client = MlflowClient()

  filter_string = f'name="{registry_model_name}"'

  model_versions = client.search_model_versions(filter_string=filter_string)
  
  if len(model_versions) > 0:
    print(f"Deleting following registered model: {registry_model_name}")
    
    # Move any versions of the model to Archived
    for model_version in model_versions:
      try:
        model_version = client.transition_model_version_stage(name=model_version.name,
                                                              version=model_version.version,
                                                              stage="Archived")
      except mlflow.exceptions.RestException:
        pass

    client.delete_registered_model(registry_model_name)
    
  else:
    print("No registered models to delete")  

In [0]:
def get_delta_version(delta_path):
  delta_table = DeltaTable.forPath(spark, delta_path)
  delta_table_history = delta_table.history() 
  delta_version = delta_table_history.first()["version"]
  
  return delta_version

In [0]:
def create_summary_stats_pdf(pdf):
  stats_pdf = pdf.describe(include="all")
  median_vals = pdf.median()
  stats_pdf.loc["median"] = median_vals
  null_count = pdf.isna().sum()
  stats_pdf.loc["null_count"] = null_count

  return stats_pdf

In [0]:
def log_summary_stats_pdf_as_csv(pdf):
  temp = tempfile.NamedTemporaryFile(prefix="summary_stats_", suffix=".csv")
  temp_name = temp.name
  try:
    pdf.to_csv(temp_name)
    mlflow.log_artifact(temp_name, "summary_stats.csv")
  finally:
    temp.close() 

In [0]:
def load_summary_stats_pdf_from_run(run, local_tmp_dir):
  client = MlflowClient()
  if not os.path.exists(local_tmp_dir):
      os.mkdir(local_tmp_dir)
  local_path = client.download_artifacts(run.info.run_id, "summary_stats.csv", local_tmp_dir)
  print(f"Summary stats artifact downloaded in: {local_path}")
  
  # Load the csv into a pandas DataFrame
  summary_stats_path = local_path + "/" + os.listdir(local_path)[0]
  summary_stats_pdf = pd.read_csv(summary_stats_path, index_col="Unnamed: 0")
  
  return summary_stats_pdf 

In [0]:
def load_delta_table_from_run(run):
  delta_path = run.data.params["delta_path"]
  delta_version = run.data.params["delta_version"]
  print(f"Loading Delta table from path: {delta_path}; version: {delta_version}")
  df = spark.read.format("delta").option("versionAsOf", delta_version).load(delta_path)
  
  return df  

In [0]:
def transition_model(model_version, stage):
    client = MlflowClient()
    
    model_version = client.transition_model_version_stage(
        name=model_version.name,
        version=model_version.version,
        stage=stage,
        archive_existing_versions=True
    )
    return model_version 

In [0]:
def fetch_model_version(registry_model_name, stage="Staging"):
    client = MlflowClient()
    filter_string = f'name="{registry_model_name}"'
    registered_model = client.search_registered_models(filter_string=filter_string)[0]

    if len(registered_model.latest_versions) == 1:
        model_version = registered_model.latest_versions[0]

    else:
        model_version = [model_version for model_version in registered_model.latest_versions if model_version.current_stage == stage][0]

    return model_version

In [0]:
def get_run_from_registered_model(registry_model_name, stage="Staging"):
    model_version = fetch_model_version(registry_model_name, stage)
    run_id = model_version.run_id
    run = mlflow.get_run(run_id)

    return run  


In [0]:
def create_sklearn_rf_pipeline(model_params, seed=42):
  # Create pipeline component for numeric Features
  numeric_transformer = Pipeline(steps=[
      ("imputer", SimpleImputer(strategy='median'))])

  # Create pipeline component for categorical Features
  categorical_transformer = Pipeline(steps=[
      ("imputer", SimpleImputer(strategy="most_frequent")),
      ("ohe", OneHotEncoder(handle_unknown="ignore"))])

  # Combine numeric and categorical components into one preprocessor pipeline
  # Use ColumnTransformer to apply the different preprocessing pipelines to different subsets of features
  # Use selector (make_column_selector) to select which subset of features to apply pipeline to
  preprocessor = ColumnTransformer(transformers=[
      ("numeric", numeric_transformer, selector(dtype_exclude="category")),
      ("categorical", categorical_transformer, selector(dtype_include="category"))
  ])

  pipeline = Pipeline(steps=[("preprocessor", preprocessor),
                             ("rf", RandomForestRegressor(random_state=seed, 
                                                          **model_params))
                            ])
  
  return pipeline


In [0]:
def train_sklearn_rf_model(run_name, delta_path, model_params, misc_params, seed=42):
  with mlflow.start_run(run_name=run_name) as run:

    # Enable MLflow autologging
    mlflow.autolog(log_input_examples=True, silent=True)
    
    # Load Delta table from delta_path
    df = spark.read.format("delta").load(delta_path)   
    # Log Delta path and version
    mlflow.log_param("delta_path", delta_path)
    delta_version = get_delta_version(delta_path)
    mlflow.log_param("delta_version", delta_version)
    
    # Track misc parameters used in pipeline creation (preprocessing) as json artifact
    mlflow.log_dict(misc_params, "preprocessing_params.json")
    target_col = misc_params["target_col"]  
    num_cols = misc_params["num_cols"]    
    cat_cols = misc_params["cat_cols"]    

    # Convert Spark DataFrame to pandas, as we will be training an sklearn model
    pdf = df.toPandas() 
    # Convert all cat cols to category dtype
    for c in cat_cols:
        pdf[c] = pdf[c].astype("category")    
    
    # Create summary statistics pandas DataFrame and log as a csv to MLflow
    summary_stats_pdf = create_summary_stats_pdf(pdf)
    log_summary_stats_pdf_as_csv(summary_stats_pdf)  
    
    # Track number of total instances and "month"
    num_instances = pdf.shape[0]
    mlflow.log_param("num_instances", num_instances)  # Log number of instances
    mlflow.log_param("month", misc_params["month"])   # Log month number
    
    # Split data
    X = pdf.drop([misc_params["target_col"], "month"], axis=1)
    y = pdf[misc_params["target_col"]]
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=seed)

    # Track train/test data info as params
    num_training = X_train.shape[0]
    mlflow.log_param("num_training_instances", num_training)
    num_test = X_test.shape[0]
    mlflow.log_param("num_test_instances", num_test)

    # Fit sklearn pipeline with RandomForestRegressor model
    rf_pipeline = create_sklearn_rf_pipeline(model_params)
    rf_pipeline.fit(X_train, y_train)
    # Specify data schema which the model will use as its ModelSignature
    input_schema = Schema([
      ColSpec("integer", "accommodates"),
      ColSpec("integer", "bedrooms"),
      ColSpec("integer", "beds"),
      ColSpec("integer", "number_of_reviews"),
      ColSpec("integer", "number_of_reviews_ltm"),
      ColSpec("integer", "minimum_nights"),
      ColSpec("integer", "review_scores_rating"),
      ColSpec("string", "host_is_superhost"),
      ColSpec("string", "neighbourhood_cleansed"),
      ColSpec("string", "property_type"),
      ColSpec("string", "room_type")
    ])
    output_schema = Schema([ColSpec("double")])
    signature = ModelSignature(input_schema, output_schema)
    mlflow.sklearn.log_model(rf_pipeline, "model", signature=signature)

    # Evaluate the model
    predictions = rf_pipeline.predict(X_test)
    test_mse = mean_squared_error(y_test, predictions) 
    r2 = r2_score(y_test, predictions)
    mlflow.log_metrics({"test_mse": test_mse,
                       "test_r2": r2})

  return run

In [0]:
stats_threshold_limit =  0.05
p_threshold = 0.05
min_model_r2_threshold = 0.1

In [0]:
# username
username = dbutils.notebook.entry_point.getDbutils().notebook().getContext().tags().apply("user")
print(username)

demo_yt_mlops@outlook.com


In [0]:
workspace_project_home = f"/Users/{username}/DT-2023"


In [0]:
experiment_path = workspace_project_home + "/airbnb_hawaii"
mlflow.set_experiment(experiment_path)

2023/02/05 09:23:26 INFO mlflow.tracking.fluent: Experiment with name '/Users/demo_yt_mlops@outlook.com/DT-2023/airbnb_hawaii' does not exist. Creating a new experiment.
Out[27]: <Experiment: artifact_location='dbfs:/databricks/mlflow-tracking/4119769509188008', creation_time=1675589006729, experiment_id='4119769509188008', last_update_time=1675589006729, lifecycle_stage='active', name='/Users/demo_yt_mlops@outlook.com/DT-2023/airbnb_hawaii', tags={'mlflow.experiment.sourceName': '/Users/demo_yt_mlops@outlook.com/DT-2023/airbnb_hawaii',
 'mlflow.experimentType': 'MLFLOW_EXPERIMENT',
 'mlflow.ownerEmail': 'demo_yt_mlops@outlook.com',
 'mlflow.ownerId': '5792412687435845'}>

In [0]:
experiment_id = mlflow.get_experiment_by_name(experiment_path).experiment_id


In [0]:
experiment_id

Out[29]: '4119769509188008'

In [0]:
#  MLflow Registry
registry_model_name = "airbnb_hawaii"

In [0]:
# Set variables to use for reading/writing tmp artifacts and datasets
project_home_dir = f"/Users/{username}/ram_drift/"
project_local_tmp_dir = "/dbfs" + project_home_dir + "tmp/"
data_project_dir = f"{project_home_dir}data/"

##################################################
upload_data_loc = "/FileStore/tables/temp_dir/airbnb-hawaii.delta"

In [0]:
raw_delta_path = "/FileStore/tables/temp_dir/airbnb-hawaii.delta"

In [0]:
# Paths to write/read data from
month_0_delta_path = data_project_dir + "month_0_delta"
# Two separate Data paths - one with error data and one without the error
month_1_error_delta_path = data_project_dir + "month_1_error_delta"
month_1_fixed_delta_path = data_project_dir + "month_1_fixed_delta"
month_2_delta_path = data_project_dir + "month_2_delta"

##################################################

# Define the path for table
gold_delta_path = data_project_dir + "airbnb_hawaii_delta"

# Ensure we start with no existing Delta table 
dbutils.fs.rm(gold_delta_path, True)

Out[32]: True

In [0]:
#For the very first run prepare the df and their columns

airbnb_df = spark.read.format("delta").load(upload_data_loc)

target_col = "price"
num_cols = ["accommodates",
            "bedrooms",
            "beds",
            "minimum_nights",
            "number_of_reviews",
            "number_of_reviews_ltm",
            "review_scores_rating"]
cat_cols = ["host_is_superhost",
            "neighbourhood_cleansed",
            "property_type",
            "room_type"]

cols_to_keep = [target_col] + num_cols + cat_cols
airbnb_df = airbnb_df.select(cols_to_keep)

In [0]:
airbnb_df.show(2)

+-----+------------+--------+----+--------------+-----------------+---------------------+--------------------+-----------------+----------------------+----------------+---------------+
|price|accommodates|bedrooms|beds|minimum_nights|number_of_reviews|number_of_reviews_ltm|review_scores_rating|host_is_superhost|neighbourhood_cleansed|   property_type|      room_type|
+-----+------------+--------+----+--------------+-----------------+---------------------+--------------------+-----------------+----------------------+----------------+---------------+
|150.0|           2|       1|   1|             3|               11|                    2|                  88|                t|          South Kohala|Entire apartment|Entire home/apt|
| 85.0|           2|       1|   1|             5|              168|                    2|                  93|                f|            South Kona|Entire apartment|Entire home/apt|
+-----+------------+--------+----+--------------+-----------------+--------

In [0]:
df_0, df_1, df_2 = airbnb_df.randomSplit(weights=[1.0, 1.0, 1.0], seed=42)

df_0.write.format("delta").mode("overwrite").save(month_0_delta_path)
df_1.write.format("delta").mode("overwrite").save(month_1_fixed_delta_path)


# Create a DataFrame which takes the clean data and introduces simulated errors into the dataset
df_1_err = (df_1
             .withColumn("neighbourhood_cleansed",                                 # Simulate some neighbourhood entires as being cleansed incorrectly  
                         F.when((F.col("neighbourhood_cleansed") == "Primary Urban Center") |  
                                (F.col("neighbourhood_cleansed") == "Kihei-Makena") | 
                                (F.col("neighbourhood_cleansed") == "Lahaina") | 
                                (F.col("neighbourhood_cleansed") == "North Kona"), F.lit(None)).otherwise(F.col("neighbourhood_cleansed")))
            .fillna(0, subset=["review_scores_rating"])                            # Fill missing ratings with 0
            .withColumn("review_scores_rating", F.col("review_scores_rating")/20)  # Scale ratings to be between 0 and 5
            )

df_1_err.write.format("delta").mode("overwrite").save(month_1_error_delta_path)


df_2_err = df_2.withColumn("price", F.col("price") + (2*F.col("price")*F.rand(seed=42)))
df_2_err.write.format("delta").mode("overwrite").save(month_2_delta_path)