### Creating Spark Session


In [1]:
! pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 47 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 64.1 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.1-py2.py3-none-any.whl size=281845512 sha256=728c157ffdef2332a142b5531645f37972d02e88d7ee1dc6f6f8930fd01edd4f
  Stored in directory: /root/.cache/pip/wheels/43/dc/11/ec201cd671da62fa9c5cc77078235e40722170ceba231d7598
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.1


In [2]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

In [3]:
spark = SparkSession.builder.getOrCreate()

### Downloading Dataset

In [4]:
train_identity = spark.read.csv('/content/drive/MyDrive/DE-Project/ieee-fraud-detection/train_identity.csv', inferSchema=True, header=True)
train_identity.show()

+-------------+-----+--------+-----+-----+-----+-----+-----+-----+-----+-----+-----+--------+-----+------+-----+--------+-----+-----+-----+-----+-----+-----+--------------------+-----+-----+-----+-----+-----+--------+----------------+--------------------+-----+---------+--------------+-----+-----+-----+-----+----------+--------------------+
|TransactionID|id_01|   id_02|id_03|id_04|id_05|id_06|id_07|id_08|id_09|id_10|id_11|   id_12|id_13| id_14|id_15|   id_16|id_17|id_18|id_19|id_20|id_21|id_22|               id_23|id_24|id_25|id_26|id_27|id_28|   id_29|           id_30|               id_31|id_32|    id_33|         id_34|id_35|id_36|id_37|id_38|DeviceType|          DeviceInfo|
+-------------+-----+--------+-----+-----+-----+-----+-----+-----+-----+-----+-----+--------+-----+------+-----+--------+-----+-----+-----+-----+-----+-----+--------------------+-----+-----+-----+-----+-----+--------+----------------+--------------------+-----+---------+--------------+-----+-----+-----+-----+----

In [5]:
train_transaction = spark.read.csv('/content/drive/MyDrive/DE-Project/ieee-fraud-detection/train_transaction.csv', inferSchema=True, header=True)
train_transaction.show()

+-------------+-------+-------------+--------------+---------+-----+-----+-----+----------+-----+------+-----+-----+-----+-----+-------------+-------------+---+---+---+---+---+---+---+---+---+---+---+---+----+---+-----+-----+----+-----+----+----+----+----+----+-----+-----+----+----+----+-----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+---+----+----+---+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+------+-----+----+-----+-----+---

In [6]:
train = train_transaction.join(train_identity, ['TransactionID'], how = 'inner').distinct()
train.show()

+-------------+-------+-------------+--------------+---------+-----+-----+-----+----------+-----+------+-----+-----+-----+-----+-------------+-------------+---+----+---+---+---+---+---+----+---+----+---+---+----+---+-----+-----+----+-----+----+-----+----+-------------------+-------------------+----+----+-----+----+----+-----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+---+---+---+---+---+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+-----+--

In [6]:
print((train.count(), len(train.columns)))

(144233, 434)


### Data Preprocessing

#### Important Features

In [7]:
imp_features = spark.read.csv('/content/drive/MyDrive/DE-Project/imp_features.csv', inferSchema=False, header=True)

In [8]:
imp_features = imp_features.select("0")
imp_features.show()

+--------------+
|             0|
+--------------+
|       isFraud|
| TransactionDT|
|TransactionAmt|
|     ProductCD|
|         card1|
|         card2|
|         card3|
|         card4|
|         card5|
|         card6|
| P_emaildomain|
| R_emaildomain|
|            C3|
|            C8|
|            D1|
|           V98|
|          V109|
|          V113|
|          V115|
|          V119|
+--------------+
only showing top 20 rows



In [9]:
imp_features = [row[0] for row in imp_features.collect()]

In [10]:
train_df = train.select(imp_features)
train_df.show()

+-------+-------------+--------------+---------+-----+-----+-----+----------+-----+------+-------------+-------------+---+----+-----+---+----+----+----+----+----+----+----+-----+-----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+------------------+------------------+------------------+-----------------+-----------------+------------------+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+-----------------+-----------------+-----------------+-----+-----+--------+-----+-----+-----+--------+-----+-------+--------+-----+-----+-----+-----+--------+--------------------+-----+-----+-----+-----+----------+--------------------+
|isFraud|TransactionDT|TransactionAmt|ProductCD|card1|card2|card3|    

