# Model Deployment Pipeline

<a id="model-deployment-with-streaming"></a>


Deploy a model with streaming information. The demo covers the use case of 1<sup>st</sup>-day churn.

The importance of 1<sup>st</sup>-day churn prediction:
- In some segments of the gaming industry, the average 1st day churn is as high as 70%.
- Acquiring new customers is 5x&ndash;25x more expensive than retaining existing ones.
- Reducing churn by just 5% can boost profitability by 75%.
- Improving retention has a 2x&ndash;4x greater impact on growth than acquisition.
- The probability of selling to an existing customer is 60%&ndash;70%, but only 5%&ndash;20% for a prospect.
- Churn rate also informs metrics like customer lifetime value (LTV).

This demo is comprised of several steps:

![Model deployment Pipeline Real-time operational Pipeline](assets/model-deployment-pipeline.png)

While this demo covers the use case of 1<sup>st</sup>-day churn, it is easy to replace the data, related features and training model and reuse the same workflow for different business cases.

These steps are covered by the following demo:

- [**1. Data generator**](functions/data-generator.ipynb) — Generates events for the training and serving and Create an enrichment table (lookup values). 
- [**2. Event handler**](functions/event-handler.ipynb) - Receive data from the input. This is a common input stream for all the data. This way, one can easily replace the event source data (in this case we have a data generator) without affecting the rest of this flow. It also store all incoming data to parquet files.
- [**3. Stream to features**](functions/stream-to-features.ipynb) - Enrich the stream using the enrichment table and Update aggregation features using the incoming event handler.
- **4. Optional model training steps -**
 - [**4.1 Get Data Snapshot**](https://github.com/mlrun/functions/tree/master/describe) - Takes a snapshot of the feature table for training.
  - [**4.2 Describe the Dataset**](functions/get-data-snapshot.ipynb) - Runs common analysis on the datasets and produces plots suche as histogram, feature importance, corollation and more.
  - [**4.3 Training**](https://github.com/mlrun/functions/tree/master/sklearn_classifier) - Runing training with multiple classification models.
  - [**4.4 Testing**](https://github.com/mlrun/functions/tree/master/test_classifier) - Testing the best performing model.
- [**5. Serving**](https://github.com/mlrun/functions/tree/master/model_server) - Serve the model and process the data from the enriched stream and aggregation features.
- [**6. Inference logger**](functions/event-handler.ipynb) - We use the same event handler function from above but only its capability to store incoming data to parquet files.

This demo comes with a pre-trained model using the base features, enrichment data and derived features, calculated using the same generated data. You can retrain the model or train a new model by running the  **optional model training steps**. You will need to ensure enough data is collected via the streams to the data storage in order to train a new model.

## About this demo

### Input Data

The data generator ([data-generator.ipynb](functions/-generator.ipynb)) creates the following events: `new_registration`, `new_purchases`, `new_bet` and `new_win` with the following data:

| new_registration |   | new_purchases |   | new_bet    |   | new_win    |
|------------------|---|---------------|---|------------|---|------------|
| user_id          |   | user_id       |   | user_id    |   | user_id    |
| event_type       |   | event_type    |   | event_type |   | event_type |
| event_time       |   | event_time    |   | event_time |   | event_time |
| name             |   | amount        |   | bet_amount |   | win_amount |
| date_of_birth    |   |               |   |            |   |            |
| street_address   |   |               |   |            |   |            |
| city             |   |               |   |            |   |            |
| country          |   |               |   |            |   |            |
| postcode         |   |               |   |            |   |            |
| affiliate_url    |   |               |   |            |   |            |
| campaign         |   |               |   |            |   |            |

Furthermore, `new_registration` includes a `label` column to indicate whether or not the user has churned (1 for churned and 0 for not)

## Enrichment

The same data generator ([data-generator.ipynb](functions/-generator.ipynb)) also creates the enrichment table which contains a lookup of postcode and returns a socioeconomic index (`socioeconomic_idx`).

## Feature calculation

During the feature calculation ([stream-to-features.ipynb](functions/stream-to-features.ipynb)), enriches the events using the enrichment table and calculates sum, mean, count and variance for the 3 amount fields (`amount`, `bet_amount` and `win_amount` for `new_purchases`, `new_bet` and `new_win` respectively). This results with the following list of fields:

- purchase_sum
- purchase_mean
- purchase_count
- purchase_var
- bet_sum
- bet_mean
- bet_count
- bet_var
- win_sum
- win_mean
- win_count
- win_var

## Configure

The configuration below is shared across the notebooks. Change the values in this subsection if you would like different configuration settings.

In [1]:
%pip install python-dotenv

You should consider upgrading via the '/opt/conda/bin/python -m pip install --upgrade pip' command.[0m
Note: you may need to restart the kernel to use updated packages.


This demo requires access to the Iguazio multi-model data layer (V3IO). Set the environment variables in `env.txt`.
> **Note**: When running this demo from the Iguazio Data Science Platform, the varialbes are already configured and therefore you do not need
> to edit these variables.

In [5]:
from dotenv import load_dotenv

load_dotenv('env.txt')

True

### Project

Projects in the platform are used to package multiple functions, workflows, and artifacts. Set here the project base name.

In [2]:
project_base_name = "model-deployment-pipeline"

### Optional Training

In [3]:
run_training = False

### Data

All data in the platform is stored in user-defined data containers. This demo uses the predefined "users" container. For more information, see the platform's [data-containers](https://www.iguazio.com/docs/latest-release/data-layer/containers/) documentation.

In [4]:
container = 'users'

Data path where to store stream data and kv tables:

In [6]:
from os import getenv, path, getcwd

# set parameters and  environment variables
v3io_envs = {'V3IO_API': getenv('V3IO_API'),
             'V3IO_ACCESS_KEY': getenv('V3IO_ACCESS_KEY'),
             'V3IO_FRAMESD': getenv('V3IO_FRAMESD')}
v3io_username = getenv('V3IO_USERNAME')
data_path = path.join(v3io_username, 'examples',project_base_name, 'data')

Set up the different stream information

In [8]:
from urllib.parse import urljoin
from urllib.parse import urlparse
web_api = getenv('V3IO_API')
if not urlparse(web_api).scheme:
    web_api = 'http://' + webapi
web_api_users = urljoin(web_api, container)
stream_configs = {'generated-stream': {
                        'path': path.join(data_path, 'generated-stream'),
                        'shard_count': 8},
                  'incoming-events-stream': {
                        'path': path.join(data_path, 'incoming-events-stream'),
                        'shard_count': 8
                  },
                  'serving-stream': {
                        'path': path.join(data_path, 'serving-stream'),
                        'shard_count': 8
                  },
                  'inference-stream': {
                        'path': path.join(data_path, 'inference-stream'),
                        'shard_count': 8
                  }
                 }

When we stream data, we associate the records with a specific partition key to ensure that similar records are assigned to the same shard. For more information, see the [stream sharding and partitioning description](https://www.iguazio.com/docs/latest-release/data-layer/stream/#stream-sharding-and-partitioning).

In [9]:
partition_attr = "user_id"

Target path to store the raw data and the inference data as parquet files.
The parquet files will be written via file mount, hence we configure the path to start with '/User' which will be mounted to our home dir.

In [10]:
raw_parquet_target_path = path.join(data_path.replace(v3io_username, '/User'),  'events-pq')
inference_parquet_target_path = path.join(data_path.replace(v3io_username, '/User'),  'inference-pq')

Target path to store the enrichment table (a key-value table)

In [11]:
enrichment_table_path = path.join(data_path, 'enrichment-table')

Target path to store the calculated features

In [12]:
feature_table_path = path.join(data_path, 'feature-table')

The list of features

In [13]:
feature_list = ['socioeconomic_idx','purchase_sum','purchase_mean','purchase_count',
                'purchase_var','bet_sum','bet_mean','bet_count',
                'bet_var','win_sum','win_mean','win_count','win_var']

## Create V3IO Client

With the dataplane client you can manipulate data in the platform's multi-model data layer, including:
* Objects
* Key-values (NoSQL)
* Streams
* Containers

Under the hood, the client connects through the platform's web API (https://www.iguazio.com/docs/latest-release/data-layer/reference/web-apis/) and wraps each low level API with an interface. Calls are blocking, but you can use the batching interface to send multiple requests in parallel for greater performance. 

In [14]:
import v3io.dataplane
v3io_client = v3io.dataplane.Client(endpoint=web_api,
                                    access_key=getenv('V3IO_ACCESS_KEY'))

## Manage Streams

#### Delete all streams

Cleanup previous streams

In [15]:
for stream_name, stream_config in stream_configs.items():
    resp = v3io_client.stream.delete(container=container, stream_path=stream_config['path'], 
                                     raise_for_status=v3io.dataplane.RaiseForStatus.never)
    print(f'Delete Stream call for stream {stream_name} returned with status {resp.status_code}, and content: {resp.body.decode("utf-8")}')

Delete Stream call for stream generated-stream returned with status 404, and content: {
	"ErrorCode": -2,
	"ErrorMessage": "No such file or directory"
}
Delete Stream call for stream incoming-events-stream returned with status 404, and content: {
	"ErrorCode": -2,
	"ErrorMessage": "No such file or directory"
}
Delete Stream call for stream serving-stream returned with status 404, and content: {
	"ErrorCode": -2,
	"ErrorMessage": "No such file or directory"
}
Delete Stream call for stream inference-stream returned with status 404, and content: {
	"ErrorCode": -2,
	"ErrorMessage": "No such file or directory"
}


#### Create all streams

In [16]:
for stream_name, stream_config in stream_configs.items():
    print(stream_config['path'])
    resp = v3io_client.stream.create(container=container,
                                     stream_path=stream_config['path'],
                                     shard_count=stream_config['shard_count'],
                                     raise_for_status=v3io.dataplane.RaiseForStatus.never)
    print(f'Create Stream call for stream {stream_name} returned with status {resp.status_code}, and content: {resp.body.decode("utf-8")}')

admin/examples/model-deployment-pipeline/data/generated-stream
Create Stream call for stream generated-stream returned with status 204, and content: 
admin/examples/model-deployment-pipeline/data/incoming-events-stream
Create Stream call for stream incoming-events-stream returned with status 204, and content: 
admin/examples/model-deployment-pipeline/data/serving-stream
Create Stream call for stream serving-stream returned with status 204, and content: 
admin/examples/model-deployment-pipeline/data/inference-stream
Create Stream call for stream inference-stream returned with status 204, and content: 


## Set-up MLRun Project

Projects are created by using the `new_project` MLRun method, which receives the following parameters:

- **`name`** (Required) &mdash; the project name.
- **`context`** &mdash; the path to a local project directory (the project's context directory).
  The project directory contains a project-configuration file (default: **project.yaml**), which defines the project, and additional generated Python code.
  The project file is created when you save your project (using the `save` MLRun project method), as demonstrated in Step 6.
- **`functions`** &mdash; a list of functions objects or links to function code or objects.
- **`init_git`** &mdash; set to `True` to perform Git initialization of the project directory (`context`).
  > **Note:** It's customary to store project code and definitions in a Git repository.

Projects are visible in the MLRun dashboard only after they're saved to the MLRun database, which happens whenever you run code for a project.

The following code creates a project using the `PROJECT_BASE_NAME`, concatenated with your current running username in the platform (**&lt;V3IO_USERNAME&gt;**), and sets the project directory to a **conf** directory in the current demo directory (**/User/demos/model-deployment-with-streaming/conf**).

> **Note:** Platform projects are shared among all users of the parent tenant, to facilitate collaboration. Therefore,
>
> - Synchronize your projects execution with other users on your platform cluster, as needed, or use unique project names to avoid conflicts.
>   You can easily change the default project name for this tutorial by changing the definition of the `PROJECT_BASE_NAME` variable, defined in the beginning of the notebook.
> - Don't include in your project proprietary information that you don't want to expose to other users.
>   Note that while projects are a useful tool, you can easily develop and run code in the platform without using projects.

In [17]:
from mlrun import new_project

project_name = '-'.join(filter(None, [project_base_name, getenv('V3IO_USERNAME', None)]))
project_path = path.abspath('conf')
project = new_project(project_name, project_path, init_git=True)

print(f'Project path: {project_path}\nProject name: {project_name}')

Project path: /home/jovyan/data/demos/model-deployment-pipeline/conf
Project name: model-deployment-pipeline-admin


[MLRun](https://github.com/mlrun/mlrun) is a generic and convenient mechanism for data scientists and software developers to describe and run tasks related to machine learning in various, scalable runtime environments and ML pipelines while automatically tracking executed code, metadata, inputs, and outputs.
MLRun integrates with the Nuclio serverless framework and with the Kubeflow Pipelines framework for running ML pipelines.
The demo uses MLRun to create a project, run Nuclio serverless functions, as well as run the model training.
Before running your code, you need to set some MLRun configurations:

- <a id="gs-mlrun-config-artifcats-path"></a>**Artifacts path** &mdash; the location for storing versioned data artifacts (such as files, objects, data sets, and models) that are produced or consumed by functions, runs, and workflows.
  The path can be defined either as a local directory path or as a URL (of the format `s3://*`, `v3io://*`, etc.).
  You can set the artifacts path either by defining an `MLRUN_ARTIFACT_PATH` environment variable (which applies globally throughout the current environment) or as part of the MLRun configuration.
 
  If the target directory doesn't exist, MLRun creates it.
  You can use the notation `{{run.uid}}` in the path to signify the current run ID.
  For pipelines, you can use the notation `{{workflow.uid}}` to signify the workflow ID.
  This allows you to create a unique artifacts directory for each executed job or workflow.

  After you run an MLRun job, the artifacts directory might contain one or more of the following directories:
 
  - **plots** &mdash; a directory for storing images, figures, and plotlines.
  - **models** &mdash; a directory for storing all trained models.
  - **data** &mdash; a directory for storing any other type of data artifact, such as data sets.

The following code sets the artifacts path to a **artifacts** directory within the tutorial directory (**/User/demos/model-deployment-with-streaming/artifacts**)

In [18]:
from mlrun import mlconf

# Target location for storing pipeline artifacts
project.artifact_path = path.abspath('artifacts')
# MLRun DB path or API service URL
mlconf.dbpath = mlconf.dbpath or 'http://mlrun-api:8080'

print(f'Artifacts path: {project.artifact_path}\nMLRun DB path: {mlconf.dbpath}')

Artifacts path: /home/jovyan/data/demos/model-deployment-pipeline/artifacts
MLRun DB path: http://mlrun-api:8080


## Set project's functions

#### Data Generator

In [19]:
from mlrun import code_to_function, NewTask
import nuclio

data_generator = code_to_function(name='data-generator', handler='main', kind='job', filename='functions/data-generator.ipynb')
project.set_function(data_generator)

dg_params = {'container': container,
             'output_stream_path': stream_configs['generated-stream']['path'],
             'enrichment_table_path': enrichment_table_path,
             'num_users_group1': 280,
             'num_users_group2': 120,
             'events_per_user': 200}

project.func('data-generator').set_envs(v3io_envs)

<mlrun.runtimes.kubejob.KubejobRuntime at 0x7ff804bff190>

In [20]:
#Build the image
project.func('data-generator').deploy()

> 2021-05-14 02:51:41,969 [info] starting remote build, image: .gshaham/func-model-deployment-pipeline-admin-data-generator:latest
E0514 02:51:43.834877       1 aws_credentials.go:77] while getting AWS credentials NoCredentialProviders: no valid providers in chain. Deprecated.
	For verbose messaging see aws.Config.CredentialsChainVerboseErrors
[36mINFO[0m[0004] Retrieving image manifest mlrun/mlrun:0.6.3-rc12 
[36mINFO[0m[0007] Retrieving image manifest mlrun/mlrun:0.6.3-rc12 
[36mINFO[0m[0011] Built cross stage deps: map[]                
[36mINFO[0m[0011] Retrieving image manifest mlrun/mlrun:0.6.3-rc12 
[36mINFO[0m[0014] Retrieving image manifest mlrun/mlrun:0.6.3-rc12 
[36mINFO[0m[0017] Executing 0 build triggers                   
[36mINFO[0m[0017] Unpacking rootfs as cmd RUN python -m pip install faker requires it. 
[36mINFO[0m[0095] RUN python -m pip install faker              
[36mINFO[0m[0095] Taking snapshot of full filesystem...        
[36mINFO[0m[0099] 

True

In [21]:
#Run the job
project.func('data-generator').run(params=dg_params, artifact_path=project.artifact_path)

> 2021-05-14 02:53:48,960 [info] starting run data-generator-main uid=094096ef34f64c41b30a4621538e0347 DB=http://mlrun-api:8080
> 2021-05-14 02:53:49,063 [info] Job is running in the background, pod: data-generator-main-29w7d
> 2021-05-14 03:36:21,684 [info] Created enrichment table with 89999 items
> 2021-05-14 03:37:12,997 [info] Records sent 80400
> 2021-05-14 03:37:13,026 [info] All data streamed successfully.
> 2021-05-14 03:37:13,129 [info] run executed, status=completed
final state: completed


project,uid,iter,start,state,name,labels,inputs,parameters,results,artifacts
model-deployment-pipeline-admin,...538e0347,0,May 14 02:54:02,completed,data-generator-main,v3io_user=adminkind=jobowner=adminhost=data-generator-main-29w7d,,container=usersoutput_stream_path=admin/examples/model-deployment-pipeline/data/generated-streamenrichment_table_path=admin/examples/model-deployment-pipeline/data/enrichment-tablenum_users_group1=280num_users_group2=120events_per_user=200,,


to track results use .show() or .logs() or in CLI: 
!mlrun get run 094096ef34f64c41b30a4621538e0347 --project model-deployment-pipeline-admin , !mlrun logs 094096ef34f64c41b30a4621538e0347 --project model-deployment-pipeline-admin
> 2021-05-14 03:37:17,646 [info] run executed, status=completed


<mlrun.model.RunObject at 0x7ff7f46a81f0>

In [22]:
from mlrun.platforms import mount_pvc
def mount_pvc_default():
    pvc_name, volume_mount_path = getenv("MLRUN_PVC_MOUNT").split(":")
    return mount_pvc(pvc_name, 'pvc', volume_mount_path)

#### Event Handler

In [23]:
event_handler = code_to_function(name='event-handler', handler='handler', kind='nuclio', filename='functions/event-handler.ipynb')
project.set_function(event_handler)

eh_envs = {'PARQUET_SINK_FLAG': 'true',
           'STREAM_SINK_FLAG': 'true',
           'PARQUET_TARGET_PATH' : raw_parquet_target_path,
           'PARQUET_BATCH_SIZE': 8192,
           'TS_KEY': 'event_time',
           'TS_FORMAT': '%Y-%m-%d %H:%M:%S.%f',
           'CONTAINER': container,
           'OUTPUT_STREAM_PATH': stream_configs['incoming-events-stream']['path'],
           'PARTITION_ATTR': partition_attr}

project.func('event-handler').set_envs({**v3io_envs, **eh_envs})
project.func('event-handler').apply(mount_pvc_default())

generated_stream = '/'.join(s.strip('/') for s in [web_api_users, stream_configs['generated-stream']['path']]) + '@eh'
project.func('event-handler').add_trigger('serving_stream',
                                           nuclio.triggers.V3IOStreamTrigger(url=generated_stream,
                                                                             maxWorkers=stream_configs['generated-stream']['shard_count']+2,
                                                                             seekTo='earliest'))

project.func('event-handler').spec.replicas=1

In [24]:
project.func('event-handler').deploy()

> 2021-05-14 04:40:11,440 [info] Starting remote function deploy
2021-05-14 04:40:11  (info) Deploying function
2021-05-14 04:40:11  (info) Building
2021-05-14 04:40:11  (info) Staging files and preparing base images
2021-05-14 04:40:11  (info) Building processor image
2021-05-14 04:41:15  (info) Build complete
2021-05-14 04:41:30  (info) Function deploy complete
> 2021-05-14 04:41:30,432 [info] function deployed, address=192.168.65.4:30512


'http://192.168.65.4:30512'

#### Stream to Features

In [26]:
stream_to_features = code_to_function(name='stream-to-features', handler='handler', kind='nuclio', filename='functions/stream-to-features.ipynb')
project.set_function(stream_to_features)

stf_envs = {'FEATURE_TABLE_PATH': feature_table_path,
            'SERVING_EVENTS': ",".join(['bet','win']),
            'FEATURE_LIST': ",".join(feature_list),
            'CONTAINER': container,
            'OUTPUT_STREAM_PATH': stream_configs['serving-stream']['path'],
            'PARTITION_ATTR': partition_attr,
            'ENRICHMENT_TABLE_PATH': enrichment_table_path,
            'ENRICHMENT_KEY':"postcode"}

project.func('stream-to-features').set_envs({**v3io_envs, **stf_envs})

incoming_events_stream = '/'.join(s.strip('/') for s in [web_api_users, stream_configs['incoming-events-stream']['path']]) + '@stf'
project.func('stream-to-features').add_trigger('serving_stream',
                                               nuclio.triggers.V3IOStreamTrigger(url=incoming_events_stream,
                                                                                 maxWorkers=stream_configs['incoming-events-stream']['shard_count']+2,
                                                                                 seekTo='earliest'))

project.func('stream-to-features').spec.readiness_timeout = 200
project.func('stream-to-features').spec.replicas=1
project.func('stream-to-features').deploy()

> 2021-05-14 05:11:37,427 [info] Starting remote function deploy
2021-05-14 05:11:37  (info) Deploying function
2021-05-14 05:11:37  (info) Building
2021-05-14 05:11:37  (info) Staging files and preparing base images
2021-05-14 05:11:37  (info) Building processor image
2021-05-14 05:12:05  (info) Build complete
2021-05-14 05:12:19  (info) Function deploy complete
> 2021-05-14 05:12:20,268 [info] function deployed, address=192.168.65.4:30780


'http://192.168.65.4:30780'

#### Get Data Snapshot (part of optional model training)

In [28]:
if run_training:
    get_data_snapshot = code_to_function(name='get-data-snapshot', handler='snapshot_data', kind='job', filename='functions/get-data-snapshot.ipynb')
    project.set_function(get_data_snapshot)

    # set parameters and  environment variables
    gds_params = {'container': container, 
                  'table_path': feature_table_path, 
                  'columns': ['label']+feature_list, 
                  'format': 'csv'}

    project.func('get-data-snapshot').set_envs(v3io_envs)
    project.func('get-data-snapshot').apply(mount_pvc_default())
    project.func('get-data-snapshot').deploy()

> 2021-05-14 05:16:24,899 [info] starting remote build, image: .gshaham/func-model-deployment-pipeline-admin-get-data-snapshot:latest
E0514 05:16:27.577780       1 aws_credentials.go:77] while getting AWS credentials NoCredentialProviders: no valid providers in chain. Deprecated.
	For verbose messaging see aws.Config.CredentialsChainVerboseErrors
[36mINFO[0m[0003] Retrieving image manifest mlrun/mlrun:0.6.3-rc12 
[36mINFO[0m[0005] Retrieving image manifest mlrun/mlrun:0.6.3-rc12 
[36mINFO[0m[0008] Built cross stage deps: map[]                
[36mINFO[0m[0008] Retrieving image manifest mlrun/mlrun:0.6.3-rc12 
[36mINFO[0m[0010] Retrieving image manifest mlrun/mlrun:0.6.3-rc12 
[36mINFO[0m[0013] Executing 0 build triggers                   
[36mINFO[0m[0013] Unpacking rootfs as cmd RUN pip install v3io-frames==0.8.* requires it. 
[36mINFO[0m[0086] RUN pip install v3io-frames==0.8.*           
[36mINFO[0m[0086] Taking snapshot of full filesystem...        
[36mINFO[0m[

In [29]:
if run_training:
    snapshot_data_run = project.func('get-data-snapshot').run(params=gds_params, artifact_path=project.artifact_path)

> 2021-05-14 05:18:30,300 [info] starting run get-data-snapshot-snapshot_data uid=97ae61d16e2445d9b0326a28a689ec8a DB=http://mlrun-api:8080
> 2021-05-14 05:18:30,441 [info] Job is running in the background, pod: get-data-snapshot-snapshot-data-wxgxp
> 2021-05-14 05:18:46,349 [info] Saving snapshot data set to /home/jovyan/data/demos/model-deployment-pipeline/artifacts/data ...
> 2021-05-14 05:18:46,446 [info] run executed, status=completed
final state: completed


project,uid,iter,start,state,name,labels,inputs,parameters,results,artifacts
model-deployment-pipeline-admin,...a689ec8a,0,May 14 05:18:44,completed,get-data-snapshot-snapshot_data,v3io_user=adminkind=jobowner=adminhost=get-data-snapshot-snapshot-data-wxgxp,,"container=userstable_path=admin/examples/model-deployment-pipeline/data/feature-tablecolumns=['label', 'socioeconomic_idx', 'purchase_sum', 'purchase_mean', 'purchase_count', 'purchase_var', 'bet_sum', 'bet_mean', 'bet_count', 'bet_var', 'win_sum', 'win_mean', 'win_count', 'win_var']format=csv",,snapshot_dataset


to track results use .show() or .logs() or in CLI: 
!mlrun get run 97ae61d16e2445d9b0326a28a689ec8a --project model-deployment-pipeline-admin , !mlrun logs 97ae61d16e2445d9b0326a28a689ec8a --project model-deployment-pipeline-admin
> 2021-05-14 05:18:49,690 [info] run executed, status=completed


#### Describe the Dataset (part of optional model training)
-------------------
You can review the plots under - artifacts/plots/

In [30]:
if run_training:
    project.set_function('hub://describe', 'describe')

    project.func('describe').apply(mount_pvc_default())
    describe_run = project.func('describe').run(params={'label_column': 'label'},
                                inputs={"table":
                                        snapshot_data_run.outputs['snapshot_dataset']},
                                artifact_path=project.artifact_path)

> 2021-05-14 05:18:51,729 [info] starting run describe-summarize uid=59871720b41048308ed61646cdb06290 DB=http://mlrun-api:8080
> 2021-05-14 05:18:51,799 [info] Job is running in the background, pod: describe-summarize-jf4ql
> 2021-05-14 05:19:39,365 [info] run executed, status=completed
final state: completed


project,uid,iter,start,state,name,labels,inputs,parameters,results,artifacts
model-deployment-pipeline-admin,...cdb06290,0,May 14 05:19:04,completed,describe-summarize,v3io_user=adminkind=jobowner=adminhost=describe-summarize-jf4ql,table,label_column=label,,histogramsviolinimbalanceimbalance-weights-veccorrelation-matrixcorrelation


to track results use .show() or .logs() or in CLI: 
!mlrun get run 59871720b41048308ed61646cdb06290 --project model-deployment-pipeline-admin , !mlrun logs 59871720b41048308ed61646cdb06290 --project model-deployment-pipeline-admin
> 2021-05-14 05:19:41,232 [info] run executed, status=completed


#### Training (part of optional model training)
---------------------
function's source and full docstrings can be found at https://github.com/mlrun/functions/tree/master/sklearn_classifier

In [31]:
if run_training:
    project.set_function('hub://sklearn_classifier', 'train')
    project.func('train').apply(mount_pvc_default())
    
    # Configure the models to train
    models = ["sklearn.ensemble.RandomForestClassifier", 
              "sklearn.linear_model.LogisticRegression",
              "sklearn.ensemble.AdaBoostClassifier"]
    
    # Create a training task
    train_task = NewTask(name="train",
                         params={"sample": -1,
                                 "label_column": "label",
                                 "test_size": 0.10},
                         inputs={"dataset": snapshot_data_run.outputs['snapshot_dataset']})
    
    # Run the training task
    train_run = project.func('train').run(train_task.with_hyper_params({'model_pkg_class': models},
                                                                        selector='max.accuracy'),
                                                                        artifact_path=project.artifact_path)

> 2021-05-14 05:19:41,672 [info] starting run train uid=0874ef3e22994477a2399e49b706bdf3 DB=http://mlrun-api:8080
> 2021-05-14 05:19:41,754 [info] Job is running in the background, pod: train-xw4sv
> 2021-05-14 05:19:49,510 [info] best iteration=2, used criteria max.accuracy
> 2021-05-14 05:19:49,616 [info] run executed, status=completed
lbfgs failed to converge (status=1):
STOP: TOTAL NO. of ITERATIONS REACHED LIMIT.

Increase the number of iterations (max_iter) or scale the data as shown in:
    https://scikit-learn.org/stable/modules/preprocessing.html
Please also refer to the documentation for alternative solver options:
    https://scikit-learn.org/stable/modules/linear_model.html#logistic-regression
final state: completed


project,uid,iter,start,state,name,labels,inputs,parameters,results,artifacts
model-deployment-pipeline-admin,...b706bdf3,0,May 14 05:19:45,completed,train,v3io_user=adminkind=jobowner=admin,dataset,sample=-1label_column=labeltest_size=0.1,best_iteration=2accuracy=0.6666666666666666test-error=0.3333333333333333rocauc=0.7777777777777779brier_score=0.2685584863385404f1-score=0.5714285714285715precision_score=0.5recall_score=0.6666666666666666,test_setprobability-calibrationconfusion-matrixprecision-recall-binaryroc-binarymodeliteration_results


to track results use .show() or .logs() or in CLI: 
!mlrun get run 0874ef3e22994477a2399e49b706bdf3 --project model-deployment-pipeline-admin , !mlrun logs 0874ef3e22994477a2399e49b706bdf3 --project model-deployment-pipeline-admin
> 2021-05-14 05:19:50,926 [info] run executed, status=completed


In [32]:
if run_training:
    # Display the name of the selected model
    print(f'Best model: {models[train_run.outputs["best_iteration"]-1]}')

    # Display the accuracy for the optimal run iteration
    print(f'Accuracy: {train_run.outputs["accuracy"]}')


Best model: sklearn.linear_model.LogisticRegression
Accuracy: 0.6666666666666666


#### Testing (part of optional model training)

In [33]:
if run_training:
    project.set_function('hub://test_classifier', 'test')
    project.func('test').apply(mount_pvc_default())
    
    test_task = NewTask(name="test",
                        params={"label_column": "label",
                                "plots_dest": path.join("plots", "test")},
                        inputs={"models_path": train_run.outputs['model'],
                                "test_set": train_run.outputs['test_set']}
                        )
    test_run = project.func('test').run(test_task,
                        artifact_path=project.artifact_path)

> 2021-05-14 05:22:55,903 [info] starting run test uid=164d4ccad1f741e397e90e294fd45243 DB=http://mlrun-api:8080
> 2021-05-14 05:22:56,035 [info] Job is running in the background, pod: test-275mn
> 2021-05-14 05:23:01,710 [info] run executed, status=completed
final state: completed


project,uid,iter,start,state,name,labels,inputs,parameters,results,artifacts
model-deployment-pipeline-admin,...4fd45243,0,May 14 05:23:00,completed,test,v3io_user=adminkind=jobowner=adminhost=test-275mn,models_pathtest_set,label_column=labelplots_dest=plots/test,accuracy=0.75test-error=0.25rocauc=1.0brier_score=0.15086661857015546f1-score=0.8precision_score=1.0recall_score=0.6666666666666666,probability-calibrationconfusion-matrixprecision-recall-binaryroc-binarytest_set_preds


to track results use .show() or .logs() or in CLI: 
!mlrun get run 164d4ccad1f741e397e90e294fd45243 --project model-deployment-pipeline-admin , !mlrun logs 164d4ccad1f741e397e90e294fd45243 --project model-deployment-pipeline-admin
> 2021-05-14 05:23:02,186 [info] run executed, status=completed


In [35]:
if run_training:
    # Display the model accuracy
    print(f'Test Accuracy: {test_run.outputs["accuracy"]}')

Test Accuracy: 0.75


#### Serving

In [36]:
project.set_function('hub://model_server:development', 'serving')

serving = project.func('serving').apply(mount_pvc_default())
if 'train_run' in locals() and train_run.outputs.get('model') is not None:
    serving.add_model('my_model', train_run.outputs.get('model'))
else:
    serving.add_model('my_model', path.join(getcwd(), 'assets/model.pkl'))
        
serving.set_envs({'INFERENCE_STREAM' : path.join(container, stream_configs['inference-stream']['path']) })
serving.set_envs(v3io_envs)


serving_stream = '/'.join(s.strip('/') for s in [web_api_users, stream_configs['serving-stream']['path']]) + '@ms'
serving.add_trigger('serving_stream',
                    nuclio.triggers.V3IOStreamTrigger(url=serving_stream,
                                                      maxWorkers=stream_configs['serving-stream']['shard_count']+2,
                                                      seekTo='earliest'))
serving.spec.config.pop('spec.triggers.http')
serving.spec.readiness_timeout = 200
serving.spec.replicas = 1

serving.deploy()

> 2021-05-14 05:23:28,268 [info] Starting remote function deploy
2021-05-14 05:23:28  (info) Deploying function
2021-05-14 05:23:28  (info) Building
2021-05-14 05:23:28  (info) Staging files and preparing base images
2021-05-14 05:23:28  (info) Building processor image
2021-05-14 05:23:54  (info) Build complete
2021-05-14 05:24:30  (info) Function deploy complete
> 2021-05-14 05:24:31,440 [info] function deployed, address=192.168.65.4:31520


'http://192.168.65.4:31520'

#### Inference logger

In [37]:
# We will use the same event-handler function for logging the inference stream to parquet.
inference_logger = code_to_function(name='inference-logger', handler='handler', kind='nuclio', filename='functions/event-handler.ipynb')
project.set_function(inference_logger)

il_envs = {'PARQUET_SINK_FLAG': 'true',
           'STREAM_SINK_FLAG': 'false',
           'PARQUET_TARGET_PATH' : inference_parquet_target_path,
           'PARQUET_BATCH_SIZE': 8192,
           'TS_KEY': 'when',
           'TS_FORMAT': '%Y-%m-%d %H:%M:%S.%f',
           'FEATURES': ",".join(feature_list),
           'PREDICTIONS': 'about_to_churn',
           'CONTAINER': container}
project.func('inference-logger').set_envs({**v3io_envs, **il_envs})

project.func('inference-logger').apply(mount_pvc_default())

inference_stream = '/'.join(s.strip('/') for s in [web_api_users, stream_configs['inference-stream']['path']]) + '@il'
project.func('inference-logger').add_trigger('inference_stream',
                                               nuclio.triggers.V3IOStreamTrigger(url=inference_stream,
                                                                                 maxWorkers=stream_configs['inference-stream']['shard_count']+2,
                                                                                 seekTo='earliest'))
project.func('inference-logger').spec.replicas=1
project.func('inference-logger').deploy()

> 2021-05-14 05:24:35,723 [info] Starting remote function deploy
2021-05-14 05:24:35  (info) Deploying function
2021-05-14 05:24:35  (info) Building
2021-05-14 05:24:35  (info) Staging files and preparing base images
2021-05-14 05:24:35  (info) Building processor image
2021-05-14 05:24:58  (info) Build complete
2021-05-14 05:25:11  (info) Function deploy complete
> 2021-05-14 05:25:11,538 [info] function deployed, address=192.168.65.4:32593


'http://192.168.65.4:32593'

### Save the Project

In [38]:
project.save()

## Done