#  Anomaly detection in cellular networks

## Introduction

The purpose of this notebook is to solve a anomaly detection problem proposed as a competition in the Kaggle InClass platform.

## Problem description

### Context:

Traditionally, the design of a cellular network focuses on the optimization of energy and resources that guarantees a smooth operation even during peak hours (i.e. periods with higher traffic load). 
However, this implies that cells are most of the time overprovisioned of radio resources. 
Next generation cellular networks ask for a dynamic management and configuration in order to adapt to the varying user demands in the most efficient way with regards to energy savings and utilization of frequency resources. 
If the network operator were capable of anticipating to those variations in the users’ traffic demands, a more efficient management of the scarce (and expensive) network resources would be possible.
Current research in mobile networks looks upon Machine Learning (ML) techniques to help manage those resources. 
In this case, you will explore the possibilities of ML to detect abnormal behaviors in the utilization of the network that would motivate a change in the configuration of the base station.


### Objective

The objective of the network optimization team is to analyze traces of past activity, which will be used to train an ML system capable of classifying samples of current activity as:
 - 0 (normal): current activity corresponds to normal behavior of any working day and. Therefore, no re-configuration or redistribution of resources is needed.
 - 1 (unusual): current activity slightly differs from the behavior usually observed for that time of the day (e.g. due to a strike, demonstration, sports event, etc.), which should trigger a reconfiguration of the base station.

### Dataset

The dataset has been obtained from a real LTE deployment. During two weeks, different metrics were gathered from a set of 10 base stations, each having a different number of cells, every 15 minutes. 

The dataset is provided in the form of a csv file, where each row corresponds to a sample obtained from one particular cell at a certain time. Each data example contains the following features:

 - Time : hour of the day (in the format hh:mm) when the sample was generated.
 - CellName1: text string used to uniquely identify the cell that generated the current sample. CellName is in the form xαLTE, where x identifies the base station, and α the cell within that base station (see the example in the right figure).
 - PRBUsageUL and PRBUsageDL: level of resource utilization in that cell measured as the portion of Physical Radio Blocks (PRB) that were in use (%) in the previous 15 minutes. Uplink (UL) and downlink (DL) are measured separately.
 - meanThrDL and meanThrUL: average carried traffic (in Mbps) during the past 15 minutes. Uplink (UL) and downlink (DL) are measured separately.
 - maxThrDL and maxThrUL: maximum carried traffic (in Mbps) measured in the last 15 minutes. Uplink (UL) and downlink (DL) are measured separately.
 - meanUEDL and meanUEUL: average number of user equipment (UE) devices that were simultaneously active during the last 15 minutes. Uplink (UL) and downlink (DL) are measured separately.
 - maxUEDL and maxUEUL: maximum number of user equipment (UE) devices that were simultaneously active during the last 15 minutes. Uplink (UL) and downlink (DL) are measured separately.
 - maxUE_UL+DL: maximum number of user equipment (UE) devices that were active simultaneously in the last 15 minutes, regardless of UL and DL.
 - Unusual: labels for supervised learning. A value of 0 determines that the sample corresponds to normal operation, a value of 1 identifies unusual behavior.

## Libraries

In [None]:
import os
import shutil
import sys
import random
from zipfile import ZipFile
from IPython.display import Image

#Analysis
import pyspark
try:
    from pyspark import SparkContext, SparkConf
    from pyspark.sql import SparkSession
except ImportError as e:
    print('WARN: Something wrong with pyspark library. Please check configuration settings!')
from pyspark.sql.types import DoubleType
    
#Feature Engineering
from pyspark.sql.functions import col, when, lit, array, explode, rand, udf
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, MinMaxScaler
#Model Training
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
#Model Registry
import pandas as pd
from sasctl import pzmm
from sasctl import Session
from sasctl.services import model_repository
import getpass

    
# Reloads functions each time so you can edit a script and not need to restart the kernel
%load_ext autoreload
%autoreload 2

## Helpers

