In [1]:
import findspark # set up the Spark environment in Python. It helps locate the Spark installation and adds it to the system path
findspark.init() # Initializes the Spark environment so that you can use PySpark in your Python script.

from pyspark import SparkConf # This module is used to configure Spark properties
from pyspark.sql import SparkSession, functions as F # SparkSession allows you to create df, register df as tables, execute SQL over tables, etc.
from pyspark.sql.window import Window # Used to define window specifications for window functions
from pyspark.sql.functions import when, lit, lag, last, col, lead, countDistinct, udf, pandas_udf, PandasUDFType, coalesce, \
                                month, year, concat, date_format, format_string, last_day, months_between, greatest, least, abs, \
                                dayofweek, isnan, count # these functions are used for different DataFrame operations
from pyspark.sql.types import *
from pyspark import SparkContext

from pyspark.sql import SparkSession
# # from joblibspark import register_spark

sc = SparkContext.getOrCreate() # creates or retrieves an existing SparkContext instance.
sc.addPyFile('utils.py') # ensuring that file is distributed to each worker node in the cluster

# for shared metastore (shared across all users)
#spark = SparkSession.builder.appName("Fundamental_features2024").getOrCreate()
#spark.sql("USE 2024_06_18");
from pyspark.ml import Pipeline, PipelineModel
from sklearn import preprocessing
from pyspark.ml.feature import VectorAssembler, StandardScaler, StringIndexer
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.classification import RandomForestClassifier, GBTClassifier, OneVsRest
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from sklearn.model_selection import TimeSeriesSplit
from scipy.stats import zscore

In [2]:
import findspark
from pyspark.sql.functions import sum, when, lit, lag, last, col, lead, countDistinct, udf, pandas_udf, PandasUDFType, coalesce, month, year, concat, date_format, format_string, last_day, months_between, greatest, least, abs, dayofweek, isnan, count, to_date, struct
# lit creates a column with a literal value.
# lag/lead window functions access a value in a column from a previous or next row.
# last and col used for selecting the last value in a window and referencing columns
# coalesce, greatest, least, abs: Various functions for handling null values, selecting maximum/minimum values, and calculating absolute values.
# month, year, dayofweek: Functions to extract parts of a date.
# concat, date_format, format_string: Functions for string operations.
# last_day, months_between: Functions for date calculations.
from pyspark.sql.types import *
from pyspark.sql.window import Window # Imports the Window class, used to define window specifications for window functions.
from pyspark.sql import SparkSession, functions as F # Imports SparkSession (entry point for Spark SQL) and functions (shorthand alias for PySpark SQL functions).
from pyspark.ml.linalg import Vectors, VectorUDT

import pandas as pd  
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import time
import csv
from functools import reduce # Imports the reduce function, used for applying a function cumulatively to items of a sequence.
from datetime import datetime
import os # interacting with operating system

## Use outputs from experiment 1 and 2, features from part 1 of experiment 3

In [3]:
# Initialize Spark session
spark = SparkSession.builder \
    .appName("AnomalyDetection") \
    .getOrCreate()

# Specify the path where the CSV files are stored
input_path_adj = "/path/to/ATPSiftingAnalyticsRepo/adj.csv"
input_path_rf = "/path/to/ATPSiftingAnalyticsRepo/rf_df.csv"
input_path_rf_train = "/path/to/ATPSiftingAnalyticsRepo/rf_train.csv"
input_path_rf_test = "/path/to/ATPSiftingAnalyticsRepo/rf_test.csv"

# Read the CSV files into a Spark DataFrame
adj = spark.read.csv(input_path_adj, header=True, inferSchema=True)
rf_df = spark.read.csv(input_path_rf, header=True, inferSchema=True)
rf_train = spark.read.csv(input_path_rf_train, header=True, inferSchema=True)
rf_test = spark.read.csv(input_path_rf_test, header=True, inferSchema=True)

In [5]:
#The below uses undersampling which was later discarded, instead we will adjust class weights
# def get_balanced_train(orig_train, show_label_count_before = True, show_label_count_after = True):
#     # Determine the size of the minority class
#     label_counts_before = rf_train.groupBy("label").count()
#     minority_class_count = label_counts_before.agg({"count": "min"}).collect()[0][0]

#     if show_label_count_before:
#         # Show the results
#         label_counts_before.show()

#     # Sample the majority classes
#     balanced_dfs = []
#     for label in ['min', 'max', 'none']:
#         label_df = orig_train.filter(col('label') == label)
#         if label_df.count() > minority_class_count:
#             sampled_df = label_df.sample(withReplacement=False, fraction=minority_class_count / label_df.count())
#         else:
#             sampled_df = label_df
#         balanced_dfs.append(sampled_df)

