###### This is a notebook of helper functions that will be called for the modeling pipelines

In [2]:
import pickle as pkl
import scipy.sparse
import pyspark.sql.functions as F
import numpy as np
import matplotlib.pyplot as plt
import itertools
import pandas as pd
import time
import hyperopt.pyll.stochastic
# import mlflow
# import hiplot as hip

from sklearn.metrics import classification_report
from itertools import chain
from tqdm.notebook import tqdm
from hyperopt import fmin, hp, tpe, Trials, SparkTrials, STATUS_OK
from functools import partial
from pyspark.ml.linalg import Vectors, SparseVector, DenseVector, VectorUDT, _convert_to_vector
from pyspark.sql.functions import lit
from pyspark.ml.feature import VectorAssembler, StringIndexer, StringIndexerModel, IndexToString
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.sql.types import IntegerType, FloatType, ArrayType, BooleanType, StringType
from pyspark.sql.functions import udf, col, monotonically_increasing_id
from pyspark.sql.types import *
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator, TrainValidationSplit
from pyspark.ml.classification import RandomForestClassifier, LogisticRegression, \
                                RandomForestClassificationModel, LogisticRegressionModel

In [3]:
default_search_space = {
                    'regParam': hp.uniform('regParam', 0, 1),
                    'elasticNetParam': hp.uniform('elasticNetParam', 0, 1),
                    'maxIter': hp.quniform('maxIter', 50,200, 1) #where q=1 #set to static 100
                }

In [4]:
def check_vector_lengths(df):
  '''
  This function takes a PySpark dataframe and checks that each feature vector \
  has the same length across the whole data. 
   
  Arguments: 
    df {PySpark Dataframe} -- PySpark dataframe containing feature fectors
    
  Returns:
    1 or 0 {int} -- 1 signifies that the vector lengths are uniform and 0 that they are not
  '''
  
  data_check = df
  for feat in list(set(data_check.columns) - set(['owner'])):
    data_check = data_check.withColumn("len_"+feat, udf(lambda c: len(c))(feat))

  vector_length_counts = []
  for l in [x for x in data_check.columns if x.startswith("len_")] :
    vector_length_counts.append(data_check.groupBy(l).count().count())
  
  if float(np.mean(vector_length_counts))==1.0:
    return 1
  else:
    data_check.groupBy(l).count().show()
    return 0

In [5]:
def vector_assembler(input_features, data):
  '''
  This function takes a PySpark dataframe and the list of input features to combine, and \
  creates a new column with the assembled feature vectors
   
  Arguments: 
    input_features {list} -- list of columns to combine using VectorAssembler
    data {PySpark Dataframe} -- A PySpark Dataframe that will be transformed
    
  Returns:
    df {PySpark Dataframe} -- A PySpark Dataframe containing a new "features" column
  '''
  
  vector_assembler_model = VectorAssembler(\
  inputCols = input_features,\
  outputCol = "features")

  df = vector_assembler_model.transform(data)
  
  return df

In [6]:
def label_indexer(df):
  '''
  This function takes a PySpark dataframe and converts the string values to index labels
   
  Arguments: 
    df {PySpark Dataframe} -- A PySpark Dataframe containing input string column
    
  Returns:
    df {PySpark Dataframe} -- A PySpark Dataframe containing a new "label" column
  '''
  
  label_indexer_model = StringIndexer(inputCol="owner", outputCol="label", handleInvalid="keep")
  df = label_indexer_model.fit(df).transform(df)
  
  return df

In [7]:
def model_fit(trainingData):
  '''
  This function takes a PySpark dataframe and fits a LogisticRegression model to it
   
  Arguments: 
    trainingData {PySpark Dataframe} -- A PySpark Dataframe to train the model
    
  Returns:
    df {PySpark Dataframe} -- A model fitted with the training data
  '''
  
  lr = LogisticRegression(featuresCol="features", labelCol="label", \
                          weightCol="weight", elasticNetParam = 0.0, regParam = 0.0, maxIter = 100)
  lr_model = lr.fit(trainingData)
  trainingSummary = lr_model.summary
  accuracy = trainingSummary.accuracy
  
  return lr_model, accuracy

In [8]:
def build_label_dict(df):
  '''
  This function takes a PySpark dataframe and creates a dictionary mapping the index \
  labels generated from StringIndexer to the original string values
   
  Arguments: 
    df {PySpark Dataframe} -- A PySpark Dataframe transformed using StringIndexer containing input label column
    
  Returns:
    df {PySpark Dataframe} -- A dictionary with key,value pair being {label:originalLabel}
  '''
  
  labelReverse = IndexToString().setInputCol("label").setOutputCol('originalLabel')
  lrtransform = labelReverse.transform(df)
  label_mapping = lrtransform.select('label','originalLabel').dropDuplicates().orderBy('label')
  label_dict = label_mapping.rdd.map(lambda x : (x[0],x[1])).collectAsMap()
  
  return label_dict

