# Modeler Flows Jobs: Orchestration
This notebook provides an example of invoking *3 Modeler flows* that have been configured as a Job.  

More information about APIs (**Watson Data API**) that are used in this notebook is available in product documentation: https://cloud.ibm.com/apidocs/watson-data-api-cpd

APIs are typically used for *automation* and *orchestration*. We implemented code in a notebook that's a part of a Watson Studio project for ease of demonstration. Sample Python code can also be saved as a Python script and executed from an external environment (such as a CI/CD platform). 

Since we use the terms automation and orchestration, let's define them. **Automation** is invoking the process programatically without human interaction, usually based on a schedule or a trigger. **Orchestration** is combining multiple steps into a single process.  

Any type of asset that can be configured to run as a *Job* in a "Watson Studio project" can be used with this API. Current version of Cloud Pak for Data supports jobs for:
- Modeler flows
- Notebooks
- Python scripts
- R scripts
- Refinery flows

There are several use cases for automation and orchestration of jobs. Here are a few examples:
- Automate and/or orchestrate data preparation (implemented in scripts, flows, or notebooks) based on a schedule or an external trigger
- Automate and/or orchestrate model retraining (implemented in scripts, flows, or notebooks) based on a schedule or an external trigger
- Automate and/or orchestrate testing and deployment of data science assets into Deployment Spaces. 

In this notebook we will show a simple example which you can expand to fit your use cases. 

*Note: This notebook has been created and tested in Cloud Pak for Data Hybrid Cloud*

## Step 1: Manually create Modeler Flows Jobs
While it's possible to create a job with an API, we think that in most scenarios jobs will be created and tested manually, and then used with automation/orchestration.

Create a Job for any Modeler Flow that you want to test. For example, in this project we provide 3 Modeler Flows: *Data Preparation*, *Model Scoring C5*, and *Post Scoring Rules*

1. Open each flow and save a version  and save a version (look for Versions icon in the rigth top toolbar - same button as for Output). Jobs require versioning of notebooks and flows.
2. Create a job - use the Job icon in the top toolbar. 
3. Test the job in the project UI. 

## Step 2: Invoke the Jobs
To construct the API call, we will need to get the following information:
1. *Authorizaton token*: this token is required for all calls to Cloud Pak for Data API
2. *Project id*: needed as a paramter for the job invocation REST request
3. *Asset id*: needed as a parameter for the job invocation REST request

In [None]:
# If you're running in a Watson Studio project, the token is available as a local variable
#token = os.environ['USER_ACCESS_TOKEN']

#In this notebook we will demonstrate retrieving a token via API, which will be required for code running outside of Cloud Pak for Data

In [None]:
# Define variables that need to be changed or reused

# TO DO: change to the hostname (and port, if defined) of your cluster

# If using a market cluster in North America (in TEC), the value should be 'https://ibm-nginx-svc.cpdmkt.svc' (this value is the same for ALL clusters)
# For all other clusters, use the CPD URL that ends with .oi, for example, 'https://cpdmkt-cpd-cpdmkt.apps.cpd.12-181-164-84.nip.io'
cpd_hostname = "***"

# TO DO: change to userid and password that exists in the CPD cluster. These credentials will be used to generate a token
username = "***"
password = "***"

In [None]:
import requests
import json

headers = {
    'Content-Type': 'application/json',
}

data = '{"username":\"' + username + '\","password":\"' + password + '\"}\''

# Construct the request URL
requestURL = cpd_hostname + "/icp4d-api/v1/authorize"

response = requests.post(requestURL, headers=headers, data=data, verify=False)

responseContent = response.content
token = json.loads(responseContent)['token']

# Print token just for a demo - remove in production
print(token)

In [None]:
#Next, we will get the project id. We can use the project-lib library to perform this task. 

# Import the lib
from project_lib import Project
project = Project.access()

projectMetadata = project.get_metadata()

# Let's print the output. 
# Metadata is returned as a nested dictionary. Project id is listed as 'guid'
print(type(projectMetadata))
print(projectMetadata)

In [None]:
# Get the project id and print it for verification
projectID = projectMetadata['metadata']['guid']
print(projectID)

In [None]:
# Get the Job id
project.get_assets()

In [None]:
# Manually look up the the asset_ids for the Modeler Jobs that you created and save them in a variable. asset_id will be used to construct REST request URL. 
# Make sure to get the ID for the Job, and not Job Run. 
jobID_1 = "3266f869-aeef-4e80-95ec-dec5eef127ed"
jobID_2 = "a273928b-74ae-4efe-9750-e218a9b160e3"
jobID_3 = "9b40b525-0c3e-48ae-9b9f-128f915593e2"

