# Project demo

This notebook shows how to run the different parts of the project through API calls:
- landing new parsed data
- training
    - parsing new data for training
    - training ML models (through separate folders)
- scoring
    - parsing new data for scoring
    - scoring ML models
    
To interact with the platform we use the **Watson Data API** which is available both [on premise (CPD)](https://cloud.ibm.com/apidocs/watson-data-api-cpd) and [on Cloud](https://cloud.ibm.com/apidocs/watson-data-api) with some small differences.

In [1]:
import os
import requests
import json
import time

# silence warnings, we won't be using certificates to authenticate to APIs
import warnings
import urllib3
warnings.simplefilter(action='ignore', category=urllib3.exceptions.InsecureRequestWarning)

# 0. Setup connection to the API

If this demo notebook is being run from within CPD, then the environment already contains a token that can be used to authenticate to the WD API. Otherwise, uncomment the following section first to get a token, and input the `project_id` manually.

The account used to authenticate needs **editor** permissions on the project.

In [2]:
# if running this notebook from within the platform:
token = os.environ.get('USER_ACCESS_TOKEN')
project_id = os.environ.get('PROJECT_ID')
host = "https://dse-cp4d301-gmc1.cpolab.ibm.com" # do not include ending / here

In [3]:
# if running this notebook from outside the platform:
# first, get a token:
# auth_headers = {
#     "cache-control": "no-cache",
#     "content-type": "application/json",
# }
# credentials = {"username": "jerome.kafrouni@ibm.com",
#                "password": 'xxx'} # input password of account to be used to authenticate
# host = "https://dse-cp4d25-cluster3.datascienceelite.com"
# response = requests.post(host + "/icp4d-api/v1/authorize",
#                          headers=auth_headers,
#                          data=json.dumps(credentials),
#                          verify=False)
# token = response.json()['token']

# next, input the project_id:
# project_id = 'xxx' # input this

In [4]:
# IMPORTANT use these specific headers:
headers = {'authorization': 'Bearer %s'%(token),
           'content-type': 'application/json'}

# 1. Land new data to be transformed for training/scoring

- We leverage the `asset_files` API to be able to upload new files into this project's storage, with a `PUT` request.
- We store new files in a `landing_zone` folder in the project's shared storage.
- Note that we don't bother creating `assets` associated to these `asset_files` which means the asset files won't show up in the project's UI.

## 1.1 Upload new file

- For the purpose of this demo the file being uploaded is actually also stored already in CPD (for convenience).
- Here we don't check if the file already exists, and overwrite by default. See [docs](https://cloud.ibm.com/apidocs/watson-data-api-cpd#put-new-asset) to change that behavior

In [5]:
new_file = '/project_data/data_asset/temp_for_demo/151020.1.json' # in practice this would be local storage outside of CPD
target_filename = '/scoring/landing_zone/151020.1.json' # choose name for uploaded file

files = {'file': open(new_file, 'rb')}

# for this put request we need specific headers:
headers_new_file = {'authorization': 'Bearer %s'%(token)}

r = requests.put(host + "/v2/asset_files/data_asset" + target_filename,
                 headers=headers_new_file,
                 verify=False,
                 files=files,
                 params={"project_id": project_id})
r.json()

{'status': 'Asset created: The asset was successfully uploaded.'}

## 1.2 List assets

We can check the top level `data_asset` folder first:

In [6]:
r = requests.get(host + "/v2/asset_files/data_asset",
                 headers=headers,
                 verify=False,
                 params={"project_id": project_id})
r.json()['resources']

[{'path': 'data_asset/madi-0.0.1.zip',
  'etag': 'W/"1aa2a7-172817735fa"',
  'size': 1745575,
  'last_modifed': 'Thu, 04 Jun 2020 22:32:52 GMT',
  'type': 'file',
  'mime_type': 'application/zip'},
 {'path': 'data_asset/scoring',
  'etag': 'W/"59-17280d3b15b"',
  'size': 4,
  'last_modifed': 'Thu, 04 Jun 2020 19:34:16 GMT',
  'type': 'folder'},
 {'path': 'data_asset/temp_for_demo',
  'etag': 'W/"1b-1727aca3c21"',
  'size': 1,
  'last_modifed': 'Wed, 03 Jun 2020 15:26:13 GMT',
  'type': 'folder'}]

And then check that our new file was indeed added in the right location:

In [7]:
r = requests.get(host + "/v2/asset_files/data_asset/scoring/landing_zone",
                 headers=headers,
                 verify=False,
                 params={"project_id": project_id})
r.json()['resources']

[{'path': 'data_asset/scoring/landing_zone/151020.1.json',
  'etag': 'W/"5471cad-17285ca8a51"',
  'size': 88546477,
  'last_modifed': 'Fri, 05 Jun 2020 18:42:22 GMT',
  'type': 'file',
  'mime_type': 'application/json'},
 {'path': 'data_asset/scoring/landing_zone/151020.4.json',
  'etag': 'W/"563e0f1-1727a942fd5"',
  'size': 90431729,
  'last_modifed': 'Wed, 03 Jun 2020 14:27:11 GMT',
  'type': 'file',
  'mime_type': 'application/json'},
 {'path': 'data_asset/scoring/landing_zone/151020.json',
  'etag': 'W/"36d9b67b-1727c52862f"',
  'size': 920237691,
  'last_modifed': 'Wed, 03 Jun 2020 22:34:42 GMT',
  'type': 'file',
  'mime_type': 'application/json'}]

# 2. Jobs overview

To run different parts of the project we leverage [notebook jobs](https://cloud.ibm.com/apidocs/watson-data-api-cpd#get-list-of-jobs-under-a-project) which can be created and run both from the UI and programmatically through an API.
- A notebook can be run as a job by:
    - creating a **job** associated to the notebook.
    - launching **runs** of that job.
- A run can take environment variables as input which is what we leverage to pass parameters (e.g. name of input file, etc.)
- To create a job for a notebook, it is important that the notebook has a [saved version] (from notebook UI: `File > Save Version`)

**TODO should this section be split between training and scoring?**

## 2.1 List existing jobs

In [8]:
r = requests.get(host + "/v2/jobs",
                 headers=headers,
                 verify=False,
                 params={"project_id": project_id})
job_list = r.json()['results']
job_list

[{'metadata': {'name': 'scoring-step-3-rf',
   'description': '',
   'asset_id': '130f729b-dc53-4e2e-ac91-996eb64334a1',
   'owner_id': '1000331040',
   'version': 0},
  'entity': {'job': {'asset_ref': '58fd72d8-cd01-4dad-aea1-ac006d8e766c',
    'asset_ref_type': 'notebook',
    'configuration': {'env_id': 'jupconda36-faebd5fd-620f-4b26-937a-8e6a3750823a',
     'env_type': 'notebook',
     'env_variables': []},
    'last_run_initiator': '1000331040',
    'last_run_time': '2020-06-04T19:42:42Z',
    'last_run_status': 'Completed',
    'last_run_status_timestamp': 1591299817232,
    'last_run_id': 'bef0e060-86b4-4b96-818a-0a7876e6f3b3'}}},
 {'metadata': {'name': 'scoring-step-2-madi-analytics',
   'description': '',
   'asset_id': '0d8346ce-1709-453c-bafd-73145310942c',
   'owner_id': '1000331040',
   'version': 0},
  'entity': {'job': {'asset_ref': '1740bf2a-c9d4-40ab-9275-d448e153eeab',
    'asset_ref_type': 'notebook',
    'configuration': {'env_id': 'defaultsparkpython36-faebd5fd-620

## 2.2 List runs for existing jobs

In [9]:
job_id = r.json()['results'][0]['metadata']['asset_id']

In [10]:
r = requests.get(host + f"/v2/jobs/{job_id}/runs",
                 headers=headers,
                 verify=False,
                 params={"project_id": project_id})
run_list = r.json()['results']
run_list

[{'metadata': {'name': 'job run',
   'description': '',
   'asset_id': 'bef0e060-86b4-4b96-818a-0a7876e6f3b3',
   'owner_id': '1000331040',
   'created': 0,
   'created_at': '2020-06-04T19:42:42Z'},
  'entity': {'job_run': {'job_ref': '130f729b-dc53-4e2e-ac91-996eb64334a1',
    'state': 'Completed',
    'isScheduledRun': False,
    'configuration': {'env_id': 'jupconda36-faebd5fd-620f-4b26-937a-8e6a3750823a',
     'env_type': 'notebook',
     'env_variables': ['INPUT_FILE=151020.4.csv', 'OUTPUT_FILE=151020.4.csv'],
     'command_line_arguments': []},
    'runtime_job_id': '1591299764-1000331040:86244612276e8dee',
    'duration': '52'}}},
 {'metadata': {'name': 'job run',
   'description': '',
   'asset_id': '948d91e6-6455-4a2f-897a-5cb8d1bda64f',
   'owner_id': '1000331040',
   'created': 0,
   'created_at': '2020-06-04T19:33:25Z'},
  'entity': {'job_run': {'job_ref': '130f729b-dc53-4e2e-ac91-996eb64334a1',
    'state': 'Failed',
    'isScheduledRun': False,
    'configuration': {'env_

# 3. Run each step

For the purpose of the demo we cleanup all the `output-*` folders before running all the jobs:

In [82]:
!rm -rf /project_data/data_asset/scoring/output*

## 3.1 Run parsing job

The endpoint is the same as for listing runs except this time we do a `POST` request. See [docs](https://cloud.ibm.com/apidocs/watson-data-api-cpd#start-a-run-for-a-job)

In [83]:
parsing_job_id = [x['metadata']['asset_id'] for x in job_list if x['metadata']['name'] == 'scoring-step-1-parser'][0]

In [84]:
run_config = {
    'job_run': {
        'configuration': {'env_variables': ["INPUT_FILE=151020.4.json",
                                            "OUTPUT_FILE=151020.4"]}
}}
r = requests.post(host + f"/v2/jobs/{parsing_job_id}/runs",
                  headers=headers,
                  verify=False,
                  params={"project_id": project_id},
                  data=json.dumps(run_config))
parsing_run = r.json()
parsing_run_id = parsing_run['metadata']['asset_id']

### Get run status:

In [85]:
state = 'Running'
while state in ['Running', 'Starting']:
    r = requests.get(host + f"/v2/jobs/{parsing_job_id}/runs/{parsing_run_id}",
                 headers=headers,
                 verify=False,
                 params={"project_id": project_id})
    parsing_run_status = r.json()
    state = parsing_run_status['entity']['job_run']['state']
    time.sleep(5)
parsing_run_status

{'metadata': {'name': 'job run',
  'description': '',
  'asset_id': '9bb4e18f-a07a-4b2f-82e7-893995435b48',
  'owner_id': '1000331040',
  'created': 1591298256115,
  'created_at': '2020-06-04T19:17:36Z'},
 'entity': {'job_run': {'job_ref': '985524dd-97d5-4dc2-9fcf-45fd664f04ae',
   'state': 'Completed',
   'isScheduledRun': False,
   'configuration': {'env_id': 'jupconda36-faebd5fd-620f-4b26-937a-8e6a3750823a',
    'env_type': 'notebook',
    'env_variables': ['INPUT_FILE=151020.4.json', 'OUTPUT_FILE=151020.4'],
    'command_line_arguments': []},
   'runtime_job_id': '1591298257-1000331040:4d129ffe996c0574',
   'duration': '85'}}}

### Check run logs (if needed):

In [34]:
# r = requests.get(host + f"/v2/jobs/{parsing_job_id}/runs/{parsing_run_id}/logs",
#                  headers=headers,
#                  verify=False,
#                  params={"project_id": project_id})
# r.json()

## 3.2 Run feature extraction job

The endpoint is the same as for listing runs except this time we do a `POST` request. See [docs](https://cloud.ibm.com/apidocs/watson-data-api-cpd#start-a-run-for-a-job).

**Warning this step expects an output file as `.csv` or `.parquet`.**

In [86]:
fe_job_id = [x['metadata']['asset_id'] for x in job_list if x['metadata']['name'] == 'scoring-step-2-madi-analytics'][0]

**Optional - pick an environment configuration**

We can specify a custom environment configuration if we need the job to scale up.
- Step 1: create a custom environment variables where CPU/memory can be selected (for python environment) as well as the spark executors configuration (number of executors, and CPU/memory for each)
- Step 2: uncomment the following cell to identify the environment id (`asset_id`)
- Step 3: in the next cell, uncomment `'env_id'` in the run configuration to specify the custom environment

In [165]:
import project_lib
project = project_lib.Project()
[x for x in project.get_assets() if x['type'] == 'environment']

[{'name': 'Bigger spark environment (custom)',
  'asset_id': '445bd0aa-ce61-4c26-badf-5660cd3265ad',
  'type': 'environment'}]

In [182]:
run_config = {
    'job_run': {
        'configuration': {'env_variables': ["INPUT_FILE=151020.4",
                                            "OUTPUT_FILE=151020.4.csv"],
                          'env_id': '445bd0aa-ce61-4c26-badf-5660cd3265ad',
                          'env_type': 'default_spark'
                         }
}}
r = requests.post(host + f"/v2/jobs/{fe_job_id}/runs",
                  headers=headers,
                  verify=False,
                  params={"project_id": project_id},
                  data=json.dumps(run_config))
fe_run = r.json()
fe_run_id = fe_run['metadata']['asset_id']

In [183]:
fe_run

{'metadata': {'rov': {'mode': 0, 'collaborator_ids': {}},
  'project_id': 'faebd5fd-620f-4b26-937a-8e6a3750823a',
  'sandbox_id': 'faebd5fd-620f-4b26-937a-8e6a3750823a',
  'usage': {'last_updated_at': '2020-06-04T21:47:30Z',
   'last_updater_id': '1000331040',
   'last_update_time': 1591307250928,
   'last_accessed_at': '2020-06-04T21:47:30Z',
   'last_access_time': 1591307250928,
   'last_accessor_id': '1000331040',
   'access_count': 0},
  'name': 'job run',
  'description': '',
  'tags': [],
  'asset_type': 'job_run',
  'origin_country': 'us',
  'rating': 0,
  'total_ratings': 0,
  'catalog_id': 'b881c41c-fa17-4dcf-9128-fd5610c4ba63',
  'created': 1591307250928,
  'created_at': '2020-06-04T21:47:30Z',
  'owner_id': '1000331040',
  'size': 0,
  'version': 2,
  'asset_state': 'available',
  'asset_attributes': ['job_run'],
  'asset_id': '530f6e0f-ebed-4be7-89c7-6756ba909476',
  'asset_category': 'USER'},
 'entity': {'job_run': {'job_ref': '0d8346ce-1709-453c-bafd-73145310942c',
   'st

### Get run status:

In [169]:
state = 'Running'
while state in ['Running', 'Starting']:
    r = requests.get(host + f"/v2/jobs/{fe_job_id}/runs/{fe_run_id}",
                 headers=headers,
                 verify=False,
                 params={"project_id": project_id})
    fe_run_status = r.json()
    state = fe_run_status['entity']['job_run']['state']
    time.sleep(5)
fe_run_status

{'metadata': {'name': 'job run',
  'description': '',
  'asset_id': '1af1b1d5-7620-4fd7-98f8-008f37e46022',
  'owner_id': '1000331040',
  'created': 1591306573646,
  'created_at': '2020-06-04T21:36:13Z'},
 'entity': {'job_run': {'job_ref': '0d8346ce-1709-453c-bafd-73145310942c',
   'state': 'Completed',
   'isScheduledRun': False,
   'configuration': {'env_id': 'defaultsparkpython36-faebd5fd-620f-4b26-937a-8e6a3750823a',
    'env_type': 'default_spark',
    'env_variables': ['INPUT_FILE=151020.4', 'OUTPUT_FILE=151020.4.csv'],
    'command_line_arguments': []},
   'runtime_job_id': '1cc471b9-21bb-4313-a093-386af5ef1fda',
   'duration': '117'}}}

Custom env:

In [171]:
state = 'Running'
while state in ['Running', 'Starting']:
    r = requests.get(host + f"/v2/jobs/{fe_job_id}/runs/{fe_run_id}",
                 headers=headers,
                 verify=False,
                 params={"project_id": project_id})
    fe_run_status = r.json()
    state = fe_run_status['entity']['job_run']['state']
    time.sleep(5)
fe_run_status

{'metadata': {'name': 'job run',
  'description': '',
  'asset_id': 'afea28ee-768b-4408-b10b-83caf810039c',
  'owner_id': '1000331040',
  'created': 1591306823227,
  'created_at': '2020-06-04T21:40:23Z'},
 'entity': {'job_run': {'job_ref': '0d8346ce-1709-453c-bafd-73145310942c',
   'state': 'Completed',
   'isScheduledRun': False,
   'configuration': {'env_id': 'defaultsparkpython36-faebd5fd-620f-4b26-937a-8e6a3750823a',
    'env_type': 'default_spark',
    'env_variables': ['INPUT_FILE=151020.4', 'OUTPUT_FILE=151020.4.csv'],
    'command_line_arguments': []},
   'runtime_job_id': '9fcc6dd8-268d-403c-be8d-2944c44c3859',
   'duration': '127'}}}

### Check run logs (if needed):

In [91]:
# r = requests.get(host + f"/v2/jobs/{fe_job_id}/runs/{fe_run_id}/logs",
#                  headers=headers,
#                  verify=False,
#                  params={"project_id": project_id})
# r.json()

## 3.3 Run model scoring job

The endpoint is the same as for listing runs except this time we do a `POST` request. See [docs](https://cloud.ibm.com/apidocs/watson-data-api-cpd#start-a-run-for-a-job)

In [123]:
model_job_id = [x['metadata']['asset_id'] for x in job_list if x['metadata']['name'] == 'scoring-step-3-rf'][0]

In [126]:
run_config = {
    'job_run': {
        'configuration': {'env_variables': ["INPUT_FILE=151020.4.csv",
                                            "OUTPUT_FILE=151020.4.csv"]}
}}
r = requests.post(host + f"/v2/jobs/{model_job_id}/runs",
                  headers=headers,
                  verify=False,
                  params={"project_id": project_id},
                  data=json.dumps(run_config))
model_run = r.json()
model_run_id = model_run['metadata']['asset_id']

### Get run status:

In [127]:
state = 'Running'
while state in ['Running', 'Starting']:
    r = requests.get(host + f"/v2/jobs/{model_job_id}/runs/{model_run_id}",
                 headers=headers,
                 verify=False,
                 params={"project_id": project_id})
    model_run_status = r.json()
    state = model_run_status['entity']['job_run']['state']
    time.sleep(5)
model_run_status

{'metadata': {'name': 'job run',
  'description': '',
  'asset_id': 'bef0e060-86b4-4b96-818a-0a7876e6f3b3',
  'owner_id': '1000331040',
  'created': 1591299762527,
  'created_at': '2020-06-04T19:42:42Z'},
 'entity': {'job_run': {'job_ref': '130f729b-dc53-4e2e-ac91-996eb64334a1',
   'state': 'Completed',
   'isScheduledRun': False,
   'configuration': {'env_id': 'jupconda36-faebd5fd-620f-4b26-937a-8e6a3750823a',
    'env_type': 'notebook',
    'env_variables': ['INPUT_FILE=151020.4.csv', 'OUTPUT_FILE=151020.4.csv'],
    'command_line_arguments': []},
   'runtime_job_id': '1591299764-1000331040:86244612276e8dee',
   'duration': '52'}}}

### Check run logs (if needed):

In [128]:
# r = requests.get(host + f"/v2/jobs/{model_job_id}/runs/{model_run_id}/logs",
#                  headers=headers,
#                  verify=False,
#                  params={"project_id": project_id})
# r.json()['results']

### Inspect final output

In [130]:
import pandas as pd
results = pd.read_csv('/project_data/data_asset/scoring/output_step_3/151020.4.csv')

In [132]:
results[results['prob_anomaly'] > 0]

Unnamed: 0,inet_s,inet_d,port_s,port_d,protocol,Packets,mean,min,max,prediction,prob_anomaly
1365,168430100.0,168430090.0,49156,102,TCP,1437,153.0,153,153,0,0.046253
1366,168430100.0,168430090.0,49156,102,S7COMM,1437,153.0,153,153,0,0.040274
1371,168430090.0,168430100.0,102,49156,S7COMM,1438,104.0,104,104,0,0.009715
1372,168430090.0,168430100.0,102,49156,TCP,1438,104.0,104,104,0,0.01121
