In [None]:
#import libraries 
import pyspark.sql.functions as f
import pandas as pd 
import numpy as np

In [None]:
#read data frame 

## Data Cleaning 

## Dealing with Nulls

In [None]:
#check the missing value % in each column 
missing_df = df.select([(f.count(f.when(f.isnan(c) |f.col(c).isNull(), c))/f.count(f.lit(1))).alias(c) for c in df.columns])

In [None]:
#convert the missing values dataframe to pandas and transpose the dataframe 
missing_pd = missing_df.toPandas()
missing_pd.T

In [None]:
#replace null values with 0
df = df.na.fill(0)

## Dealing with categorical variables 

In [None]:
#apply one hot encoder to categorical variables 
#we can not apply one hot encoder to string columns directly
#first convert string to numeric 
#string indexer after that apply one hot encoder 

from pyspark.ml import Pipeline 
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler 

#categoric_features are the columns with categorical values 
#convert categorical with one hot encoder to numeric values 
indexers = [StringIndexer(inputCol = c, outputCol = "{0}_indexed".format(c))
            for c in categoric_features
            ]

encoders = [
            OneHotEncoder(
                inputCol = indexer.getOutputCol(),
                outputCol = "{0}_encoded".format(indexer.getOutputCol())
            ) for indexer in indexers 
]


assemblers = VectorAssembler(
    inputCols = [encoder.getOutputCol() for encoder in encoders], outputCol = "cat_features"
)

pipeline = Pipeline(stages = indexers +encoders +[assembler]) #+[assembler] add this to get one vector for all cat features 

df_encoded = pipeline.fit(df).transform(df)

## Dealing with Outliers (Trim method) 

In [None]:
#calculate upper and lower bound for each column 
# create 3% and 97% bounds as cut-offs to trim the data for outliers (the cut off is flexible based on the dataset)

bounds = {
    c:dict(
        zip(["q3", "q97"], df.approxQuantile(c, [0.03, 0.97], 0.025))
    ) for c in df.columns if c in numeric_features #apply it only on the columns with numeric features
}

#get the upper and lower bound for each column

for c in bounds:
  bounds[c]['lower'] = bounds[c]['q3'] #lower bound at 3%
  bounds[c]['upper'] = bounds[c]['q97'] #upper bound at 97%


  #create an indicator for outliers 0 within bounds 1 outside of bounds and add outlier column indicator for each column (this is to check the percent of 
  #outliers in each column )


  df_out = df.select(
      "*",
      *[
        f.when(
            f.col(c).between(bounds[c]['lower'], bounds[c]['upper']
        ), 0).otherwise(1).alias(c+"_out")
        for c in df.columns if c in numeric_features

      ]
  )



  #get the outliers 
  outliers = [col for col in df_out.columns if '_out' in col]


  #get percent of outliers in each column 
  exp = {x: "mean" for x in outliers}
  pd_out = df_out.agg(exp).toPandas()

  #show the percent of outliers in each column 
  pd_out.T


  #outliers above upper limit 
  max_for_replacement = [c for c, v in df.select([
                                                  f.count(f.when(f.col(c) > bounds[c]['upper'], 1)).alias(c) for c in numeric_feautres 

  ]).first().asDict().items() if v]


  #set outliers above upper limit to the upper limit 
  df = df.select(
      "*",
      *[f.when(f.col(c) > bounds[c]['upper'], bounds[c]['upper'])
      .otherwise(df[c]).alias(c+"_maxtrim")
      for c in df.columns if c in max_for_replacement]
  )

  df = df.drop(*max_for_replacement)

  for name in df.schema.names:
    df = df.withColumnRenamed(name, name.replace('_maxtrim', ''))


cols_min = df.columns 

#outliers below lower limit 
min_for_replacement = [c for c, v in df.select([
                                                f.count(f.when(f.col(c) < bounds[c]['lower'], 1)).alias(c) for c in numeric_features
]).first().asDict().items() if v ]