In [9]:
def model_transform(testData, model):
  '''
  This function takes a fitted model and a test dataframe to generates predictions
   
  Arguments: 
    testData {PySpark Dataframe} -- A PySpark Dataframe to pass through the trained model
    model {LogisticRegressionModel} -- trained model
    
  Returns:
    df {PySpark Dataframe} -- A model fitted with the training data
  '''
  predictions = model.transform(testData)
  
  return model, predictions

In [10]:
def add_class_weights(df):
  '''
  This function takes a PySpark dataframe and generates weights for each class. This \
  is important for handling the high imbalance in data
   
  Arguments: 
    df {PySpark Dataframe} -- A PySpark Dataframe containing the target classes
    
  Returns:
    df {PySpark Dataframe} -- A PySpark Dataframe with a new "weights" column
  '''
  
  y_collect = df.select("owner").groupBy("owner").count().collect()
  unique_y = [x["owner"] for x in y_collect]
  total_y = sum([x["count"] for x in y_collect])
  unique_y_count = len(y_collect)
  bin_count = [x["count"] for x in y_collect]
  class_weights_spark = {i: ii for i, ii in zip(unique_y, total_y / (unique_y_count * np.array(bin_count)))}
  mapping_expr = F.create_map([F.lit(x) for x in chain(*class_weights_spark.items())])
  df = df.withColumn("weight", mapping_expr.getItem(F.col("owner")))
  
  return df

In [11]:
def evaluation_metrics(predictions):
  '''
  This function takes a PySpark dataframe containing model predictions and calculates \
  metrics to understand the performance of the model
   
  Arguments: 
    predictions {PySpark Dataframe} -- A PySpark Dataframe containing predicted and actual labels
    
  Returns:
    metrics {tuple} -- A tuple of performance metrics in the following order: \
                    confusion_matrix, accuracy, fMeasure, weightedFMeasure, \
                    weightedFalsePositiveRate, weightedPrecision, weightedRecall
  '''
  
  predictionAndLabels = predictions.rdd.map(lambda lp: (lp.prediction, lp.label))
  metrics = MulticlassMetrics(predictionAndLabels)
  confusion_matrix = metrics.confusionMatrix().toArray()
  accuracy = metrics.accuracy 
  fMeasure = metrics.fMeasure()
  weightedFMeasure = metrics.weightedFMeasure()
  weightedFalsePositiveRate = metrics.weightedFalsePositiveRate
  weightedPrecision = metrics.weightedPrecision
  weightedRecall = metrics.weightedRecall
  
  return confusion_matrix, accuracy, fMeasure, weightedFMeasure, weightedFalsePositiveRate, weightedPrecision, weightedRecall

In [12]:
def optimize_gridsearch(data):
  '''
  This function takes a list of 1 or more PySpark dataframes that grid search will be performed \
  on to get best parameters for a Logistic Regression Classifier.
   
  Arguments: 
    data {list:PySpark Dataframe} -- If model_validate is set to TRUE, this list will contain (trainingData, testData) \
                   and if it is set to FALSE, this list will contain the whole pyspark dataframe 
    
  Returns:
    tvsModel.bestModel {LogisticRegressionModel} -- trained logistic regression model with most optimum parameters
    predictions {PySpark Dataframe} -- A PySpark Dataframe containing predictions if model_validate == True
  '''
  
  trainingData = data[0]
  
  lr = LogisticRegression(maxIter=20, regParam=0.0, elasticNetParam=0, featuresCol="features", \
                        labelCol="label", weightCol="weight")

  # Create ParamGrid for Cross Validation
  paramGrid = (ParamGridBuilder()
               .addGrid(lr.regParam, [0.0, 0.1, 0.3]) # regularization parameter
               .addGrid(lr.elasticNetParam, [0.0, 0.1]) # Elastic Net Parameter (Ridge = 0)
               .addGrid(lr.maxIter, [20, 50, 100]) #Number of iterations
               .build())
  
  evaluator = MulticlassClassificationEvaluator(metricName="f1") 

  tvs = TrainValidationSplit(estimator=lr,
                             estimatorParamMaps=paramGrid,
                             evaluator=evaluator,
                             trainRatio=0.8)

  tvsModel = tvs.fit(trainingData)
  
  if len(data)>1:
    predictions = tvsModel.transform(data[1])
    print("f1 Score: ", evaluator.evaluate(predictions))
    return tvsModel.bestModel, predictions
  else:
    return tvsModel.bestModel

