#### In this notebook, we validate the R/Spark operations by comparing them against vanilla R operations on a smaller subset of the data

In [8]:
getwd()
rm(list = ls())

setupLibrary <- function(libraryName){
  if (!require(libraryName, character.only = TRUE)){
    install.packages(libraryName, dep = TRUE)
    if (!require(libraryName, character.only = TRUE)){
      print('Package not found')
    }
  } else {
    print('Package is loaded')
  }
}

setupLibrary('sparklyr')
setupLibrary('dplyr')
setupLibrary('ggplot2')

sc <- spark_connect(master = 'yarn', 
                    config = list('spark.driver.memory'='8G',
                                  'spark.executor.instances'=4,
                                  'spark.executor.cores'=8,
                                  'spark.executor.memory'='8G')
                    )

airline_tbl <- spark_read_csv(sc, name = 'airline_data',
                              path = '/repository/airlines/data/2000.csv',
                              delimiter = ',')

carrier_tbl <- spark_read_csv(sc, name = 'carrier_data',
                               path = '/repository/airlines/metadata/carriers.csv',
                               delimiter = ',')

# Filter records and create target variable 'gain'
speed_data <- airline_tbl %>%
  mutate(Dep_delay = as.numeric(DepDelay)) %>%
  mutate(Dep_delay = ifelse(Dep_delay < 0, 0, Dep_delay)) %>%
  mutate(Flight_Distance = as.numeric(Distance)) %>%
  mutate(Air_time = as.numeric(AirTime)) %>%
  filter(!is.na(Dep_delay) & !is.na(Flight_Distance) & !is.na(Air_time)) %>%
  filter(Dep_delay > 15 & Dep_delay < 240) %>%
  mutate(Avg_speed = Flight_Distance / Air_time) %>%
  select(Dep_delay, Avg_speed)

speed_model <- speed_data %>%
  ml_linear_regression(Dep_delay ~ Avg_speed, max_iter = 100)

summary(speed_model)

# Filter records and create target variable 'gain'
model_data <- airline_tbl %>%
  mutate(Arr_delay = as.numeric(ArrDelay)) %>%
  mutate(Dep_delay = as.numeric(DepDelay)) %>%
  mutate(Flight_Distance = as.numeric(Distance)) %>%
  filter(!is.na(Arr_delay) & !is.na(Dep_delay) & !is.na(Flight_Distance)) %>%
  filter(year != 2008) %>%
  filter(Dep_delay > 15 & Dep_delay < 240) %>%
  filter(Arr_delay > -60 & Arr_delay < 360) %>%
  left_join(carrier_tbl, by = c("UniqueCarrier" = "Code")) %>%
  mutate(Gain = Dep_delay - Arr_delay) %>%
  select(Year, Month, Arr_delay, Dep_delay, Flight_Distance, UniqueCarrier, Description, Gain)

# Fit a linear model
ml1 <- model_data %>%
  ml_linear_regression(Gain ~ Flight_Distance + Dep_delay + UniqueCarrier)

# Summarize the linear model
summary(ml1)

spark_disconnect(sc)

[1] "Package is loaded"
[1] "Package is loaded"
[1] "Package is loaded"


* No rows dropped by 'na.omit' call


Call: ml_linear_regression(., Dep_delay ~ Avg_speed, max_iter = 100)

Deviance Residuals: (approximate):
   Min     1Q Median     3Q    Max 
-42.88 -28.86 -14.62  14.49 188.30 

Coefficients:
             Estimate Std. Error t value  Pr(>|t|)    
(Intercept) 48.262704   0.224112 215.351 < 2.2e-16 ***
Avg_speed    0.687947   0.032449  21.201 < 2.2e-16 ***
---
Signif. codes:  0 ‘***’ 0.001 ‘**’ 0.01 ‘*’ 0.05 ‘.’ 0.1 ‘ ’ 1

R-Squared: 0.0004256
Root Mean Squared Error: 40.64


* No rows dropped by 'na.omit' call


Call: ml_linear_regression(., Gain ~ Flight_Distance + Dep_delay + UniqueCarrier)

Deviance Residuals: (approximate):
     Min       1Q   Median       3Q      Max 
-304.394   -5.291    2.790    9.585   73.520 

Coefficients:
                    Estimate  Std. Error  t value  Pr(>|t|)    
(Intercept)      -2.7315e+00  6.5526e-02 -41.6857 < 2.2e-16 ***
Flight_Distance   2.5345e-03  3.2594e-05  77.7584 < 2.2e-16 ***
Dep_delay        -1.2193e-02  4.3357e-04 -28.1222 < 2.2e-16 ***
UniqueCarrier_AQ  1.9953e+00  7.1489e-01   2.7910  0.005254 ** 
UniqueCarrier_AS  2.2168e+00  1.1133e-01  19.9120 < 2.2e-16 ***
UniqueCarrier_CO  1.1380e+00  9.1918e-02  12.3801 < 2.2e-16 ***
UniqueCarrier_DL  7.7404e-01  7.1103e-02  10.8863 < 2.2e-16 ***
UniqueCarrier_HP  1.6162e+00  9.4862e-02  17.0371 < 2.2e-16 ***
UniqueCarrier_NW  2.6628e+00  8.2218e-02  32.3866 < 2.2e-16 ***
UniqueCarrier_TW  7.1625e-01  9.9641e-02   7.1883 6.566e-13 ***
UniqueCarrier_UA -9.7159e-01  6.5288e-02 -14.8816 < 2.2e-16 ***
UniqueC