#     # Combine the sampled majority classes with the minority class to create a balanced dataset
#     balanced_train = balanced_dfs[0].union(balanced_dfs[1]).union(balanced_dfs[2])

#     # Verify the new class distribution
#     label_counts_after = balanced_train.groupBy("label").count()
#     if show_label_count_after:
#         label_counts_after.show()
#     return balanced_train
# rf_balanced_train = get_balanced_train(rf_train)

#### Find appropriate class weights by adjusting manually, train validate test RF/GBM model - 9 split (10 fold) cross-validation, test on final 4 years, hyperparameter tuning using grid search. Assess individual classes and models overall

In [9]:
# Function to add class weights to the training data
def add_class_weights(df):
    class_weights = {
        'max': 0.88,
        'min': 0.88,
        'both': 1.80,
        'none': 0.85
    }

    # Add a weight column based on the label
    df_with_weights = df.withColumn(
        "class_weights",
        F.when(F.col("label") == 'max', class_weights['max'])
         .when(F.col("label") == 'min', class_weights['min'])
         .when(F.col("label") == 'both', class_weights['both'])
         .when(F.col("label") == 'none', class_weights['none'])
    )
    return df_with_weights

In [10]:
#handle random forest and gbm with manual class weights, hyperparam tuned, cross validated, 
def train_test_model_with_grid_search(train_df, test_df, model_type='random_forest', model_save_path=None):
    # Add class weights to the training data
    train_df = add_class_weights(train_df)

    # Making sure no columns have nulls, if they do then fill with 0
    double_columns = [column for column, dtype in train_df.dtypes if dtype == 'double']
    train_df = train_df.na.fill(0, subset=double_columns)
    test_df = test_df.na.fill(0, subset=double_columns)
    feature_cols = [col for col in train_df.columns if col not in ['fsym', 'date', 'label', 'returns']]

    # Assemble features into a vector column
    assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
    
    # Indexing the label column
    label_indexer = StringIndexer(inputCol="label", outputCol="labelindex")

    # Define model based on input (random_forest or gbm)
    if model_type == 'random_forest':
        # Define the Random Forest Classifier with class weights
        model = RandomForestClassifier(labelCol="labelindex", featuresCol="features", weightCol="class_weights")
        
        # Create the parameter grid for tuning Random Forest as per the new grid
        paramGrid = ParamGridBuilder() \
            .addGrid(model.maxFeatures, ['auto', 'sqrt']) \
            .addGrid(model.maxDepth, [3, 5, 7, 10]) \
            .addGrid(model.minInstancesPerNode, [1, 2, 4, 8, 16]) \
            .build()

    elif model_type == 'gbm':
        # Define the Gradient Boosting Classifier (OneVsRest) with class weights
        gbm = GBTClassifier(labelCol="labelindex", featuresCol="features", weightCol="class_weights")
        model = OneVsRest(classifier=gbm, labelCol="labelindex", featuresCol="features")
        
        # Create the parameter grid for tuning GBM as per the new grid
        paramGrid = ParamGridBuilder() \
            .addGrid(gbm.maxFeatures, ['auto', 'sqrt']) \
            .addGrid(gbm.maxDepth, [3, 5, 7, 10]) \
            .addGrid(gbm.minInstancesPerNode, [1, 2, 4, 8, 16]) \
            .addGrid(gbm.stepSize, [0.05, 0.1, 0.2]) \
            .build()

    # Define an evaluator with weighted F1 score for cross-validation
    evaluator_f1_cv = MulticlassClassificationEvaluator(labelCol="labelindex", predictionCol="prediction", metricName="weightedF1")

    # CrossValidator using time series split
    tscv = TimeSeriesSplit(n_splits=9)

    pipeline = Pipeline(stages=[assembler, label_indexer, model])
    
    crossval = CrossValidator(estimator=pipeline,
                              estimatorParamMaps=paramGrid,
                              evaluator=evaluator_f1_cv,
                              numFolds=tscv.get_n_splits(),
                              parallelism=4)  # Adjust parallelism based on available resources

    # Fit the model with cross-validation
    cvModel = crossval.fit(train_df)
    
    # Save the model if save path is provided
    if model_save_path:
        cvModel.write().overwrite().save(model_save_path)

    # Make predictions on the test data
    predictions = cvModel.transform(test_df)

    # Calculate additional metrics
    evaluator_accuracy = MulticlassClassificationEvaluator(labelCol="labelindex", predictionCol="prediction", metricName="accuracy")
    evaluator_precision = MulticlassClassificationEvaluator(labelCol="labelindex", predictionCol="prediction", metricName="weightedPrecision")
    evaluator_recall = MulticlassClassificationEvaluator(labelCol="labelindex", predictionCol="prediction", metricName="weightedRecall")
    evaluator_f1 = MulticlassClassificationEvaluator(labelCol="labelindex", predictionCol="prediction", metricName="f1")

    # Calculate and print the metrics
    accuracy = evaluator_accuracy.evaluate(predictions)
    precision = evaluator_precision.evaluate(predictions)
    recall = evaluator_recall.evaluate(predictions)
    f1 = evaluator_f1.evaluate(predictions)

    print(f"Model: {model_type}")
    print(f"Accuracy: {accuracy}")
    print(f"Weighted Precision: {precision}")
    print(f"Weighted Recall: {recall}")
    print(f"F1 Score: {f1}")

    # Calculate the confusion matrix using original categorical labels
    label_mapping = predictions.select("label", "labelindex").distinct()
    label_mapping.show()

    # Calculate confusion matrix
    confusion_matrix = predictions.groupBy("labelindex", "prediction").count()

    # Display confusion matrix
    confusion_matrix.show()

    # Calculate total actual positives for each class
    actual_positives = confusion_matrix.groupBy("labelindex").agg(F.sum("count").alias("actual_positives"))

    # Calculate true positives for each class
    true_positives = confusion_matrix.filter(F.col("labelindex") == F.col("prediction")) \
        .select(F.col("labelindex").alias("label_class"), F.col("count").alias("true_positives"))

    # Join to calculate recall for each class
    recall_df = actual_positives.join(true_positives, actual_positives.labelindex == true_positives.label_class, "left") \
        .select("labelindex", "true_positives", "actual_positives") \
        .withColumn("recall", F.col("true_positives") / F.col("actual_positives"))

    # Show recall for each class
    recall_df.show()

    # Return predictions to use them later if needed
    return predictions