In [13]:
def plot_confusion_matrix(confusion_matrix, predictions, normalize=True, cmap = 'winter'):
  '''
  This function takes a confusion matrix and dataframe of model predictions and \
  returns a plot of the confusion matrix where every point outside of the diagonal is \
  a misclassification. By default, it is normalized.
   
  Arguments: 
    confusion_matrix {array} -- An array representing the confusion matrix from the classifer
    predictions {PySpark Dataframe} -- A PySpark Dataframe containing the target labels
    normalize {bool} -- normalizes the confusion matrix for a more accurate representation
    cmap {string} -- colormap for the plot
    
  Returns:
    Display of the confusion matrix plot
  '''
  
  class_temp = predictions.select("label").groupBy("label")\
                        .count().sort('count', ascending=False).toPandas()
  class_temp = class_temp["label"].values.tolist()

  plt.figure()
  if normalize:
      confusion_matrix = confusion_matrix.astype('float') / confusion_matrix.sum(axis=1)[:, np.newaxis]
      title = "Normalized confusion matrix"
  else:
      title = 'Confusion matrix, without normalization'

  plt.imshow(confusion_matrix, interpolation='nearest', cmap=cmap)
  plt.title(title)
  plt.colorbar()
  plt.tight_layout()
  plt.ylabel('True label')
  plt.xlabel('Predicted label')

  display(plt.show())

In [14]:
def objective_func(training_data, validation_data, params):
  '''
  This function takes the training data and hyperparameters as input and \
  trains a Logistic Regression Model for the chosen set of parameters. It \
  returns the loss, the model, as well as other information in the trials object.
   
  Arguments: 
    data {PySpark Dataframe} -- A PySpark Dataframe containing feature vectors and labels
    params {dict} -- A dictionary of chosen parameter distributions from the search space
    
  Returns:
    results {dict} -- loss, model, status, and runtime for that trial
  '''
  
  elasticNetParam = float(params['elasticNetParam'])
  maxIter = int(params['maxIter'])
  regParam = float(params['regParam'])
  
#   (training_data, validation_data) = data.randomSplit([0.8, 0.2])

  lr = LogisticRegression(featuresCol="features", labelCol="label", \
                          weightCol="weight", regParam = regParam, \
                          elasticNetParam = elasticNetParam, maxIter = maxIter)
  model_fitted = lr.fit(training_data)
  model_transformed, predictions = model_transform(validation_data, model_fitted)

  predictionAndLabels = predictions.rdd.map(lambda lp: (lp.prediction, lp.label))
  metrics = MulticlassMetrics(predictionAndLabels)
  fMeasure = metrics.fMeasure()
  #   accuracy = metrics.accuracy() #Warning: fMeasure is deprecated, use accuracy
  
  return {'loss': -fMeasure, 'status': STATUS_OK, 'eval_time': time.time()}

In [15]:
def optimize_hyperopt(data, search_space = default_search_space, \
                    algo = tpe.suggest, n_evals = 32):
  
  '''
  This function takes the training data as input and performs baysian hyperopt \
  tuning to generate the best hyperparameters and return the best model. If the \
  testing data is also passed then it will validate and return predictions as well.
   
  Arguments: 
    data {PySpark Dataframe} -- A PySpark Dataframe containing feature vectors and labels
    search_space {dict} -- A dictionary of the parameter distributions to search from
    algo {str} -- The search algorithm to be used. By default it is bayesian
    n_evals {int} -- number of hyperparameter combinations to generate
    
  Returns:
    results {tuple} -- If only training data was provided, it returns (trials, fitted model) \
                       If testing data is also provided, it returns(trials, fitted model, predictions)
  '''
  
  trainData = data[0]
  (training_data, validation_data) = trainData.randomSplit([0.8, 0.2]) 
  
  hyperopt_training = partial(objective_func, training_data, validation_data) 
  #Use trials to distribute tuning across spark cluster
  trials = Trials()
  best_hyperparameters = fmin(
    fn= hyperopt_training,
    space=search_space,
    algo=algo,
    trials=trials,
    max_evals=n_evals)

  paramMap = hyperopt.space_eval(search_space, trials.argmin)
  lr = LogisticRegression(featuresCol="features", labelCol="label", weightCol="weight")
  full_model = lr.fit(trainData, paramMap) #Retraining model using best parameters on full data
  
  if len(data)>1:
    predictions = full_model.transform(data[1])
    predictionAndLabels = predictions.rdd.map(lambda lp: (lp.prediction, lp.label))
    metrics = MulticlassMetrics(predictionAndLabels)
    fMeasure = metrics.fMeasure()
    return trials, full_model, predictions

  else:
    return trials, full_model