In [None]:
def get_root_dir (src: str, max_nest: int) -> str:
    '''
    Specify paths and appending directories
    with relevant python source code.
    :param src: the path of the source
    :param max_nest: number of levels to search for the src
    :return: root_dir path of the root
    '''
    root_dir = os.curdir
    nest = 0
    while src not in os.listdir(root_dir) and nest < max_nest:
        root_dir = os.path.join(os.pardir, root_dir)  # Look up the directory structure for a src directory
        nest += 1
    # If you don't find the src directory, the root directory is this directory
    root_dir = os.path.abspath(root_dir) if nest < max_nest else os.path.abspath(
        os.curdir)
    return root_dir

def set_src (root_dir: str, src: str) -> str:
    '''
     Get the source directory and append
     path to access python packages/scripts within directory
    :param root_dir: root path
    :param src: src path
    :return: last system path record (to check)
    '''
    if src in os.listdir(root_dir):
        src_dir = os.path.join(root_dir, src)
        sys.path.append(src_dir)
    return sys.path[-1]


def set_folder (root_dir: str, folder: str) -> str:
    '''
    Set the folder path based on the folder name
    :param root_dir: root path
    :param folder: folder name
    :return: folder_path from root
    '''
    folder_path = os.path.join(
        root_dir, folder) if folder in os.listdir(root_dir) else os.curdir
    return folder_path

def set_path(path:str, dirname:str) -> str:
    '''
    Set the entire path given a directory name
    :param path: 
    :param dirname: 
    :return: new path
    '''
    return os.path.join(path, dirname)


def unzip (inpath: str, outpath: str) -> None:
    '''
    unzip a compressed file
    :param inpath: path of zip
    :param outpath: path to unzip
    :return: None
    '''
    zf = ZipFile(inpath, 'r')
    zf.extractall(outpath)
    zf.close()
    
def metrics (dataframe, actual, predicted):
    '''
    Calculates evaluation metrics from predicted results
    :param dataframe: spark.sql.dataframe with the real and predicted values
    :param actual: Name of column with observed target values
    :param predicted: Name of column with predicted values
    :return: None
    '''

    # Along each row are the actual values and down each column are the predicted
    dataframe = dataframe.withColumn(actual, col(actual).cast('integer'))
    dataframe = dataframe.withColumn(predicted, col(predicted).cast('integer'))
    cm = dataframe.crosstab(actual, predicted)
    cm = cm.sort(cm.columns[0], ascending=True)

    # Adds missing column in case just one class was predicted
    if not '0' in cm.columns:
        cm = cm.withColumn('0', lit(0))
    if not '1' in cm.columns:
        cm = cm.withColumn('1', lit(0))

    # Subsets values from confusion matrix
    zero = cm.filter(cm[cm.columns[0]] == 0.0)
    first_0 = zero.take(1)

    one = cm.filter(cm[cm.columns[0]] == 1.0)
    first_1 = one.take(1)

    tn = first_0[0][1]
    fp = first_0[0][2]
    fn = first_1[0][1]
    tp = first_1[0][2]

    # Calculate metrics from values in the confussion matrix
    if (tp == 0):
        acc = float((tp + tn) / (tp + tn + fp + fn))
        sen = 0
        spe = float((tn) / (tn + fp))
        prec = 0
        rec = 0
        f1 = 0
    elif (tn == 0):
        acc = float((tp + tn) / (tp + tn + fp + fn))
        sen = float((tp) / (tp + fn))
        spe = 0
        prec = float((tp) / (tp + fp))
        rec = float((tp) / (tp + fn))
        f1 = 2 * float((prec * rec) / (prec + rec))
    else:
        acc = float((tp + tn) / (tp + tn + fp + fn))
        sen = float((tp) / (tp + fn))
        spe = float((tn) / (tn + fp))
        prec = float((tp) / (tp + fp))
        rec = float((tp) / (tp + fn))
        f1 = 2 * float((prec * rec) / (prec + rec))

    # Print results
    print('Confusion Matrix and Statistics: \n')
    cm.show()

    print('True Positives:', tp)
    print('True Negatives:', tn)
    print('False Positives:', fp)
    print('False Negatives:', fn)
    print('Total:', dataframe.count(), '\n')

    print('Accuracy: {0:.2f}'.format(acc))
    print('Sensitivity: {0:.2f}'.format(sen))
    print('Specificity: {0:.2f}'.format(spe))
    print('Precision: {0:.2f}'.format(prec))
    print('Recall: {0:.2f}'.format(rec))
    print('F1-score: {0:.2f}'.format(f1))
    # Create spark dataframe with results
    l = [(acc, sen, spe, prec, rec, f1)]
    df = spark.createDataFrame(l, ['Accuracy', 'Sensitivity', 'Specificity', 'Precision', 'Recall', 'F1'])

    return (df)

