
# <center> PySpark with OHE in spark


## Import required functions & libraries

In [1]:
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer, VectorAssembler, SQLTransformer,OneHotEncoder
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from IPython.display import Image
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from pyspark.sql.types import *
from pyspark.sql.functions import lpad
from sklearn.metrics import confusion_matrix
from sklearn.model_selection import train_test_split
import pandas as pd
import xgboost as xgb
import numpy as np
import time

## Set up and configure spark context

In [2]:
#sc.stop()
#sc = SparkContext(master="local[4]")
sqlContext = SQLContext(sc)

## Import required Dataset

In [3]:
#df = sqlContext.table('eaptraining_work.eap_loans') 
t_import=time.time()
df = sqlContext.read.csv("Delay_20k.csv",header=True,inferSchema=True)

## Data Preparation

### Check data type & dimensions of dataset

In [4]:
t_Data_Prep=time.time()
print(type(df))
print(df.count(),",",len(df.columns))

<class 'pyspark.sql.dataframe.DataFrame'>
20000 , 30


### Add DepDelayFlag and DepHour fields

In [5]:
def valueToCategory(value):
   if   value >= 15: return 1
   else: return 0
    
udfValueToCategory = udf(valueToCategory, IntegerType())
df = df.withColumn("DepDelayFlag", udfValueToCategory("DepDelay"))

In [6]:
df=df.withColumn('DepHour', lpad(df['CRSDepTime'],4,'0').substr(1,2))

### Remove unwanted columns

In [7]:
df=df.drop("X")
df.columns

['Year',
 'Month',
 'DayofMonth',
 'DayOfWeek',
 'DepTime',
 'CRSDepTime',
 'ArrTime',
 'CRSArrTime',
 'UniqueCarrier',
 'FlightNum',
 'TailNum',
 'ActualElapsedTime',
 'CRSElapsedTime',
 'AirTime',
 'ArrDelay',
 'DepDelay',
 'Origin',
 'Dest',
 'Distance',
 'TaxiIn',
 'TaxiOut',
 'Cancelled',
 'CancellationCode',
 'Diverted',
 'CarrierDelay',
 'WeatherDelay',
 'NASDelay',
 'SecurityDelay',
 'LateAircraftDelay',
 'DepDelayFlag',
 'DepHour']

### Print schema

In [8]:
df.printSchema()