In [11]:
train_test_model_with_grid_search(rf_train, rf_test, model_type='random_forest', model_save_path="/path/to/ATPSiftingAnalyticsRepo/RF_TP_classif")
# below results are displayed for 3 classes but can run to display for all 4

Accuracy: 0.38899200456881783
Precision: 0.4165085701979212
Recall: 0.3889920045688178
F1 Score: 0.3801992419670922
+-----+----------+
|label|labelindex|
+-----+----------+
|  max|       1.0|
|  min|       0.0|
| none|       2.0|
+-----+----------+

+----------+----------+-----+
|labelindex|prediction|count|
+----------+----------+-----+
|       2.0|       0.0| 1185|
|       1.0|       1.0| 1343|
|       0.0|       1.0|  863|
|       2.0|       2.0| 1299|
|       1.0|       0.0| 2508|
|       2.0|       1.0|  603|
|       1.0|       2.0| 1875|
|       0.0|       0.0| 2807|
|       0.0|       2.0| 1525|
+----------+----------+-----+

+----------+--------------+----------------+-------------------+
|labelindex|true_positives|actual_positives|             recall|
+----------+--------------+----------------+-------------------+
|       0.0|          2807|            5195| 0.5403272377285852|
|       1.0|          1343|            5726|0.23454418442193503|
|       2.0|          1299|       

In [12]:
train_test_model_with_grid_search(rf_train, rf_test, model_type='gbm', model_save_path="/path/to/ATPSiftingAnalyticsRepo/GBM_TP_classif")

Accuracy: 0.38092518560822386
Precision: 0.4022777710096367
Recall: 0.38092518560822386
F1 Score: 0.3843515446855992
+-----+----------+
|label|labelindex|
+-----+----------+
|  max|       1.0|
|  min|       0.0|
| none|       2.0|
+-----+----------+

+----------+----------+-----+
|labelindex|prediction|count|
+----------+----------+-----+
|       2.0|       0.0|  931|
|       1.0|       1.0| 1851|
|       0.0|       1.0| 1287|
|       1.0|       0.0| 2071|
|       2.0|       2.0| 1201|
|       2.0|       1.0|  955|
|       1.0|       2.0| 1804|
|       0.0|       0.0| 2284|
|       0.0|       2.0| 1624|
+----------+----------+-----+

+----------+--------------+----------------+-------------------+
|labelindex|true_positives|actual_positives|             recall|
+----------+--------------+----------------+-------------------+
|       0.0|          2284|            5195| 0.4396535129932628|
|       1.0|          1851|            5726| 0.3232623122598673|
|       2.0|          1201|      