def get_output_variables(names, labels, eventprob):
    '''
    Given variable names, labels and event probability, 
    it creates dataframes for pzmm metadata generation
    :param names: 
    :param labels: 
    :param eventprob: 
    :return: outputVar
    '''
    outputVar = pd.DataFrame(columns=names)
    outputVar[names[0]] = [random.random(), random.random()]
    outputVar[names[1]] = [random.random(), random.random()]
    outputVar[names[2]] = labels
    outputVar[names[3]] = eventprob
    return outputVar

def zip_folder(folder_to_zip_path, rmtree=False):
    '''
    Given the folder to zip path,
    create an archive
    :param folder_to_zip_path: 
    :param rmtree: 
    :return: zipath
    '''
    path_sep = '/'
    root_dir = path_sep.join(folder_to_zip_path.split('/')[:-1])
    base_dir = folder_to_zip_path.split('/')[-1]
    zipath = shutil.make_archive(
        folder_to_zip_path,         # folder to zip
        'zip',                      # the archive format - or tar, bztar, gztar 
        root_dir=root_dir,          # folder to zip root
        base_dir=base_dir)          # folder to zip name
    if rmtree:
        shutil.rmtree(folder_to_zip_path) # remove .zip folder
    return zipath

extract1_udf = udf(lambda value: value[1].item(), DoubleType())

## Setup

In [None]:
root_dir = get_root_dir('src', 5)
src_dir = set_src(root_dir, 'src')
data_dir = set_folder(root_dir, 'data')
raw_data_dir = set_path(data_dir, 'raw')
interim_data_dir = set_path(data_dir, 'interim')
processed_data_dir = set_path(data_dir, 'processed')
figures_dir = set_folder(root_dir, 'figures')
features_dir = set_folder(root_dir, 'features')
index_features_dir = set_path(features_dir, 'indexstr')
ohe_features_dir = set_path(features_dir, 'ohe')
std_features_dir = set_path(features_dir, 'std')
models_dir = set_folder(root_dir, 'models')
deliverables_path = set_folder(root_dir, 'deliverables')
model_version_dir = set_path(deliverables_path, 'pyspark_GBTClassifier')
gbt_pipe_dir = set_path(model_version_dir, 'gbt_pipeline')

# 1. Data

## Initiate Spark session

In [None]:
#If not exists create a spark session named Anomaly Detection where the master node is local
spark = SparkSession.builder \
    .master("local[*]") \
    .appName("Anomaly Detection") \
    .getOrCreate()

In [None]:
spark.getActiveSession()

## Load

### Set path

In [None]:
train_path = set_path(processed_data_dir, 'ML-MATT-CompetitionQT1920_train_processed.parquet')
test_path = set_path(processed_data_dir, 'ML-MATT-CompetitionQT1920_test_processed.parquet')

### Load data

In [None]:
train_df = spark.read.parquet(train_path)
test_df = spark.read.parquet(test_path)

In [None]:
train_df.printSchema()

In [None]:
train_df.show(5)

## 2. Load ML Pipeline

In [None]:
gbt_pipe = Pipeline.load(gbt_pipe_dir)

## 3. Test the pipe

