## Reason for Spark over Pandas

1) Pandas data frame is not distributed & Spark's DataFrame is distributed. -> Hence you won't get the benefit of parallel processing in Pandas DataFrame & speed of processing in Pandas DataFrame will be less for large amount of data.

2) Spark DataFrame assures you fault tolerance (It's resilient) & pandas DataFrame does not assure it. -> Hence if your data processing got interrupted/failed in between processing then spark can regenerate the failed result set from lineage (from DAG) . Fault tolerance is not supported in Pandas. You need to implement your own framework to assure it.

3) In my experience as a Data Engineer, I’ve found building data pipelines in Pandas often requires us to regularly increase resources to keep up with the increasing memory usage. In addition, we often see many runtime errors due to unexpected data types or nulls. As a result of using Spark with Scala instead, solutions feel more robust and easier to refactor and extend.

In [6]:
import pandas as pd
import numpy as np
from datetime import date, timedelta, datetime
import time

import pyspark # only run this after findspark.init()
from pyspark.sql import SparkSession, SQLContext
from pyspark.context import SparkContext
from pyspark.sql.functions import * 
from pyspark.sql.types import * 

In [3]:
#we use the findspark library to locate spark on our local machine
import findspark
findspark.init('/Applications/spark-2.4.7-bin-hadoop2.7')

In [12]:
sc = SparkSession.builder.appName('Spark_Functions').master("local[*]").getOrCreate()
sparkContext=sc.sparkContext
sqlContext = SQLContext(sparkContext=sc.sparkContext, sparkSession=sc)

In [None]:
# we use the below function to find more information about the #missing values
def info_missing_table(df_pd):
    """Input pandas dataframe and Return columns with missing value and percentage"""
    mis_val = df_pd.isnull().sum() #count total of null in each columns in dataframe
#count percentage of null in each columns
    mis_val_percent = 100 * df_pd.isnull().sum() / len(df_pd) 
    mis_val_table = pd.concat([mis_val, mis_val_percent], axis=1) 
 #join to left (as column) between mis_val and mis_val_percent
    mis_val_table_ren_columns = mis_val_table.rename(
    columns = {0 : 'Missing Values', 1 : '% of Total Values'}) 
#rename columns in table
    mis_val_table_ren_columns = mis_val_table_ren_columns[
    mis_val_table_ren_columns.iloc[:,1] != 0].sort_values('% of Total Values', ascending=False).round(1) 
        
    print ("Your selected dataframe has " + str(df_pd.shape[1]) + " columns.\n"    #.shape[1] : just view total columns in dataframe  
    "There are " + str(mis_val_table_ren_columns.shape[0]) +              
    " columns that have missing values.") #.shape[0] : just view total rows in dataframe
    return mis_val_table_ren_columns
missings = info_missing_table(df_pd)
missings

In [None]:
def count_missings(spark_df):
    null_counts = []        
    for col in spark_df.dtypes:    
        cname = col[0]     
        ctype = col[1]      
        nulls = spark_df.where( spark_df[cname].isNull()).count() #check count of null in column name
        result = tuple([cname, nulls])  #new tuple, (column name, null count)
        null_counts.append(result)      #put the new tuple in our result list
    null_counts=[(x,y) for (x,y) in null_counts if y!=0]  #view just columns that have missing values
    return null_counts

In [None]:
miss_counts = count_missings(new_df)
miss_counts

In [None]:
# Separate categorical and numerical columns with missing values based on types

list_cols_miss=[x[0] for x in miss_counts]
df_miss= new_df.select(*list_cols_miss)

# categorical columns
catcolums_miss=[item[0] for item in df_miss.dtypes if item[1].startswith('string')]  #will select name of column with string data type
print("cateogrical columns_miss:", catcolums_miss)

# numerical columns
numcolumns_miss = [item[0] for item in df_miss.dtypes if item[1].startswith('int') | item[1].startswith('double')] #will select name of column with integer or double data type
print("numerical columns_miss:", numcolumns_miss)

