## Prepare
When running this notebook in automation, the following variables will be assigned at runtime automaticately based on the environment.

In [1]:
env="prod"

In [2]:
#gitUrl="$GIT_URL$"
#ingestUrl="$INGEST_URL$"
#transformUrl="$TRANSFORM_URL$"
#pipelineUrl="$PIPELINE_URL$"
#realtimeUrl="$REALTIME_URL$"
#access_token="$ACCESS_TOKEN$"

gitUrl="https://"+env+"-git-ml.spark.bluemix.net:12501/v1"
ingestUrl="https://ingestion-"+env+".spark.bluemix.net:13100/v1"
pipelineUrl="https://pipeline-"+env+".spark.bluemix.net:13200/v1"
realtimeUrl="https://"+env+"-realtime-ml.spark.bluemix.net:14500/v1"

In [3]:
import getpass
access_token = getpass.getpass('ibm.ax.token')
access_token

'ASN3i0UPQSF6xCt3ZzbXi2mCrPIW7HWiyIfzOVT2'

In [4]:
os.environ["SPARK_MASTER_IP"]

'yp-spark-dal09-env5-0031'

In [5]:
bearer_token = "Bearer "+access_token
header = {'Content-Type': 'application/json', 'Accept': 'application/json', 'Authorization': bearer_token}

# 1. Create kernel

In [6]:
import requests

instanceId = os.environ["NOTEBOOK_TENANT_ID"]

url = ingestUrl + "/ingest/kernel"
payload = {"instanceID": instanceId}

response = requests.post(url, json=payload, headers=header)

status = response.status_code
statement = response.text
if status != 200:
    raise Exception("Create Kernel failed", "status code="+str(status), statement)

kernelId = response.json()["kernelId"]
print kernelId

6f80f423-c3b0-4dfa-bfdb-021d927cc074


# 2. Load CSV file from Object Storage

In [7]:

# @hidden_cell
credentials_1 = {
  'auth_url':'https://identity.open.softlayer.com',
  'project':'object_storage_79b50d72_4e93_430e_b07c_504e2326d4c7',
  'project_id':'6ec903dcbd8a4fc5932519a51498e8ab',
  'region':'dallas',
  'user_id':'2c6baf4fd01e47838b4bd599bd40246c',
  'domain_id':'d4015e761ec34d1682e35fdcbe0c3e56',
  'domain_name':'1026663',
  'username':'member_a5823a7e172e44380882d3b89c81db57cacf85e5',
  'password':"""My7I3KxYAn*V?h!3""",
  'container':'ANov17',
  'tenantId':'undefined',
  'filename':'transactions-for-nb-pipeline.csv'
}


In [8]:
url = ingestUrl + "/ingest/load"
payload = {"instanceID": instanceId,
           "kernelID":kernelId, 
           "loadOptions": { 
                "type":"bluemixobjectstorage", 
                "connection.authurl": credentials_1["auth_url"],
                "connection.userid": credentials_1["user_id"],
                "connection.password": credentials_1["password"],
                "connection.projectid": credentials_1["project_id"],
                "connection.region": credentials_1["region"],
                "source.container": credentials_1["container"],
                "source.inferschema": "1",
                "source.filename": credentials_1["filename"],
                "source.fileformat": "csv", 
                "source.firstlineisheader": "true" } 
          }

response = requests.post(url, json=payload, headers=header)

status = response.status_code
statement = response.text
if status != 200:
    raise Exception("Load CSV from Object Storage failed", "status code="+str(status), statement)
    
print statement