In [None]:
gbt_model = gbt_pipe.fit(train_df)
predictions_train = gbt_model.transform(train_df)
predictions_train.select('CellName', 'features', 'Unusual', 'rawPrediction', 'probability', 'prediction').show(5)
metrics(predictions_train, 'Unusual', 'prediction')

In [None]:
predictions_test = gbt_model.transform(test_df)
predictions_test.select('CellName', 'features', 'Unusual', 'rawPrediction', 'probability', 'prediction').show(5)
metrics(predictions_test, 'Unusual', 'prediction')

## 4. Create Model for versioning in SAS Model Manager

As always, we create some metadata files to take advantages from SAS Model Manager.
In this case, because we want to migrate workload on GCP, we need as much info as possible. So we create:

1. requirement.txt
2. *.py with etl, train model and score data
3. *.json with all metainfo (inputs, outputs, model properties...)
4. others

### requirement.txt

In [None]:
%%writefile ../deliverables/pyspark_GBTClassifier/requirement.txt
numpy==1.19.4
pandas==1.1.4
pyspark==3.0.1
PyYAML==5.3.1

### *.json with all metainfo

In [None]:
# Base metainfo

inSample = train_df.toPandas()[:1]
js = pzmm.JSONFiles()

# inputVar.json
js.writeVarJSON(inSample, isInput=True, jPath=model_version_dir)

# outputVar.json
names=['P_UNUSUAL0', 'P_UNUSUAL1', 'EM_CLASSIFICATION', 'EM_EVENTPROBABILITY']
labels=['0', '1']
eventprob=0.5
outSample = get_output_variables(names, labels, eventprob)
js.writeVarJSON(outSample, isInput=False, jPath=model_version_dir)

# ModelProperties.json
modelname='pyspark_gbtClassifier'
target='Unusual'
predictors=['CellName', 'hour', 'minutes', 'PRBUsageUL', 'PRBUsageDL', 
            'meanThr_DL', 'meanThr_UL', 'maxThr_DL', 'maxThr_UL', 'meanUE_DL', 
            'meanUE_UL', 'maxUE_DL', 'maxUE_UL']
js.writeModelPropertiesJSON(modelName=modelname,
                                   modelDesc='A pyspark GBTClassifier for Network anomaly detection',
                                   targetVariable=target,
                                   modelType='Boosted Tree',
                                   modelPredictors=predictors,
                                   targetEvent=1,
                                   numTargetCategories=1,
                                   eventProbVar='EM_EVENTPROBABILITY',
                                   jPath=model_version_dir,
                                   modeler='ivnard')

In [None]:
# Advanced metainfo

dmcas_fitstat.json
trainData = predictions_train.withColumn('p2', extract1_udf('probability')) \
                             .select('Unusual', 'p2').toPandas()
testData = predictions_test.withColumn('p2', extract1_udf('probability')) \
                             .select('Unusual', 'p2').toPandas()
js.calculateFitStat(trainData=trainData, testData=testData, jPath=model_version_dir)

dmcas_roc.json, dmcas_lift.json
print('Provide username and password to Viya Server login')
username = getpass.getpass()
password = getpass.getpass()
host = 'rusid1.rus.sas.com'
sess = Session(host, username, password, verify_ssl=False, protocol='http')
conn = sess.as_swat()
js.generateROCLiftStat(target, 1, conn, trainData=trainData, testData=testData, jPath=model_version_dir)

### *.py for train model and score data

In [None]:
%%writefile ../deliverables/pyspark_GBTClassifier/train.py
# -*- coding: utf-8 -*-

"""
train.py is the training module for our project.
Remarks: The model package is designed in a way
it's executable both in SAS Viya and GCP Dataproc

Steps:
1 - Read data (both server and gcp cloud storage)
2 - Build Machine Learning Pipeline
3 - Serialize the Pipeline and all data

Author: Ivan Nardini (ivan.nardini@sas.com)
"""

# Libraries ------------------------------------------------------------------------------------------------------------
import logging
import logging.config
import argparse
import yaml

#from helpers import read_parquet, write_parquet, metrics, save_pipeline