In [11]:
# get data from HDFS
system2('hdfs',args = c('dfs','-get','/repository/airlines/data/2000.csv'), stderr = TRUE, stdout = TRUE)
print('===================')

# load one data file from airline dataset
airline_tbl <- read.csv('2000.csv', stringsAsFactors=FALSE)

system2('hdfs',args = c('dfs','-get','/repository/airlines/metadata/carriers.csv'), stderr = TRUE, stdout = TRUE)
carrier_tbl <- read.csv('carriers.csv',stringsAsFactors=FALSE)


# Filter records and create target variable 'gain'
speed_data <- airline_tbl %>%
  filter(!is.na(DepDelay) & !is.na(Distance) & !is.na(AirTime)) %>%
  mutate(Dep_delay = ifelse(DepDelay < 0, 0, DepDelay)) %>%
  mutate(Flight_Distance = Distance) %>%
  mutate(Air_time = AirTime) %>%
  filter(Dep_delay > 15 & Dep_delay < 240) %>%
  mutate(Avg_speed = Flight_Distance / Air_time) %>%
  select(Dep_delay, Avg_speed)

speed_model <- lm(Dep_delay ~ Avg_speed, data = speed_data)

summary(speed_model)

# Filter records and create target variable 'gain'
model_data <- airline_tbl %>%
  filter(!is.na(DepDelay) & !is.na(Distance) & !is.na(AirTime)) %>%
  mutate(Arr_delay = ArrDelay) %>%
  mutate(Dep_delay = ifelse(DepDelay < 0, 0, DepDelay)) %>%
  mutate(Flight_Distance = Distance) %>%
  filter(Dep_delay > 15 & Dep_delay < 240) %>%
  filter(Arr_delay > -60 & Arr_delay < 360) %>%
  left_join(carrier_tbl, by = c("UniqueCarrier" = "Code")) %>%
  mutate(Gain = Dep_delay - Arr_delay) %>%
  select(Year, Month, Arr_delay, Dep_delay, Flight_Distance, UniqueCarrier, Description, Gain)

# Fit a linear model
ml1 <- lm(Gain ~ Flight_Distance + Dep_delay + UniqueCarrier, data = model_data)

# Summarize the linear model
summary(ml1)

“running command ''hdfs' dfs -get /repository/airlines/data/2000.csv 2>&1' had status 1”



“running command ''hdfs' dfs -get /repository/airlines/metadata/carriers.csv 2>&1' had status 1”


Call:
lm(formula = Dep_delay ~ Avg_speed, data = speed_data)

Residuals:
    Min      1Q  Median      3Q     Max 
-137.72  -28.81  -14.57   14.49  188.70 

Coefficients:
            Estimate Std. Error t value Pr(>|t|)    
(Intercept) 48.26270    0.22411   215.4   <2e-16 ***
Avg_speed    0.68795    0.03245    21.2   <2e-16 ***
---
Signif. codes:  0 ‘***’ 0.001 ‘**’ 0.01 ‘*’ 0.05 ‘.’ 0.1 ‘ ’ 1

Residual standard error: 40.64 on 1055701 degrees of freedom
Multiple R-squared:  0.0004256,	Adjusted R-squared:  0.0004246 
F-statistic: 449.5 on 1 and 1055701 DF,  p-value: < 2.2e-16



Call:
lm(formula = Gain ~ Flight_Distance + Dep_delay + UniqueCarrier, 
    data = model_data)

Residuals:
    Min      1Q  Median      3Q     Max 
-330.20   -5.32    2.70    9.54   82.50 

Coefficients:
                  Estimate Std. Error t value Pr(>|t|)    
(Intercept)     -2.732e+00  6.553e-02 -41.686  < 2e-16 ***
Flight_Distance  2.534e-03  3.259e-05  77.758  < 2e-16 ***
Dep_delay       -1.219e-02  4.336e-04 -28.122  < 2e-16 ***
UniqueCarrierAQ  1.995e+00  7.149e-01   2.791  0.00525 ** 
UniqueCarrierAS  2.217e+00  1.113e-01  19.912  < 2e-16 ***
UniqueCarrierCO  1.138e+00  9.192e-02  12.380  < 2e-16 ***
UniqueCarrierDL  7.740e-01  7.110e-02  10.886  < 2e-16 ***
UniqueCarrierHP  1.616e+00  9.486e-02  17.037  < 2e-16 ***
UniqueCarrierNW  2.663e+00  8.222e-02  32.387  < 2e-16 ***
UniqueCarrierTW  7.162e-01  9.964e-02   7.188 6.56e-13 ***
UniqueCarrierUA -9.716e-01  6.529e-02 -14.882  < 2e-16 ***
UniqueCarrierUS -6.463e-01  7.220e-02  -8.953  < 2e-16 ***
UniqueCarrierWN  3.306e+00  