# <center> GBT using SparklyR


## Importing required functions & libraries

In [16]:
library(sparklyr)
library(dplyr)
library(DBI) #To use sequel commands
library(stringr)

## Setup and configure spark connect

In [2]:
config=spark_config()
#config$sparklyr.cores.local <- 4
config$spark.driver.cores <- 1
config$spark.driver.memory <- "2G"
config$spark.executor.cores <- 3
config$spark.executor.memory <- "6G"
config$spark.dynamicAllocation.maxExecutors <- 200

In [3]:
sc=spark_connect(master = "local", config = config)

In [4]:
print(paste("Driver cores =",sc$config$spark.driver.cores))
print(paste("Executor cores =",sc$config$spark.executor.cores))

[1] "Driver cores = 1"
[1] "Executor cores = 3"


## Import Required Dataset

In [5]:
t_import=Sys.time()
df <- read.csv("Delay_500k.csv",header = T,sep = ",")
df_tbl <- copy_to(sc,df,"df_sc")
# The copy_to function copys the local data frame to a spark data table
#iris_preview <- dbGetQuery(sc, "SELECT * FROM iris LIMIT 10")
#spark_read_csv - To upload csv file in hdfs
#flights_tbl %>% filter(dep_delay == 2) - Can use dplyr on tables in cluster

In [6]:
df_tbl

# Source:   table<df_sc> [?? x 30]
# Database: spark_connection
        X  Year Month DayofMonth DayOfWeek DepTime CRSDepTime ArrTime CRSArrTime
    <int> <int> <int>      <int>     <int>   <int>      <int>   <int>      <int>
 1 5.96e6  2008    11          9         7    1811       1755    2005       1950
 2 2.23e5  2008     1         10         4    1333       1215    1554       1436
 3 3.41e5  2008     1         18         5    1249       1218    1548       1523
 4 4.98e6  2008     9          7         7    1400       1345    1613       1603
 5 2.38e6  2008     4         11         5    1021        943    1152       1115
 6 9.22e4  2008     1         31         4    1057       1050    1216       1210
 7 1.55e6  2008     3         27         4    1746       1730    1909       1901
 8 5.43e6  2008    10         18         6    1254       1245    1552       1605
 9 2.32e6  2008     4         21         1     930        912    1144       1119
10 6.75e6  2008    12         26         5   

In [7]:
colnames(df_tbl)

## Data Preparation / Feature Engineering

### Remove the unwanted columns and creating dependent column

In [8]:
t_Data_Prep=Sys.time()
#columns required to build model
selected_columns=c("Origin", "Dest", "Distance", "Month", "DayOfWeek", "UniqueCarrier", "Dep_Hour", "DepDelay_flag")

# Create flag base on delay in departure and filter the required columns
df_tbl <- df_tbl %>% 
  mutate(DepDelay_flag=ifelse(DepDelay >= 15, 1, 0),
         Dep_Hour=ifelse(nchar(CRSDepTime)==4,substr(CRSDepTime,1,2),ifelse(nchar(CRSDepTime)==3,paste("0",substr(CRSDepTime,1,1),sep=""),"00"))) %>%
  select(selected_columns)

### Change the data type of columns to use string indexer function

In [9]:
# Casting the data type of columns
df_tbl <- df_tbl %>% 
  mutate(Dep_hour_num=as.numeric(Dep_Hour),
         Distance_num=as.numeric(Distance),
         Origin_char=as.character(Origin),
         Dest_char=as.character(Dest),
         Month_char=as.character(Month),
         DayOfWeek_char=as.character(DayOfWeek),
         UniqueCarrier_char=as.character(UniqueCarrier)) %>% 
  select(-Distance,-Dep_Hour,-Origin,-Dest,-Month,-DayOfWeek,-UniqueCarrier)

### Create pipeline for string indexer of features & label