import pyspark
from pyspark.sql.functions import col, lit, udf
from pyspark.sql.types import DoubleType
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, MinMaxScaler
from pyspark.ml.classification import GBTClassifier
from pyspark.ml import Pipeline, PipelineModel

try:
    from pyspark import SparkContext, SparkConf
    from pyspark.sql import SparkSession
except ImportError as e:
    print('WARN: Something wrong with pyspark library. Please check configuration settings!')


# Helpers --------------------------------------------------------------------------------------------------------------

def read_parquet (session: SparkSession, filepath: str) -> pyspark.sql.DataFrame:
    '''
    Read a parquet file
    :param session: SparkSession
    :param filepath: the path of parquet datafile
    :return: pyspark.sql.DataFrame
    '''
    return session.read.parquet(filepath)


def write_parquet (df: pyspark.sql.DataFrame, filepath: str) -> None:
    '''
    Write a parquet file
    :param df: DataFrame to store
    :param filepath: the path of parquet datafile
    :return: None
    '''
    df.write.mode('overwrite').save(filepath)


def metrics (session: SparkSession, dataframe: pyspark.sql.DataFrame, actual: str,
             predicted: str) -> pyspark.sql.DataFrame:
    '''
    Calculates evaluation metrics from predicted results

    :param dataframe: spark.sql.dataframe with the real and predicted values
    :param actual: Name of column with observed target values
    :param predicted: Name of column with predicted values
    :return:
    '''

    # Along each row are the actual values and down each column are the predicted
    dataframe = dataframe.withColumn(actual, col(actual).cast('integer'))
    dataframe = dataframe.withColumn(predicted, col(predicted).cast('integer'))
    cm = dataframe.crosstab(actual, predicted)
    cm = cm.sort(cm.columns[0], ascending=True)

    # Adds missing column in case just one class was predicted
    if not '0' in cm.columns:
        cm = cm.withColumn('0', lit(0))
    if not '1' in cm.columns:
        cm = cm.withColumn('1', lit(0))

    # Subsets values from confusion matrix
    zero = cm.filter(cm[cm.columns[0]] == 0.0)
    first_0 = zero.take(1)

    one = cm.filter(cm[cm.columns[0]] == 1.0)
    first_1 = one.take(1)

    tn = first_0[0][1]
    fp = first_0[0][2]
    fn = first_1[0][1]
    tp = first_1[0][2]

    # Calculate metrics from values in the confussion matrix
    if (tp == 0):
        acc = float((tp + tn) / (tp + tn + fp + fn))
        sen = 0
        spe = float((tn) / (tn + fp))
        prec = 0
        rec = 0
        f1 = 0
    elif (tn == 0):
        acc = float((tp + tn) / (tp + tn + fp + fn))
        sen = float((tp) / (tp + fn))
        spe = 0
        prec = float((tp) / (tp + fp))
        rec = float((tp) / (tp + fn))
        f1 = 2 * float((prec * rec) / (prec + rec))
    else:
        acc = float((tp + tn) / (tp + tn + fp + fn))
        sen = float((tp) / (tp + fn))
        spe = float((tn) / (tn + fp))
        prec = float((tp) / (tp + fp))
        rec = float((tp) / (tp + fn))
        f1 = 2 * float((prec * rec) / (prec + rec))

    # Print results
    print('Confusion Matrix and Statistics: \n')
    cm.show()

    print('True Positives:', tp)
    print('True Negatives:', tn)
    print('False Positives:', fp)
    print('False Negatives:', fn)
    print('Total:', dataframe.count(), '\n')

    print('Accuracy: {0:.2f}'.format(acc))
    print('Sensitivity: {0:.2f}'.format(sen))
    print('Specificity: {0:.2f}'.format(spe))
    print('Precision: {0:.2f}'.format(prec))
    print('Recall: {0:.2f}'.format(rec))
    print('F1-score: {0:.2f}'.format(f1))

    # Create spark dataframe with results
    l = [(acc, sen, spe, prec, rec, f1)]
    df = session.createDataFrame(l, ['Accuracy', 'Sensitivity', 'Specificity', 'Precision', 'Recall', 'F1'])
    return df