{"outDfName":"res1480628235649","responseCode":"Success","message":"Code completed successfully!","moreInfo":"Rows:\n{\"PRODUCT_LINE\":\"Personal Accessories\",\"GENDER\":\"M\",\"AGE\":27,\"MARITAL_STATUS\":\"Single\",\"PROFESSION\":\"Professional\"}\n{\"PRODUCT_LINE\":\"Personal Accessories\",\"GENDER\":\"F\",\"AGE\":39,\"MARITAL_STATUS\":\"Married\",\"PROFESSION\":\"Other\"}\n{\"PRODUCT_LINE\":\"Mountaineering Equipment\",\"GENDER\":\"F\",\"AGE\":39,\"MARITAL_STATUS\":\"Married\",\"PROFESSION\":\"Other\"}\n{\"PRODUCT_LINE\":\"Personal Accessories\",\"GENDER\":\"F\",\"AGE\":56,\"MARITAL_STATUS\":\"Unspecified\",\"PROFESSION\":\"Hospitality\"}\n{\"PRODUCT_LINE\":\"Golf Equipment\",\"GENDER\":\"M\",\"AGE\":45,\"MARITAL_STATUS\":\"Married\",\"PROFESSION\":\"Retired\"}\n"}


In [9]:
dfName = response.json()["outDfName"]

# 3. Split data into train, validate and test data

In [10]:
url = ingestUrl + "/ingest/split"
payload = {"instanceID": instanceId, 
           "kernelID": kernelId, 
           "inDfName": dfName, 
           "train": 60, 
           "test": 20, 
           "validate": 20, 
           "timeoutInSeconds": 600
          }
response = requests.post(url, json=payload, headers=header)

status = response.status_code
statement = response.text
if status != 200:
    raise Exception("Split DataFrame into train/validate/test data failed", "status code="+str(status), statement)
    
print statement

{"responseCode":"Success","validateDFName":"validate_res1480628267539","testDFName":"test_res1480628267539","message":"Code completed successfully!","trainDfName":"train_res1480628267539"}


In [11]:
trainDfName = response.json()["trainDfName"]
validateDfName = response.json()["validateDFName"]
testDfName = response.json()["testDFName"]

# 4. Create a model using some StringIndexer transformers and Naive Bayes Classifier

In [12]:
outModelName = "outModel"
pipelineRef = "iautopipeline"
url =  pipelineUrl + "/pipeline/execution?instance_id="+instanceId+"&kernel_id="+kernelId
payload = {"meta": {},
           "inputDataBindings": {"id1": {"dataName": trainDfName}},
           "outputBindings": {"id4": {"model": outModelName}},
           "pipeline": {
               "title": "Simple linear pipeline",
               "reference": pipelineRef,
               "kind": "scala",
               "nodes": [{"ref": "id1","className": "org.apache.spark.ml.feature.StringIndexer",
                          "params": {"inputCol": "GENDER","outputCol": "gender_code"}},
                         {"ref": "id2","className": "org.apache.spark.ml.feature.StringIndexer",
                          "params": {"inputCol": "MARITAL_STATUS","outputCol": "marital_status_code"}},
                         {"ref": "id3","className": "org.apache.spark.ml.feature.StringIndexer",
                          "params": {"inputCol": "PROFESSION","outputCol": "profession_code"}},
                         {"ref": "id4","className": "org.apache.spark.ml.feature.StringIndexer",
                          "params": {"inputCol": "PRODUCT_LINE","outputCol": "label"}},
                         {"ref": "id5","className": "org.apache.spark.ml.feature.VectorAssembler",
                          "params": {"inputCols": ["gender_code", "AGE", "marital_status_code", "profession_code"],
                                     "outputCol": "features"}},
                         {"ref": "id6","className": "org.apache.spark.ml.classification.NaiveBayes",
                          "params": {"featuresCol": "features", "labelCol":"label", "modelType":"multinomial"}}],
               "links": [{"from": "id1","to": "id2"},
                         {"from": "id2","to": "id3"},
                         {"from": "id3","to": "id4"},
                         {"from": "id4","to": "id5"},
                         {"from": "id5","to": "id6"}]
               }
          }

response = requests.post(url, json=payload, headers=header)

status = response.status_code
statement = response.text
if status != 200:
    raise Exception("Submits a pipeline for execution failed", "status code="+str(status), statement)
    
print statement

{
  "pipelineRef": "iautopipeline",
  "outputs": [{
    "type": "model",
    "ref": "outModel"
  }]
}


# Validate the model

In [13]:
# https://ibm-jira.svl.ibm.com:8443/browse/SM-270

url =  pipelineUrl + "/pipeline/model/metrics?instance_id="+instanceId+"&kernel_id="+kernelId+"&model_reference="+outModelName+"&df_reference="+validateDfName+"&model_type=binary"