In [10]:
# Using pipeline to streamline the process for features
pipeline_transformation <- ml_pipeline(sc,uid="pipeline_transformation")
pipeline_transformation <- ft_string_indexer(pipeline_transformation,input_col="Origin_char",output_col="Origin_char_id")
pipeline_transformation <- ft_string_indexer(pipeline_transformation,input_col="Dest_char",output_col="Dest_char_id")
pipeline_transformation <- ft_string_indexer(pipeline_transformation,input_col="Month_char",output_col="Month_char_id")
pipeline_transformation <- ft_string_indexer(pipeline_transformation,input_col="DayOfWeek_char",output_col="DayOfWeek_char_id")
pipeline_transformation <- ft_string_indexer(pipeline_transformation,input_col="UniqueCarrier_char",
                                             output_col="UniqueCarrier_char_id")

features_vec <-  c("Origin_char_id","Dest_char_id","Month_char_id","DayOfWeek_char_id",
                   "UniqueCarrier_char_id","Distance_num","Dep_hour_num")
pipeline_transformation <- ft_vector_assembler(pipeline_transformation,input_col=features_vec,output_col="features")

#Using pipeline to streamline label columns
label_col <- "DepDelay_flag"
pipeline_transformation <- ft_string_indexer(pipeline_transformation,input_col="DepDelay_flag",output_col="label")

### Transform the data using pipeline and cache to use in future

In [11]:
# Fit & transform data using pipeline and cahce to use in subsequent steps
all_data <- ml_fit_and_transform(pipeline_transformation,df_tbl)

sdf_register(all_data,"all_data_sc")
tbl_cache(sc,"all_data_sc")

# Source:   table<all_data_sc> [?? x 15]
# Database: spark_connection
   DepDelay_flag Dep_hour_num Distance_num Origin_char Dest_char Month_char
           <dbl>        <dbl>        <dbl> <chr>       <chr>     <chr>     
 1             1           17         1056 FLL         STL       11        
 2             1           12          472 DEN         OMA       1         
 3             1           12          813 AUS         ATL       1         
 4             1           13          264 ORD         CVG       9         
 5             1            9          445 ATL         SRQ       4         
 6             0           10          491 MCI         BNA       1         
 7             1           17          370 BOS         BWI       3         
 8             0           12          862 LAX         DEN       10        
 9             1            9          679 SFO         SEA       4         
10             1           12          403 ATL         MCO       12        
# ... with more ro

## Model Development

### Split the dataset into train & test

In [12]:
t_model=Sys.time()
partition <- all_data %>% sdf_partition(train = 0.75, test = 0.25, seed = 8585)

train_tbl <- partition$train
test_tbl <- partition$test

### Fit training data into model

In [13]:
model_gbm <- ml_gbt_classifier(x = train_tbl,
                               formula=NULL,
                               type = "classification",
                               features_col = "features",
                               label_col = "label",
                               loss_type = "logistic",
                               max_bins = 300L,
                               max_depth = 7L,
                               max_iter = 60L,
                               step_size = 0.2,
                               subsampling_rate = 0.8,
                               seed = 123,
                               max_memory_in_mb = 256L,
                               feature_subset_strategy = "0.8")

### Predict using test data & check AUC value

In [14]:
predict_score <- ml_predict(model_gbm,test_tbl)
Auc <- ml_binary_classification_eval(predict_score,label_col="label",prediction_col="prediction",metric_name = "areaUnderROC")
print(paste("Test Area Under ROC:",Auc))
t_end=Sys.time()

[1] "Test Area Under ROC: 0.61460917165327"


## Exporting results

In [15]:
Overall_time <- t_end-t_import
Data_Extraction_time <- t_Data_Prep-t_import
Data_Preparation_time <- t_model-t_Data_Prep
Model_time <- t_end-t_model
units(Overall_time) <- "secs"
units(Data_Extraction_time) <- "secs"
units(Data_Preparation_time) <- "secs"
units(Model_time) <- "secs"

Accuracy <- Auc

result_df <- cbind(Overall_time,Data_Extraction_time,Data_Preparation_time,Model_time,Accuracy)

write.csv(result_df,"Results_500k_v3.csv")