extract0_udf = udf(lambda value: value[0].item(), DoubleType())
extract1_udf = udf(lambda value: value[1].item(), DoubleType())

def save_pipeline(pipeline: PipelineModel, filepath:str) -> None:
    '''
    Serialize the fitted pipeline
    :param pipeline:
    :param filepath:
    :return: None
    '''
    pipeline.write().overwrite().save(path=filepath)

# Builders -------------------------------------------------------------------------------------------------------------

def build_pipeline (pipeconfig: dict) -> pyspark.ml.Pipeline:
    '''
    Build a Pipeline instance based on config file
    :param pipeconfig: metadata dictionary
    :return: pyspark.ml.Pipeline
    '''

    # Pipeline metadata
    cats = pipeconfig['variables']['categoricals']
    nums = pipeconfig['variables']['numericals']
    index_names = pipeconfig['metadata']['index_names']
    encoded_names = pipeconfig['metadata']['encoded_names']
    vect_name = pipeconfig['metadata']['vect_name']
    feats_name = pipeconfig['metadata']['feats_name']
    labelcol = pipeconfig['model']['labelCol']
    maxdepth = pipeconfig['model']['maxDepth']
    maxbins = pipeconfig['model']['maxBins']
    maxiter = pipeconfig['model']['maxIter']
    seed = pipeconfig['model']['seed']

    # Build stages
    stageone = StringIndexer(inputCols=cats,
                             outputCols=index_names)

    stagetwo = OneHotEncoder(dropLast=False,
                             inputCols=stageone.getOutputCols(),
                             outputCols=encoded_names)

    stagethree = VectorAssembler(inputCols=nums + stagetwo.getOutputCols(),
                                 outputCol=vect_name)

    stagefour = MinMaxScaler(inputCol=stagethree.getOutputCol(),
                             outputCol=feats_name)

    stagefive = GBTClassifier(featuresCol=stagefour.getOutputCol(),
                              labelCol=labelcol,
                              maxDepth=maxdepth,
                              maxBins=maxbins,
                              maxIter=maxiter,
                              seed=seed)
    pipeline = Pipeline(stages=[stageone, stagetwo, stagethree, stagefour, stagefive])

    return pipeline

# Main -----------------------------------------------------------------------------------------------------------------

def run_training (args):

    # Read configuration
    logging.info('Read config file.')
    with open(args.configfile, "r") as cf:
        config = yaml.load(cf, Loader=yaml.FullLoader)
    sparksession = config['sparksession']
    data = config['data']
    pipeline = config['pipeline']
    output = config['output']

    # Create a spark session
    logging.info('Instantiate the {0} Spark session'.format(sparksession['appName']))
    spark = SparkSession.builder \
        .master(sparksession['master']) \
        .appName(sparksession['appName']) \
        .getOrCreate()

    # Load Data
    logging.info('Load train and test data')
    train_df = read_parquet(spark, data['train_datapath'])
    test_df = read_parquet(spark, data['test_datapath'])

    # Execute training
    logging.info('Train {0} Pipeline'.format(pipeline['model']['method']))
    train_pipe = build_pipeline(pipeline)
    gbt_model = train_pipe.fit(train_df)

    # Evaluate
    logging.info('Evaluate the model')
    predictions_test = gbt_model.transform(test_df)
    predictions_test.select(output['showschema_train']).show(5)
    metrics_df = metrics(spark, predictions_test, 'Unusual', 'prediction')

    # Save training data
    logging.info('Save all the process outputs')
    # Save trained pipeline
    logging.info('Saving pipeline...')
    save_pipeline(gbt_model, output['pipeline_path'])
    # Save predictions
    logging.info('Saving predictions...')
    predictions_test_fmt = predictions_test.withColumn('P_Unusual0', extract0_udf('probability')).withColumn(
         'P_Unusual1', extract1_udf('probability')).select(output['columnschema_train'])
    write_parquet(predictions_test_fmt, output['test_scored_path'])
    # Save metrics
    logging.info('Saving metrics...')
    write_parquet(metrics_df, output['metrics_scored_path'])


