In [0]:
### imports
from pyspark.sql import SparkSession
from pyspark.sql.types import *

from pyspark.ml.feature import VectorAssembler, StandardScaler, BucketedRandomProjectionLSH, VectorSlicer
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import array, create_map, struct, monotonically_increasing_id, row_number
from pyspark.ml.clustering import KMeans
import random
import numpy as np
from functools import reduce
from pyspark.sql import DataFrame

from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier, LogisticRegression
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import MinMaxScaler

from pyspark.sql.functions import count, col

In [0]:
import time
start_time = time.time()

### Preprocessing and SMOTE helper functions

#### Preprocess function to convert dataframe columns to vector

In [0]:
def preprocess(spark_df,sel_fetures, target_column):
    assembler = VectorAssembler(inputCols = sel_fetures, outputCol = 'features')
    processedDF = assembler.transform(spark_df).withColumnRenamed(target_column,'label').select("features","label")
    return processedDF

#### Function to get the label counts

In [0]:
def getLabelCounts(dfvectorized):
    label_counts = dfvectorized.groupBy("label").count().collect()
    label_counts_dict = {}
    maj_count = 0

    for r in label_counts:
        label_counts_dict[r["label"]] = r["count"]
        maj_count = max(maj_count,r["count"])
        
    return maj_count, label_counts_dict

#### SMOTE function to implement smote in the train dataset

In [0]:
def SMOTE(dfvectorized, label_counts_dict, maj_count):
    res = []

    for label, instances in label_counts_dict.items():

        if maj_count-instances == 0: continue

        dataInput_min = dfvectorized[dfvectorized['label'] == label]

        # LSH, bucketed random projection
        brp = BucketedRandomProjectionLSH(inputCol="features", outputCol="hashes", seed=41, 
                                          bucketLength=100)
        # smote only applies on existing minority instances    
        model = brp.fit(dataInput_min)
        model.transform(dataInput_min)

        # here distance is calculated from brp's param inputCol
        self_join_w_distance = model.approxSimilarityJoin(dataInput_min, dataInput_min, float("inf"), distCol="EuclideanDistance")

        # remove self-comparison (distance 0)
        self_join_w_distance = self_join_w_distance.filter(self_join_w_distance.EuclideanDistance > 0)

        over_original_rows = Window.partitionBy("datasetA").orderBy("EuclideanDistance")

        self_similarity_df = self_join_w_distance.withColumn("r_num", F.row_number().over(over_original_rows))

        self_similarity_df_selected = self_similarity_df.filter(self_similarity_df.r_num <= 5)

        over_original_rows_no_order = Window.partitionBy('datasetA')




        # two udf for vector add and subtract, subtraction include a random factor [0,1]
        subtract_vector_udf = F.udf(lambda arr: random.uniform(0, 1)*(arr[0]-arr[1]), VectorUDT())
        add_vector_udf = F.udf(lambda arr: arr[0]+arr[1], VectorUDT())

        # retain original columns
        original_cols = dataInput_min.columns



        df_random_sel = self_similarity_df_selected.withColumn("rand", F.rand()).withColumn('max_rand', F.max('rand').over(over_original_rows_no_order))\
                                .where(F.col('rand') == F.col('max_rand')).drop(*['max_rand','rand','r_num'])

        ### repeatign data
        df_random_sel = df_random_sel.withColumn("row_id", row_number().over(Window.orderBy(monotonically_increasing_id())))

        toAdd = maj_count - instances ## majority class count minus current class count
        print(f"Current class:{label}, instances:{instances}, toAdd:{toAdd}")
        rand_rownum = list(np.random.choice(a=instances,size=toAdd,replace=True))
        rand_rownum = [int(r) for r in rand_rownum]
        ## Creatign a data frame with number of required rows
        df_n = spark.createDataFrame(zip(rand_rownum),["row_no"])

        ##mJoining both
        df_random_sel_n = df_n.join(df_random_sel, df_n.row_no == df_random_sel.row_id, "inner")

        # create synthetic feature numerical part
        df_vec_diff = df_random_sel_n.select('*', subtract_vector_udf(F.array('datasetA.features', 'datasetB.features')).alias('vec_diff'))
        df_vec_modified = df_vec_diff.select('*', add_vector_udf(F.array('datasetA.features', 'vec_diff')).alias('features')).withColumn('label',F.lit(label))
        
        df_vec_modified = df_vec_modified.drop(*['row_no','row_id','datasetA','datasetB','vec_diff','EuclideanDistance'])

        res.append(df_vec_modified)



    dfunion = reduce(DataFrame.unionAll, res)

    # union synthetic instances with original full (both minority and majority) df
    oversampled_df = dfunion.union(dfvectorized.select(dfunion.columns))
    return oversampled_df