df = df.select(
    "*",
    *[f.when(f.col(c) < bounds[c]['lower'], bounds [c]['lower'])
    .otherwise(df[c]).alias(c+"_mintrim")
    for c in df.columns if c in min_for_replacement])

df = df.drop(*min_for_replacement)

for name in df.schema.names:
  df = df.withColumnRenamed(name, name.replace('_mintrim', ''))


  #save table 
  df.write.mode('overwrite').saveAsTable('table_name')


**End of Data Cleaning**

## Feature Engineering 


## Log Transformation 

In [None]:
#log transformation can't be applied on 0s and negative value 
# to deal with 0 we take log(x+1) instead of log(x)
#to deal with negative value we take the minimum value in the column and we add it to all the values in that column so we get
#rid of all negative value by replacing log(x) with log(x+a) where a is the minimum values of each column 

#creating dictionary for all the minimum values 
d = {}
for col in numeric_features:
  d[col] = df.agg({col :"min"}).collect()[0][0]

df_log = df

#looping through the columns and adding the constant values to the column 
for col in numeric_features:
  df_log = df_log.withColumn(col, f.log(df_log[col] - d[col] +1))

## MinMax Scaler 

In [None]:
#minmax scaler rescale each feature to a specific range between [0,1]
#define col to scale before running this code below 
scaler = MinMaxScaler(min = 0, max = 1, inputCol = "features", outputCol = "scaledFeatures")
assembler = VectorAssembler().setInputCols(coltoscale).setOutputCol("features")
df_transformed = assembler.transform(df_log) #the dataset after applying log transformation 
scalerModel = scaler.fit(df_transformed.select("features"))
scaled_df = scalerModel.transform(df_transformed)

## Standard Scaler

In [None]:
#standard scaler transforms a dataset of vector rows, normalizing each feature to have unit standard deviation and 0 mean 
#standard scaler assumes that the distribution of the data is normal 

from pyspark.ml.feature import StandardScaler
from pyspark.ml.feature import VectorAssembler

scaler = StandardScaler(inputCol = "features", outputCol = "scaledFeatures",
                        withstd = True, withMean = False)

assembler = VectorAssembler().setInputCols(coltoscal).setOutputCol("features")
df_transformed = assembler.transform(df_log)
scalerModel = scaler.fit(df_transformed.select("features"))
scaledData = scalerModel.transform(df_transformed)

## Robust Scaler

In [None]:
#robust scaler transforms a dataset of vector rows, removing the median and scaling the data according to a specific quantile range 
#by default IQR. it is similar to standard scaler however the mendian and the quantile range are used instead of 
#the mean and standard deviation which make it robust to outliers 

assemblers = [VectorAssembler(inputCols = [col], outputCol = col+"_vec") for col in coltoscal]
scaler = RobustScaler(inputCol = "features", outputCol = "scaledFeatures", withscaling = True, withcentering = False, lower = 0.03, upper = 0.97)
pipeline = Pipeline(stages = assemblers, scaler)
scalerModel - pipeline.fit(df)

#transform each feature to have unit quantile range 
scaledData = scalerModel.transform(df)


## Preparing labels 

In [None]:
#label indexer 
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline 

labels = StringIndexer(inputCol = "label", outputCol = "label_indexed")
pipeline = Pipeline(stages = [labels])
df = pipeline.fit(df).transform(df)

## Train/Test Split

In [None]:
#split dataset into 70% training and 30% testing 
(trainingData, testData) = df.randomSplit([0.7, 0.3])

In [None]:
#dealing with imbalanced data 

#oversampling 
a = range(0)
oversample_df = df.withColumn("dummy", f.explode(f.array([f.lit(x) for x in a ]))).drop('dummy')
df_done = df.unionAll(oversample_df)

#undersampline 
undersample_df = df.sample(False, 0.5) #take 50% of the sample without replacement 


## Random Forest Model 

In [None]:
from pyspark.ml.classification import RandomForestClassifier 
from pyspark.ml.evaluation import MulticlassClassificationEvaluator 