if __name__ == "__main__":
    #logging.config.fileConfig("../../config/logging/local.conf")
    logging.basicConfig(format='%(asctime)s %(name)-12s %(levelname)-8s %(message)s',
                        datefmt='%m/%d/%Y %I:%M:%S %p', level=logging.INFO)
    logger = logging.getLogger(__name__)
    parser = argparse.ArgumentParser(description="Train Pyspark GBTClassifier")
    parser.add_argument('--configfile', required=True, help='path to configuration yaml file')
    args = parser.parse_args()
    run_training(args)


In [None]:
%%writefile ../deliverables/pyspark_GBTClassifier/score.py
# -*- coding: utf-8 -*-

"""
score.py is the scoring module for our project.
Remarks: The model package is designed in a way
it's executable both in SAS Viya and GCP Dataproc

Steps:
1 - Read data (both server and gcp cloud storage)
2 - Read serialized pipeline (both server and gcp cloud storage)
3 - Score (or trasform) data
4 - Store scored data (both server and gcp cloud storage)

Author: Ivan Nardini (ivan.nardini@sas.com)
"""

# Libraries ------------------------------------------------------------------------------------------------------------
import logging
import logging.config
import argparse
import yaml

# from helpers import read_parquet, write_parquet, load_model

import pyspark
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType
from pyspark.ml import PipelineModel

try:
    from pyspark import SparkContext, SparkConf
    from pyspark.sql import SparkSession
except ImportError as e:
    print('WARN: Something wrong with pyspark library. Please check configuration settings!')

# Helpers --------------------------------------------------------------------------------------------------------------

def read_parquet (session: SparkSession, filepath: str) -> pyspark.sql.DataFrame:
    '''
    Read a parquet file
    :param session: SparkSession
    :param filepath: the path of parquet datafile
    :return: pyspark.sql.DataFrame
    '''
    return session.read.parquet(filepath)


def write_parquet (df: pyspark.sql.DataFrame, filepath: str) -> None:
    '''
    Write a parquet file
    :param df: DataFrame to store
    :param filepath: the path of parquet datafile
    :return: None
    '''
    df.write.mode('overwrite').save(filepath)


def load_model(filepath:str) -> PipelineModel:
    '''
    Load the fitted pipeline
    :param filepath:
    :return: PipelineModel
    '''
    return PipelineModel.load(filepath)

extract0_udf = udf(lambda value: value[0].item(), DoubleType())
extract1_udf = udf(lambda value: value[1].item(), DoubleType())

# Builders -------------------------------------------------------------------------------------------------------------

def score_model(data:pyspark.sql.DataFrame, model:PipelineModel) -> pyspark.sql.DataFrame:
    predictions_test = model.transform(data)
    return predictions_test

# Main -----------------------------------------------------------------------------------------------------------------

def run_scoring(args):
    # Read Configuration
    logging.info('Read config file.')
    with open(args.configfile, "r") as cf:
        config = yaml.load(cf, Loader=yaml.FullLoader)
    sparksession = config['sparksession']
    data = config['data']
    output = config['output']

    # Initiate the Spark session
    logging.info('Instantiate the {0} Spark session'.format(sparksession['appName']))
    spark = SparkSession.builder \
        .master(sparksession['master']) \
        .appName(sparksession['appName']) \
        .getOrCreate()

    # Load Data
    logging.info('Load data to score')
    datatoscore = read_parquet(spark, data['datatoscore_path'])

    # Load Model
    logging.info('Load trained pipeline')
    pipemodel = load_model(output['pipeline_path'])

    # Score data
    datascored = score_model(datatoscore, pipemodel)
    datascored.select(output['showschema_score']).show(5)

    # Store scored data
    logging.info('Save all the process outputs')
    datascored_fmt = datascored.withColumn('P_Unusual0', extract0_udf('probability')).withColumn(
         'P_Unusual1', extract1_udf('probability')).select(output['columnschema_score'])
    write_parquet(datascored_fmt, output['datascored_path'])