response = requests.get(url, headers=header)

status = response.status_code
statement = response.text
if status != 200:
    raise Exception("Validate model failed", "status code="+str(status), statement)
    
print statement

{"recallByThreshold":[{"threshold":3.0,"metric":0.03847210772190162},{"threshold":2.0,"metric":0.04506732618851333},{"threshold":1.0,"metric":0.32907392140697994},{"threshold":0.0,"metric":1.0}],"precisionByThreshold":[{"threshold":3.0,"metric":0.9688581314878892},{"threshold":2.0,"metric":0.9318181818181818},{"threshold":1.0,"metric":0.7571925387290547},{"threshold":0.0,"metric":0.6014378976944054}],"areaUnderPR":0.7397561363205685,"fMeasureByThreshold":[{"threshold":3.0,"metric":0.07400555041628122},{"threshold":2.0,"metric":0.08597640891218873},{"threshold":1.0,"metric":0.45876831721099515},{"threshold":0.0,"metric":0.7511223489344135}],"roc":[{"threshold":0.0,"metric":0.0},{"threshold":0.0018660584698320546,"metric":0.03847210772190162},{"threshold":0.004976155919552146,"metric":0.04506732618851333},{"threshold":0.15923698942566866,"metric":0.32907392140697994},{"threshold":1.0,"metric":1.0},{"threshold":1.0,"metric":1.0}],"areaUnderROC":0.5877415695619432}



# Evaluate test data

In [14]:
url =  pipelineUrl + "/pipeline/model/metrics?instance_id="+instanceId+"&kernel_id="+kernelId+"&model_reference="+outModelName+"&df_reference="+testDfName+"&model_type=binary"

response = requests.get(url, headers=header)

status = response.status_code
statement = response.text
if status != 200:
    raise Exception("Evaluate test data failed", "status code="+str(status), statement)
    
print statement

{"recallByThreshold":[{"threshold":3.0,"metric":0.03416919261504547},{"threshold":2.0,"metric":0.03995591071920639},{"threshold":1.0,"metric":0.331082942959493},{"threshold":0.0,"metric":1.0}],"precisionByThreshold":[{"threshold":3.0,"metric":0.9763779527559056},{"threshold":2.0,"metric":0.9446254071661238},{"threshold":1.0,"metric":0.7587622355541522},{"threshold":0.0,"metric":0.6011263872784496}],"areaUnderPR":0.742101214268422,"fMeasureByThreshold":[{"threshold":3.0,"metric":0.06602768903088392},{"threshold":2.0,"metric":0.07666886979510905},{"threshold":1.0,"metric":0.46100719424460435},{"threshold":0.0,"metric":0.7508793709911028}],"roc":[{"threshold":0.0,"metric":0.0},{"threshold":0.0012458471760797341,"metric":0.03416919261504547},{"threshold":0.0035299003322259138,"metric":0.03995591071920639},{"threshold":0.15863787375415284,"metric":0.331082942959493},{"threshold":1.0,"metric":1.0},{"threshold":1.0,"metric":1.0}],"areaUnderROC":0.5888428675106127}



# 5. Create a repository

In [15]:
import requests

repositoryName = "clRepo_Nov29"
url = gitUrl + "/repos"
payload = {"repoName": repositoryName}

response = requests.post(url, json=payload, headers=header)

status = response.status_code
statement = response.text

if status != 201 and status != 409:
    raise Exception("Create repository failed", "status code="+str(status), statement)
    
print statement

{"message":"repoName:clRepo_Nov29 already exists"}


# 6. Save Model

In [16]:
gitPath = repositoryName+"/iautotest"
url =  pipelineUrl + "/pipeline/model/storage?instance_id="+instanceId+"&kernel_id="+kernelId+"&from_reference="+outModelName+"&path="+gitPath

response = requests.post(url, headers=header)

status = response.status_code
statement = response.text
if status != 200:
    raise Exception("Save model failed", "status code="+str(status), statement)
    
print statement

{
  "reference": "outModel",
  "artifactId": "180"
}


In [17]:
artifactId = response.json()["artifactId"]