rf = RandomForestClassifier(featurescol = "scaledFeatures", labelCol = 'label_indexed', numTrees = 'number', maxDepth = 'number')

#chain random forest in a pipeline
pipeline = Pipeline(stages = [rf])

#train model 
rfmodel = pipeline.fit(df)

#make predictions on train set 
trainpredictions = rfmodel.transform(trainingData)
testpredcitions = rfmodel.trasform(testData)

#apply the same steps above on the validation (out of time test set)
#the validation set should be held out of the training and balancing 




## Model Evaluation 


In [None]:
#evaluate multiclass classification model 
evaluator = MulticlassClassificationEvaluator(labelCol = "label_indexed")
print('F1-Score', evaluator.evaluate(testPredictions, {evaluator.metricName: 'f1'}))
print('Precision', evaluator.evaluate(testPredictions, {evaluator.metricName: 'weightedPrecision'}))
print('Recall', evaluator.evaluate(testPredictions, {evaluator.metricName: 'weightedRecall'}))
print('Accuracy', evaluator.evaluate(testPredictions, {evaluator.metricName: 'accuracy'}))

#evaluate in train set 
evaluator = MulticlassClassificationEvaluator(labelCol = "label_indexed")
print('F1-Score', evaluator.evaluate(trainPredictions, {evaluator.metricName: 'f1'}))
print('Precision', evaluator.evaluate(trainPredictions, {evaluator.metricName: 'weightedPrecision'}))
print('Recall', evaluator.evaluate(trainPredictions, {evaluator.metricName: 'weightedRecall'}))
print('Accuracy', evaluator.evaluate(trainPredictions, {evaluator.metricName: 'accuracy'}))

In [None]:
#get classification report 

y_true = spark.sql("""select label_indexed from saved_results_in_table""").toPandas()
y_pred = spark.sql("""select prediction from saved_results_in_table""").toPandas()

#classification report by class 
from sklearn.metrics import classification_report 
target_names = ["class{}".format(i) for i in range(4)] #4 here is the number of classes change as it fits
print(classification_report(y_true, y_pred, target_names = target_names))

#get roc_auc score
from sklearn.metrics import roc_curve, auc, roc_auc_score
from sklearn.preprocessing import LabelBinarizer 

#create function for roc_auc in multiclass 
def multiclass_roc_auc_score(y_test,y_pred, average = "macro"): #change average as it fits used macro here becuase of class imbalance
 lb = LabelBinarizer()
 lb.fit(y_test)
 y_test = lb.transform(y_test)
 y_pred = lb.transform(y_pred)
 return roc_auc_score(y_test, y_pred, average = average)

print('ROC AUC Score:' , multiclass_roc_auc_score(y_true, y_pred))


#get confusion matrix 
from sklearn.metrics import confusion_matrix 
cnf_matrix = confusion_matrix(y_true, y_pred, labels = range(4))

cnf_matrix

In [None]:
#save the model to use later 

model_path = "///"

rfmodel.write.overwrite().save(model_path)

## Hyperparameter Tuning 

In [None]:
rf = RandomForestClassifier()
evaluator = MulticlassClassificationEvaluator(labelCol = "label", predictionCol = "prediction", metricName = "f1")
paramGrid = (paramGridBuilder().\
             addGrid(rf.numTrees, [x1, x2, x3]) #x1, x2, x3 are values for the model to test and pick the best based on performance
             .addGrid(rf.maxDepth, [x1, x2, x3])
             .build())

crossval = CrossValidator(
    estimator = rf,
    estimatorparamMaps = paramGrid,
    evaluator = evaluator,
    numFolds = 4 #change the value based on the number of folds 
)

model = crossval.fit(testData)
bestmodel = model.bestmodel


#get best params 
model.getEstimatorParamMaps()[np.argmax(model.avgMetrics)]

## Feature Importance 

In [None]:
rfmodel.stages[-1].featureImportances

#try techiniques like shap values which is better than feature importance 