if __name__ == "__main__":
    logging.basicConfig(format='%(asctime)s %(name)-12s %(levelname)-8s %(message)s',
                        datefmt='%m/%d/%Y %I:%M:%S %p', level=logging.INFO)
    logger = logging.getLogger(__name__)
    parser = argparse.ArgumentParser(description="Score with Pyspark GBTClassifier")
    parser.add_argument('--configfile', required=True, help='path to configuration yaml file')
    args = parser.parse_args()
    run_scoring(args)


### Others

Here we have to contact IT to submit gs path correctly

In [None]:
%%writefile ../deliverables/pyspark_GBTClassifier/demo-config.yml
sparksession:
  master: 'local[*]'
  appName: 'Anomaly Detection'
data:
  train_datapath: gs://network-spark-migrate/data/ML-MATT-CompetitionQT1920_train_processed.parquet
  test_datapath: gs://network-spark-migrate/data/ML-MATT-CompetitionQT1920_test_processed.parquet
  datatoscore_path: gs://network-spark-migrate/data/ML-MATT-CompetitionQT1920_val_processed.parquet
pipeline:
  variables:
    categoricals: ['hour', 'minutes']
    numericals: ['PRBUsageUL', 'PRBUsageDL', 'meanThr_DL', 'meanThr_UL', 'maxThr_DL', 'maxThr_UL', 'meanUE_DL', 'meanUE_UL', 'maxUE_DL', 'maxUE_UL']
  metadata:
    index_names: ['hour_index', 'minutes_index']
    encoded_names: ['hour_encoded', 'minutes_encoded']
    vect_name: 'vars_vectorized'
    feats_name: 'features'
  model:
    method: 'GBTClassifier'
    labelCol: 'Unusual'
    maxDepth: 5
    maxBins: 32
    maxIter: 3
    seed: 888
output:
  showschema_train: ['CellName', 'features', 'Unusual', 'rawPrediction', 'probability', 'prediction']
  showschema_score: ['CellName', 'features', 'rawPrediction', 'probability', 'prediction']
  columnschema_train: ['CellName', 'Unusual', 'hour', 'minutes', 'PRBUsageUL', 'PRBUsageDL', 'meanThr_DL', 'meanThr_UL', 'maxThr_DL', 'maxThr_UL', 'meanUE_DL', 'meanUE_UL', 'maxUE_DL', 'maxUE_UL', 'P_Unusual0', 'P_Unusual1']
  columnschema_score: ['CellName', 'hour', 'minutes', 'PRBUsageUL', 'PRBUsageDL', 'meanThr_DL', 'meanThr_UL', 'maxThr_DL', 'maxThr_UL', 'meanUE_DL', 'meanUE_UL', 'maxUE_DL', 'maxUE_UL', 'P_Unusual0', 'P_Unusual1']
  test_scored_path: gs://network-spark-migrate/output/data/
  metrics_scored_path: gs://network-spark-migrate/output/metrics/
  pipeline_path: gs://network-spark-migrate/output/model/
  datascored_path: gs://network-spark-migrate/output/scored/

## Time to package and ship all to SAS Model Manager

In [None]:
# Zip gbt_pipeline folder
zip_folder(gbt_pipe_dir, rmtree=True)

In [None]:
# Zip deliverables folder
model_zipath = zip_folder(model_version_dir)

In [None]:
projectname='Network anomaly detection'
modelname='pyspark_GBTClassifier'
np = False

with Session(hostname=host, username=username, password=password, verify_ssl=False):
    
    if np == True:
        model_repository.create_project(project=projectname,
                                    repository='Public',
                                    function='classification'
                                    )

    zipfile = open(model_zipath, 'rb')

    model_repository.import_model_from_zip(modelname,
                                           projectname,
                                           file=zipfile,
                                           version='new'
                                           )
    zipfile.close()

# Conclusion

We version the model in SAS Model Manager. Now we are almost ready to migrate the workload on Google Cloud Platform.