In [None]:
from pyspark.sql.functions import rank, sum ,col, mean, round

def impute_data(miss_counts):
    list_cols_miss=[x[0] for x in miss_counts]
    df_miss= new_df.select(*list_cols_miss)
    # categorical columns
    
    #will select name of column with string data type
    catcolums_miss=[item[0] for item in df_miss.dtypes if item[1].startswith('string')]
    
    # numerical columns
    numcolumns_miss = [item[0] for item in df_miss.dtypes if item[1].startswith('int') | item[1].startswith('double')]
    
    #dropped_df
    df_Nomiss=new_df.na.drop()
    
    #categorical operation
    for x in catcolums_miss:                  
    mode=df_Nomiss.groupBy(x).count().sort(col("count").desc()).collect()[0][0] 
    print(x, mode) #print name of columns and it's most categories 
    new_df = new_df.na.fill({x:mode})
    
    #numerical operation
    for i in numcolumns_miss:
    meanvalue = new_df.select(round(mean(i))).collect()[0][0] 
    print(i, meanvalue) 
    new_df=new_df.na.fill({i:meanvalue})
    
    return new_df

In [126]:
#Pandas One hot encoding
#Best Practice
#Still Needs to Drop One Value

list_of_pivot_cols = ['TYPE','CODE']
list_of_keys = ['ID','TYPE','CODE']

Spark_DF = sparkContext.parallelize([(1,'A','X1'),
                         (2,'B','X2'),
                         (3,'B','X3'),
                         (1,'B','X3'),
                         (2,'C','X2'),
                         (3,'C','X2'),
                         (1,'C','X1'),
                         (1,'B','X1')]).toDF(['ID','TYPE','CODE'])                         

#Helper function to recursively join a list of dataframes
#Can be simplified if you only need two columns

class Spark_One_Hot_Encoding():
    def __init__(self, list_of_keys, list_of_pivot_cols, Spark_DF):
        pass
    
    def join_all(dfs, list_of_keys):
        if len(dfs) > 1:
            return dfs[0].join(join_all(dfs[1:], list_of_keys), on = list_of_keys, how = 'inner')
        else:
            return dfs[0]
        
    def encoded_df():
        dfs = []
        combined = []
        
        for pivot_col in pivot_cols:
            pivotDF = Spark_DF.groupBy(keys).pivot(pivot_col).count()
            new_names = pivotDF.columns[:len(keys)] +  ["e{0}_{1}".format(pivot_col, c) for c in pivotDF.columns[len(keys):]]
            df = pivotDF.toDF(*new_names).fillna(0)
            combined.append(df)
            
        new_df = join_all(combined, list_of_keys)
        return new_df
    
    def encoded_drop1():
        dfs = []
        combined = []
        drop_columns_list = []
        
        for pivot_col in pivot_cols:
            pivotDF = Spark_DF.groupBy(keys).pivot(pivot_col).count()
            new_names = pivotDF.columns[:len(keys)] +  ["e{0}_{1}".format(pivot_col, c) for c in pivotDF.columns[len(keys):]]
            drop_columns_list.append(new_names[len(list_of_keys)])
            df = pivotDF.toDF(*new_names).fillna(0)
            combined.append(df)
            
        new_df = join_all(combined, list_of_keys)
    
        new_df_drop_1 = new_df.drop(*drop_columns_list)  
        return new_df_drop_1, drop_columns_list
        
    
Spark_One_Hot_Encoding(list_of_keys, list_of_pivot_cols, Spark_DF)
Spark_One_Hot_Encoding.encoded_df().show()

a, b = Spark_One_Hot_Encoding.encoded_drop1()
a.show()
b

