In [None]:
#!/usr/bin/python
import pandas as pd
import json
import time, sys, os, shutil, glob, io, requests
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline, Model, PipelineModel
from pyspark.sql import SQLContext
import dsx_core_utils
# define variables
args = {"threshold": {"min_value": 0.3, "mid_value": 0.7, "metric": "accuracyScore"}, "published": "false", "evaluator_type": "multiclass", "dataset": "/datasets/TradingCustomerSparkMLEval.csv"}
model_path = os.getenv("DSX_PROJECT_DIR") + "/models/Trading Churn Risk Classification SparkML/1/model"
# create spark context
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
# load the input data
input_data = os.getenv("DSX_PROJECT_DIR") + args.get("dataset")
dataframe = SQLContext(sc).read.csv(input_data , header="true", inferSchema = "true")
# load the model from disk 
model_rf = PipelineModel.load(model_path)
# generate predictions
predictions = model_rf.transform(dataframe)
# Create Evalutation JSON
evaluation = dict()
evaluation["metrics"] = dict()
evaluation["modelName"] = "Trading Churn Risk Classification SparkML"
evaluation["startTime"] = int(time.time())
evaluation["modelVersion"] = "1"
threshold = {'min_value': 0.3, 'mid_value': 0.7, 'metric': 'accuracyScore'}
# replace "label" below with the numeric representation of the label column that you defined while training the model
labelCol = "label"
# create evaluator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(labelCol=labelCol)
# compute evaluations
evaluation["metrics"]["accuracyScore"] = evaluator.evaluate(predictions, {evaluator.metricName: "accuracy"})
evaluation["metrics"]["f1Score"] = evaluator.evaluate(predictions, {evaluator.metricName: "f1"})
evaluation["metrics"]["weightedPrecisionScore"] = evaluator.evaluate(predictions, {evaluator.metricName: "weightedPrecision"})
evaluation["metrics"]["weightedRecallScore"] = evaluator.evaluate(predictions, {evaluator.metricName: "weightedRecall"})
evaluation["metrics"]["threshold"] = threshold
if(evaluation["metrics"][threshold.get('metric','INVALID_METRIC')] >= threshold.get('mid_value', 0.70)):
    evaluation["performance"] = "good"
elif(evaluation["metrics"][threshold.get('metric','INVALID_METRIC')] <= threshold.get('min_value', 0.25)):
    evaluation["performance"] = "poor"
else:
    evaluation["performance"] = "fair"
evaluations_file_path = os.getenv("DSX_PROJECT_DIR") + '/models/' + str("Trading Churn Risk Classification SparkML") + '/' + str("1") + '/evaluations.json'
if(os.path.isfile(evaluations_file_path)):
    current_evaluations = json.load(open(evaluations_file_path))
else:
    current_evaluations = []
current_evaluations.append(evaluation)
with open(evaluations_file_path, 'w') as outfile:
    json.dump(current_evaluations, outfile, indent=4, sort_keys=True)