root
 |-- Year: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- DayofMonth: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- DepTime: integer (nullable = true)
 |-- CRSDepTime: integer (nullable = true)
 |-- ArrTime: string (nullable = true)
 |-- CRSArrTime: integer (nullable = true)
 |-- UniqueCarrier: string (nullable = true)
 |-- FlightNum: integer (nullable = true)
 |-- TailNum: string (nullable = true)
 |-- ActualElapsedTime: string (nullable = true)
 |-- CRSElapsedTime: string (nullable = true)
 |-- AirTime: string (nullable = true)
 |-- ArrDelay: string (nullable = true)
 |-- DepDelay: integer (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- Distance: integer (nullable = true)
 |-- TaxiIn: string (nullable = true)
 |-- TaxiOut: string (nullable = true)
 |-- Cancelled: integer (nullable = true)
 |-- CancellationCode: string (nullable = true)
 |-- Diverted: integer (nullable = true)
 |-- C

### Print first row of data

In [9]:
df.show(1)

+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+------------+-------+
|Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|Dest|Distance|TaxiIn|TaxiOut|Cancelled|CancellationCode|Diverted|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|DepDelayFlag|DepHour|
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+------------+-------+
|2008|    3|        29|        

### Statistics of columns in data

In [10]:
df.describe().toPandas()

Unnamed: 0,summary,Year,Month,DayofMonth,DayOfWeek,DepTime,CRSDepTime,ArrTime,CRSArrTime,UniqueCarrier,...,Cancelled,CancellationCode,Diverted,CarrierDelay,WeatherDelay,NASDelay,SecurityDelay,LateAircraftDelay,DepDelayFlag,DepHour
0,count,20000.0,20000.0,20000.0,20000.0,20000.0,20000.0,20000.0,20000.0,20000,...,20000.0,20000,20000.0,20000.0,20000.0,20000.0,20000.0,20000.0,20000.0,20000.0
1,mean,2008.0,6.064,15.6512,3.9635,1516.20085,1467.7472,1609.9583855602907,1633.8333,,...,0.00035,,0.00295,19.12344236760125,3.7969626168224297,14.982165109034268,0.1129283489096573,24.74618380062305,0.6851,14.4004
2,stddev,0.0,3.4653289057781,8.805697903958512,1.9985413326656316,453.25061016512575,426.463336690314,547.7696403543224,466.3067525894143,,...,0.0187054803399896,,0.0542350861733307,41.85493154128528,21.683215301413732,32.77319990903296,3.0435253392568837,41.30050344087439,0.4644876504697092,4.261336385383045
3,min,2008.0,1.0,1.0,1.0,1.0,10.0,1.0,1.0,9E,...,0.0,A,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
4,max,2008.0,12.0,31.0,7.0,2400.0,2359.0,,2400.0,YV,...,1.0,N,1.0,,,,,,1.0,23.0


### Select target variable

In [11]:
target_variable="DepDelayFlag"

### Select categorical columns

In [12]:
categorical_columns=["Origin","Dest","Distance","Month","DayOfWeek","UniqueCarrier","DepHour","DepDelayFlag"]
#numeric_columns  = [x for x in df.dtypes if x not in [categorical_columns]]
categorical_columns=[x for x in categorical_columns if x not in [target_variable]]
print(categorical_columns)

['Origin', 'Dest', 'Distance', 'Month', 'DayOfWeek', 'UniqueCarrier', 'DepHour']


### Convert categorical columns to dummy columns using One hot encoding

In [13]:
categorical_columns_out=[x+"_t" for x in categorical_columns]
print(categorical_columns_out)

indexers = [StringIndexer(inputCol=x, outputCol=x+'_tmp') for x in categorical_columns]

encoders = [OneHotEncoder(dropLast=False, inputCol=x+"_tmp", outputCol=y) 
            for x,y in zip(categorical_columns, categorical_columns_out)]

stages = [[i,j] for i,j in zip(indexers, encoders)]
stages = [i for sublist in stages for i in sublist]

['Origin_t', 'Dest_t', 'Distance_t', 'Month_t', 'DayOfWeek_t', 'UniqueCarrier_t', 'DepHour_t']


### Update the list of final columns

In [14]:
updated_columns=categorical_columns_out 
print(updated_columns)

['Origin_t', 'Dest_t', 'Distance_t', 'Month_t', 'DayOfWeek_t', 'UniqueCarrier_t', 'DepHour_t']


### Group predictors as "features" and response as "label"

In [15]:
# Define Assembler & String Indexer stages for creating 'Features' and 'Labels'
assembler_features = VectorAssembler(inputCols=updated_columns, outputCol="features")
labelIndexer = StringIndexer(inputCol=target_variable, outputCol="label")
stages += [assembler_features, labelIndexer]

### Streamline the entire process using function called 'Pipeline'

In [16]:
pipeline = Pipeline(stages=stages)

### Transform the data using pipeline and cache to use in future

In [17]:
allData = pipeline.fit(df).transform(df)
allData.cache()

DataFrame[Year: int, Month: int, DayofMonth: int, DayOfWeek: int, DepTime: int, CRSDepTime: int, ArrTime: string, CRSArrTime: int, UniqueCarrier: string, FlightNum: int, TailNum: string, ActualElapsedTime: string, CRSElapsedTime: string, AirTime: string, ArrDelay: string, DepDelay: int, Origin: string, Dest: string, Distance: int, TaxiIn: string, TaxiOut: string, Cancelled: int, CancellationCode: string, Diverted: int, CarrierDelay: string, WeatherDelay: string, NASDelay: string, SecurityDelay: string, LateAircraftDelay: string, DepDelayFlag: int, DepHour: string, Origin_tmp: double, Origin_t: vector, Dest_tmp: double, Dest_t: vector, Distance_tmp: double, Distance_t: vector, Month_tmp: double, Month_t: vector, DayOfWeek_tmp: double, DayOfWeek_t: vector, UniqueCarrier_tmp: double, UniqueCarrier_t: vector, DepHour_tmp: double, DepHour_t: vector, features: vector, label: double]

## Model Development

### Converting features column as pandas dataframe

In [None]:
df=allData.toPandas()
series = df['features'].apply(lambda x : np.array(x.toArray())).as_matrix().reshape(-1,1)
features = np.apply_along_axis(lambda x : x[0], 1, series)
a = []
for num in range(len(features[0])):
    a.append("V"+str(num))
a = np.array(a)
columns_list=list(a)
data_feature=pd.DataFrame
data_feature=pd.DataFrame(index=columns_list)
for i in range(len(features)):
    #print(i)
    data = pd.DataFrame(features[i])
    data_feature=pd.concat([data_feature.reset_index(drop=True),data.reset_index(drop=True)],axis=1)

data_feature

data_feature_t=data_feature.transpose()
y = df['label'].values.reshape(-1,1)

### Split the dataset into train & test

In [None]:
x_train, x_test, y_train, y_test = train_test_split(data_feature_t,y,test_size=.25, random_state=42)

### Fit training data

In [None]:
params = {           'nthread':-1,
                      'seed' : 42,
                      'colsample_bytree': 0.8,
                      'subsample': 0.8, 
                      'learning_rate': 0.2,
                      'max_depth': 7
                       }

dtrain = xgb.DMatrix(x_train.as_matrix(),label=y_train.as_matrix())
dtest = xgb.DMatrix(x_test.as_matrix(),label=y_test.as_matrix())
cm = confusion_matrix(y_test, (y_pred>0.5))
predict_accuracy_on_test_set = (cm[0,0] + cm[1,1])/(cm[0,0] + cm[1,1]+cm[1,0] + cm[0,1])
t_end=time.time()

## Exporting Results

In [20]:
Overall_time=t_end-t_import
Data_Extraction=t_Data_Prep-t_import
Data_Preparation=t_model-t_Data_Prep
Model=t_end-t_model

result_df=pd.DataFrame({'Overall_time':[Overall_time],
                       'Data_Extraction_time':[Data_Extraction],
                       'Data_Prepartion':[Data_Preparation],
                       'Model_time':[Model],
                       'Accuracy':predict_accuracy_on_test_set})
result_df.to_excel('Results_XGB.xlsx')