#### Main function to implement SMOTE which uses the helper functions

In [0]:
def genSyntheticData(spark_df, sel_features, target_column):
    print("Sending the data to preprocess step")
    dfvectorized = preprocess(spark_df,sel_features, target_column)
    print("Getting required label statistics")
    maj_count, label_counts_dict = getLabelCounts(dfvectorized)
    print("Calling the SMOTE method")
    oversampled_df = SMOTE(dfvectorized, label_counts_dict, maj_count)
    return oversampled_df
    
    
    
    

In [0]:
sc = spark.sparkContext

### Importing the dataset

In [0]:
filepath = "/mnt/team16/covtype_final.csv"
df = spark.read\
    .format('csv')\
    .option('header', True)\
    .load(filepath)

df.cache()

Out[10]: DataFrame[Index: string, Elevation: string, Aspect: string, Slope: string, Horizontal_Distance_To_Hydrology: string, Vertical_Distance_To_Hydrology: string, Horizontal_Distance_To_Roadways: string, Hillshade_9am: string, Hillshade_Noon: string, Hillshade_3pm: string, Horizontal_Distance_To_Fire_Points: string, Wilderness_Area_0: string, Wilderness_Area_1: string, Wilderness_Area_2: string, Wilderness_Area_3: string, Soil_Type_0: string, Soil_Type_1: string, Soil_Type_2: string, Soil_Type_3: string, Soil_Type_4: string, Soil_Type_5: string, Soil_Type_6: string, Soil_Type_7: string, Soil_Type_8: string, Soil_Type_9: string, Soil_Type_10: string, Soil_Type_11: string, Soil_Type_12: string, Soil_Type_13: string, Soil_Type_14: string, Soil_Type_15: string, Soil_Type_16: string, Soil_Type_17: string, Soil_Type_18: string, Soil_Type_19: string, Soil_Type_20: string, Soil_Type_21: string, Soil_Type_22: string, Soil_Type_23: string, Soil_Type_24: string, Soil_Type_25: string, Soil_Type

### selecting the final columns for the model

In [0]:
selectedcols = ['Elevation',
 'Aspect',
 'Slope']

In [0]:
df = df.select( selectedcols + ['Cover_Type',])

### Converting the string values to integertype

In [0]:
df = df.select([F.col(c).cast("integer") for c in df.columns])

### Normalise function which uses min-max normalisation

In [0]:
def normalisation(df, inputCols):
    from pyspark.ml.feature import VectorAssembler
    
    inputCols = inputCols
    outputCol = 'features'
    output = 'Cover_Type'
    #inputCols.remove(output)
    assemblers = [VectorAssembler(inputCols = [col],
                                  outputCol = col+'_vec',
                                  handleInvalid="keep") for col in inputCols]

    featureScaler = [MinMaxScaler(inputCol = col + '_vec', 
                                  outputCol = col + '_scaled', 
                                  min = 0.0, 
                                  max = 1.0) for col in inputCols]

    pipeline_scaler = Pipeline(stages = assemblers + featureScaler)
    scaled_fit_df = pipeline_scaler.fit(df)
    scaled_df = scaled_fit_df.transform(df)

    scaled_df.show(5)
    
    return(scaled_df)

### Function for Oversampling using Random Oversampling Algorithm

In [0]:
def oversample(df_oversample):
    from pyspark.sql.functions import count, col
    distinct_values = df_oversample.groupBy("Cover_Type")\
        .agg(count("*").alias("count"))\
        .orderBy(col("Cover_Type").asc())
    distinct_values.show()

    majority = distinct_values.orderBy(col('count').desc()).limit(1).collect()
    print(majority)

    majority_class = df_oversample.filter(col("Cover_Type") == 2)

    inputCols = df_oversample.columns
    inputCols.remove('Cover_Type')

    classes = [i for i in range(1,8) if i!= majority[0][0]]
    print(classes)

    majority_count = df_oversample.filter(df_oversample['Cover_Type'] == majority[0][0]).count()

    minority_upsampled = []
    for i in classes:
        minority_class = df_oversample.filter(col("Cover_Type") == i)

        minority_upsampled.append(minority_class.sample(True, majority_count / minority_class.count(), seed=20))

    print("Upsampled Minority", len(minority_upsampled))

    upsampled_data = majority_class
    for upsample in minority_upsampled:
        upsampled_data = upsampled_data.union(upsample)

    upsampled_data.groupBy("Cover_Type").count().show()

    print(upsampled_data.show(10)) 

    return(upsampled_data)


### Function for Undersampling using Random Undersampling Algorithm

In [0]:
def undersample(df_undersample):
    from pyspark.sql.functions import count, col 
    distinct_values = df_undersample.groupBy("Cover_Type")\
        .agg(count("*").alias("count"))\
        .orderBy(col("Cover_Type").asc())
    distinct_values.show()

    minority = distinct_values.orderBy(col('count').asc()).limit(1).collect()
    print(minority)

    minority_class = df_undersample.filter(col("Cover_Type") == 4)

    inputCols = df_undersample.columns
    inputCols.remove('Cover_Type')

    classes = [i for i in range(1, 8) if i != minority[0][0]]
    print(classes)

    minority_count = df_undersample.filter(df_undersample['Cover_Type'] == minority[0][0]).count()

    majority_undersampled = []
    for i in classes:
        majority_class = df_undersample.filter(col("Cover_Type") == i)

        majority_undersampled.append(majority_class.sample(True, minority_count / majority_class.count(), seed=20))

    print("Undersampled Majority", len(majority_undersampled))

    undersampled_data = minority_class
    for undersample in majority_undersampled:
        undersampled_data = undersampled_data.union(undersample)

    undersampled_data.groupBy("Cover_Type").count().show()
    print(undersampled_data.show(10))
    return (undersampled_data)


### Function for converting the dataset column values in form of vector with the column names as features, label
### where features is the features and label is the output column (Cover_Type)

In [0]:
def processed_vector(df):
  from pyspark.ml.feature import VectorAssembler

  sel_fetures = ["Elevation","Aspect","Slope"]
  target_column = "Cover_Type"
  assembler = VectorAssembler(inputCols = sel_fetures, outputCol = 'features')
  processedDF = assembler.transform(df).withColumnRenamed(target_column,'label').select("features","label")
  return processedDF

### Normalising the dataset using the normalise helper function

In [0]:
inputCols = ["Elevation", "Aspect", "Slope"]
df = normalisation(df, inputCols)

+---------+------+-----+----------+-------------+----------+---------+--------------------+--------------------+--------------------+
|Elevation|Aspect|Slope|Cover_Type|Elevation_vec|Aspect_vec|Slope_vec|    Elevation_scaled|       Aspect_scaled|        Slope_scaled|
+---------+------+-----+----------+-------------+----------+---------+--------------------+--------------------+--------------------+
|     2596|    51|    3|         5|     [2596.0]|    [51.0]|    [3.0]|[0.36868434217108...|[0.14166666666666...|[0.04545454545454...|
|     2590|    56|    2|         5|     [2590.0]|    [56.0]|    [2.0]|[0.36568284142071...|[0.15555555555555...|[0.03030303030303...|
|     2804|   139|    9|         2|     [2804.0]|   [139.0]|    [9.0]|[0.47273636818409...|[0.3861111111111111]|[0.13636363636363...|
|     2785|   155|   18|         2|     [2785.0]|   [155.0]|   [18.0]|[0.4632316158079039]|[0.4305555555555556]|[0.2727272727272727]|
|     2595|    45|    2|         5|     [2595.0]|    [45.0]|  

### Removing the intermediate vector columns created during normalisation

In [0]:
for col in df.columns:
    if col in inputCols or "_vec" in col:
        df = df.drop(col)

### Converting the vectorised scaled columns to Doubletype column

In [0]:
udf1 = F.udf(lambda x: float(x[0]), DoubleType())
for col in df.columns:
    if "_scaled" in col:
        df = df.withColumn(col,udf1(col).alias(col))

### Renaming the scaled columns to the original names

In [0]:
df = df.withColumnRenamed('Elevation_scaled','Elevation')
df = df.withColumnRenamed('Slope_scaled','Slope')
df = df.withColumnRenamed('Aspect_scaled','Aspect')

### Splitting the dataset in test and train using 90-10 split

In [0]:
trainDF, testDF = df.randomSplit([.9,.1], seed=41)
print(f"Training data examples: {trainDF.count()} \nTesting data examples: {testDF.count()}")

Training data examples: 523029 
Testing data examples: 57983


In [0]:
trainDF.show(5)

+----------+------------------+-------------------+--------------------+
|Cover_Type|         Elevation|             Aspect|               Slope|
+----------+------------------+-------------------+--------------------+
|         1|0.3586793396698349|0.11666666666666667| 0.30303030303030304|
|         1|0.3601800900450225| 0.6861111111111111|0.030303030303030304|
|         1|0.3621810905452726|0.08611111111111111| 0.30303030303030304|
|         1|0.3666833416708354|0.10555555555555556| 0.33333333333333337|
|         1|0.3696848424212106|                0.1| 0.15151515151515152|
+----------+------------------+-------------------+--------------------+
only showing top 5 rows



### Resampling the train dataframe to hold 15% of the original data

In [0]:
trainDFs = trainDF.sample(fraction = 0.15)

In [0]:
trainDFs.count()

Out[25]: 78753

### Function for model train and cross-validation which returns F1 score and Accuracy

In [0]:
def model_train_validation(train, test):
    from pyspark.ml.feature import VectorAssembler
    from pyspark.ml import Pipeline
    from pyspark.ml.classification import RandomForestClassifier, LogisticRegression
    from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
    from pyspark.ml.classification import MultilayerPerceptronClassifier
    from pyspark.ml.evaluation import MulticlassClassificationEvaluator
    from pyspark.ml.feature import MinMaxScaler
    import pyspark.sql.functions as F

    from pyspark.sql.functions import count, col


    VectorAssembler1 = VectorAssembler(inputCols=train.columns,
                                       outputCol='features')

    evaluator1 = MulticlassClassificationEvaluator(metricName='f1',
                                                  predictionCol='prediction',
                                                  labelCol='label')

    evaluator2 = MulticlassClassificationEvaluator(metricName='accuracy',
                                                  predictionCol='prediction',
                                                  labelCol='label')

    lr = LogisticRegression(labelCol='label')

    paramGrid = ParamGridBuilder()\
        .addGrid(lr.maxIter, [10])\
        .build()

    cv1 = CrossValidator(estimator=lr,
                        estimatorParamMaps=paramGrid,
                        evaluator=evaluator1)
    
    cv2 = CrossValidator(estimator=lr,
                        estimatorParamMaps=paramGrid,
                        evaluator=evaluator2)

    pipeline1 = Pipeline(stages=[cv1])
    pipeline2 = Pipeline(stages=[cv2])
    
    
    pipelineModel1 = pipeline1.fit(train)
    pipelineModel2 = pipeline2.fit(train)

    predictions1 = pipelineModel1.transform(test)
    predictions2 = pipelineModel2.transform(test)
    
    f1 = evaluator1.evaluate(predictions1)
    accuracy = evaluator2.evaluate(predictions2)

    print("F1:", f1)
    print("Accuracy:", accuracy)
    return f1,accuracy


### Converting the test dataframe as vectors

In [0]:
testDF = processed_vector(testDF)

### Building Base Model using train dataframe

#### Converting train dataframe as vectors

In [0]:
baseDF = processed_vector(trainDF)

#### Building Base Model

In [0]:
f1,accuracy = model_train_validation(baseDF, testDF)
print("F1 of Base Model: ", f1)
print("Accuracy of Base Model: ", accuracy)

F1: 0.6428498239005291
Accuracy: 0.6700929582808755
F1 of Base Model:  0.6428498239005291
Accuracy of Base Model:  0.6700929582808755


### Building Oversampled Dataset and model

#### Oversampling train dataframe

In [0]:
oversampled_df = oversample(trainDF)

+----------+------+
|Cover_Type| count|
+----------+------+
|         1|190826|
|         2|254855|
|         3| 32280|
|         4|  2461|
|         5|  8569|
|         6| 15604|
|         7| 18434|
+----------+------+

[Row(Cover_Type=2, count=254855)]
[1, 3, 4, 5, 6, 7]
Upsampled Minority 6
+----------+------+
|Cover_Type| count|
+----------+------+
|         2|254855|
|         1|254886|
|         3|254706|
|         4|254664|
|         5|254936|
|         6|254566|
|         7|255257|
+----------+------+

+----------+-------------------+--------------------+-------------------+
|Cover_Type|          Elevation|              Aspect|              Slope|
+----------+-------------------+--------------------+-------------------+
|         2|0.15507753876938468|  0.9694444444444444|0.24242424242424243|
|         2|0.16258129064532265|0.016666666666666666| 0.3181818181818182|
|         2| 0.2276138069034517|   0.913888888888889|0.19696969696969696|
|         2|0.24712356178089043| 0.16944

#### Converting oversampled dataframe as vectors

In [0]:
oversampled_df = processed_vector(oversampled_df)

#### Building Oversampled Model

In [0]:
f1,accuracy = model_train_validation(oversampled_df, testDF)
print("F1 of Oversampled Model: ", f1)
print("Accuracy of Oversampled Model: ", accuracy)

F1: 0.5210508298119556
Accuracy: 0.47933014849179933
F1 of Oversampled Model:  0.5210508298119556
Accuracy of Oversampled Model:  0.47933014849179933


### Building Undersampled Dataset and model

#### Undersampling train dataset

In [0]:
undersampled_df = undersample(trainDF)

+----------+------+
|Cover_Type| count|
+----------+------+
|         1|190826|
|         2|254855|
|         3| 32280|
|         4|  2461|
|         5|  8569|
|         6| 15604|
|         7| 18434|
+----------+------+

[Row(Cover_Type=4, count=2461)]
[1, 2, 3, 5, 6, 7]
Undersampled Majority 6
+----------+-----+
|Cover_Type|count|
+----------+-----+
|         4| 2461|
|         1| 2475|
|         6| 2521|
|         7| 2559|
|         2| 2448|
|         3| 2472|
|         5| 2452|
+----------+-----+

+----------+-------------------+-------------------+-------------------+
|Cover_Type|          Elevation|             Aspect|              Slope|
+----------+-------------------+-------------------+-------------------+
|         4|0.06503251625812906| 0.8388888888888889| 0.2727272727272727|
|         4| 0.0655327663831916|0.35000000000000003|0.15151515151515152|
|         4|0.06603301650825412| 0.9361111111111111|0.24242424242424243|
|         4|0.06803401700850424|0.14444444444444446|0.33

#### Converting undersampled dataframe as vectors

In [0]:
undersampled_df = processed_vector(undersampled_df)

#### Building Undersampled Model

In [0]:
f1,accuracy = model_train_validation(undersampled_df, testDF)
print("F1 of Undersampled Model: ", f1)
print("Accuracy of Undersampled Model: ", accuracy)

F1: 0.5173117385094204
Accuracy: 0.47713985133573633
F1 of Undersampled Model:  0.5173117385094204
Accuracy of Undersampled Model:  0.47713985133573633


### Applying SMOTE Algorithm on the train dataframe using SMOTE main function

In [0]:
oversampled_smote_df = genSyntheticData(trainDFs, ["Elevation","Aspect","Slope"], "Cover_Type")

Sending the data to preprocess step
Getting required label statistics
Calling the SMOTE method
Current class:1, instances:28712, toAdd:9771
Current class:6, instances:2293, toAdd:36190
Current class:3, instances:4834, toAdd:33649
Current class:5, instances:1261, toAdd:37222
Current class:7, instances:2812, toAdd:35671
Current class:4, instances:358, toAdd:38125


In [0]:
oversampled_smote_df.show(5, truncate=False)

+-------------------------------------------------------------+-----+
|features                                                     |label|
+-------------------------------------------------------------+-----+
|[0.43649575001829805,0.10648056269985189,0.22895455844417517]|1    |
|[0.45704754059548397,0.3934882903124631,0.12593286401474832] |1    |
|[0.455991811366895,0.18615873306922134,0.14637806108626422]  |1    |
|[0.514903424319858,0.0,0.026238688242995627]                 |1    |
|[0.5144575878190347,0.2655822005423943,0.3135636954904397]   |1    |
+-------------------------------------------------------------+-----+
only showing top 5 rows



In [0]:
oversampled_smote_df.groupBy("label").count().show()

#### Building SMOTE Model

In [0]:
f1,accuracy = model_train_validation(oversampled_smote_df, testDF)
print("F1 of SMOTE Model: ", f1)
print("Accuracy of SMOTE Model: ", accuracy)

In [0]:
end_time = time.time()

In [0]:
print("--- %s seconds ---" % (end_time - start_time))

### Plotting Graphs

In [0]:
import matplotlib.pyplot as plt
import seaborn as sns
%matplotlib inline

In [0]:
pd_oversampled_df = oversampled_df.toPandas()

pd_oversampled_df['Elevation'] = pd_oversampled_df["features"].apply(lambda X:X[0])
pd_oversampled_df['Aspect'] = pd_oversampled_df["features"].apply(lambda X:X[1])

plt.figure(figsize=(10,6))
sns.scatterplot(x='Elevation', y='Aspect',data=pd_oversampled_df, hue='Cover_Type')
plt.show()

In [0]:
pd_undersampled_df = undersampled_df.toPandas()

pd_undersampled_df['Elevation'] = pd_undersampled_df["features"].apply(lambda X:X[0])
pd_undersampled_df['Aspect'] = pd_undersampled_df["features"].apply(lambda X:X[1])

plt.figure(figsize=(10,6))
sns.scatterplot(x='Elevation', y='Aspect',data=pd_undersampled_df, hue='Cover_Type')
plt.show()

In [0]:
pd_smotesampled_df = oversampled_smote_df.toPandas()

In [0]:
pd_smotesampled_df['Elevation'] = pd_smotesampled_df["features"].apply(lambda X:X[0])
pd_smotesampled_df['Aspect'] = pd_smotesampled_df["features"].apply(lambda X:X[1])


plt.figure(figsize=(10,6))
sns.scatterplot(x='Elevation', y='Aspect',data=pd_smotesampled_df, hue='label')
plt.show()

In [0]:
pd_base_df = trainDF.toPandas()

plt.figure(figsize=(10,6))
sns.scatterplot(x='Elevation', y='Aspect',data=pd_base_df, hue='Cover_Type')
plt.show()