In [1]:
import numpy as np
import pandas as pd
from sklearn import preprocessing
import matplotlib.pyplot as plt
import seaborn as sns 
import warnings
warnings.filterwarnings("ignore")

import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql.functions import *

import unittest
import pytest

## Time Travel

In [2]:
spark = SparkSession.builder\
  .appName("Refresh Raw into Icerberg Table") \
  .config("spark.hadoop.fs.s3a.s3guard.ddb.region", "us-west-2")\
  .config("spark.kerberos.access.hadoopFileSystems", "s3a://ps-uat2")\
  .config("spark.jars","/home/cdsw/lib/iceberg-spark-runtime-3.2_2.12-0.13.2.jar") \
  .config("spark.sql.extensions","org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
  .config("spark.sql.catalog.spark_catalog","org.apache.iceberg.spark.SparkSessionCatalog") \
  .config("spark.sql.catalog.spark_catalog.type","hive") \
  .getOrCreate()

#read raw data 
df_raw = spark.sql("SELECT * FROM spark_catalog.default.pump_processed")
df_raw.show()

Setting spark.hadoop.yarn.resourcemanager.principal to dciciani
[Stage 0:>                                                          (0 + 1) / 1]

+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+---------+--------------+
|         sensor_00|         sensor_02|         sensor_04|         sensor_06|         sensor_07|         sensor_08|         sensor_09|         sensor_10|         sensor_11|sensor_51|machine_status|
+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+---------+--------------+
|2.4634259999999997|          52.77777|          627.5463|13.382520000000001|          16.21094|          15.53096|          15.11863|40.336529999999996|          50.56949| 223.3796|             1|
|           2.45162|          51.86632|          630.2084|          13.64294|          16.16753|          15.76968|15.053529999999999|           39.9214|           48.3555| 211.5162|             1|
|         

                                                                                

In [3]:
from datetime import datetime

# current date and time
now = datetime.now()

timestamp = datetime.timestamp(now)
print("timestamp =", timestamp)

timestamp = 1665322047.181102


In [5]:
# query iceberg with a specific time
df_raw = spark.read \
              .option("as-of-timestamp", int(timestamp*1000)) \
              .load("spark_catalog.default.pump_raw")
df.tail(5)

                                                                                

[Row(sensor_00=2.4004630000000002, sensor_02=50.564240000000005, sensor_04=630.902770996094, sensor_06=15.1548, sensor_07=16.70284, sensor_08=15.653929999999999, sensor_09=15.11863, sensor_10=43.21038, sensor_11=54.526019999999995, sensor_51=231.1921, machine_status='1'),
 Row(sensor_00=2.396528, sensor_02=50.52083, sensor_04=625.9259033203131, sensor_06=15.0897, sensor_07=16.70284, sensor_08=15.697339999999999, sensor_09=15.11863, sensor_10=43.12836, sensor_11=55.11779, sensor_51=232.0602, machine_status='1'),
 Row(sensor_00=2.406366, sensor_02=50.5208320617676, sensor_04=635.6481, sensor_06=15.11863, sensor_07=16.56539, sensor_08=15.740739999999999, sensor_09=15.11863, sensor_10=42.357459999999996, sensor_11=55.99321, sensor_51=234.0856, machine_status='1'),
 Row(sensor_00=2.396528, sensor_02=50.5208320617676, sensor_04=639.8148, sensor_06=15.11863, sensor_07=16.6522, sensor_08=15.653929999999999, sensor_09=15.010129999999998, sensor_10=42.62814, sensor_11=56.49641999999999, sensor_5

In [17]:
print(spark.sql("SELECT * FROM spark_catalog.default.pump_processed").count())

220322


In [None]:
# insert more data into pump_processend

In [10]:
spark.sql("INSERT INTO spark_catalog.default.pump_processed VALUES (4219, 31294, 42, 645, 664, 7654, 12, 1321, 3124, 643, 0) ")

DataFrame[]

In [19]:
 print(spark.sql("SELECT * FROM spark_catalog.default.pump_processed").count())

220322


In [20]:
## check snapshots
spark.read.format("iceberg").load("spark_catalog.default.pump_raw.history").show(30, False)

+-----------------------+-------------------+-------------------+-------------------+
|made_current_at        |snapshot_id        |parent_id          |is_current_ancestor|
+-----------------------+-------------------+-------------------+-------------------+
|2022-10-05 15:12:08.85 |2469479201610401380|null               |true               |
|2022-10-09 10:31:06.146|8733053519583731400|2469479201610401380|true               |
|2022-10-09 10:36:16.943|1138532621558925961|8733053519583731400|true               |
|2022-10-09 11:06:00.172|1190870489535501765|1138532621558925961|true               |
|2022-10-09 11:28:39.689|2963303490051769365|1190870489535501765|true               |
|2022-10-09 11:33:42.045|4978643826144187955|2963303490051769365|true               |
|2022-10-09 11:39:47.776|4487245178457655377|4978643826144187955|true               |
|2022-10-09 12:29:02.996|8826355977582537112|4487245178457655377|true               |
|2022-10-09 12:38:04.999|7453359399547647724|882635597

In [8]:
## query a specific snapshot
spark.read\
    .option("snapshot-id", "4884203215810497378")\
    .table("spark_catalog.default.pump_processed").tail(5)


IllegalArgumentException: Cannot find snapshot with ID 4884203215810497378

In [15]:
spark.sql("SELECT * FROM spark_catalog.default.pump_processed.snapshots").show()

+--------------------+------------------+---------+---------+--------------------+--------------------+
|        committed_at|       snapshot_id|parent_id|operation|       manifest_list|             summary|
+--------------------+------------------+---------+---------+--------------------+--------------------+
|2022-10-09 12:39:...|297908169986604918|     null|   append|s3a://ps-uat2/war...|{spark.app.id -> ...|
+--------------------+------------------+---------+---------+--------------------+--------------------+



In [39]:
# get current snapshot
spark.read.format("iceberg").load("spark_catalog.default.pump_processed.history").toPandas().iloc[-1]['snapshot_id']

                                                                                

numpy.int64

In [6]:
spark.read\
    .option("snapshot-id", 4884203215810497378)\
    .table("spark_catalog.default.pump_processed").tail(5)


IllegalArgumentException: Cannot find snapshot with ID 4884203215810497378

## Explainability SHAP

https://medium.com/dataman-in-ai/explain-your-model-with-the-shap-values-bc36aac4de3dù

In [3]:
df = pd.read_csv("../data/pump_processed.csv")
df.drop("Unnamed: 0", axis=1, inplace=True)
df.head()

KeyError: "['Unnamed: 0'] not found in axis"

In [1]:
# read the rf model
import mlflow
import pandas as pd

logged_model = '/home/cdsw/.experiments/n03k-3b0z-nbdp-wz8l/94k7-jivb-s3j1-g7r9/artifacts/model'

loaded_model = mlflow.pyfunc.load_model(logged_model)

# Predict on a Pandas DataFrame.
#loaded_model.predict(pd.DataFrame(data))

NameError: name 'data' is not defined

In [None]:
import pandas as pd
import numpy as np
np.random.seed(0)
import matplotlib.pyplot as plt
df = pd.read_csv('/winequality-red.csv') # Load the data

from sklearn.model_selection import train_test_split
from sklearn import preprocessing

from sklearn.ensemble import RandomForestRegressor
# The target variable is 'quality'.
Y = df['quality']
X =  df[['fixed acidity', 'volatile acidity', 'citric acid', 'residual sugar','chlorides', 'free sulfur dioxide', 'total sulfur dioxide', 'density','pH', 'sulphates', 'alcohol']]

# Split the data into train and test data:
X_train, X_test, Y_train, Y_test = train_test_split(X, Y, test_size = 0.2)
# Build the model with the random forest regression algorithm:
model = RandomForestRegressor(max_depth=6, random_state=0, n_estimators=10)
model.fit(X_train, Y_train)

#### Variable Importance Plot

In [None]:
import shap
shap_values = shap.TreeExplainer(model).shap_values(X_train)
shap.summary_plot(shap_values, X_train, plot_type="bar")

In [None]:
import matplotlib.pyplot as plt
f = plt.figure()
shap.summary_plot(rf_shap_values, X_test)
f.savefig("/summary_plot1.png", bbox_inches='tight', dpi=600)

#### SHAP Dependence Plot — Global Interpretabilityù

In [3]:
shap.dependence_plot(“alcohol”, shap_values, X_train)

SyntaxError: invalid character in identifier (1780575973.py, line 1)

#### Individual SHAP Value Plot — Local Interpretability

In [43]:
class ApiUtility:
    """A utility class for working with CML API_v2
    This class contains methods that wrap API_v2 to achieve specific
    needs that facilitate the simulation.
    Attributes:
        client (cmlapi.api.cml_service_api.CMLServiceApi)
    """

    def __init__(self):
        self.client = cmlapi.default_client()

    def get_latest_deployment_details_allmodels(self):
        """
        Given a APIv2 client object and Model Name, use APIv2 to retrieve details about the latest/current deployment.
        This function only works for models deployed within the current project.
        """

        project_id = os.environ["CDSW_PROJECT_ID"]

        # gather model details
        models = (
            self.client.list_models(project_id=project_id, async_req=True, page_size = 50)
            .get()
            .to_dict()
        )
        model_info = [
            model for model in models["models"]
        ][-1]

        model_id = model_info["id"]
        model_crn = model_info["crn"]
        model_access_key = model_info["access_key"]

        # gather latest build details
        builds = (
            self.client.list_model_builds(
                project_id=project_id, model_id=model_id, async_req=True, page_size = 50
            )
            .get()
            .to_dict()
        )
        build_info = builds["model_builds"][-1]  # most recent build

        build_id = build_info["id"]

        # gather latest deployment details
        deployments = (
            self.client.list_model_deployments(
                project_id=project_id,
                model_id=model_id,
                build_id=build_id,
                async_req=True,
                page_size = 50
            )
            .get()
            .to_dict()
        )
        deployment_info = deployments["model_deployments"][-1]  # most recent deployment

        model_deployment_crn = deployment_info["crn"]

        return {
            "model_id": model_id,
            "model_crn": model_crn,
            "model_access_key": model_access_key,
            "latest_build_id": build_id,
            "latest_deployment_crn": model_deployment_crn,
        }


In [44]:
import cdsw, time, os
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
from sklearn.metrics import classification_report
from cmlbootstrap import CMLBootstrap
import seaborn as sns
import cmlapi
#from src.api import ApiUtility
import sqlite3

# You can access all models with API V2
client = cmlapi.default_client()

project_id = os.environ["CDSW_PROJECT_ID"]

# You can use an APIV2-based utility to access the latest model's metadata. For more, explore the src folder
apiUtil = ApiUtility()

Model_AccessKey = apiUtil.get_latest_deployment_details_allmodels()["model_access_key"]
Deployment_CRN = apiUtil.get_latest_deployment_details_allmodels()["latest_deployment_crn"]
Model_CRN = apiUtil.get_latest_deployment_details_allmodels()["model_crn"]


In [48]:
# Get the various Model Endpoint details
HOST = os.getenv("CDSW_API_URL").split(":")[0] + "://" + os.getenv("CDSW_DOMAIN")
model_endpoint = (
    HOST.split("//")[0] + "//modelservice." + HOST.split("//")[1] + "/model"
)

In [49]:
model_endpoint

'https://modelservice.ml-95af2461-8a3.ps-sandb.a465-9q4k.cloudera.site/model'

In [52]:
# Read in the model metrics dict
model_metrics = cdsw.read_metrics(
    model_crn=Model_CRN, model_deployment_crn=Deployment_CRN
)

In [53]:
model_metrics

{'metrics': [{'modelDeploymentCrn': 'crn:cdp:ml:us-west-1:558bc1d2-8867-4357-8524-311d51259233:workspace:89b11538-6c14-45f1-a1e5-a77898deae34/5dd81621-f2bf-43e3-a745-f56bd111009a',
   'modelBuildCrn': 'crn:cdp:ml:us-west-1:558bc1d2-8867-4357-8524-311d51259233:workspace:89b11538-6c14-45f1-a1e5-a77898deae34/b6f31e95-bb3d-4bc2-92bc-24ec9af37bbf',
   'modelCrn': 'crn:cdp:ml:us-west-1:558bc1d2-8867-4357-8524-311d51259233:workspace:89b11538-6c14-45f1-a1e5-a77898deae34/cbcc3c1e-75b2-4f21-a322-2eab9565a858',
   'startTimeStampMs': 1664398564477,
   'endTimeStampMs': 1664398564554,
   'predictionUuid': 'cacc1242-0c01-4b8b-b9cd-f605545c99da',
   'metrics': {'data': {'sensor_04': '213214',
     'sensor_19': '534',
     'sensor_20': '6342',
     'sensor_21': '234234',
     'sensor_38': '647574',
     'sensor_39': '64684',
     'sensor_40': '6435',
     'sensor_41': '223',
     'sensor_42': '65784'},
    'prediction': '[0.]'}},
  {'modelDeploymentCrn': 'crn:cdp:ml:us-west-1:558bc1d2-8867-4357-8524-

In [56]:
# This is a handy way to unravel the dict into a big pandas dataframe
metrics_df = pd.io.json.json_normalize(model_metrics["metrics"])
metrics_df.tail().T


  metrics_df = pd.io.json.json_normalize(model_metrics["metrics"])


Unnamed: 0,0,1
modelDeploymentCrn,crn:cdp:ml:us-west-1:558bc1d2-8867-4357-8524-3...,crn:cdp:ml:us-west-1:558bc1d2-8867-4357-8524-3...
modelBuildCrn,crn:cdp:ml:us-west-1:558bc1d2-8867-4357-8524-3...,crn:cdp:ml:us-west-1:558bc1d2-8867-4357-8524-3...
modelCrn,crn:cdp:ml:us-west-1:558bc1d2-8867-4357-8524-3...,crn:cdp:ml:us-west-1:558bc1d2-8867-4357-8524-3...
startTimeStampMs,1664398564477,1664399949235
endTimeStampMs,1664398564554,1664399949263
predictionUuid,cacc1242-0c01-4b8b-b9cd-f605545c99da,801012ab-41fa-4a0c-a6e0-5f22b3859c83
metrics.data.sensor_04,213214,23
metrics.data.sensor_19,534,32
metrics.data.sensor_20,6342,3
metrics.data.sensor_21,234234,12


In [57]:
# Write the data to SQL lite for visualization
if not (os.path.exists("model_metrics.db")):
    conn = sqlite3.connect("model_metrics.db")
    metrics_df.to_sql(name="model_metrics", con=conn)

In [61]:
# Do some conversions & calculations on the raw metrics
metrics_df["startTimeStampMs"] = pd.to_datetime(
    metrics_df["startTimeStampMs"], unit="ms"
)
metrics_df["endTimeStampMs"] = pd.to_datetime(metrics_df["endTimeStampMs"], unit="ms")
metrics_df["processing_time"] = (
    metrics_df["endTimeStampMs"] - metrics_df["startTimeStampMs"]
).dt.microseconds * 1000

In [62]:
# Create plots for different tracked metrics
sns.set_style("whitegrid")
sns.despine(left=True, bottom=True)


<Figure size 432x288 with 0 Axes>

In [63]:
# Plot metrics.probability
prob_metrics = metrics_df.dropna(subset=["metrics.accuracy"]).sort_values(
    "startTimeStampMs"
)
sns.lineplot(
    x=range(len(prob_metrics)), y="metrics.accuracy", data=prob_metrics, color="grey"
)

# Plot processing time
time_metrics = metrics_df.dropna(subset=["processing_time"]).sort_values(
    "startTimeStampMs"
)
sns.lineplot(
    x=range(len(prob_metrics)), y="processing_time", data=prob_metrics, color="grey"
)

KeyError: ['metrics.accuracy']

In [64]:

# Plot model accuracy drift over the simulated time period
agg_metrics = metrics_df.dropna(subset=["metrics.accuracy"]).sort_values(
    "startTimeStampMs"
)
sns.barplot(
    x=list(range(1, len(agg_metrics) + 1)),
    y="metrics.accuracy",
    color="grey",
    data=agg_metrics,
)

KeyError: ['metrics.accuracy']

In [1]:
import os
import cmlapi
from cmlapi.utils import Cursor
import string
import random
import json

cluster = os.getenv("CDSW_DOMAIN")
client = cmlapi.default_client()

session_id = "".join([random.choice(string.ascii_lowercase) for _ in range(6)])

# List projects using the default sort and default page size (10)
client.list_projects(page_size = 20)

project_id = os.environ["CDSW_PROJECT_ID"]


### CREATE AN ENDPOINT AND PUSH THE  MODEL ###

# Would be nice to name it with job id rather than session id
modelReq = cmlapi.CreateModelRequest(
    name = "pump-model-RF",
    description = "Pump predictive mainten Model",
    project_id = project_id,
    disable_authentication = True
)

model = client.create_model(modelReq, project_id)

model_build_request = cmlapi.CreateModelBuildRequest(
    project_id = project_id,
    model_id = model.id,
    comment = "Deplying model as REST api model",
    file_path = "model/model_endpoint.py",
    function_name = "predict",
    kernel = "python3",
    runtime_identifier = "docker.repository.cloudera.com/cloudera/cdsw/ml-runtime-jupyterlab-python3.8-standard:2022.04.1-b6"
)

modelBuild = client.create_model_build(
    model_build_request, project_id, model.id
)

model_deployment = cmlapi.CreateModelDeploymentRequest(
        project_id = project_id, 
        model_id = model.id, 
        build_id = modelBuild.id, 
        cpu = 1.00,
        memory = 2.00,
        replicas=4,
    )

model_deployment_response = client.create_model_deployment(
        model_deployment, 
        project_id = project_id, 
        model_id = model.id, 
        build_id = modelBuild.id
    )