# Working with CML API v2

## Establish Python Client

In [3]:
# Install cmlapi package
try:
    import cmlapi
except ModuleNotFoundError:
    import os

    cluster = os.getenv("CDSW_API_URL")[:-1] + "2"
    !pip3 install {cluster}/python.tar.gz
    import cmlapi

from cmlapi.utils import Cursor
import string
import random
import json

In [4]:
import sys
import os
import pickle

%load_ext lab_black

In [5]:
client = cmlapi.default_client()

## Using Model Metrics Feature

In [6]:
import cdsw
import time

import sklearn
import numpy as np
import pandas as pd

### Get Model Deployment Details via CML APIv2

Must first get Model details > then can use that to obtain Build Details > then can use that to obtain Model Deployment Details.

More details on what each of these abstractions are can be found in the [Cloudera Docs](https://docs.cloudera.com/machine-learning/cloud/models/topics/ml-model-concepts-and-terminology.html).

#### Get Model Details

In [90]:
project_id = os.environ["CDSW_PROJECT_ID"]

models = client.list_models(project_id=project_id, async_req=True).get().to_dict()

In [91]:
model_info = models["models"][-1]  # most recent model

model_id = model_info["id"]
model_crn = model_info["crn"]
model_access_key = model_info["access_key"]

In [92]:
model_info

{'id': '069b6410-4142-49fb-b0e6-9b281c2e5e5a',
 'name': 'Price Regressor8',
 'description': 'adgasdf',
 'creator': {'username': 'areed',
  'name': 'Andrew Reed',
  'email': 'areed@cloudera.com'},
 'access_key': 'mvmfmryln256qsbfu3u1vmlfltocq9io',
 'deletion_status': '',
 'created_at': datetime.datetime(2021, 11, 23, 15, 0, 10, 343379, tzinfo=tzlocal()),
 'updated_at': datetime.datetime(2021, 11, 23, 15, 0, 10, 343379, tzinfo=tzlocal()),
 'crn': 'crn:cdp:ml:us-west-1:12a0079b-1591-4ca0-b721-a446bda74e67:workspace:1e08299d-97ac-4d5e-8c21-d77745ce0a1c/069b6410-4142-49fb-b0e6-9b281c2e5e5a',
 'auth_enabled': False}

#### Get Build Details

In [93]:
builds = (
    client.list_model_builds(project_id=project_id, model_id=model_id, async_req=True)
    .get()
    .to_dict()
)

build_info = builds["model_builds"][-1]  # most recent build
build_id = build_info["id"]

In [94]:
build_info

{'id': '099a0f74-14a5-4e46-9802-d20674e3e71e',
 'model_id': '069b6410-4142-49fb-b0e6-9b281c2e5e5a',
 'creator': {'username': 'areed',
  'name': 'Andrew Reed',
  'email': 'areed@cloudera.com'},
 'comment': 'Initial revision.',
 'file_path': 'predict.py',
 'function_name': 'predict',
 'engine_image': '',
 'kernel': 'Python 3.8',
 'created_at': datetime.datetime(2021, 11, 23, 15, 0, 10, 348131, tzinfo=tzlocal()),
 'updated_at': datetime.datetime(2021, 11, 23, 15, 0, 55, 953662, tzinfo=tzlocal()),
 'status': 'built',
 'deletion_status': '',
 'crn': 'crn:cdp:ml:us-west-1:12a0079b-1591-4ca0-b721-a446bda74e67:workspace:1e08299d-97ac-4d5e-8c21-d77745ce0a1c/099a0f74-14a5-4e46-9802-d20674e3e71e',
 'built_at': datetime.datetime(1, 1, 1, 0, 0, tzinfo=tzlocal()),
 'runtime_identifier': 'docker.repository.cloudera.com/cdsw/ml-runtime-workbench-python3.8-standard:2021.09.1-b5'}

In [95]:
build_id

'099a0f74-14a5-4e46-9802-d20674e3e71e'

#### Get Deployment Details

In [96]:
deployments = (
    client.list_model_deployments(
        project_id=project_id, model_id=model_id, build_id=build_id, async_req=True
    )
    .get()
    .to_dict()
)

deployment_info = deployments["model_deployments"][-1]  # most recent deployment
model_deployment_crn = deployment_info["crn"]

In [97]:
model_deployment_crn

'crn:cdp:ml:us-west-1:12a0079b-1591-4ca0-b721-a446bda74e67:workspace:1e08299d-97ac-4d5e-8c21-d77745ce0a1c/864e0f70-6089-49ef-95a4-fcd8ef117aa6'

In [98]:
def get_latest_deployment_details(client, model_name):
    """
    Given a APIv2 client object and Model Name, use APIv2 to retrieve details about the latest/current deployment.

    This function only works for models deployed within the current project.
    """

    project_id = os.environ["CDSW_PROJECT_ID"]

    # gather model details
    models = client.list_models(project_id=project_id, async_req=True).get().to_dict()
    model_info = [model for model in models["models"] if model["name"] == model_name][0]

    model_id = model_info["id"]
    model_crn = model_info["crn"]
    model_access_key = model_info["access_key"]

    # gather latest build details
    builds = (
        client.list_model_builds(
            project_id=project_id, model_id=model_id, async_req=True
        )
        .get()
        .to_dict()
    )
    build_info = builds["model_builds"][-1]  # most recent build

    build_id = build_info["id"]

    # gather latest deployment details
    deployments = (
        client.list_model_deployments(
            project_id=project_id, model_id=model_id, build_id=build_id, async_req=True
        )
        .get()
        .to_dict()
    )
    deployment_info = deployments["model_deployments"][-1]  # most recent deployment

    model_deployment_crn = deployment_info["crn"]

    return {
        "model_name": model_name,
        "model_id": model_id,
        "model_crn": model_crn,
        "model_access_key": model_access_key,
        "latest_build_id": build_id,
        "latest_deployment_crn": model_deployment_crn,
    }

In [99]:
latest_deployment_details = get_latest_deployment_details(
    client=client, model_name="Price Regressor8"
)

In [100]:
latest_deployment_details

{'model_name': 'Price Regressor8',
 'model_id': '069b6410-4142-49fb-b0e6-9b281c2e5e5a',
 'model_crn': 'crn:cdp:ml:us-west-1:12a0079b-1591-4ca0-b721-a446bda74e67:workspace:1e08299d-97ac-4d5e-8c21-d77745ce0a1c/069b6410-4142-49fb-b0e6-9b281c2e5e5a',
 'model_access_key': 'mvmfmryln256qsbfu3u1vmlfltocq9io',
 'latest_build_id': '099a0f74-14a5-4e46-9802-d20674e3e71e',
 'latest_deployment_crn': 'crn:cdp:ml:us-west-1:12a0079b-1591-4ca0-b721-a446bda74e67:workspace:1e08299d-97ac-4d5e-8c21-d77745ce0a1c/864e0f70-6089-49ef-95a4-fcd8ef117aa6'}

### Load up prod_df to make inference and log metrics

The model metrics feature is not built to handle batch inference - while we can pass batches of data back and forth, the `cdsw.track_metric()` method is set make one database write (aka one UUID) per request - this means it cannot store large arrays of predictions in single write. 

Therefore, we will send one request per record via a loop. This will give us a UUID and separate database entry for each prediction.

In [101]:
train_df = pd.read_pickle("../data/working/train_df.pkl")

In [102]:
def cast_date_as_str_for_json(df):
    """Given a dataframe, return the same dataframe with non-numeric columns cast as string"""

    for column, dt in zip(df.columns, df.dtypes):
        if dt.type not in [np.int64, np.float64]:
            df.loc[:, column] = df[column].astype(str)
    return df

In [103]:
# data must be JSON serializable, therefore convert df to list of dict records
# also first change any TimeStamp dtype columns to string

# data_input = {"df": cast_date_as_str_for_json(train_df).to_dict(orient="records")}

# response = cdsw.call_model(
#     model_access_key=latest_deployment_details["model_access_key"], ipt=data_input
# )

## Test online inference

In [104]:
input_records = cast_date_as_str_for_json(train_df).to_dict(orient="records")

In [108]:
import cdsw
import concurrent
import threading


class ThreadedModelRequest:
    """
    Utilize multi-threading to achieve concurrency and speed up I/O bottleneck associated
    with making a large number of synchronous API calls to the model endpoint.

    Note - this function can also be implemented with cdsw.call_model()

    """

    def __init__(self, deployment_details, n_threads=2):
        self.n_threads = n_threads
        self.deployment_details = deployment_details
        self.model_service_url = cdsw._get_model_call_endpoint()
        self.thread_local = threading.local()

    def get_session(self):
        if not hasattr(self.thread_local, "session"):
            self.thread_local.session = requests.Session()
        return self.thread_local.session

    def call_model(self, record):

        headers = {
            "Content-Type": "application/json",
        }
        data = {
            "accessKey": self.deployment_details["model_access_key"],
            "request": {"record": record},
        }

        session = self.get_session()
        response = session.post(
            url=self.model_service_url,
            headers=headers,
            data=json.dumps(data),
        ).json()

        return record["id"], response["response"]["uuid"]

    def call_model_cdsw(self, record):
        """
        Not Implemented - currently performs 42% slower than call_model.
        Threading cant be properly implemented
        """

        response = cdsw.call_model(
            model_access_key=self.deployment_details["model_access_key"],
            ipt={"record": record},
        )

        return record["id"], response["response"]["uuid"]

    def threaded_call(self, records):

        start_timestamp_ms = int(round(time.time() * 1000))

        results = []
        with concurrent.futures.ThreadPoolExecutor(
            max_workers=self.n_threads
        ) as executor:
            completed = executor.map(self.call_model, records)

        results.extend(completed)

        end_timestamp_ms = int(round(time.time() * 1000))

        return {
            "start_timestamp_ms": start_timestamp_ms,
            "end_timestamp_ms": end_timestamp_ms,
            "id_uuid_mapping": dict(results),
        }

In [109]:
test_records = input_records[:2000]

In [111]:
%%time
tmr = ThreadedModelRequest(deployment_details=latest_deployment_details)
test = tmr.threaded_call(test_records)

CPU times: user 7.12 s, sys: 187 ms, total: 7.31 s
Wall time: 1min 6s


In [118]:
test["start_timestamp_ms"]

1637679735775

## Query Metric Store

In [113]:
metrics = cdsw.read_metrics(
    model_deployment_crn=latest_deployment_details["latest_deployment_crn"]
)

In [116]:
metrics["metrics"][0]

{'modelDeploymentCrn': 'crn:cdp:ml:us-west-1:12a0079b-1591-4ca0-b721-a446bda74e67:workspace:1e08299d-97ac-4d5e-8c21-d77745ce0a1c/864e0f70-6089-49ef-95a4-fcd8ef117aa6',
 'modelBuildCrn': 'crn:cdp:ml:us-west-1:12a0079b-1591-4ca0-b721-a446bda74e67:workspace:1e08299d-97ac-4d5e-8c21-d77745ce0a1c/099a0f74-14a5-4e46-9802-d20674e3e71e',
 'modelCrn': 'crn:cdp:ml:us-west-1:12a0079b-1591-4ca0-b721-a446bda74e67:workspace:1e08299d-97ac-4d5e-8c21-d77745ce0a1c/069b6410-4142-49fb-b0e6-9b281c2e5e5a',
 'startTimeStampMs': 1637679773605,
 'endTimeStampMs': 1637679773612,
 'predictionUuid': '6b3a7aee-7272-47c7-a8d0-47b036a3d79b',
 'metrics': {'input_features': {'view': 3,
   'zipcode': 98034,
   'bedrooms': 3,
   'sqft_lot': 13095,
   'bathrooms': 4,
   'condition': 3,
   'waterfront': 0,
   'sqft_living': 3880},
  'predicted_result': 1090097.0154739222}}

In [194]:
??cdsw

[0;31mType:[0m        module
[0;31mString form:[0m <module 'cdsw' from '/usr/local/lib/python3.8/site-packages/cdsw.py'>
[0;31mFile:[0m        /usr/local/lib/python3.8/site-packages/cdsw.py
[0;31mSource:[0m     
[0;34m"""[0m
[0;34mCDSW[0m
[0;34m====[0m
[0;34m[0m
[0;34mUtilities for Python on Cloudera Data Science Workbench.[0m
[0;34m"""[0m[0;34m[0m
[0;34m[0m[0;32mimport[0m [0mos[0m[0;34m[0m
[0;34m[0m[0;32mimport[0m [0mshutil[0m[0;34m[0m
[0;34m[0m[0;32mimport[0m [0msys[0m[0;34m[0m
[0;34m[0m[0;32mimport[0m [0mtime[0m[0;34m[0m
[0;34m[0m[0;32mimport[0m [0mtraceback[0m[0;34m[0m
[0;34m[0m[0;32mimport[0m [0mre[0m[0;34m[0m
[0;34m[0m[0;32mimport[0m [0murllib[0m[0;34m[0m
[0;34m[0m[0;32mimport[0m [0mbase64[0m[0;34m[0m
[0;34m[0m[0;32mimport[0m [0muuid[0m[0;34m[0m
[0;34m[0m[0;32mimport[0m [0msubprocess[0m[0;34m[0m
[0;34m[0m[0;34m[0m
[0;34m[0m[0;32mimport[0m [0mrequests[0m[0;34m[0m
[0;3

#### Test `cdsw.read_metrics()`

In [189]:
metrics = cdsw.read_metrics(
    model_deployment_crn=latest_deployment_details["latest_deployment_crn"]
)

In [158]:
metrics["metrics"][0]

{'modelDeploymentCrn': 'crn:cdp:ml:us-west-1:12a0079b-1591-4ca0-b721-a446bda74e67:workspace:1e08299d-97ac-4d5e-8c21-d77745ce0a1c/57bbbe61-1ff6-4099-b8cc-aa9e64d7a3b6',
 'modelBuildCrn': 'crn:cdp:ml:us-west-1:12a0079b-1591-4ca0-b721-a446bda74e67:workspace:1e08299d-97ac-4d5e-8c21-d77745ce0a1c/ad92b008-0c7e-4958-aa79-2a284b365379',
 'modelCrn': 'crn:cdp:ml:us-west-1:12a0079b-1591-4ca0-b721-a446bda74e67:workspace:1e08299d-97ac-4d5e-8c21-d77745ce0a1c/6154e1d9-cd26-44e0-beb1-d76284fcf815',
 'startTimeStampMs': 1637599752886,
 'endTimeStampMs': 1637599752895,
 'predictionUuid': '4054acb6-32fe-428d-b929-eb1b6186ed01',
 'metrics': {'input_features': {'view': 0,
   'zipcode': 98102,
   'bedrooms': 3,
   'sqft_lot': 1572,
   'bathrooms': 2.25,
   'condition': 3,
   'waterfront': 0,
   'sqft_living': 1750},
  'predicted_result': 624104.3176691445}}

#### Local walkthrough inference testing

In [25]:
with open("../model.pkl", "rb") as f:
    model = pickle.load(f)

In [None]:
inpu

In [103]:
single_record

{'id': 1962200037,
 'price': 626000.0,
 'bedrooms': 3,
 'bathrooms': 2.25,
 'sqft_living': 1750,
 'sqft_lot': 1572,
 'floors': 2.5,
 'waterfront': 0,
 'view': 0,
 'condition': 3,
 'grade': 9,
 'sqft_above': 1470,
 'sqft_basement': 280,
 'yr_built': 2005,
 'yr_renovated': 0,
 'zipcode': 98102,
 'lat': 47.6498,
 'long': -122.321,
 'sqft_living15': 2410,
 'sqft_lot15': 3050,
 'date_sold': '2014-05-02',
 'date_listed': '2014-03-04'}

In [105]:
single_record = input_records[0]

df = pd.DataFrame.from_records([single_record])
result = model.predict(df).item()

In [118]:
get_active_feature_names(model.named_steps["preprocess"])

['bedrooms',
 'bathrooms',
 'sqft_living',
 'sqft_lot',
 'waterfront',
 'zipcode',
 'condition',
 'view']

In [106]:
result

624104.3176691445

In [107]:
def get_active_feature_names(
    column_transformer: sklearn.compose._column_transformer.ColumnTransformer,
):
    """Inspect the transformer steps in a given sklearn.ColumnTransformer to collect and
    return the names of all features that are not dropped as part of the pipeline."""

    active_steps = [
        k for k, v in column_transformer.named_transformers_.items() if v != "drop"
    ]

    return np.concatenate(
        [t.named_transformers_[step].feature_names_in_ for step in active_steps]
    ).tolist()

In [108]:
active_feats = get_active_feature_names(model.named_steps["preprocess"])

In [119]:
df[active_feats]

Unnamed: 0,bedrooms,bathrooms,sqft_living,sqft_lot,waterfront,zipcode,condition,view
0,3,2.25,1750,1572,0,98102,3,0


In [116]:
df[active_feats].to_dict(orient="records")[0]

{'bedrooms': 3,
 'bathrooms': 2.25,
 'sqft_living': 1750,
 'sqft_lot': 1572,
 'waterfront': 0,
 'zipcode': 98102,
 'condition': 3,
 'view': 0}

In [None]:
df.to_dict()

In [161]:
df

Unnamed: 0,id,price,bedrooms,bathrooms,sqft_living,sqft_lot,floors,waterfront,view,condition,...,sqft_basement,yr_built,yr_renovated,zipcode,lat,long,sqft_living15,sqft_lot15,date_sold,date_listed
0,1962200037,626000.0,3,2.25,1750,1572,2.5,0,0,3,...,280,2005,0,98102,47.6498,-122.321,2410,3050,2014-05-02,2014-03-04


In [None]:
['id', 'date_sold', 'date_listed']

### Tesing

In [7]:
with open("../model.pkl", "rb") as f:
    model = pickle.load(f)

In [9]:
ct = model.named_steps["preprocess"]

In [12]:
num_pipeline = ct.named_transformers_["numerical"]

In [18]:
num_pipeline

Pipeline(steps=[('impute', SimpleImputer()), ('standardize', StandardScaler()),
                ('scale', MinMaxScaler())])

In [19]:
ct.get_feature_names()

AttributeError: Transformer numerical (type Pipeline) does not provide get_feature_names.

In [52]:
?? cdsw

[0;31mType:[0m        module
[0;31mString form:[0m <module 'cdsw' from '/usr/local/lib/python3.8/site-packages/cdsw.py'>
[0;31mFile:[0m        /usr/local/lib/python3.8/site-packages/cdsw.py
[0;31mSource:[0m     
[0;34m"""[0m
[0;34mCDSW[0m
[0;34m====[0m
[0;34m[0m
[0;34mUtilities for Python on Cloudera Data Science Workbench.[0m
[0;34m"""[0m[0;34m[0m
[0;34m[0m[0;32mimport[0m [0mos[0m[0;34m[0m
[0;34m[0m[0;32mimport[0m [0mshutil[0m[0;34m[0m
[0;34m[0m[0;32mimport[0m [0msys[0m[0;34m[0m
[0;34m[0m[0;32mimport[0m [0mtime[0m[0;34m[0m
[0;34m[0m[0;32mimport[0m [0mtraceback[0m[0;34m[0m
[0;34m[0m[0;32mimport[0m [0mre[0m[0;34m[0m
[0;34m[0m[0;32mimport[0m [0murllib[0m[0;34m[0m
[0;34m[0m[0;32mimport[0m [0mbase64[0m[0;34m[0m
[0;34m[0m[0;32mimport[0m [0muuid[0m[0;34m[0m
[0;34m[0m[0;32mimport[0m [0msubprocess[0m[0;34m[0m
[0;34m[0m[0;34m[0m
[0;34m[0m[0;32mimport[0m [0mrequests[0m[0;34m[0m
[0;3