+---+----+----+-------+-------+-------+--------+--------+--------+
| ID|TYPE|CODE|eTYPE_A|eTYPE_B|eTYPE_C|eCODE_X1|eCODE_X2|eCODE_X3|
+---+----+----+-------+-------+-------+--------+--------+--------+
|  1|   A|  X1|      1|      0|      0|       1|       0|       0|
|  2|   C|  X2|      0|      0|      1|       0|       1|       0|
|  3|   B|  X3|      0|      1|      0|       0|       0|       1|
|  2|   B|  X2|      0|      1|      0|       0|       1|       0|
|  3|   C|  X2|      0|      0|      1|       0|       1|       0|
|  1|   B|  X3|      0|      1|      0|       0|       0|       1|
|  1|   B|  X1|      0|      1|      0|       1|       0|       0|
|  1|   C|  X1|      0|      0|      1|       1|       0|       0|
+---+----+----+-------+-------+-------+--------+--------+--------+

+---+----+----+-------+-------+--------+--------+
| ID|TYPE|CODE|eTYPE_B|eTYPE_C|eCODE_X2|eCODE_X3|
+---+----+----+-------+-------+--------+--------+
|  1|   A|  X1|      0|      0|       0|     

['eTYPE_A', 'eCODE_X1']

In [85]:
combined

[DataFrame[ID: bigint, TYPE: string, CODE: string, eTYPE_A: bigint, eTYPE_B: bigint, eTYPE_C: bigint],
 DataFrame[ID: bigint, TYPE: string, CODE: string, eCODE_X1: bigint, eCODE_X2: bigint, eCODE_X3: bigint]]

In [99]:
list_of_pivot_cols = ['TYPE','CODE']
list_of_keys = ['ID','TYPE','CODE']

Spark_DF = sparkContext.parallelize([(1,'A','X1'),
                         (2,'B','X2'),
                         (3,'B','X3'),
                         (1,'B','X3'),
                         (2,'C','X2'),
                         (3,'C','X2'),
                         (1,'C','X1'),
                         (1,'B','X1')]).toDF(['ID','TYPE','CODE'])

def join_all(dfs, list_of_keys):
    if len(dfs) > 1:
        return dfs[0].join(join_all(dfs[1:], list_of_keys), on = list_of_keys, how = 'inner')
    else:
        return dfs[0]
dfs = []
combined = []
first_new_names = []
        
for pivot_col in pivot_cols:
    pivotDF = Spark_DF.groupBy(keys).pivot(pivot_col).count()
    new_names = pivotDF.columns[:len(keys)] +  ["e_{0}_{1}".format(pivot_col, c) for c in pivotDF.columns[len(keys):]]
    first_new_names.append(new_names)
    df = pivotDF.toDF(*new_names).fillna(0)
    combined.append(df)
    
new_df = join_all(combined, list_of_keys)
new_df.show()

+---+----+----+--------+--------+--------+---------+---------+---------+
| ID|TYPE|CODE|e_TYPE_A|e_TYPE_B|e_TYPE_C|e_CODE_X1|e_CODE_X2|e_CODE_X3|
+---+----+----+--------+--------+--------+---------+---------+---------+
|  1|   A|  X1|       1|       0|       0|        1|        0|        0|
|  2|   C|  X2|       0|       0|       1|        0|        1|        0|
|  3|   B|  X3|       0|       1|       0|        0|        0|        1|
|  2|   B|  X2|       0|       1|       0|        0|        1|        0|
|  3|   C|  X2|       0|       0|       1|        0|        1|        0|
|  1|   B|  X3|       0|       1|       0|        0|        0|        1|
|  1|   B|  X1|       0|       1|       0|        1|        0|        0|
|  1|   C|  X1|       0|       0|       1|        1|        0|        0|
+---+----+----+--------+--------+--------+---------+---------+---------+



In [63]:
#Column Weights
# adding the new column weights and fill it with ratios
from pyspark.sql.functions import when
ratio = 0.91
def weight_balance(labels):
    return when(labels == 1, ratio).otherwise(1*(1-ratio))
new_df = new_df.withColumn('weights', weight_balance(col('label')))

NameError: name 'new_df' is not defined

## 3. Feature Engineering