In [16]:
def training_pipeline(data, datestamp, train_existing_model = True, hyperparameter_tuning = False, \
                      training_split = 0.8, model_validate = True, evaluate_metrics = False):
  '''
  This is the main function to be called for executing the model training pipeline. It takes \
  a PySpark dataframe which is first transformed, and trained using existing model or performs \
  hyperparameter tuning, following which the prediction metrics are evaluated.
   
  Arguments: 
    data {PySpark Dataframe} -- A PySpark Dataframe containing feature vectors and labels
    train_existing_model {bool} -- If TRUE, the latest model parameters are used for training
    hyperparameter_tuning {bool} -- If TRUE, a grid search is performed to get the optimum parameters
    training_split {float} -- fraction in range (0,1) that will be the training data after randomSplit
    model_validate {bool} -- If TRUE, it splits the dataset into (train, test) using randomSplit
                             If FALSE, it passes the whole data into the model and generates no predictions
    evaluate_metrics {bool} -- If TRUE, it calculates the performance metrics of a model predictions
    
  Returns:
    results {tuple} -- A tuple containing results in the following order:
                    (model, label_dict, trained_owners_dict, predictions, metrics, classificationReport)
                    If all boolean parameters were FALSE, it will return only (df, label_dict)
  '''
  
  if check_vector_lengths(data)==0:
    sys.exit('Feature vector lengths are different')
    
  label_dict_path = '/dbfs/dbfs/FileStore/label_dicts/label_mapping_{}.pkl'.format(datestamp)
  trials_path = '/dbfs/dbfs/FileStore/metrics/trials_{}.pkl'.format(datestamp)
  model_path = '/dbfs/FileStore/models/model_{}.sav'.format(datestamp)
#   validation_predictions_path = '/dbfs/dbfs/FileStore/predictions/validation_predictions_{}.csv'.format(datestamp)
  validation_predictions_tablename = 'validation_predictions_{}'.format(datestamp)
  trained_owners_dict_path = '/dbfs/dbfs/FileStore/trained_owners/trained_owners_{}.pkl'.format(datestamp)
  metrics_path = '/dbfs/dbfs/FileStore/metrics/validation_metrics_{}.pkl'.format(datestamp)
  classification_report_path = '/dbfs/dbfs/FileStore/metrics/validation_classification_report_{}.pkl'.format(datestamp)
  transformed_df_path = 'transformed_df_{}'.format(datestamp)
  results = {}
  
  input_features = list(set(data.columns) - set(['owner','vulnerability_id']))
  df = vector_assembler(input_features, data).select('features','owner','vulnerability_id')
  df = label_indexer(df)
  label_dict = build_label_dict(df)
  pkl.dump(label_dict, open(label_dict_path, 'wb'))
  df = add_class_weights(df)
  
  if model_validate:
    (trainingData, testData) = df.randomSplit([training_split, 1 - training_split])
    if hyperparameter_tuning:
      trials, model, predictions = optimize_hyperopt([trainingData, testData])
      results['training_accuracy'] = trials.best_trial['result']['loss']*-1
      pkl.dump(trials, open(trials_path, 'wb'))
      model.save(model_path)
    elif train_existing_model:
      model_fitted, accuracy = model_fit(trainingData)
      results['training_accuracy'] = accuracy
      model, predictions = model_transform(testData, model_fitted)
      model.save(model_path)
    else:
      print("Please make one of these parameters True: [hyperparameter_tuning, existing_model] \
             \n OR make model_validate == False")
    predictions = predictions.withColumn('ml_predicted_owner', udf(lambda x: label_dict[x])('prediction'))
    predictions = predictions.withColumn('array_probs', udf(lambda v: v.toArray().tolist(), \
                                                        ArrayType(FloatType()))('probability'))
    predictions = predictions.withColumn('confidence_factor', udf(lambda x: max(x), FloatType())('array_probs'))
    predictions = predictions.withColumn('ml_owner',udf(lambda x: x)('ml_predicted_owner'))
