# Setting Up A Machine Learning Deployment Workflow on the C3 AI Suite


### For the Data Science Course: As stated in the course materials, you do not need to edit anything in this notebook! You should simply execute this notebook with "Run All" from the Cell menu.

This notebook is intended to set up an MLProject. In the next notebook, you will attach your already-trained model to that existing project to show you how to use artifacts you have created and update existing projects.

If you would like to watch a walkthrough video of this notebook, you can do so [here](https://developer.c3.ai/data-science?qd=1) (scroll down to ML Model Deployment Workflow video).


**Note**: This workflow will work for any timeseries ML use-case that you may have, however for this example we will consider a simple binary classification problem for our SmartBulb Predictive Maintenance use-case.

 * [Machine Learning Productionizing Workflow](#1)
    * [Pre-deployment]
        * [1. Define Use Case](#1.1)
        * [2. Create a segment of your population](#1.2)
        * [3. Define how to retrieve and format data as features, data mask, and labels](#1.3)
        * [Using an untrained ML Pipeline]
            * [4. Define the steps in your machine learning pipeline flow](#1.4)
            * [5. Configure the pipeline with the data specifications and tie it to the project](#1.5)
            * [6. Train the model on the combined data from the subjects in the defined segment](#1.6)
            * [7. Use the trained model to predict on a small subset of data](#1.7)
    * [Deployment]
        * [8. Configure how model predictions will be persisted](#1.8)
        * [9. Deploy the trained model to a group of subjects](#1.9)
    * [Post-deployment]
        * [10. Evaluate Model and persist predictions and feature contributions on the subjects with the deployed model](#1.10)


In [None]:
import pandas as pd
import time
import matplotlib.pyplot as plt
import time
from datetime import timedelta


def monitor_job_status(job, sleepFor=10):
    
    while (job.status().status not in ["completed", "failed", "completed_with_errors"]):
        print(job.status().status)
        time.sleep(sleepFor)
        
    return job.status()

### Only use the cell below if the notebook has failed unexpectedly and you cannot troubleshoot the errors.

**Note**: Use the following code block to clean up the objects created in this MLProject so that you can re-start the process. Only use this if the notebook has failed unexpectedly and you are unable to troubleshoot the error easily. 

**Warning** - This will remove all model deployment related artifacts you created under a project. Do not execute this unless you want to start over!

In [None]:
# project.cleanUp(c3.MLCleanSpec(
#     mlModels=True,
#     mlPipelines=True,
#     mlPopulationSegments=True,
#     mlPredictionConfigurations=True,
#     mlPredictionModelConfigurations=True,
#     mlProjectSubjects=True,
#     predictionDataSourceSpecs=True,
#     trainingDataSourceSpecs=True,
#     predictions=True,
#     contributions=True,
#     scores=True
# ))
# project.remove()


### Step 1: Define Use Case <a class="anchor" id="1.1">



| Types  | Function  |
|:---:|:---:|
| `MLProject`  |  Represents the ML use case you are trying to solve with your model. Eg: Predictive Maintenance, Churn Prediction, Demand Forecasting |
| `MLSubject`  |  Represents the object that you want to make predictions on. This subject should have data implicitly *attached* to it that you will use to make ML features. In your data model, there should be some C3 type representing this object. |
| `MLProjectSubject` | Represents the object (`MLSubject`) but only in the context of your particular ML use case  |
|   |   |


If you were setting this up yourself, you would configure two c3typ files, one for the `MLSubject` and one for the `MLProjectSubject`. See [Create a Type to model the ML Subject](https://developer.c3.ai/docs/7.24.0/guide/guide-ml-ds/modelDeployment#section:1.1) for information on how to configure these two files.

For this course, the course package has been configured for you.



In [None]:
project_name = "SmartBulbPredictiveMaintenance"
subject_type_name = "SmartBulb"
project_subject_type_name = "SmartBulbProjectSubject"

In [None]:
project = c3.MLProject(
    id=project_name,
    name=project_name,
    sourceType=subject_type_name,
    projectSubjectType=project_subject_type_name
).upsert()
# Trigger a job populating the TadProjectSubject type that will execute asynchronously
subject_creation_job = project.createProjectSubjects()


Wait for creation of `MLProjectSubject` entities to complete.

In [None]:
# subject_creation_job.status()

monitor_job_status(subject_creation_job)

### Step 2: Create a segment of your population: <a class="anchor" id="1.2">
Key C3 Types:
- `MLPopulationSegment` represents a logical grouping of the subjects (`MLSubject`) defined by a C3 fetch filter (`MLPopulationSegment#subjectFilter`). See [here](https://developer.c3.ai/docs/7.24.0/guide/guide-ml-ds/modelDeployment#section:1.2) for more information.


We create train, validation and test population segments:

In [None]:
training_segment = c3.MLPopulationSegment(
    subjectFilter="!(contains(id, 'SMBLB1') || contains(id, 'SMBLB2'))", # Which subjects to include in the group
    mlProject=project,
    name="TrainingBulbs",
).upsert()


In [None]:
validation_segment = c3.MLPopulationSegment(
    subjectFilter="contains(id, 'SMBLB1')", # Which subjects to include in the group
    mlProject=project,
    name="ValidationBulbs",
).upsert()


In [None]:
test_segment = c3.MLPopulationSegment(
    subjectFilter="contains(id, 'SMBLB2')", # Which subjects to include in the group
    mlProject=project,
    name="TestingBulbs",
).upsert()


In [None]:
test_segment

Assign `MLProjectSubjects` in the `MLProject` to the `MLPopulationSegments` you just created.

In [None]:
assignment_action = project.assignSegments()


### Step 3: Define how to retrieve and format data as features, data mask, and labels. <a class="anchor" id="1.3">
Key C3 Types:
- `MLDataSourceSpec` defines where the data lies and how to extract the features for each subject to create the dataset for model training or prediction. For time series data, `EvalMetricsDatasetMLDataSourceSpec` comes out-of-the-box, but if you're using a different kind of data, you can create your own `MLDataSourceSpec` by following this [tutorial](https://developer.c3.ai/docs/7.24.0/topic/custom-mldatasourcespec).

In [None]:
# List of C3 metric (SimpleMetric or Compound) names
features = [
                "AverageTemperature",
                "AveragePower",
]

# We will use this to discard data AFTER a bulb has failed
mask = 'HasEverFailed'
label = 'WillFailNextMonth'
train_start_date = "2016-01-01" # datetime string for start of training period
train_end_date = "2021-01-01" # datetime string for end of training period
time_series_interval = "DAY" # string specifying interval of data (See Interval type for more info)

source_spec = c3.EvalMetricsDatasetMLDataSourceSpec(
    name="training_smartbulb",
    srcType=subject_type_name,
    features=features,
    maskMetric=mask,
    target=label,
    start=train_start_date,
    end=train_end_date,
    interval=time_series_interval
).upsert()


## Note regarding Steps 4 to 7

**Steps 4 to 7 are required if you wish to train a new machine learning pipeline and then deploy it. If you are continuing work from a tuned ML pipeline from the MLExperimentation workflow, then continue onto Step 8**

### Step 4: Define the machine learning pipeline. <a class="anchor" id="1.4">
Key Types:
- `MLPipeline` defines steps of machine learning algorithms/models in the workflow. See [Working with ML Pipeline](https://developer.c3.ai/docs/7.24.0/guide/guide-ml-ds/machine-learning-pipeline) for more details about `MLPipeline` and [Custom Machine Learning Pipelines](https://developer.c3.ai/docs/7.24.0/guide/guide-ml-ds/custom-machine-learning-pipelines) for how to configure a pipeline that is not available out-of-the-box.


We can make deeply nested machine learning pipelines as shown above, but in this example we will use a simple RandomForestClassifier from sklearn

In [None]:
interpret_technique = c3.TreeInterpreterInterpretTechnique() # Specify MLInterpretTechnique here or None
scoring_metrics = [c3.MLAccuracyMetric(), c3.MLPrecisionMetric(), c3.MLRecallMetric(), c3.MLF1ScoreMetric()]

untrained_pipeline = c3.MLSerialPipeline(
    steps=[
        c3.MLStep(pipe=c3.SklearnPipe( # Specify MLLeafPipe here
            technique=c3.SklearnTechnique( # Add associated MLTechnique with hyperparameters here
                        name="ensemble.RandomForestClassifier",
                        processingFunctionName="predict"),
            
            interpretTechnique=interpret_technique),
            
        name="rfPipeline"
        )
    ],
    scoringMetrics=c3.MLScoringMetric.toScoringMetricMap(scoring_metrics)
).upsert()


### Step 5: Configure the pipeline with the data specifications and tie it to the project. <a class="anchor" id="1.5">
Key Types:
- `MLModel` combines the machine learning pipeline with the training data specifications and the inference data specifications into one entity. See [here](https://developer.c3.ai/docs/7.24.0/guide/guide-ml-ds/modelDeployment#section:1.3) for more information.

In [None]:
ml_model = c3.MLModel.createFromPipeline(
    pipeline=untrained_pipeline.get(), 
    trainingDataSourceSpec=source_spec.get(),
    spec=c3.MLModelCreateSpec(
        predictionDataSourceSpec=source_spec.get(),
        mlProject=project
    )
).upsert(spec=c3.UpsertSpec(returnInclude="this"))


### Step 6: Train the model on the combined data from the subjects in the defined segment. <a class="anchor" id="1.6">
- `MLPopulationSegmentTrainingJob` is a model training job that is kicked off when `MLModel#train` is called. It parallelizes the creation of the dataset and then trains the model. See [here](https://developer.c3.ai/docs/7.24.0/type/MLPopulationSegmentTrainingJob) for more information.

In [None]:
training_job = ml_model.train(training_segment.get(),
                              spec=c3.MLTrainingJobOptions(persistData=True))


Wait for model training to finish.

In [None]:
# training_job.status()

monitor_job_status(training_job)

Retrieve the resulting trained model from the training job.

In [None]:
trained_ml_model = c3.MLModel.fetch(
    spec=c3.FetchSpec(filter=f"configuration.id == '{training_job.trainedModelConfiguration.id}'")
).objs[0]

In [None]:
trained_ml_model

### Step 7: Use the trained model to predict on a small subset of data.  <a class="anchor" id="1.7">
Key Types:
- `MLPartialDataSourceSpec` exists to allow users to override certain fields of the `MLDataSourceSpec` during inference. In the case of time series data, you might want to override the `start` and `end` dates whenever you make a new prediction because you received new data. See [here](https://developer.c3.ai/docs/7.24.0/topic/custom-mldatasourcespec#section:1.2) for more information.

In [None]:
prediction_subjects = getattr(c3, subject_type_name).fetch({ # Get a sample subject to predict on
    "filter": test_segment.get().subjectFilter,
    "limit": 1
}).objs

partial_data_source_spec = c3.EvalMetricsMLPartialDataSourceSpec(start="2018-05-01",
                                                                 end="2021-01-01") # MLPartialDataSourceSpec subtype for your dataset

sample_predictions = trained_ml_model.predictForSubjects(
    partialDataSpec=partial_data_source_spec, 
    subjects=prediction_subjects
)

pred_df = c3.Dataset.toPandas(sample_predictions)

Retrieve ground truth labels:

In [None]:
y_true_dataset = source_spec.get().withPartialSpec(partial_data_source_spec)\
                                        .getTargetDataForSources([prediction_subjects[0].id])

y_true = c3.Dataset.toPandas(y_true_dataset)                                  

Additional formatting on the prediction and ground truth dataframes:

In [None]:
def format_df(df):
    df['source'] = df.index.str.split('_').str[0]
    df['timestamp'] = pd.to_datetime(df.index.str.split('_').str[1],format="%Y-%m-%dT%H:%M:%S.%f")
    return df

pred_df = format_df(pred_df)
y_true = format_df(y_true)

View prediction series for the sample subject.

In [None]:
fig, ax = plt.subplots(figsize=(16, 6))

ax.plot(pred_df['timestamp'], pred_df['prediction'], color='tab:orange', label='predictions')
ax.plot(y_true['timestamp'], y_true[label], color='tab:blue', label='ground truth')
ax.set_xlabel('Time')
ax.set_ylabel('Status')
ax.set_title(f'Comparing Predictions to Ground Truth for {prediction_subjects[0].id}')
ax.legend(loc='upper left');

### Step 8: Configure how model predictions will be persisted <a class="anchor" id="1.8">
Key C3 Types:
- `MLPredictionPersister`
- `MLModelTimedPredictionSeries`

We will use the default timeseries prediction persister here

In [None]:
trained_ml_model= trained_ml_model.get() # or c3.MLModel.get('model_id') if using a model trained earlier
trained_ml_model.predictionPersisters = None # Specify MLPredictionPersisters or None to use default
trained_ml_model.merge(spec=c3.MergeSpec(mergeInclude="predictionPersisters"))


### Step 9: Deploy the trained model to a group of subjects. <a class="anchor" id="1.9">
Deploying the model to a group of subjects means that the model is used to make and persist predictions on each subject. This group of subjects (`MLPopulationSegment`) that the model is deployed on is often the same group that the model was trained on, but can be applied to other subjects as well (e.g. subjects that do not have enough data to train on).

In [None]:
test_segment.deployModels(
    [training_job.trainedModelConfiguration.get()], 
    statusLabel="LIVE" # "LIVE", "CHALLENGER", OR "CANDIDATE"
)


# test_segment.deployModels(
#     [trained_ml_model.configuration], 
#     statusLabel="CHALLENGER" # "LIVE", "CHALLENGER", OR "CANDIDATE"
# )


In [None]:
update_job = test_segment.updateModels()


Wait for model to be deployed.

In [None]:
# update_job.status()

monitor_job_status(update_job)

### Step 10:  Evaluate Model and persist predictions and feature contributions on the subjects with the deployed model. <a class="anchor" id="1.10">

We will now submit jobs to score, predict and interpret results from our machine learning model. The results are persisted automatically and can be efficiently queried from other applications.

In [None]:
partial_data_source_spec = c3.EvalMetricsMLPartialDataSourceSpec(start="2016-01-01",
                                                                 end="2021-01-01") # MLPartialDataSourceSpec subtype for your dataset


In [None]:
score_job = test_segment.score(
    partialDataSpec=partial_data_source_spec
)

prediction_job = test_segment.predict(
    partialDataSpec=partial_data_source_spec
)

interpret_job = test_segment.interpret(
    partialDataSpec=partial_data_source_spec
)


In [None]:
# score_job.status()

monitor_job_status(score_job)

Wait for prediction and feature contribution jobs to complete.

In [None]:
# prediction_job.status()

monitor_job_status(prediction_job)

In [None]:
# interpret_job.status()

monitor_job_status(interpret_job)

View the results of the prediction job for a single subject.

In [None]:
predictions_emr = c3.MLModel.evalMetrics(spec=c3.EvalMetricsSpec(
    ids=[trained_ml_model.id], expressions=["MLProjectPrediction"], 
    start="2016-01-01", end="2021-01-01",
    bindings=[{"subjectId": "SMBLB2"}],
    interval="HOUR",
    resultKey=c3.Lambda.fromPython("lambda expression, bindings: expression + ('_' + bindings.get('featName', '') if expression == 'MLProjectContribution' else '')")
))

In [None]:
c3.EvalMetricsResult.toPandas(predictions_emr, multiIndexed=True).droplevel(0).plot(figsize=(16, 4))

View the results of the interpret job for a single subject:

In [None]:
feature_contributions_emr = c3.MLModel.evalMetrics(spec=c3.EvalMetricsSpec(
    ids=[trained_ml_model.id], expressions=["MLProjectContribution"], 
    start="2016-01-01", end="2021-01-01",
    bindings=[{"featName": feat, "subjectId": "SMBLB2"} for feat in features],
    interval="DAY",
    resultKey=c3.Lambda.fromPython("lambda expression, bindings: expression + ('_' + bindings.get('featName', '') if expression == 'MLProjectContribution' else '')")
))

In [None]:
c3.EvalMetricsResult.toPandas(feature_contributions_emr, multiIndexed=True).droplevel(0).plot(figsize=(16, 4))


View Scores resulting from our scoring job:

In [None]:
start = trained_ml_model.get("scores.data.this").scores[0].data[0].start
scores_emr = c3.MLModel.evalMetrics(spec=c3.EvalMetricsSpec(
                ids=[trained_ml_model.id], expressions=["Score"], 
                start=start - timedelta(hours=3), end=start + timedelta(hours=12),
                bindings=[{'scoringMetricName': 'MLAccuracyMetric'},
                          {'scoringMetricName': 'MLF1ScoreMetric'},
                          {'scoringMetricName': 'MLPrecisionMetric'},
                          {'scoringMetricName': 'MLRecallMetric'}],
                interval="DAY",
                resultKey=c3.Lambda.fromPython("lambda expression, bindings: expression + '_' + bindings['scoringMetricName']")
))


In [None]:
c3.EvalMetricsResult.toPandas(scores_emr, multiIndexed=True).droplevel(0).plot(figsize=(12, 4), marker='x', grid=True, subplots=True)

**Congratulations! You have just deployed a ```LIVE``` machine learning model in production using best practices that allow you to scale seamlessly. Now your applications can begin querying your model to generate predictions, feature contributions and scores that are automatically persisted.** 

### View the ids of the created MLProject and of the trained MLModel from the project (the "LIVE" model) in the cells below.

In [None]:
# project

In [None]:
project.id

In [None]:
trained_ml_model.id