# Get meta data of the model

In [18]:
url = gitUrl + "/repos/artifacts/meta/"+artifactId

meta_header={'Accept': 'application/json', 'Authorization': bearer_token}

response = requests.get(url, headers=meta_header)

status = response.status_code
statement = response.text
if status != 200:
    raise Exception("Get model meta data failed", "status code="+str(status), statement)
    
print statement

{
  "artifactId" : 180,
  "baseArtifactId" : 116,
  "version" : 2,
  "commitHash" : "94a8cdc323f6daa203eea782fe00dc04bdc569c9",
  "name" : "iautotest",
  "repoPath" : "clRepo_Nov29/iautotest",
  "artifactType" : "model",
  "runTime" : "spark",
  "description" : "model",
  "modelOwner" : "jycli@ca.ibm.com",
  "project" : null,
  "inputSchema" : "{\"schema\":\"{\\\"type\\\":\\\"struct\\\",\\\"fields\\\":[{\\\"name\\\":\\\"PRODUCT_LINE\\\",\\\"type\\\":\\\"string\\\",\\\"nullable\\\":true,\\\"metadata\\\":{\\\"columnInfo\\\":{\\\"columnPrimaryKey\\\":false,\\\"columnTypeName\\\":\\\"varchar\\\",\\\"columnSigned\\\":true,\\\"columnType\\\":12,\\\"columnLength\\\":1024,\\\"columnNullable\\\":true,\\\"columnScale\\\":0}}},{\\\"name\\\":\\\"GENDER\\\",\\\"type\\\":\\\"string\\\",\\\"nullable\\\":true,\\\"metadata\\\":{\\\"columnInfo\\\":{\\\"columnPrimaryKey\\\":false,\\\"columnTypeName\\\":\\\"varchar\\\",\\\"columnSigned\\\":true,\\\"columnType\\\":12,\\\"columnLength\\\":1024,\\\"columnNul

# 6. Realtime Deploy the model

In [19]:
url =  realtimeUrl + "/deployment/model/realtime"

payload = {"modelId": artifactId,
           "avgRequest":10
}

response = requests.post(url, json=payload, headers=header)

status = response.status_code
statement = response.text

# if reaching the maximum deploy limit in the realtime deploy
if status == 402:
    # list active deployed model 
    url_1 = realtimeUrl+"/deployment/model/realtime?status=ACTIVE"
    response = requests.get(url_1, headers=header)
    if response.status_code != 200:
        raise Exception("List ACTIVE deployed model failed when trying to freeup resource", response.text)
    # delete the 1st active deployed model
    depId = response.json()["deployedInfo"][0].get("deploymentId")
    url_2 = realtimeUrl+"/deployment/model/realtime/"+depId
    response = requests.delete(url_2, headers=header)
    if response.status_code != 204:
        raise Exception("Delete deployed model failed when trying to freeup resource", response.text)
    # re-deploy
    response = requests.post(url, json=payload, headers=header)
    status = response.status_code
    statement = response.text
    if status != 201:
        raise Exception("Something went wrong Re-Realtime Deploy failed", "status code="+str(status), statement)
elif status != 201:
    raise Exception("Realtime Deploy failed", "status code="+str(status), statement)
    
print statement

{"deploymentId":"227","scoringEndPoint":"https://prod-scoring-ml.spark.bluemix.net:32768/v1/score/227","scoringApiKey":"hlmh+NzxdE5MqdQVDdRENg==","feedBackEndPoints":{"realtime":"N/A","batch":"N/A"}}


In [20]:
scoringEndPoint = response.json()["scoringEndPoint"]
scoringApiKey = response.json()["scoringApiKey"]
deploymentId = response.json()["deploymentId"]

In [21]:
url =  realtimeUrl + "/deployment/model/realtime?status=ACTIVE"

response = requests.get(url, headers=header)

status = response.status_code
statement = response.text

if status != 200:
    raise Exception("List ACTIVE deployed model failed", "status code="+str(status), statement)
    
print statement

{"deployedInfo":[{"modelId":"180","deploymentId":"227","avgRequest":10,"peakRequest":10,"status":"ACTIVE","schema":"{\"schema\":\"{\\\"type\\\":\\\"struct\\\",\\\"fields\\\":[{\\\"name\\\":\\\"PRODUCT_LINE\\\",\\\"type\\\":\\\"string\\\",\\\"nullable\\\":true,\\\"metadata\\\":{\\\"columnInfo\\\":{\\\"columnPrimaryKey\\\":false,\\\"columnTypeName\\\":\\\"varchar\\\",\\\"columnSigned\\\":true,\\\"columnType\\\":12,\\\"columnLength\\\":1024,\\\"columnNullable\\\":true,\\\"columnScale\\\":0}}},{\\\"name\\\":\\\"GENDER\\\",\\\"type\\\":\\\"string\\\",\\\"nullable\\\":true,\\\"metadata\\\":{\\\"columnInfo\\\":{\\\"columnPrimaryKey\\\":false,\\\"columnTypeName\\\":\\\"varchar\\\",\\\"columnSigned\\\":true,\\\"columnType\\\":12,\\\"columnLength\\\":1024,\\\"columnNullable\\\":true,\\\"columnScale\\\":0}}},{\\\"name\\\":\\\"AGE\\\",\\\"type\\\":\\\"integer\\\",\\\"nullable\\\":true,\\\"metadata\\\":{\\\"columnInfo\\\":{\\\"columnPrimaryKey\\\":false,\\\"columnTypeName\\\":\\\"integer\\\",\\\"co

# 7. Predict calling the scoring endpoint

In [22]:
payload = {"record":["M",65,"Married","Professional"]}

scoring_header = {'Content-Type': 'application/json', 'Accept': 'application/json', 'Authorization': scoringApiKey}

response = requests.post(scoringEndPoint, json=payload, headers=scoring_header)

status = response.status_code
statement = response.text

if status != 200:
    raise Exception("Scoring endpoint prediction failed", "status code="+str(status), statement)
    
print statement

{"result":{"gender_code":0.0,"GENDER":"M","features":{"values":[0.0,65.0,0.0,1.0]},"marital_status_code":0.0,"prediction":3.0,"AGE":65,"profession_code":1.0,"MARITAL_STATUS":"Married","rawPrediction":{"values":[-9.099853873246575,-10.676481070981191,-10.61705677351417,-8.984461572740972,-12.408242362820573]},"PROFESSION":"Professional","probability":{"values":[0.38686452726726267,0.07995367266670689,0.08484886989514148,0.43418333401545434,0.014149596155434636]}}}


### 8. Delete the deployed model to free up resources

In [None]:
url =  realtimeUrl + "/deployment/model/realtime/"+deploymentId

response = requests.delete(url, headers=header)

if response.status_code != 204:
    raise Exception("Delete deployed model failed", "status code="+str(response.status_code), response.text)
    
print response.status_code

204


### 9. Delete the kernel after run the test

In [16]:
url = ingestUrl + "/ingest/kernel/"+kernelId+"?instanceID="+instanceId

response = requests.delete(url, headers=header)

status = response.status_code

if  status != 204 and status != 200:
    raise Exception("Delete kernel="+kernelId+" failed for instanceId="+instanceId, "status code="+str(status), response.text)
    
print status

Exception: (u'Delete kernel=581bf919-226e-4cf6-86d9-8681e23b4123 failed for instanceId=507aaee9-5ebb-4722-94c8-84c432140908', 'status code=404', u'{"responseCode":"Not found","message":"Kernel Not Found"}')

## 10. Delete the artifact

In [28]:
url = gitUrl + "/repos/artifacts/"+artifactId

delete_header = {'Content-Type': 'application/json', 'Authorization': bearer_token}
response = requests.delete(url, headers=delete_header)

if response.status_code != 204:
    raise Exception("Delete artifact failed", "status code="+str(response.status_code), response.text)
    
print response.status_code

200


## 11. Delete the repository

In [29]:
url = gitUrl + "/repos/"+repositoryName

response = requests.delete(url, headers=header)

if response.status_code != 204:
    raise Exception("Delete repository failed", "status code="+str(response.status_code), response.text)
    
print response.status_code

200