In [None]:
# We can reuse the header and body (dataDict) for invocation of all jobs

headers = {
     'Authorization': 'Bearer ' + token,
     'accept': 'application/json',
     'Content-Type': 'application/json'
}


# This JSON body can be used with any job invocation, even if the job defintion doesn't 

dataDict = {
   "job_run": {
        "configuration": {
            "env_variables": [
                "variable1=test1",
                "variable2=test2"
            ]
        }
    }
}

data = json.dumps(dataDict)

# Print if debugging
# print(headers)
# print(data)

In [None]:
#Construct the URL for invoking the job. We are using this REST endpoint: https://cloud.ibm.com/apidocs/watson-data-api-cpd#job-runs-create
url1 = cpd_hostname + "/v2/jobs/" + jobID_1 + "/runs?project_id=" + projectID
url2 = cpd_hostname + "/v2/jobs/" + jobID_2 + "/runs?project_id=" + projectID
url3 = cpd_hostname + "/v2/jobs/" + jobID_3 + "/runs?project_id=" + projectID

# print(url1)
# print(url2)
# print(url3)

In [None]:
# Invoke the 1st job
response = requests.post(url1, headers=headers, data=data, verify=False)

responseContent = response.content
# print(responseContent)

In [None]:
# If we want to check the job status, we need to get the run ID, which is called asset_id
runID = json.loads(responseContent)['metadata']['asset_id']
print(runID)

In [None]:
url = cpd_hostname + "/v2/jobs/" + jobID_1 + "/runs/" + runID + "?project_id=" + projectID
print(url)

In [None]:
response = requests.get(url, headers=headers, verify=False)
responseContent = response.content
print(responseContent)

In [None]:
# Job Status is reported in variable "state"
jobStatus = json.loads(responseContent)['entity']['job_run']['state']
print(jobStatus)

In [None]:
# Status look up can also be implemented in a loop. This is useful when you need to invoke a 2nd job after the completion of the first one
import time

while jobStatus == "Starting" or jobStatus == "Running":
  response = requests.get(url, headers=headers, verify=False)
  responseContent = response.content
  jobStatus = json.loads(responseContent)['entity']['job_run']['state']
  print(jobStatus)
# Wait for 20 seconds before checking status again
  time.sleep(20)

In [None]:
# It's possible to refactor this code to create a function for invoking jobs. We repeat the code in this example for ease of debugging

# Invoke the 2nd job
response = requests.post(url2, headers=headers, data=data, verify=False)

responseContent = response.content

# If we want to check the job status, we need to get the run ID, which is called asset_id
runID = json.loads(responseContent)['metadata']['asset_id']

url = cpd_hostname + "/v2/jobs/" + jobID_2 + "/runs/" + runID + "?project_id=" + projectID

response = requests.get(url, headers=headers, verify=False)
responseContent = response.content
print(responseContent)

In [None]:
jobStatus = json.loads(responseContent)['entity']['job_run']['state']
print(jobStatus)

In [None]:
while jobStatus == "Starting" or jobStatus == "Running":
  response = requests.get(url, headers=headers, verify=False)
  responseContent = response.content
  jobStatus = json.loads(responseContent)['entity']['job_run']['state']
  print(jobStatus)
# Wait for 20 seconds before checking status again
  time.sleep(20)

In [None]:
# Invoke the 3rd job
response = requests.post(url3, headers=headers, data=data, verify=False)

responseContent = response.content

# If we want to check the job status, we need to get the run ID, which is called asset_id
runID = json.loads(responseContent)['metadata']['asset_id']

url = cpd_hostname + "/v2/jobs/" + jobID_3 + "/runs/" + runID + "?project_id=" + projectID

response = requests.get(url, headers=headers, verify=False)
responseContent = response.content
print(responseContent)

In [None]:
jobStatus = json.loads(responseContent)['entity']['job_run']['state']
print(jobStatus)

In [None]:
while jobStatus == "Starting" or jobStatus == "Running":
  response = requests.get(url, headers=headers, verify=False)
  responseContent = response.content
  jobStatus = json.loads(responseContent)['entity']['job_run']['state']
  print(jobStatus)
# Wait for 10 seconds before checking status again
  time.sleep(10)

**Written by: Elena Lowery, April 2021**