In [11]:
print((train_df.count(), len(train_df.columns)))

(144233, 120)


#### Modified Features

In [12]:
# Column to be modified
modify_col_list = ('card6', 'P_emaildomain', 'R_emaildomain', 'id_31', 'DeviceInfo')

In [13]:
train_df.drop(*modify_col_list)

DataFrame[isFraud: int, TransactionDT: int, TransactionAmt: double, ProductCD: string, card1: int, card2: double, card3: double, card4: string, card5: double, C3: double, C8: double, D1: double, V98: double, V109: double, V113: double, V115: double, V119: double, V122: double, V123: double, V125: double, V130: double, V136: double, V169: double, V170: double, V171: double, V172: double, V173: double, V174: double, V175: double, V176: double, V180: double, V181: double, V184: double, V185: double, V188: double, V189: double, V193: double, V194: double, V198: double, V200: double, V205: double, V206: double, V207: double, V208: double, V209: double, V210: double, V220: double, V223: double, V225: double, V226: double, V228: double, V230: double, V232: double, V237: double, V238: double, V240: double, V241: double, V243: double, V244: double, V245: double, V247: double, V249: double, V251: double, V252: double, V253: double, V255: double, V257: double, V258: double, V260: double, V261: do

In [14]:
print((train_df.count(), len(train_df.columns)))

(144233, 120)


In [14]:
# Object Feature
from pyspark.sql.types import StringType, DoubleType

object_col = [f.name for f in train_df.schema.fields if isinstance(f.dataType, StringType)]
object_col

['ProductCD',
 'card4',
 'card6',
 'P_emaildomain',
 'R_emaildomain',
 'id_12',
 'id_15',
 'id_16',
 'id_28',
 'id_29',
 'id_31',
 'id_35',
 'id_36',
 'id_37',
 'id_38',
 'DeviceType',
 'DeviceInfo']

In [16]:
len(object_col)

17

In [15]:
# Numerical Features
num_col = [f.name for f in train_df.schema.fields if not isinstance(f.dataType, StringType)]
num_col

['isFraud',
 'TransactionDT',
 'TransactionAmt',
 'card1',
 'card2',
 'card3',
 'card5',
 'C3',
 'C8',
 'D1',
 'V98',
 'V109',
 'V113',
 'V115',
 'V119',
 'V122',
 'V123',
 'V125',
 'V130',
 'V136',
 'V169',
 'V170',
 'V171',
 'V172',
 'V173',
 'V174',
 'V175',
 'V176',
 'V180',
 'V181',
 'V184',
 'V185',
 'V188',
 'V189',
 'V193',
 'V194',
 'V198',
 'V200',
 'V205',
 'V206',
 'V207',
 'V208',
 'V209',
 'V210',
 'V220',
 'V223',
 'V225',
 'V226',
 'V228',
 'V230',
 'V232',
 'V237',
 'V238',
 'V240',
 'V241',
 'V243',
 'V244',
 'V245',
 'V247',
 'V249',
 'V251',
 'V252',
 'V253',
 'V255',
 'V257',
 'V258',
 'V260',
 'V261',
 'V262',
 'V264',
 'V265',
 'V267',
 'V268',
 'V271',
 'V276',
 'V280',
 'V281',
 'V282',
 'V283',
 'V284',
 'V286',
 'V287',
 'V289',
 'V290',
 'V291',
 'V297',
 'V301',
 'V302',
 'V303',
 'V304',
 'V312',
 'V313',
 'V314',
 'V321',
 'id_01',
 'id_02',
 'id_05',
 'id_06',
 'id_11',
 'id_13',
 'id_17',
 'id_19',
 'id_20']

In [18]:
len(num_col)

103

#### Numerical Feature Imputation

In [19]:
from pyspark.ml.feature import Imputer

imputer_num = Imputer(
    inputCols = num_col,
    outputCols = num_col
).setStrategy("median")

In [20]:
train_df = imputer_num.fit(train_df).transform(train_df)
train_df.show()

+-------+-------------+--------------+---------+-----+-----+-----+----------+-----+------+-------------+-------------+---+----+-----+---+----+----+----+----+----+----+----+-----+-----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+------------------+------------------+------------------+-----------------+-----------------+------------------+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+-----------------+-----------------+-----------------+-----+-----+--------+-----+-----+-----+--------+-----+-------+--------+-----+-----+-----+-----+--------+--------------------+-----+-----+-----+-----+----------+--------------------+
|isFraud|TransactionDT|TransactionAmt|ProductCD|card1|card2|card3|    

In [22]:
train_df.select([F.count(F.when(F.isnan(c) | F.col(c).isNull(), c)).alias(c) for c in train_df.columns]).show()

+-------+-------------+--------------+---------+-----+-----+-----+-----+-----+-----+-------------+-------------+---+---+---+---+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+----------+----------+
|isFraud|TransactionDT|TransactionAmt|ProductCD|card1|card2|card3|card4|card5|card6|P_emaildomain|R_emaildomain| C3| C8| D1|V98|V109|V113|V115|V119|V122|V123|V125|V130|V136|V169|V170|V171|V172|V173|V174|V175|V176|V180|V181|V184|V185|V188|V189|V193|V194|V198|V200|V205|V206|V207|V208|V209|V210|V220|V223|V225|V226|V228|V2

#### StringIndexing Object Features

In [24]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer

indexers = [StringIndexer(inputCol=column, outputCol=column+"_index").setHandleInvalid("skip").fit(train_df) for column in  object_col]

pipeline = Pipeline(stages=indexers)
train_df = pipeline.fit(train_df).transform(train_df)
train_df.show()

+-------+-------------+--------------+---------+-----+-----+-----+----------+-----+------+-------------+-------------+---+---+----+---+----+----+----+----+----+----+----+-----+-----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+------------------+------------------+-----------------+-----------------+-----------------+------------------+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+-----------------+-----------------+-----------------+-----+-----+--------+-----+-----+-----------------+--------+-----+-----+--------+-----+-----+-----+-----+--------+--------------------+-----+-----+-----+-----+----------+--------------------+---------------+-----------+-----------+-------------------+----

In [None]:
object_col1 = (col for col in object_col)

train_df = train_df.drop(*object_col1)
train_df.show()

In [None]:
print((train_df.count(), len(train_df.columns)))

#### Object Feature Imputation

In [None]:
object_col_index = [col+"_index" for col in object_col]
imputer_col = Imputer(inputCols = object_col_index, outputCols = object_col_index).setStrategy("mode")
train_df = imputer_num.fit(train_df).transform(train_df)
train_df.show()

In [None]:
print(len(train_df.columns))

In [None]:
train_df.select([F.count(F.when(F.isnan(c) | F.col(c).isNull(), c)).alias(c) for c in train_df.columns]).show()

#### Saving Imputed Data

In [None]:
train_df.write.options(header='True', delimiter=',').csv("/content/drive/MyDrive/DE-Project/train_imputed.csv")

In [16]:
train_df = spark.read.csv('/content/drive/MyDrive/DE-Project/train_imputed.csv', inferSchema=True, header=True)
train_df.show()

+-------+-------------+--------------+-----+-----+-----+-----+---+-----+-----+---+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+-----+--------+-----+-----+-----------------+-----+-----+-----+-----+---------------+-----------+-----------+-------------------+-------------------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+----------------+----------------+
|isFraud|TransactionDT|TransactionAmt|card1|card2|card3|card5| C3|   C8|   D1|V98|V109|V113|V115|V119|V122|V123|V125|V130|V136|V169|V170|V171|V172|V173|V174|V175|V176|V180|V181|V184|V185|V188|V189|V1

In [17]:
object_col_index = [col+"_index" for col in object_col]

#### SMOTE

In [18]:
# Code Source: https://gist.github.com/hwang018/420e288021e9bdacd133076600a9ea8c

import random
import numpy as np
from functools import reduce
import pyspark.sql.functions as F
from pyspark.sql import Row
from pyspark.sql.functions import rand,col,when,concat,substring,lit,udf,lower,sum as ps_sum,count as ps_count,row_number
from pyspark.sql.window import *
from pyspark.sql import DataFrame
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler,BucketedRandomProjectionLSH,VectorSlicer,StringIndexer
from pyspark.sql.window import Window
from pyspark.ml.linalg import Vectors,VectorUDT,SparseVector,DenseVector
from pyspark.sql.functions import array, create_map, struct

class SmoteConfig():
  def __init__(self, seed, bucketLength, k, multiplier):
    self.seed = seed
    self.bucketLength = bucketLength
    self.k = k
    self.multiplier = multiplier

############################## spark smote oversampling ##########################
#for categorical columns, must take its stringIndexed form (smote should be after string indexing, default by frequency)

def pre_smote_df_process(df,num_cols,cat_cols,target_col,index_suffix="_index"):
    '''
    string indexer and vector assembler
    inputs:
    * df: spark df, original
    * num_cols: numerical cols to be assembled
    * cat_cols: categorical cols to be stringindexed
    * target_col: prediction target
    * index_suffix: will be the suffix after string indexing
    output:
    * vectorized: spark df, after stringindex and vector assemble, ready for smote
    '''
    if(df.select(target_col).distinct().count() != 2):
        raise ValueError("Target col must have exactly 2 classes")
        
    if target_col in num_cols:
        num_cols.remove(target_col)

    # only assembled numeric columns into features
    assembler = VectorAssembler(inputCols = num_cols, outputCol = 'features')
    # index the string cols, except possibly for the label col
    assemble_stages = [StringIndexer(inputCol=column, outputCol=column+index_suffix).fit(df) for column in list(set(cat_cols)-set([target_col]))]
    # add the stage of numerical vector assembler
    assemble_stages.append(assembler)
    pipeline = Pipeline(stages=assemble_stages)
    pos_vectorized = pipeline.fit(df).transform(df)
    
    # drop original num cols and cat cols
    drop_cols = num_cols+cat_cols
    
    keep_cols = [a for a in pos_vectorized.columns if a not in drop_cols]
    
    vectorized = pos_vectorized.select(*keep_cols).withColumn('label',pos_vectorized[target_col]).drop(target_col)
    
    return vectorized

def smote(vectorized_sdf,smote_config):
    '''
    contains logic to perform smote oversampling, given a spark df with 2 classes
    inputs:
    * vectorized_sdf: cat cols are already stringindexed, num cols are assembled into 'features' vector
      df target col should be 'label'
    * smote_config: config obj containing smote parameters
    output:
    * oversampled_df: spark df after smote oversampling
    '''
    dataInput_min = vectorized_sdf[vectorized_sdf['label'] == 1]
    dataInput_maj = vectorized_sdf[vectorized_sdf['label'] == 0]
    
    # LSH, bucketed random projection
    brp = BucketedRandomProjectionLSH(inputCol="features", outputCol="hashes",seed=smote_config.seed, bucketLength=smote_config.bucketLength)
    # smote only applies on existing minority instances    
    model = brp.fit(dataInput_min)
    model.transform(dataInput_min)

    # here distance is calculated from brp's param inputCol
    self_join_w_distance = model.approxSimilarityJoin(dataInput_min, dataInput_min, float("inf"), distCol="EuclideanDistance")

    # remove self-comparison (distance 0)
    self_join_w_distance = self_join_w_distance.filter(self_join_w_distance.EuclideanDistance > 0)

    over_original_rows = Window.partitionBy("datasetA").orderBy("EuclideanDistance")

    self_similarity_df = self_join_w_distance.withColumn("r_num", F.row_number().over(over_original_rows))

    self_similarity_df_selected = self_similarity_df.filter(self_similarity_df.r_num <= smote_config.k)

    over_original_rows_no_order = Window.partitionBy('datasetA')

    # list to store batches of synthetic data
    res = []
    
    # two udf for vector add and subtract, subtraction include a random factor [0,1]
    # subtract_vector_udf = F.udf(lambda arr: random.uniform(0, 1)*(arr[0]-arr[1]), VectorUDT())
    # add_vector_udf = F.udf(lambda arr: arr[0]+arr[1], VectorUDT())
    
    @F.udf(returnType=VectorUDT())
    def subtract_vector_udf(arr):
        # Must decorate func as udf to ensure that its callback form is the arg to df iterator construct
        a = arr[0]
        b = arr[1]
        if isinstance(a, SparseVector):
            a = a.toArray()
        if isinstance(b, SparseVector):
            b = b.toArray()
        array_ = a - b
        return random.uniform(0, 1) * Vectors.dense(array_)

    @F.udf(returnType=VectorUDT())
    def add_vector_udf(arr):
        # Must decorate func as udf to ensure that its callback form is the arg to df iterator construct
        a = arr[0]
        b = arr[1]
        if isinstance(a, SparseVector):
            a = a.toArray()
        if isinstance(b, SparseVector):
            b = b.toArray()
        array_ = a + b
        return Vectors.dense(array_)

    # retain original columns
    original_cols = dataInput_min.columns
    
    for i in range(smote_config.multiplier):
        print("generating batch %s of synthetic instances"%i)
        # logic to randomly select neighbour: pick the largest random number generated row as the neighbour
        df_random_sel = self_similarity_df_selected.withColumn("rand", F.rand()).withColumn('max_rand', F.max('rand').over(over_original_rows_no_order))\
                            .where(F.col('rand') == F.col('max_rand')).drop(*['max_rand','rand','r_num'])
        # create synthetic feature numerical part
        df_vec_diff = df_random_sel.select('*', subtract_vector_udf(F.array('datasetA.features', 'datasetB.features')).alias('vec_diff'))
        df_vec_modified = df_vec_diff.select('*', add_vector_udf(F.array('datasetA.features', 'vec_diff')).alias('features'))
        
        # for categorical cols, either pick original or the neighbour's cat values
        for c in original_cols:
            # randomly select neighbour or original data
            col_sub = random.choice(['datasetA','datasetB'])
            val = "{0}.{1}".format(col_sub,c)
            if c != 'features':
                # do not unpack original numerical features
                df_vec_modified = df_vec_modified.withColumn(c,F.col(val))
        
        # this df_vec_modified is the synthetic minority instances,
        df_vec_modified = df_vec_modified.drop(*['datasetA','datasetB','vec_diff','EuclideanDistance'])
        
        res.append(df_vec_modified)
    
    dfunion = reduce(DataFrame.unionAll, res)
    # union synthetic instances with original full (both minority and majority) df
    oversampled_df = dfunion.union(vectorized_sdf.select(dfunion.columns))
    
    return oversampled_df

In [19]:
smote_config = SmoteConfig(48, 100, 4, 3)

vectorized_sdf= pre_smote_df_process(train_df, num_col, object_col_index, "isFraud")
oversampled_df = smote(vectorized_sdf, smote_config)

generating batch 0 of synthetic instances
generating batch 1 of synthetic instances
generating batch 2 of synthetic instances


In [20]:
oversampled_df.show()

+--------------------+-----------------+-----------------+-----------------+----------------------+-------------------------+---------------------+-----------------+-----------------+----------------------+-----------------+-------------------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----+
|            features|card4_index_index|id_28_index_index|id_38_index_index|DeviceInfo_index_index|P_emaildomain_index_index|ProductCD_index_index|id_35_index_index|id_29_index_index|DeviceType_index_index|id_31_index_index|R_emaildomain_index_index|id_15_index_index|id_12_index_index|card6_index_index|id_37_index_index|id_36_index_index|id_16_index_index|label|
+--------------------+-----------------+-----------------+-----------------+----------------------+-------------------------+---------------------+-----------------+-----------------+----------------------+-----------------+-------------------------+-----------------+--

In [21]:
object_col_index2 = [col+"_index" for col in object_col_index]

#### OneHot Encoding

In [22]:
from pyspark.ml.feature import OneHotEncoder

encoders = [OneHotEncoder(inputCol=column, outputCol=column+"_encode") for column in object_col_index2]

pipeline = Pipeline(stages=encoders)
encoded_df = pipeline.fit(oversampled_df).transform(oversampled_df)
encoded_df.show()

+--------------------+-----------------+-----------------+-----------------+----------------------+-------------------------+---------------------+-----------------+-----------------+----------------------+-----------------+-------------------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----+----------------------------+------------------------+------------------------+--------------------------------+--------------------------------+------------------------+------------------------+------------------------+------------------------+------------------------+------------------------+------------------------+------------------------+------------------------+------------------------+-----------------------------+-----------------------------+
|            features|card4_index_index|id_28_index_index|id_38_index_index|DeviceInfo_index_index|P_emaildomain_index_index|ProductCD_index_index|id_35_index_index|id_29_index_ind

In [23]:
object_col_encode = [col+"_encode" for col in object_col_index2]

encoded_df = encoded_df.drop(*object_col_index2)
encoded_df.show()

+--------------------+-----+----------------------------+------------------------+------------------------+--------------------------------+--------------------------------+------------------------+------------------------+------------------------+------------------------+------------------------+------------------------+------------------------+------------------------+------------------------+------------------------+-----------------------------+-----------------------------+
|            features|label|ProductCD_index_index_encode|card4_index_index_encode|card6_index_index_encode|P_emaildomain_index_index_encode|R_emaildomain_index_index_encode|id_12_index_index_encode|id_15_index_index_encode|id_16_index_index_encode|id_28_index_index_encode|id_29_index_index_encode|id_31_index_index_encode|id_35_index_index_encode|id_36_index_index_encode|id_37_index_index_encode|id_38_index_index_encode|DeviceType_index_index_encode|DeviceInfo_index_index_encode|
+--------------------+-----+----

#### Scaling

In [24]:
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.feature import VectorAssembler

for i in list(set(encoded_df.columns)-set(['label'])):
    # VectorAssembler Transformation - Converting column to vector type
    assembler = VectorAssembler(inputCols=[i],outputCol=i+"_Vect")

    # MinMaxScaler Transformation
    scaler = MinMaxScaler(inputCol=i+"_Vect", outputCol=i+"_Scaled")

    # Pipeline of VectorAssembler and MinMaxScaler
    pipeline = Pipeline(stages=[assembler, scaler])

    # Fitting pipeline on dataframe
    encoded_df = pipeline.fit(encoded_df).transform(encoded_df)

encoded_df.show()

+--------------------+-----+----------------------------+------------------------+------------------------+--------------------------------+--------------------------------+------------------------+------------------------+------------------------+------------------------+------------------------+------------------------+------------------------+------------------------+------------------------+------------------------+-----------------------------+-----------------------------+----------------------------------+------------------------------------+-----------------------------+-------------------------------+-----------------------------+-------------------------------+-----------------------------+-------------------------------+---------------------------------+-----------------------------------+-----------------------------+-------------------------------+--------------------+--------------------+-----------------------------+-------------------------------+-----------------------

In [25]:
object_col_scaled = [col+"_Scaled" for col in object_col_encode]

object_col_vect = [col+"_Vect" for col in object_col_encode]
encoded_df = encoded_df.drop(*object_col_encode)
encoded_df = encoded_df.drop(*object_col_vect)
encoded_df = encoded_df.drop("features_Vect")
encoded_df = encoded_df.drop("features")
encoded_df.show()

+-----+------------------------------------+-------------------------------+-------------------------------+-------------------------------+-----------------------------------+-------------------------------+--------------------+-------------------------------+-------------------------------+-------------------------------+-------------------------------+-------------------------------+-------------------------------+---------------------------------------+-------------------------------+-------------------------------+---------------------------------------+------------------------------------+
|label|DeviceType_index_index_encode_Scaled|id_12_index_index_encode_Scaled|card4_index_index_encode_Scaled|id_37_index_index_encode_Scaled|ProductCD_index_index_encode_Scaled|id_36_index_index_encode_Scaled|     features_Scaled|id_38_index_index_encode_Scaled|id_35_index_index_encode_Scaled|id_15_index_index_encode_Scaled|card6_index_index_encode_Scaled|id_29_index_index_encode_Scaled|id_28_in

In [26]:
object_col_scaled

['ProductCD_index_index_encode_Scaled',
 'card4_index_index_encode_Scaled',
 'card6_index_index_encode_Scaled',
 'P_emaildomain_index_index_encode_Scaled',
 'R_emaildomain_index_index_encode_Scaled',
 'id_12_index_index_encode_Scaled',
 'id_15_index_index_encode_Scaled',
 'id_16_index_index_encode_Scaled',
 'id_28_index_index_encode_Scaled',
 'id_29_index_index_encode_Scaled',
 'id_31_index_index_encode_Scaled',
 'id_35_index_index_encode_Scaled',
 'id_36_index_index_encode_Scaled',
 'id_37_index_index_encode_Scaled',
 'id_38_index_index_encode_Scaled',
 'DeviceType_index_index_encode_Scaled',
 'DeviceInfo_index_index_encode_Scaled']

In [None]:
# from pyspark.ml import Pipeline
# from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler

# categorical_columns= ['Gender', 'Age', 'Occupation', 'City_Category','Marital_Status']

# # The index of string vlaues multiple columns
# indexers = [
#     StringIndexer(inputCol=c, outputCol="{0}_indexed".format(c))
#     for c in categorical_columns
# ]

# # The encode of indexed vlaues multiple columns
# encoders = [OneHotEncoder(dropLast=False,inputCol=indexer.getOutputCol(),
#             outputCol="{0}_encoded".format(indexer.getOutputCol())) 
#     for indexer in indexers
# ]

# # Vectorizing encoded values
# assembler = VectorAssembler(inputCols=[encoder.getOutputCol() for encoder in encoders],outputCol="features")

# pipeline = Pipeline(stages=indexers + encoders+[assembler])
# model=pipeline.fit(data_df)
# transformed = model.transform(data_df)

### Modelling

In [27]:
feature_cols = list(set(encoded_df.columns)-set(['label']))

assembler = VectorAssembler(inputCols=feature_cols, outputCol='features')
assembled_df = assembler.transform(encoded_df)
assembled_df.show()

+-----+------------------------------------+-------------------------------+-------------------------------+-------------------------------+-----------------------------------+-------------------------------+--------------------+-------------------------------+-------------------------------+-------------------------------+-------------------------------+-------------------------------+-------------------------------+---------------------------------------+-------------------------------+-------------------------------+---------------------------------------+------------------------------------+--------------------+
|label|DeviceType_index_index_encode_Scaled|id_12_index_index_encode_Scaled|card4_index_index_encode_Scaled|id_37_index_index_encode_Scaled|ProductCD_index_index_encode_Scaled|id_36_index_index_encode_Scaled|     features_Scaled|id_38_index_index_encode_Scaled|id_35_index_index_encode_Scaled|id_15_index_index_encode_Scaled|card6_index_index_encode_Scaled|id_29_index_index_e

In [28]:
df = assembled_df.select(["features", "label"])
df.show()

+--------------------+-----+
|            features|label|
+--------------------+-----+
|(2033,[0,2,3,4,5,...|    1|
|(2033,[0,1,3,4,5,...|    1|
|(2033,[0,1,3,4,5,...|    1|
|(2033,[0,1,3,4,5,...|    1|
|(2033,[0,1,3,4,5,...|    1|
|(2033,[0,2,3,4,5,...|    1|
|(2033,[0,1,3,4,5,...|    1|
|(2033,[0,1,3,4,5,...|    1|
|(2033,[0,1,3,4,5,...|    1|
|(2033,[0,2,3,4,5,...|    1|
|(2033,[0,1,3,4,5,...|    1|
|(2033,[0,1,3,4,5,...|    1|
|(2033,[0,2,3,4,5,...|    1|
|(2033,[0,1,3,4,5,...|    1|
|(2033,[0,1,3,4,5,...|    1|
|(2033,[0,1,3,4,5,...|    1|
|(2033,[0,1,3,4,5,...|    1|
|(2033,[0,2,3,4,5,...|    1|
|(2033,[0,1,3,4,5,...|    1|
|(2033,[0,1,3,4,5,...|    1|
+--------------------+-----+
only showing top 20 rows



In [29]:
train, test = df.randomSplit([0.6, 0.4], seed=42)

In [30]:
x_test = test.select('features')
y_test = test.select('label')

In [31]:
from pyspark.ml.classification import RandomForestClassifier
spark.conf.set("spark.sql.codegen.wholestage", False)

rf = RandomForestClassifier(featuresCol="features", labelCol='label').fit(train)

Py4JJavaError: ignored

In [None]:
pred = rf.transform(x_test)

In [None]:
pred