#     predictions.coalesce(1).write.format('com.databricks.spark.csv') \
#                .option('header','true') \
#                .save(validation_predictions_path)
    predictions.write.saveAsTable(validation_predictions_tablename)
    trained_owners_dict = trainingData.groupBy('owner').count().rdd.map(lambda x : (x[0],x[1])).collectAsMap()
    pkl.dump(trained_owners_dict, open(trained_owners_dict_path, 'wb'))
    if evaluate_metrics:
      metrics = evaluation_metrics(predictions)
      actual_labels = predictions.select('owner').collect()
      predicted_labels = predictions.select('ml_predicted_owner').collect()
      classificationReport = classification_report(actual_labels, predicted_labels)
      pkl.dump(metrics, open(metrics_path, 'wb'))
      pkl.dump(classificationReport, open(classification_report_path, 'wb'))
      results.update({'model': model_path, 'label_dict': label_dict_path, \
              'trained_owners_dict': trained_owners_dict_path, \
              'validation_predictions': validation_predictions_tablename, \
              'metrics': metrics_path, 'classificationReport': classification_report_path})
      return results
    else:
      results.update({'model': model_path, 'label_dict': label_dict_path, \
              'trained_owners_dict': trained_owners_dict_path, \
              'validation_predictions': validation_predictions_tablename})
      return results
  else:
    if hyperparameter_tuning:
      trials, model_fitted = optimize_hyperopt([df])
      trained_owners_dict = df.groupBy('owner').count().rdd.map(lambda x : (x[0],x[1])).collectAsMap()
      pkl.dump(trials, open(trials_path, 'wb'))
      model_fitted.save(model_path)
      pkl.dump(trained_owners_dict, open(trained_owners_dict_path, 'wb'))
      results['training_accuracy'] = trials.best_trial['result']['loss']*-1
      results.update({'model': model_path, 'label_dict': label_dict_path, \
              'trained_owners_dict': trained_owners_dict_path})
      return results
    elif train_existing_model:
      model_fitted, accuracy = model_fit(df)
      results['training_accuracy'] = accuracy
      trained_owners_dict = df.groupBy('owner').count().rdd.map(lambda x : (x[0],x[1])).collectAsMap()
      model_fitted.save(model_path)
      pkl.dump(trained_owners_dict, open(trained_owners_dict_path, 'wb'))
      results.update({'model': model_path, 'label_dict': label_dict_path, \
              'trained_owners_dict': trained_owners_dict_path})
      return results
    else:
      df.write.saveAsTable(transformed_df_path)
      results.update({'transformed_df': transformed_df_path, 'label_dict': label_dict_path})
      return results

In [17]:
def testing_pipeline(data, datestamp, model_path = '/dbfs/FileStore/models/tvs_lr_model_apr23_1.sav',\
                    label_dict_path = '/dbfs/FileStore/tables/label_mapping_apr23_1.pkl'):
  '''
  This is the main function to be called for executing the model testing pipeline. It takes \
  a PySpark dataframe which it transforms, passes through an existing model, and generates predictions
   
  Arguments: 
    data {PySpark Dataframe} -- A PySpark Dataframe containing feature vectors and labels
    model_path {string:path} -- path in dbfs where the trained model is saved
    label_dict_path {string: path} -- path in dbfs where the label-owner mapping dictionary is saved
    
  Returns:
    results {tuple} -- A tuple containing (transformed model, predictions)
  '''
  
#   testing_predictions_path = '/dbfs/dbfs/FileStore/predictions/testing_predictions_{}.csv'.format(datestamp)
  testing_predictions_tablename = 'prediction_results_{}'.format(datestamp)
  
  input_features = list(set(data.columns) - set(['vulnerability_id']))
  df = vector_assembler(input_features, data).select('features','vulnerability_id')
  model = LogisticRegressionModel.load((model_path))
  label_dict = pkl.load(open(label_dict_path, 'rb'))
  predictions = model.transform(df)
  
  predictions = predictions.withColumn('ml_predicted_owner', udf(lambda x: label_dict[x])('prediction'))
  predictions = predictions.withColumn('array_probs', udf(lambda v: v.toArray().tolist(), \
                                                      ArrayType(FloatType()))('probability'))
  predictions = predictions.withColumn('confidence_factor', udf(lambda x: max(x), FloatType())('array_probs'))
  predictions = predictions.withColumn('ml_owner',udf(lambda x: x)('ml_predicted_owner'))
  predictions = predictions.withColumn('created_date',lit(datestamp).cast(LongType()))
  
  
  predictions.select('vulnerability_id','ml_owner','ml_predicted_owner','confidence_factor','created_date').write.saveAsTable(testing_predictions_tablename)
#   predictions.coalesce(1).write.format('com.databricks.spark.csv') \
#                .option('header','true') \
#                .save(testing_predictions_path)
  
  return {'predictions': testing_predictions_tablename}