PySpark has a great feature engineering handling, so that we do not need to do much for extracting features

1. Apply StringIndexer() to assign indices to each category in our categorical columns
2. Apply OneHotEncoderEstimator() to convert categorical columns to onehot encoded vectors
3. Apply VectorAssembler() to create a feature vector from all categorical and numerical features and we call the final vector as “features”

In [None]:
# we use the OneHotEncoderEstimator from MLlib in spark to convert #aech v=categorical feature into one-hot vectors
# next, we use VectorAssembler to combine the resulted one-hot ector #and the rest of numerical features into a 
# single vector column. we append every step of the process in a #stages array
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler
stages = []
for categoricalCol in cat_cols:
    stringIndexer = StringIndexer(inputCol = categoricalCol, outputCol = categoricalCol + 'Index')
    encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
    stages += [stringIndexer, encoder]
    
assemblerInputs = [c + "classVec" for c in cat_cols] + num_cols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

In [None]:
from pyspark.ml import Pipeline
cols = new_df.columns
pipeline = Pipeline(stages = stages)
pipelineModel = pipeline.fit(new_df)
new_df = pipelineModel.transform(new_df)

In [None]:
selectedCols = ['features']+cols
new_df = new_df.select(selectedCols)

# New dataset after feature engineering:
pd.DataFrame(new_df.take(5), columns=new_df.columns)

In [1]:
## Train/Test Split
# split the data into trainign and testin sets
train, test = new_df.randomSplit([0.80, 0.20], seed = 42)
print(train.count())
print(test.count())

## Modeling

##### (1) Logistic Regression 

In [31]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
predictions_LR = LR_model.transform(test)
evaluator = BinaryClassificationEvaluator()
print("Test SET ROC: " + str(evaluator.evaluate(predictions_LR, {evaluator.metricName: "areaUnderROC"})))

Test SET ROC: 0.7193116947055981


##### (2) Gradient Boosting

In [32]:
from pyspark.ml.classification import GBTClassifier
gbt = GBTClassifier(maxIter=15)
GBT_Model = gbt.fit(train)
gbt_predictions = GBT_Model.transform(test)
evaluator = BinaryClassificationEvaluator()

Test_SET (Area Under ROC): 0.7323118190985889


In [33]:
print("Test SET ROC: " + str(evaluator.evaluate(gbt_predictions, {evaluator.metricName: "areaUnderROC"})))

Test SET ROC: 0.7323118190985882


Let's use the Gradient Boosting result then apply hyper-parameter tuning using grid search and after that we run cross validation to better improve the performance of GBT.

In [None]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
paramGrid = (ParamGridBuilder()
             .addGrid(gbt.maxDepth, [2, 4, 6])
             .addGrid(gbt.maxBins, [20, 30])
             .addGrid(gbt.maxIter, [10, 15])
             .build())
cv = CrossValidator(estimator=gbt, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)
# Run cross validations.
cvModel = cv.fit(train)
gbt_cv_predictions = cvModel.transform(test)
evaluator.evaluate(gbt_cv_predictions)

In [2]:
## Modeling

from pyspark.ml.classification import GBTClassifier
gbt = GBTClassifier(maxIter=15)
GBT_Model = gbt.fit(train)
gbt_predictions = GBT_Model.transform(test)
evaluator = BinaryClassificationEvaluator()

In [None]:
print("Test SET ROC: " + str(evaluator.evaluate(gbt_predictions, {evaluator.metricName: "areaUnderROC"})))

In [None]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
paramGrid = (ParamGridBuilder()
             .addGrid(gbt.maxDepth, [2, 4, 6])
             .addGrid(gbt.maxBins, [20, 30])
             .addGrid(gbt.maxIter, [10, 15])
             .build())
cv = CrossValidator(estimator=gbt, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)
# Run cross validations.
cvModel = cv.fit(train)
gbt_cv_predictions = cvModel.transform(test)
evaluator.evaluate(gbt_cv_predictions)