Score function templates for IBM Watson OpenScale
IBM Watson OpenScale users need to pass these custom score functions as an input while generating common configuration package for a subscription via the notebook. This page has some templates of score functions that can be used for reference.
- The only input to the score function is the training data containing all the feature columns.
- training_data_frame : (type: pandas.DataFrame)
- The output of the score function varies depending on the
problem_type
.- For classification problems, the output should be a
tuple
of(probabilities, predictions)
. Bothprobabilities
andpredictions
are of typenumpy.ndarray
- For regression problems, the output should be a
numpy.ndarray
ofpredictions
. -
predictions: (type: numpy.ndarray)
- The data type of this array should be same as dataset class label data type e.g.
["A", "B", "C"... ]
- The shape of this array should be same as the length of the
training_data_frame
- The data type of this array should be same as dataset class label data type e.g.
-
probabilities: (type: numpy.ndarray)
- Each element should be a probability vector with values between 0 and 1. e.g.
[[0.50,0.20,0.15,0.15] , [0.60,0.10,0.05,20.5], .......]
- The shape of this array should be the length of the
training_data_frame
x the number of unique classes in label column.
- Each element should be a probability vector with values between 0 and 1. e.g.
- For classification problems, the output should be a
This section provides the score function templates for model deployed in WML. There are 2 formats specified (local model , online model) and user is free to choose any of the formats .
The templates specified below are common for binary / multi-class classification cases.
- Model stored in WML is retrieved and loaded in local environment. This model is used to score.
Limitations of running in WML local mode:
- If a model is trained and deployed using WML Auto AI, the local mode does not work as the right runtime used to train the model is not known
- If a model is generated and deployed using WML Model Builder - the local mode does not work as WML python client does not support this context.
def score(training_data_frame):
WML_CREDENTIALS = {
<EDIT THIS>
}
try:
# Supply the model id
space_id = <EDIT THIS>
model_id = <EDIT THIS>
# Retain feature columns from user selection
feature_columns = list(training_data_frame.columns)
# Load the WML model in local object
from ibm_watson_machine_learning import APIClient
wml_client = APIClient(WML_CREDENTIALS)
wml_client.set.default_space(space_id)
model = wml_client.repository.load(model_id)
# Predict the training data locally
# Example of a spark based model ( the below set of lines to be customized based on model framework)
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("drift").getOrCreate()
spark_frame = spark.createDataFrame(training_data_frame)
spark_frame.printSchema()
score_predictions = model.transform(spark_frame)
score_predictions_pd = score_predictions.select("*").toPandas()
probability_column_name = <EDIT THIS>
prediction_column_name = <EDIT THIS>
import numpy as np
probability_array = np.array(
[x.tolist() for x in score_predictions_pd[probability_column_name]])
prediction_vector = np.array(
[x for x in score_predictions_pd[prediction_column_name]])
return probability_array, prediction_vector
except Exception as ex:
raise Exception("Scoring failed. Error: {}".format(str(ex)))
- Please install python library ibm_watson_machine_learning using
pip install ibm-watson-machine-learning
. The snippets make use of the above python client to score against the online endpoint of a WML model. Please be aware that a cost will be incurred for scoring using this method.
def score(training_data_frame):
# To be filled by the user
WML_CREDENTIALS = {
<EDIT THIS>
}
try:
deployment_id = <EDIT THIS>
space_id = <EDIT THIS>
# The data type of the label column and prediction column should be same .
# User needs to make sure that label column and prediction column array
# should have the same unique class labels
# edit these if your prediction column has different name or if the model has any meta columns
prediction_column_name = "prediction"
probability_column_name = "probability"
label_column_name = "label"
meta_columns = ["TO_BE_EDITED"]
if meta_columns and not (isinstance(meta_columns, list)):
raise ValueError("Meta columns are of incorrect type. Need to be of type list of strings.")
training_df = training_data_frame.copy()
meta_payload = {}
cols_to_remove = [label_column_name]
# Prepare meta payload values if meta columns are available in dataframe
if meta_columns and all(col in training_data_frame.columns for col in meta_columns):
meta_df = training_df[meta_columns].copy()
meta_df = meta_df.fillna('')
meta_fields = meta_df.columns.tolist()
meta_values = meta_df[meta_fields].values.tolist()
cols_to_remove.extend(meta_columns)
meta_payload = {
"fields": meta_fields,
"values": meta_values
}
# Removing the meta columns from the dataframe
for col in cols_to_remove:
if col in training_df.columns:
del training_df[col]
# Read the feature columns for scoring
fields = training_df.columns.tolist()
values = training_df[fields].values.tolist()
payload_scoring = {"input_data": [{"fields": fields, "values": values, "meta":meta_payload}]}
# Load the WML model in local object
from ibm_watson_machine_learning import APIClient
wml_client = APIClient(WML_CREDENTIALS)
wml_client.set.default_space(space_id)
score = wml_client.deployments.score(deployment_id, payload_scoring)
score_predictions = score.get("predictions")[0]
prob_col_index = list(score_predictions.get("fields")).index(probability_column_name)
predict_col_index = list(score_predictions.get("fields")).index(prediction_column_name)
if prob_col_index < 0 or predict_col_index < 0:
raise Exception("Missing prediction/probability column in the scoring response")
import numpy as np
probability_array = np.array([value[prob_col_index] for value in score_predictions.get("values")])
prediction_vector = np.array([value[predict_col_index] for value in score_predictions.get("values")])
return probability_array, prediction_vector
except Exception as ex:
raise Exception("Scoring failed. Error: {}".format(str(ex)))
def score(training_data_frame):
# To be filled by the user
WML_CREDENTIALS = {
<EDIT THIS>
}
try:
deployment_id = <EDIT THIS>
space_id = <EDIT THIS>
# edit this if your prediction column has different name or if the model has any meta columns
prediction_column_name = "prediction"
label_column_name = "label"
meta_columns = ["TO_BE_EDITED"]
if meta_columns and not (isinstance(meta_columns, list)):
raise ValueError("Meta columns are of incorrect type. Need to be of type list of strings.")
training_df = training_data_frame.copy()
meta_payload = {}
cols_to_remove = [label_column_name]
# Prepare meta payload values if meta columns are available in dataframe
if meta_columns and all(col in training_data_frame.columns for col in meta_columns):
meta_df = training_df[meta_columns].copy()
meta_df = meta_df.fillna('')
meta_fields = meta_df.columns.tolist()
meta_values = meta_df[meta_fields].values.tolist()
cols_to_remove.extend(meta_columns)
meta_payload = {
"fields": meta_fields,
"values": meta_values
}
# Removing the meta columns from the dataframe
for col in cols_to_remove:
if col in training_df.columns:
del training_df[col]
# Read the feature columns for scoring
fields = training_df.columns.tolist()
values = training_df[fields].values.tolist()
payload_scoring = {"input_data": [{"fields": fields, "values": values, "meta":meta_payload}]}
# Load the WML model in local object
from ibm_watson_machine_learning import APIClient
wml_client = APIClient(WML_CREDENTIALS)
wml_client.set.default_space(space_id)
score = wml_client.deployments.score(deployment_id, payload_scoring)
score_predictions = score.get("predictions")[0]
predict_col_index = list(score_predictions.get("fields")).index(prediction_column_name)
if predict_col_index < 0:
raise Exception("Missing prediction column in the scoring response")
import numpy as np
prediction_vector = np.array([value[predict_col_index] for value in score_predictions.get("values")])
return prediction_vector
except Exception as ex:
raise Exception("Scoring failed. Error: {}".format(str(ex)))
This section provides the score function templates for model deployed in Azure Model Engine. User needs to consider that online scoring endpoints of Azure Studio will be used. Please be aware that a cost will be incurred for scoring using this method.
Following examples are provided for two kinds of endpoints:
- with
workspaces
in scoring url. Example:https://ussouthcentral.services.azureml.net/workspaces/<workspace_id>/services/<service_id>/execute?api-version=2.0&details=true
- with
subscriptions
in scoring url. Example:https://ussouthcentral.services.azureml.net:443/subscriptions/<subscription_id>/services/<service_id>/execute?api-version=2.0&format=swagger
Depending on the selected endpoint, request and response schema changes. If you have a different schema for your request and response from scoring url, please adjust the following code snippets accordingly.
def get_scoring_request(training_data_frame):
input_values = training_data_frame.values.tolist()
feature_columns = list(training_data_frame.columns)
# Payload
payload = {
"Inputs": {
"input1": {
"ColumnNames": feature_columns,
"Values": input_values
}
},
"GlobalParameters": {}
}
return payload
def get_prediction_probability_using_scoring_response(
response, prediction_column, probability_column):
# # assumed response json structure
# {
# "Results": {
# "output1": {
# "type": "DataTable",
# "value": {
# "ColumnNames": [
# ],
# "ColumnTypes": [
# ],
# "Values": [
# [],
# []
# ]
# }
# }
# }
# }
results = response.json()["Results"]["output1"]["value"]
prob_col_index = list(results.get('ColumnNames')).index(probability_column)
predict_col_index = list(results.get('ColumnNames')).index(prediction_column)
if prob_col_index < 0 or predict_col_index < 0:
raise Exception("Missing prediction/probability column in the scoring response")
# Get Score label from first entry
first_entry = results.get('Values')[0]
score_label = first_entry[predict_col_index]
print(score_label)
score_prob_1 = float(first_entry[prob_col_index])
main_label = True
if score_prob_1 < 0.5:
#The score label is not main label of interest
main_label = False
output = [[value[predict_col_index], 1 - float(value[prob_col_index]) if \
(value[predict_col_index] == score_label and not main_label) else \
float(value[prob_col_index])] for value in results.get('Values')]
print(len(output))
import numpy as np
# Construct predicted_label array
predicted_vector = np.array([value[0] for value in output])
# Construct probabilities array
probability_array = np.array([[value[1],(1-value[1])] for value in output])
return probability_array, predicted_vector
def score(training_data_frame):
azure_scoring_url = <REQUEST RESPONSE URL FROM AZURE MODEL>
token = <PRIMARY_KEY FROM AZURE MODEL>
# edit these if your prediction and probability column have different names
prediction_column_name = "Scored Labels"
probability_column_name = "Scored Probabilities"
cols_to_remove = []
# edit these if your training dataframe contains label column
label_column = "TO_BE_EDITED"
cols_to_remove.append(label_column)
# edit these if your training dataframe contains meta columns
meta_columns = ["TO_BE_EDITED"]
cols_to_remove.extend(meta_columns)
# Removing the meta columns/label column from the feature_columns
feature_cols = [col for col in training_data_frame.columns if col not in cols_to_remove]
try:
payload = get_scoring_request(training_data_frame=training_data_frame[feature_cols])
import requests
response = requests.post(
azure_scoring_url,
json=payload,
headers={
"Authorization": "Bearer {}".format(token)
})
if not response.ok:
raise Exception(str(response.content))
return get_prediction_probability_using_scoring_response(
response=response,
prediction_column=prediction_column_name,
probability_column=probability_column_name)
except Exception as ex:
raise Exception("Scoring failed. {}".format(str(ex)))
def get_scoring_request(training_data_frame):
input_data = training_data_frame.to_json(orient="records")
# Payload
import json
payload = {
"Inputs": {
"input1": json.loads(input_data)
},
"GlobalParameters": {}
}
return payload
def get_prediction_probability_using_scoring_response(
response, prediction_column, probability_column):
# # assumed response json structure
# {
# "Results": {
# "output1": [
# {
# "age": "28",
# "workclass": "Private",
# "education": "Masters",
# "marital-status": "Married-civ-spouse",
# "occupation": "Adm-clerical",
# "relationship": "Wife",
# "race": "White",
# "sex": "Male",
# "capital-gain": "0",
# "capital-loss": "0",
# "hours-per-week": "10",
# "native-country": "United-States",
# "Scored Labels": "<=50K",
# "Scored Probabilities": "0.189918905496597"
# }
# ]
# }
# }
results = response.json()["Results"]["output1"]
# Get Score label from first entry
first_entry = results[0]
score_label = first_entry[prediction_column]
score_prob_1 = float(first_entry[probability_column])
main_label = True
if score_prob_1 < 0.5:
#The score label is not main label of interest
main_label = False
output = [[value[prediction_column], 1 - float(value[probability_column]) if \
(value[prediction_column] == score_label and not main_label) else \
float(value[probability_column])] for value in results]
import numpy as np
predicted_vector = np.array([value[0] for value in output])
probability_array = np.array([[value[1],(1-value[1])] for value in output])
return probability_array, predicted_vector
def score(training_data_frame):
azure_scoring_url = <REQUEST RESPONSE URL FROM AZURE MODEL>
token = <PRIMARY_KEY FROM AZURE MODEL>
# edit these if your prediction and probability column have different names
prediction_column_name = "Scored Labels"
probability_column_name = "Scored Probabilities"
cols_to_remove = []
# edit these if your training dataframe contains label column
label_column = "TO_BE_EDITED"
cols_to_remove.append(label_column)
# edit these if your training dataframe contains meta columns
meta_columns = ["TO_BE_EDITED"]
cols_to_remove.extend(meta_columns)
# Removing the meta columns/label column from the feature_columns
feature_cols = [col for col in training_data_frame.columns if col not in cols_to_remove]
try:
payload = get_scoring_request(training_data_frame=training_data_frame[feature_cols])
import requests
response = requests.post(
azure_scoring_url,
json=payload,
headers={
"Authorization": "Bearer {}".format(token)
})
if not response.ok:
raise Exception(str(response.content))
return get_prediction_probability_using_scoring_response(
response=response,
prediction_column=prediction_column_name,
probability_column=probability_column_name)
except Exception as ex:
raise Exception("Scoring failed. {}".format(str(ex)))
def get_scoring_request(training_data_frame):
input_values = training_data_frame.values.tolist()
feature_columns = list(training_data_frame.columns)
# Payload
payload = {
"Inputs": {
"input1": {
"ColumnNames": feature_columns,
"Values": input_values
}
},
"GlobalParameters": {}
}
return payload
def get_prediction_probability_using_scoring_response(
response, prediction_column, probability_column):
# # assumed response json structure
# {
# "Results": {
# "output1": {
# "type": "DataTable",
# "value": {
# "ColumnNames": [
# ],
# "ColumnTypes": [
# ],
# "Values": [
# [],
# []
# ]
# }
# }
# }
# }
results = response.json()["Results"]["output1"]["value"]
result_column_names = list(results.get('ColumnNames'))
predict_col_index = result_column_names.index(prediction_column)
prob_col_indexes = [result_column_names.index(column_name) for column_name in result_column_names \
if column_name.startswith(probability_column, 0)]
# Compute for all values
score_label_list = []
score_prob_list = []
for value in results.get("Values"):
score_label_list.append(value[predict_col_index])
#Construct prob
score_prob_values = [float(value[index]) for index in range(len(value)) \
if index in prob_col_indexes]
score_prob_list.append(score_prob_values)
import numpy as np
# Construct predicted_label bucket
predicted_vector = np.array(score_label_list)
# Scored probabilities
probability_array = np.array(score_prob_list)
return probability_array, predicted_vector
def score(training_data_frame):
azure_scoring_url = <REQUEST RESPONSE URL FROM AZURE MODEL>
token = <PRIMARY_KEY FROM AZURE MODEL>
# edit these if your prediction and probability column have different names/prefixes
prediction_column_name = "Scored Labels"
probability_column_prefix = "Scored Probabilities"
cols_to_remove = []
# edit these if your training dataframe contains label column
label_column = "TO_BE_EDITED"
cols_to_remove.append(label_column)
# edit these if your training dataframe contains meta columns
meta_columns = ["TO_BE_EDITED"]
cols_to_remove.extend(meta_columns)
# Removing the meta columns/label column from the feature_columns
feature_cols = [col for col in training_data_frame.columns if col not in cols_to_remove]
try:
payload = get_scoring_request(training_data_frame=training_data_frame[feature_cols])
import requests
response = requests.post(
azure_scoring_url,
json=payload,
headers={
"Authorization": "Bearer {}".format(token)
})
if not response.ok:
raise Exception(str(response.content))
return get_prediction_probability_using_scoring_response(
response=response,
prediction_column=prediction_column_name,
probability_column=probability_column_prefix)
except Exception as ex:
raise Exception("Scoring failed. {}".format(str(ex)))
def get_scoring_request(training_data_frame):
input_data = training_data_frame.to_json(orient="records")
# Payload
import json
payload = {
"Inputs": {
"input1": json.loads(input_data)
},
"GlobalParameters": {}
}
return payload
def get_prediction_probability_using_scoring_response(
response, prediction_column, probability_column):
# # assumed response json structure
# {
# "Results": {
# "output1": [
# {
# "age": "28",
# "workclass": "Private",
# "education": "Masters",
# "marital-status": "Married-civ-spouse",
# "occupation": "Adm-clerical",
# "relationship": "Wife",
# "race": "White",
# "sex": "Male",
# "capital-gain": "0",
# "capital-loss": "0",
# "hours-per-week": "10",
# "native-country": "United-States",
# "Scored Labels": "<=50K",
# "Scored Probabilities for class <=50K": 0.81,
# "Scored Probabilities for class >50K": 0.19
# }
# ]
# }
# }
results = response.json()["Results"]["output1"]
# Compute for all values
score_label_list = []
score_prob_list = []
for value in results:
score_label_list.append(value[prediction_column])
#Construct prob
score_prob_values = [float(prob) for key,prob in value.items() \
if key.startswith(probability_column, 0)]
score_prob_list.append(score_prob_values)
import numpy as np
#Construct predicted_label bucket
predicted_vector = np.array(score_label_list)
#Scored probabilities
probability_array = np.array(score_prob_list)
return probability_array, predicted_vector
def score(training_data_frame):
azure_scoring_url = <REQUEST RESPONSE URL FROM AZURE MODEL>
token = <PRIMARY_KEY FROM AZURE MODEL>
# edit these if your prediction and probability column have different names/prefixes
prediction_column_name = "Scored Labels"
probability_column_prefix = "Scored Probabilities"
cols_to_remove = []
# edit these if your training dataframe contains label column
label_column = "TO_BE_EDITED"
cols_to_remove.append(label_column)
# edit these if your training dataframe contains meta columns
meta_columns = ["TO_BE_EDITED"]
cols_to_remove.extend(meta_columns)
# Removing the meta columns/label column from the feature_columns
feature_cols = [col for col in training_data_frame.columns if col not in cols_to_remove]
try:
payload = get_scoring_request(training_data_frame=training_data_frame[feature_cols])
import requests
response = requests.post(
azure_scoring_url,
json=payload,
headers={
"Authorization": "Bearer {}".format(token)
})
if not response.ok:
raise Exception(str(response.content))
return get_prediction_probability_using_scoring_response(
response=response,
prediction_column=prediction_column_name,
probability_column=probability_column_prefix)
except Exception as ex:
raise Exception("Scoring failed. {}".format(str(ex)))
def score(training_data_frame):
azure_scoring_url = <REQUEST RESPONSE URL FROM AZURE MODEL>
token = <PRIMARY_KEY FROM AZURE MODEL>
# edit this if your prediction has different names
prediction_column_name = "Scored Labels"
cols_to_remove = []
# edit these if your training dataframe contains label column
label_column = "TO_BE_EDITED"
cols_to_remove.append(label_column)
# edit these if your training dataframe contains meta columns
meta_columns = ["TO_BE_EDITED"]
cols_to_remove.extend(meta_columns)
# Removing the meta columns/label column from the feature_columns
feature_cols = [col for col in training_data_frame.columns if col not in cols_to_remove]
try:
input_values = training_data_frame[feature_cols].values.tolist()
# Payload
import requests
from datetime import datetime, timedelta
payload = {
"Inputs": {
"input1": {
"ColumnNames": feature_cols,
"Values": input_values
}
},
"GlobalParameters": {}
}
headers = {"Authorization": "Bearer " + token}
start = datetime.utcnow()
response = requests.post(azure_scoring_url, json=payload, headers=headers)
if not response.ok:
raise Exception(str(response.content))
response_time = (datetime.utcnow() - start).total_seconds() * 1000
print(response_time)
# assumed response json structure
# {
# "Results": {
# "output1": {
# "type": "DataTable",
# "value": {
# "ColumnNames": [
# ],
# "ColumnTypes": [
# ],
# "Values": [
# [],[]
# ]
# }
# }
# }
# }
# If your scoring response does not match above schema,
# please modify below code to extract prediction and probabilities array
# Extract results
results = response.json()["Results"]["output1"]["value"]
result_column_names = list(results.get("ColumnNames"))
predict_col_index = result_column_names.index(prediction_column_name)
# Compute for all values
score_label_list = []
for value in results.get("Values"):
score_label_list.append(value[predict_col_index])
import numpy as np
# Construct predicted_label bucket
predicted_vector = np.array(score_label_list)
return predicted_vector
except Exception as ex:
raise Exception("Scoring failed. {}".format(str(ex)))
def get_prediction_using_scoring_response(
response, prediction_column):
# # assumed response json structure
# {
# "Results": {
# "output1": [
# {
# "age": "28",
# "workclass": "Private",
# "education": "Masters",
# "marital-status": "Married-civ-spouse",
# "occupation": "Adm-clerical",
# "relationship": "Wife",
# "race": "White",
# "sex": "Male",
# "capital-gain": "0",
# "capital-loss": "0",
# "hours-per-week": "10",
# "native-country": "United-States",
# "Scored Labels": "10000",
# }
# ]
# }
# }
results = response.json()["Results"]["output1"]
import numpy as np
import warnings
prediction_vector = np.array([value[prediction_column] for value in results])
if (np.issubdtype(prediction_vector.dtype, np.str_)) or (np.issubdtype(prediction_vector.dtype, np.object_)):
warnings.warn("The predictions for the regression problem are strings. Converting them to floats.")
prediction_vector = prediction_vector.astype(float)
return prediction_vector
def score(training_data_frame):
azure_scoring_url = <REQUEST RESPONSE URL FROM AZURE MODEL>
token = <PRIMARY_KEY FROM AZURE MODEL>
# edit these if your prediction column has different name
prediction_column_name = "Scored Labels"
cols_to_remove = []
# edit these if your training dataframe contains label column
label_column = "TO_BE_EDITED"
cols_to_remove.append(label_column)
# edit these if your training dataframe contains meta columns
meta_columns = ["TO_BE_EDITED"]
cols_to_remove.extend(meta_columns)
# Removing the meta columns/label column from the feature_columns
feature_cols = [col for col in training_data_frame.columns if col not in cols_to_remove]
try:
import json
import requests
input_data = training_data_frame[feature_cols].to_json(orient="records")
payload = {
"Inputs": {
"input1": json.loads(input_data)
},
"GlobalParameters": {}
}
response = requests.post(
azure_scoring_url,
json=payload,
headers={
"Authorization": "Bearer {}".format(token)
})
if not response.ok:
raise Exception(str(response.content))
return get_prediction_using_scoring_response(
response=response,
prediction_column=prediction_column_name)
except Exception as ex:
raise Exception("Scoring failed. {}".format(str(ex)))
This section provides the score function templates for model deployed in Azure ML Service. User needs to consider that online scoring endpoints of Azure ML Service will be used. Please be aware that a cost will be incurred for scoring using this method.
def score(training_data_frame):
az_scoring_uri = <EDIT THIS>
api_key = <DEPLOYMENT API KEY>
# edit these if your prediction and probability column have different names
prediction_column_name = "Scored Labels"
probability_column_name = "Scored Probabilities"
cols_to_remove = []
# edit these if your training dataframe contains label column
label_column = "TO_BE_EDITED"
cols_to_remove.append(label_column)
# edit these if your training dataframe contains meta columns
meta_columns = ["TO_BE_EDITED"]
cols_to_remove.extend(meta_columns)
# Removing the meta columns/label column from the feature_columns
feature_cols = [col for col in training_data_frame.columns if col not in cols_to_remove]
try:
input_values = training_data_frame[feature_cols].values.tolist()
input_data = [{field: value for field,value in zip(feature_cols, input_value)} for input_value in input_values]
payload = {
"input": input_data
}
import requests
import json
import numpy as np
import time
headers = {"Content-Type":"application/json", "Authorization":("Bearer "+ api_key)}
start_time = time.time()
response = requests.post(az_scoring_uri, json=payload, headers=headers)
if not response.ok:
raise Exception(str(response.content))
response_time = int((time.time() - start_time)*1000)
print(response_time)
# assumed response json structure
# {
# "output": [
# {
# "Scored Labels": "Risk",
# "Scored Probabilities": []
# }
# ]
# }
# If your scoring response does not match above schema,
# please modify below code to extract prediction and probabilities array
response_dict = json.loads(response.json())
output = response_dict["output"]
# Compute for all values
score_label_list = []
score_prob_list = []
for value in output:
score_label_list.append(value[prediction_column_name])
score_prob_list.append(value[probability_column_name])
return np.array(score_prob_list), np.array(score_label_list)
except Exception as ex:
raise Exception("Scoring failed. {}".format(str(ex)))
def score(training_data_frame):
az_scoring_uri = <EDIT THIS>
api_key = <DEPLOYMENT API KEY>
# edit this if your prediction column has different name
prediction_column_name = "Scored Labels"
cols_to_remove = []
# edit these if your training dataframe contains label column
label_column = "TO_BE_EDITED"
cols_to_remove.append(label_column)
# edit these if your training dataframe contains meta columns
meta_columns = ["TO_BE_EDITED"]
cols_to_remove.extend(meta_columns)
# Removing the meta columns/label column from the feature_columns
feature_cols = [col for col in training_data_frame.columns if col not in cols_to_remove]
try:
input_values = training_data_frame[feature_cols].values.tolist()
input_data = [{field: value for field,value in zip(feature_cols, input_value)} for input_value in input_values]
payload = {
"input": input_data
}
import requests
import json
import numpy as np
import time
headers = {"Content-Type":"application/json", "Authorization":("Bearer "+ api_key)}
start_time = time.time()
response = requests.post(az_scoring_uri, json=payload, headers=headers)
if not response.ok:
raise Exception(str(response.content))
response_time = int((time.time() - start_time)*1000)
print(response_time)
# assumed response json structure
# {
# "output": [
# {
# "Scored Labels": 123
# }
# ]
# }
# If your scoring response does not match above schema,
# please modify below code to extract prediction and probabilities array
response_dict = json.loads(response.json())
output = response_dict["output"]
# Compute for all values
score_label_list = []
for value in output:
score_label_list.append(value[prediction_column_name])
return np.array(score_label_list)
except Exception as ex:
raise Exception("Scoring failed. {}".format(str(ex)))
This section provides the score function templates for for model deployed in AWS SageMaker Model Engine. User needs to consider that online scoring endpoints of SageMaker will be used. Please be aware that a cost will be incurred for scoring using this method.
The below snippets are created with an assumption that input datasets are one hot encoded for categorical columns and label-encoded for label column
def score(training_data_frame):
SAGEMAKER_CREDENTIALS = {
"access_key_id": <EDIT THIS>,
"secret_access_key": <EDIT THIS>,
"region": <EDIT THIS>
}
# User input needed
endpoint_name = <EDIT THIS>
# edit these if your prediction and probability column have different names
prediction_column_name = "predicted_label"
probability_column_name = "score"
cols_to_remove = []
# edit these if your training dataframe contains label column
label_column = "TO_BE_EDITED"
cols_to_remove.append(label_column)
# edit these if your training dataframe contains meta columns
meta_columns = ["TO_BE_EDITED"]
cols_to_remove.extend(meta_columns)
# Removing the meta columns/label column from the feature_columns
feature_cols = [col for col in training_data_frame.columns if col not in cols_to_remove]
try:
access_id = SAGEMAKER_CREDENTIALS.get("access_key_id")
secret_key = SAGEMAKER_CREDENTIALS.get("secret_access_key")
region = SAGEMAKER_CREDENTIALS.get("region")
# Covert the training data frames to bytes
import io
import numpy as np
train_df_bytes = io.BytesIO()
np.savetxt(train_df_bytes, training_data_frame[feature_cols].values, delimiter=",", fmt="%g")
payload_data = train_df_bytes.getvalue().decode().rstrip()
# Score the training data
import requests
import time
import json
import boto3
runtime = boto3.client("sagemaker-runtime", region_name=region, aws_access_key_id=access_id, aws_secret_access_key=secret_key)
start_time = time.time()
response = runtime.invoke_endpoint(EndpointName=endpoint_name, ContentType="text/csv", Body=payload_data)
if not(("ResponseMetadata" in response) and ("HTTPStatusCode" in response["ResponseMetadata"]) and (response["ResponseMetadata"]["HTTPStatusCode"] == 200)):
raise Exception(str(response))
response_time = int((time.time() - start_time)*1000)
results_decoded = json.loads(response["Body"].read().decode())
# Extract the details
results = results_decoded["predictions"]
predicted_label_list = []
score_prob_list = []
for result in results :
predicted_label_list.append(result[prediction_column_name])
# Please note probability always to belongs to the same class label
score_prob_list.append(result[probability_column_name])
import numpy as np
predicted_vector = np.array(predicted_label_list)
probability_array = np.array([[prob, 1-prob] for prob in score_prob_list])
return probability_array, predicted_vector
except Exception as ex:
raise Exception("Scoring failed. {}".format(str(ex)))
def score(training_data_frame):
SAGEMAKER_CREDENTIALS = {
"access_key_id": <EDIT THIS>,
"secret_access_key": <EDIT THIS>,
"region": <EDIT THIS>
}
# User input needed
endpoint_name = <EDIT THIS>
# edit these if your prediction and probability column have different names
prediction_column_name = "predicted_label"
probability_column_name = "score"
cols_to_remove = []
# edit these if your training dataframe contains label column
label_column = "TO_BE_EDITED"
cols_to_remove.append(label_column)
# edit these if your training dataframe contains meta columns
meta_columns = ["TO_BE_EDITED"]
cols_to_remove.extend(meta_columns)
# Removing the meta columns/label column from the feature_columns
feature_cols = [col for col in training_data_frame.columns if col not in cols_to_remove]
try:
access_id = SAGEMAKER_CREDENTIALS.get("access_key_id")
secret_key = SAGEMAKER_CREDENTIALS.get("secret_access_key")
region = SAGEMAKER_CREDENTIALS.get("region")
# Convert the training data frames to bytes
import io
import numpy as np
train_df_bytes = io.BytesIO()
np.savetxt(train_df_bytes, training_data_frame[feature_cols].values, delimiter=",", fmt="%g")
payload_data = train_df_bytes.getvalue().decode().rstrip()
# Score the training data
import requests
import time
import json
import boto3
runtime = boto3.client("sagemaker-runtime", region_name=region, aws_access_key_id=access_id, aws_secret_access_key=secret_key)
start_time = time.time()
response = runtime.invoke_endpoint(EndpointName=endpoint_name, ContentType="text/csv", Body=payload_data)
if not(("ResponseMetadata" in response) and ("HTTPStatusCode" in response["ResponseMetadata"]) and (response["ResponseMetadata"]["HTTPStatusCode"] == 200)):
raise Exception(str(response))
response_time = int((time.time() - start_time)*1000)
results_decoded = json.loads(response["Body"].read().decode())
# Extract the details
results = results_decoded["predictions"]
predicted_vector_list = []
probability_array_list = []
for value in results:
predicted_vector_list.append(value[prediction_column_name])
probability_array_list.append(value[probability_column_name])
# Convert to numpy arrays
probability_array = np.array(probability_array_list)
predicted_vector = np.array(predicted_vector_list)
return probability_array, predicted_vector
except Exception as ex:
raise Exception("Scoring failed. {}".format(str(ex)))
def score(training_data_frame):
SAGEMAKER_CREDENTIALS = {
"access_key_id": <EDIT THIS>,
"secret_access_key": <EDIT THIS>,
"region": <EDIT THIS>
}
# User input needed
endpoint_name = <EDIT THIS>
# edit this if your prediction column has different name
prediction_column_name = "score"
cols_to_remove = []
# edit these if your training dataframe contains label column
label_column = "TO_BE_EDITED"
cols_to_remove.append(label_column)
# edit these if your training dataframe contains meta columns
meta_columns = ["TO_BE_EDITED"]
cols_to_remove.extend(meta_columns)
# Removing the meta columns/label column from the feature_columns
feature_cols = [col for col in training_data_frame.columns if col not in cols_to_remove]
try:
access_id = SAGEMAKER_CREDENTIALS.get("access_key_id")
secret_key = SAGEMAKER_CREDENTIALS.get("secret_access_key")
region = SAGEMAKER_CREDENTIALS.get("region")
# Convert the training data frames to bytes
import io
import numpy as np
train_df_bytes = io.BytesIO()
np.savetxt(train_df_bytes, training_data_frame[feature_cols].values, delimiter=",", fmt="%g")
payload_data = train_df_bytes.getvalue().decode().rstrip()
# Score the training data
import requests
import time
import json
import boto3
runtime = boto3.client("sagemaker-runtime", region_name=region, aws_access_key_id=access_id, aws_secret_access_key=secret_key)
start_time = time.time()
response = runtime.invoke_endpoint(EndpointName=endpoint_name, ContentType="text/csv", Body=payload_data)
if not(("ResponseMetadata" in response) and ("HTTPStatusCode" in response["ResponseMetadata"]) and (response["ResponseMetadata"]["HTTPStatusCode"] == 200)):
raise Exception(str(response))
response_time = int((time.time() - start_time)*1000)
results_decoded = json.loads(response["Body"].read().decode())
# Extract the details
results = results_decoded["predictions"]
predicted_vector_list = []
for value in results:
predicted_vector_list.append(value[prediction_column_name])
# Convert to numpy arrays
predicted_vector = np.array(predicted_vector_list)
return predicted_vector
except Exception as ex:
raise Exception("Scoring failed. {}".format(str(ex)))
This section provides the score function template for model deployed in SPSS model engine. The online scoring end point of custom engine will be used for scoring. Please be aware that a cost will be incurred for scoring using this method.
def score(training_data_frame):
SPSS_CREDENTIALS = {
"username": <EDIT THIS>,
"password": <EDIT THIS>
}
# To be filled by the user - model scoring url
scoring_url = <EDIT THIS>
# "id" - Identifier for the scoring configuration being used to generate scores
scoring_id = <EDIT THIS>
# edit these if your prediction and probability column have different prefixes
prediction_column_prefix = "$N-"
probability_column_prefix = "$NP-"
cols_to_remove = []
# edit these if your training dataframe contains label column
label_column = "TO_BE_EDITED"
cols_to_remove.append(label_column)
# edit these if your training dataframe contains meta columns
meta_columns = ["TO_BE_EDITED"]
cols_to_remove.extend(meta_columns)
# Removing the meta columns/label column from the feature_columns
feature_cols = [col for col in training_data_frame.columns if col not in cols_to_remove]
try:
training_data_dict = training_data_frame[feature_cols].to_dict(orient="records")
request_input_row = [{"input": [{"name": key, "value": value} for key, value in json.items()]} \
for json in training_data_dict]
payload_scoring = {
"id": scoring_id,
"requestInputTable": [{
"requestInputRow": request_input_row
}]
}
# Retain username and password for custom
username = SPSS_CREDENTIALS.get("username")
password = SPSS_CREDENTIALS.get("password")
import requests
import time
import json
import numpy as np
start_time = time.time()
response = requests.post(scoring_url, json=payload_scoring, auth=(username, password))
if not response.ok:
error_msg = "Scoring failed : " + str(response.status_code)
if response.content is not None:
error_msg = error_msg + ", " + response.content.decode("utf-8")
raise Exception(error_msg)
response_time = int((time.time() - start_time)*1000)
print(response_time)
# Convert response to dict
score_predictions = json.loads(response.text)
output_column_names = list(score_predictions.get("columnNames")["name"])
# identify prediction and probability column names
probability_column_names = [item for item in output_column_names \
if item.startswith(probability_column_prefix)]
if len(probability_column_names) == 0:
raise Exception("No probability column found. Please specify probability column name.")
prediction_column_name = [item for item in output_column_names \
if item.startswith(prediction_column_prefix)]
if len(prediction_column_name) != 1:
raise Exception(
"Either no prediction column found or more than one is found. Please specify prediction column name.")
prediction_column_name = prediction_column_name[0]
# identify prediction and probability column indexes
prob_col_indexes = [output_column_names.index(prob_col_name) for prob_col_name in probability_column_names]
predict_col_index = output_column_names.index(prediction_column_name)
if len(prob_col_indexes) == 0 or predict_col_index < 0:
raise Exception("Missing prediction/probability column in the scoring response")
probability_array = []
prediction_vector = []
for value in score_predictions.get("rowValues"):
response_prediction = value["value"][predict_col_index]["value"]
prediction_vector.append(response_prediction)
response_prob_array = []
for prob_col_index in prob_col_indexes:
response_prob_array.append(float(value["value"][prob_col_index]["value"]))
probability_array.append(response_prob_array)
import numpy as np
probability_array = np.array(probability_array)
prediction_vector = np.array(prediction_vector)
return probability_array,prediction_vector
except Exception as ex:
raise Exception("Scoring failed. {}".format(str(ex)))
def score(training_data_frame):
SPSS_CREDENTIALS = {
"username": <EDIT THIS>,
"password": <EDIT THIS>
}
# To be filled by the user - model scoring url
scoring_url = <EDIT THIS>
# "id" - Identifier for the scoring configuration being used to generate scores
scoring_id = <EDIT THIS>
# edit this if your prediction column has different prefix
prediction_column_prefix = "$N-"
cols_to_remove = []
# edit these if your training dataframe contains label column
label_column = "TO_BE_EDITED"
cols_to_remove.append(label_column)
# edit these if your training dataframe contains meta columns
meta_columns = ["TO_BE_EDITED"]
cols_to_remove.extend(meta_columns)
# Removing the meta columns/label column from the feature_columns
feature_cols = [col for col in training_data_frame.columns if col not in cols_to_remove]
try:
training_data_dict = training_data_frame[feature_cols].to_dict(orient="records")
request_input_row = [{"input": [{"name": key, "value": value} for key, value in json.items()]} \
for json in training_data_dict]
payload_scoring = {
"id": scoring_id,
"requestInputTable": [{
"requestInputRow": request_input_row
}]
}
# Retain username and password for custom
username = SPSS_CREDENTIALS.get("username")
password = SPSS_CREDENTIALS.get("password")
import requests
import time
import json
import numpy as np
start_time = time.time()
response = requests.post(scoring_url, json=payload_scoring, auth=(username, password))
if not response.ok:
error_msg = "Scoring failed : " + str(response.status_code)
if response.content is not None:
error_msg = error_msg + ", " + response.content.decode("utf-8")
raise Exception(error_msg)
response_time = int((time.time() - start_time)*1000)
print(response_time)
# Convert response to dict
score_predictions = json.loads(response.text)
output_column_names = list(score_predictions.get("columnNames")["name"])
# identify prediction column name
prediction_column_name = [item for item in output_column_names \
if item.startswith(prediction_column_prefix)]
if len(prediction_column_name) != 1:
raise Exception(
"Either no prediction column found or more than one is found. Please specify prediction column name.")
prediction_column_name = prediction_column_name[0]
# identify prediction column index
predict_col_index = output_column_names.index(prediction_column_name)
if predict_col_index < 0:
raise Exception("Missing prediction column in the scoring response")
prediction_vector = []
for value in score_predictions.get("rowValues"):
response_prediction = value["value"][predict_col_index]["value"]
prediction_vector.append(response_prediction)
import numpy as np
prediction_vector = np.array(prediction_vector)
import warnings
if np.issubdtype(prediction_vector.dtype, np.str_):
warnings.warn("The predictions for the regression problem are strings. Converting them to floats.")
prediction_vector = prediction_vector.astype(float)
return prediction_vector
except Exception as ex:
raise Exception("Scoring failed. {}".format(str(ex)))
This section provides the score function template for model deployed in a custom engine. The online scoring end point of custom engine will be used for scoring. Please be aware that a cost will be incurred for scoring using this method.
def score(training_data_frame):
CUSTOM_ENGINE_CREDENTIALS = {
"url": <EDIT THIS>,
"username": <EDIT THIS>,
"password": <EDIT THIS>
}
# To be filled by the user - model scoring url
scoring_url = <EDIT THIS>
# The data type of the label column and prediction column should be same .
# User needs to make sure that label column and prediction column array
# should have the same unique class labels
prediction_column_name = <EDIT THIS>
probability_column_name = <EDIT THIS>
cols_to_remove = []
# edit these if your training dataframe contains label column
label_column = "TO_BE_EDITED"
cols_to_remove.append(label_column)
# edit these if your training dataframe contains meta columns
meta_columns = ["TO_BE_EDITED"]
cols_to_remove.extend(meta_columns)
# Removing the meta columns/label column from the feature_columns
feature_cols = [col for col in training_data_frame.columns if col not in cols_to_remove]
try:
training_data_rows = training_data_frame[feature_cols].values.tolist()
payload_scoring = {
"fields": feature_cols,
"values": [x for x in training_data_rows]
}
# Retain username and password for custom
username = CUSTOM_ENGINE_CREDENTIALS.get("username")
password = CUSTOM_ENGINE_CREDENTIALS.get("password")
import requests
import time
start_time = time.time()
response = requests.post(scoring_url, json=payload_scoring, auth=(username, password))
if not response.ok:
raise Exception(str(response.content))
response_time = int((time.time() - start_time)*1000)
print(response_time)
# Convert response to dict
import json
score_predictions = json.loads(response.text)
prob_col_index = list(score_predictions.get("fields")).index(probability_column_name)
predict_col_index = list(score_predictions.get("fields")).index(prediction_column_name)
if prob_col_index < 0 or predict_col_index < 0:
raise Exception("Missing prediction/probability column in the scoring response")
import numpy as np
probability_array = np.array([value[prob_col_index] for value in score_predictions.get("values")])
prediction_vector = np.array([value[predict_col_index] for value in score_predictions.get("values")])
return probability_array,prediction_vector
except Exception as ex:
raise Exception("Scoring failed. {}".format(str(ex)))
def score(training_data_frame):
CUSTOM_ENGINE_CREDENTIALS = {
"url": <EDIT THIS>,
"username": <EDIT THIS>,
"password": <EDIT THIS>
}
# To be filled by the user - model scoring url
scoring_url = <EDIT THIS>
# The data type of the label column and prediction column should be same .
# User needs to make sure that label column and prediction column array
# should have the same unique class labels
prediction_column_name = <EDIT THIS>
cols_to_remove = []
# edit these if your training dataframe contains label column
label_column = "TO_BE_EDITED"
cols_to_remove.append(label_column)
# edit these if your training dataframe contains meta columns
meta_columns = ["TO_BE_EDITED"]
cols_to_remove.extend(meta_columns)
# Removing the meta columns/label column from the feature_columns
feature_cols = [col for col in training_data_frame.columns if col not in cols_to_remove]
try:
training_data_rows = training_data_frame[feature_cols].values.tolist()
payload_scoring = {
"fields": feature_cols,
"values": [x for x in training_data_rows]
}
# Retain username and password for custom
username = CUSTOM_ENGINE_CREDENTIALS.get("username")
password = CUSTOM_ENGINE_CREDENTIALS.get("password")
import requests
import time
start_time = time.time()
response = requests.post(scoring_url, json=payload_scoring, auth=(username, password))
if not response.ok:
raise Exception(str(response.content))
response_time = int((time.time() - start_time)*1000)
print(response_time)
# Convert response to dict
import json
score_predictions = json.loads(response.text)
predict_col_index = list(score_predictions.get("fields")).index(prediction_column_name)
if predict_col_index < 0:
raise Exception("Missing prediction/probability column in the scoring response")
import numpy as np
prediction_vector = np.array([value[predict_col_index] for value in score_predictions.get("values")])
import warnings
if np.issubdtype(prediction_vector.dtype, np.str_):
warnings.warn("The predictions for the regression problem are strings. Converting them to floats.")
prediction_vector = prediction_vector.astype(float)
return prediction_vector
except Exception as ex:
raise Exception("Scoring failed. {}".format(str(ex)))