In [2]:
import findspark
import math
import numpy as np
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import isnan, when, count, col


findspark.init('/Users/AzeezA/opt/spark-3.2.0/')

conf = SparkConf().setMaster("local").setAppName("planes")
sc = SparkContext.getOrCreate(conf=conf)

spark = SparkSession.builder.getOrCreate()

In [3]:
planes = sc.textFile("/Users/AzeezA/Google\ Drive/Life/2021/UPM/Big\ Data/Planes/1988.csv.bz2")
# planes = sc.textFile("hdfs://localhost:9000/temp/1988.csv.bz2")  //if file exists in HDFS use this line


print()
ten_lines = planes.take(10)
for l in ten_lines:
    print(l)


Year,Month,DayofMonth,DayOfWeek,DepTime,CRSDepTime,ArrTime,CRSArrTime,UniqueCarrier,FlightNum,TailNum,ActualElapsedTime,CRSElapsedTime,AirTime,ArrDelay,DepDelay,Origin,Dest,Distance,TaxiIn,TaxiOut,Cancelled,CancellationCode,Diverted,CarrierDelay,WeatherDelay,NASDelay,SecurityDelay,LateAircraftDelay
1988,1,9,6,1348,1331,1458,1435,PI,942,NA,70,64,NA,23,17,SYR,BWI,273,NA,NA,0,NA,0,NA,NA,NA,NA,NA
1988,1,10,7,1334,1331,1443,1435,PI,942,NA,69,64,NA,8,3,SYR,BWI,273,NA,NA,0,NA,0,NA,NA,NA,NA,NA
1988,1,11,1,1446,1331,1553,1435,PI,942,NA,67,64,NA,78,75,SYR,BWI,273,NA,NA,0,NA,0,NA,NA,NA,NA,NA
1988,1,12,2,1334,1331,1438,1435,PI,942,NA,64,64,NA,3,3,SYR,BWI,273,NA,NA,0,NA,0,NA,NA,NA,NA,NA
1988,1,13,3,1341,1331,1503,1435,PI,942,NA,82,64,NA,28,10,SYR,BWI,273,NA,NA,0,NA,0,NA,NA,NA,NA,NA
1988,1,14,4,1332,1331,1447,1435,PI,942,NA,75,64,NA,12,1,SYR,BWI,273,NA,NA,0,NA,0,NA,NA,NA,NA,NA
1988,1,15,5,1331,1331,1434,1435,PI,942,NA,63,64,NA,-1,0,SYR,BWI,273,NA,NA,0,NA,0,NA,NA,NA,NA,NA
1988,1,16,6,1327,1331,1427,

In [4]:
column_names = planes.take(1)[0].split(',')

header = planes.first()
planes_no_header = planes.filter(lambda line: line != header)

split_planes = planes_no_header.map(lambda x : x.split(','))

full_planes_df = spark.createDataFrame(split_planes, schema = column_names)
print((full_planes_df.count(), len(full_planes_df.columns)))

(5202096, 29)


In [5]:
Azeez# convert time format from HoursMinutes to Minutes ex. 1348 -> 828
time_to_min_cols = ['DepTime', 'CRSDepTime', 'ArrTime', 'CRSArrTime']
for c in time_to_min_cols:
    full_planes_df = full_planes_df.withColumn(c, 
                                   ((full_planes_df[c]/100).cast('int'))*60 + (full_planes_df[c]%100))

In [6]:
print(full_planes_df.dtypes)

[('Year', 'string'), ('Month', 'string'), ('DayofMonth', 'string'), ('DayOfWeek', 'string'), ('DepTime', 'double'), ('CRSDepTime', 'double'), ('ArrTime', 'double'), ('CRSArrTime', 'double'), ('UniqueCarrier', 'string'), ('FlightNum', 'string'), ('TailNum', 'string'), ('ActualElapsedTime', 'string'), ('CRSElapsedTime', 'string'), ('AirTime', 'string'), ('ArrDelay', 'string'), ('DepDelay', 'string'), ('Origin', 'string'), ('Dest', 'string'), ('Distance', 'string'), ('TaxiIn', 'string'), ('TaxiOut', 'string'), ('Cancelled', 'string'), ('CancellationCode', 'string'), ('Diverted', 'string'), ('CarrierDelay', 'string'), ('WeatherDelay', 'string'), ('NASDelay', 'string'), ('SecurityDelay', 'string'), ('LateAircraftDelay', 'string')]


In [7]:
%%time
print(f'Number of original columns: {len(full_planes_df.columns)}')
print(full_planes_df.columns)

excluded_cols = ['ArrTime', 'ActualElapsedTime', 'AirTime', 'TaxiIn', 'Diverted',
                   'CarrierDelay', 'WeatherDelay', 'NASDelay', 'SecurityDelay', 'LateAircraftDelay']
int_cols = ['Year', 'DayofMonth', 'DepTime', 'CRSDepTime', 'CRSArrTime', 'FlightNum', 
            'TailNum', 'CRSElapsedTime', 'DepDelay', 'Distance', 'TaxiOut', 'Cancelled', 'CancellationCode', 'ArrDelay']
cat_cols = ['UniqueCarrier', 'Orig', 'Dest']


target_col = 'ArrDelay'

planes_df = full_planes_df.drop(*excluded_cols)

for i in int_cols:
    planes_df = planes_df.withColumn(i , planes_df[i].cast(IntegerType()))

print()
print()

print(f'Now only {len(planes_df.columns)} columns remain')
print(planes_df.columns)

full_planes_df.createOrReplaceTempView("FULL_DF")
planes_df.createOrReplaceTempView("FLIGHT_LIST")

Number of original columns: 29
['Year', 'Month', 'DayofMonth', 'DayOfWeek', 'DepTime', 'CRSDepTime', 'ArrTime', 'CRSArrTime', 'UniqueCarrier', 'FlightNum', 'TailNum', 'ActualElapsedTime', 'CRSElapsedTime', 'AirTime', 'ArrDelay', 'DepDelay', 'Origin', 'Dest', 'Distance', 'TaxiIn', 'TaxiOut', 'Cancelled', 'CancellationCode', 'Diverted', 'CarrierDelay', 'WeatherDelay', 'NASDelay', 'SecurityDelay', 'LateAircraftDelay']


Now only 19 columns remain
['Year', 'Month', 'DayofMonth', 'DayOfWeek', 'DepTime', 'CRSDepTime', 'CRSArrTime', 'UniqueCarrier', 'FlightNum', 'TailNum', 'CRSElapsedTime', 'ArrDelay', 'DepDelay', 'Origin', 'Dest', 'Distance', 'TaxiOut', 'Cancelled', 'CancellationCode']
CPU times: user 17.9 ms, sys: 5.45 ms, total: 23.3 ms
Wall time: 405 ms


### Circular Transformation (Angular Distance)
The following cells transforms specific columns so that the circular nature of their value is preserved. For exmaple, when days of the week are encoded as 1-7 where 1=Monday and 7=Sunday, the difference between Monday and Sunday is falsly inflated. Baded on this encoding, we woudl say Monday(1) and Thursday (4) are close togehter, when this is not actually true. To account for this, we calculate the sin and cosine of these columns to create a 'circular' encoding. 
This trnasofrmation is applied to the Months column, DayOfWeek column, as well as the columns that represent a specific time of the day (DepTime, CRSDepTime, CRSArrTime). 

In [19]:
from pyspark.sql.functions import sin, cos
from pyspark.sql.types import IntegerType

# Transform Month and DayOfWeek Column to circular transformation so that Mon-Sun (1,7) is closer than Mon-Thurs (1,4)
date_cols = ['Month', 'DayOfWeek']
for dc in date_cols:
    planes_df = planes_df.withColumn(dc, planes_df[dc].cast(IntegerType()))
    if dc == 'Month':
        planes_df = planes_df.withColumn(dc + '_sin', sin((2*np.pi/12)*planes_df[dc]))
        planes_df = planes_df.withColumn(dc + '_cos', cos((2*np.pi/12)*planes_df[dc]))
    elif dc == 'DayOfWeek':
        planes_df = planes_df.withColumn(dc + '_sin', sin((2*np.pi/7)*planes_df[dc]))
        planes_df = planes_df.withColumn(dc + '_cos', cos((2*np.pi/7)*planes_df[dc]))
        
time_cols = ['DepTime', 'CRSDepTime', 'CRSArrTime']
mins_in_day = 24*60
for tc in time_cols:
    planes_df = planes_df.withColumn(tc, planes_df[tc].cast(IntegerType()))
    planes_df = planes_df.withColumn(tc + '_sin', sin((2*np.pi/mins_in_day)*planes_df[tc]))
    planes_df = planes_df.withColumn(tc + '_cos', cos((2*np.pi/mins_in_day)*planes_df[tc]))
        
# planes_df.select(date_cols).show(n=10)
# planes_df.drop(*date_cols)
planes_df.select(['Month', 'DayOfWeek','Month_sin', 'Month_cos', 'DayOfWeek_sin', 'DayOfWeek_cos']).show(n=10)
planes_df.show(n=1, vertical = True)

+-----+---------+-------------------+------------------+--------------------+--------------------+
|Month|DayOfWeek|          Month_sin|         Month_cos|       DayOfWeek_sin|       DayOfWeek_cos|
+-----+---------+-------------------+------------------+--------------------+--------------------+
|    1|        6|0.49999999999999994|0.8660254037844387| -0.7818314824680299|  0.6234898018587334|
|    1|        7|0.49999999999999994|0.8660254037844387|-2.44929359829470...|                 1.0|
|    1|        1|0.49999999999999994|0.8660254037844387|  0.7818314824680298|  0.6234898018587336|
|    1|        2|0.49999999999999994|0.8660254037844387|  0.9749279121818236|-0.22252093395631434|
|    1|        3|0.49999999999999994|0.8660254037844387| 0.43388373911755823|  -0.900968867902419|
|    1|        4|0.49999999999999994|0.8660254037844387|  -0.433883739117558| -0.9009688679024191|
|    1|        5|0.49999999999999994|0.8660254037844387| -0.9749279121818236| -0.2225209339563146|
|    1|   

### Linear Regression Model

In [29]:
%%time
from pyspark.ml.feature import StandardScaler
from pyspark.ml.feature import VectorAssembler

#no circular standardization of Month, Day, and time columns
# training_cols = ['Month', 'DayOfWeek', 'DayofMonth', 'DepTime', 'CRSDepTime', 'CRSArrTime', 'CRSElapsedTime',
#                 'DepDelay', 'Distance']
training_cols = ['Month_sin', 'Month_cos', 'DayOfWeek_sin', 'DayOfWeek_cos', 'DayofMonth', 'DepTime_sin', 'DepTime_cos', 'CRSDepTime_sin', 'CRSDepTime_cos', 'CRSArrTime_sin', 'CRSArrTime_cos', 'CRSElapsedTime',
                'DepDelay', 'Distance']

target_col = 'ArrDelay'


features = planes_df.select(training_cols + [target_col] )
outputCol = 'features'

df_va = VectorAssembler(inputCols = training_cols, outputCol = outputCol, handleInvalid="skip")
features = df_va.transform(features)


features = features.na.drop(subset = training_cols + [target_col, outputCol])

model_data = features.select([outputCol, target_col])

# # # Use scaler of choice; here Standard scaler is used
scalar = StandardScaler(inputCol="features", outputCol="scaled_vectored_features")
scaled_features = scalar.fit(model_data.select('features'))
scaled_model_data = scaled_features.transform(model_data)


model_data.limit(10).show()


+--------------------+--------+
|            features|ArrDelay|
+--------------------+--------+
|[0.49999999999999...|      23|
|[0.49999999999999...|       8|
|[0.49999999999999...|      78|
|[0.49999999999999...|       3|
|[0.49999999999999...|      28|
|[0.49999999999999...|      12|
|[0.49999999999999...|      -1|
|[0.49999999999999...|      -8|
|[0.49999999999999...|       5|
|[0.49999999999999...|      44|
+--------------------+--------+

CPU times: user 45.3 ms, sys: 16.7 ms, total: 62 ms
Wall time: 1min 46s


In [32]:
scaled_model_data = scaled_model_data.withColumn('features', scaled_model_data['scaled_vectored_features'])
scaled_model_data = scaled_model_data.drop('scaled_vectored_features')
scaled_model_data.limit(10).show()

+--------------------+--------+
|            features|ArrDelay|
+--------------------+--------+
|[0.70690757706162...|      23|
|[0.70690757706162...|       8|
|[0.70690757706162...|      78|
|[0.70690757706162...|       3|
|[0.70690757706162...|      28|
|[0.70690757706162...|      12|
|[0.70690757706162...|      -1|
|[0.70690757706162...|      -8|
|[0.70690757706162...|       5|
|[0.70690757706162...|      44|
+--------------------+--------+



In [26]:
print(scaled_model_data.take(1))
print(type(model_data))
print(type(scaled_features))

[Row(features=DenseVector([0.5, 0.866, -0.7818, 0.6235, 9.0, -0.454, -0.891, -0.3867, -0.9222, -0.6259, -0.7799, 64.0, 17.0, 273.0]), ArrDelay=23, scaled_vectored_features=DenseVector([0.7069, 1.2252, -1.1017, 0.8852, 1.0225, -0.6379, -1.5505, -0.5398, -1.6388, -1.01, -1.1644, 1.0381, 0.8206, 0.5446]))]
<class 'pyspark.sql.dataframe.DataFrame'>
<class 'pyspark.ml.feature.StandardScalerModel'>


#### Regression with features not scaled

In [128]:
%%time
from pyspark.ml.regression import GeneralizedLinearRegression

(train_data, test_data) = model_data.randomSplit([0.8, 0.2])

glr=GeneralizedLinearRegression(labelCol=target_col,family="gaussian",maxIter=10,regParam=0.3)
model = glr.fit(train_data)

print("Coefficients: ", model.coefficients)
print("Intercept: ", model.intercept)

print(model.summary)

Coefficients:  [0.5738143759813829,1.0049912250367061,0.2812057295831131,-1.0372128999973347,-0.010034714061350222,0.006149261310895182,-0.006608570549335716,0.0010592355870998505,-0.07909467272539676,0.8974097655567339,0.008462300271836594]
Intercept:  3.318032071420901
Coefficients:
       Feature Estimate Std Error   T Value P Value
   (Intercept)   3.3180    0.0300  110.6891  0.0000
     Month_sin   0.5738    0.0094   61.1696  0.0000
     Month_cos   1.0050    0.0094  107.0469  0.0000
 DayOfWeek_sin   0.2812    0.0093   30.0978  0.0000
 DayOfWeek_cos  -1.0372    0.0094 -110.1567  0.0000
    DayofMonth  -0.0100    0.0008  -13.3223  0.0000
       DepTime   0.0061    0.0001   61.7413  0.0000
    CRSDepTime  -0.0066    0.0001  -66.2558  0.0000
    CRSArrTime   0.0011    0.0000   26.6471  0.0000
CRSElapsedTime  -0.0791    0.0005 -165.8729  0.0000
      DepDelay   0.8974    0.0003 2748.3446  0.0000
      Distance   0.0085    0.0001  144.4421  0.0000

(Dispersion parameter for gaussian fa

#### Site with really good examples https://towardsdatascience.com/building-a-linear-regression-with-pyspark-and-mllib-d065c3ba246a

In [17]:
from pyspark.ml.regression import LinearRegression

(train_data, test_data) = model_data.randomSplit([0.8, 0.2])

lr = LinearRegression(featuresCol = 'features', labelCol=target_col, maxIter=10, regParam=0.3, elasticNetParam=0.8)
lr_model = lr.fit(train_data)
print("Coefficients: " + str(lr_model.coefficients))
print("Intercept: " + str(lr_model.intercept))

trainingSummary = lr_model.summary
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

Coefficients: [0.2492365894055577,0.6650634453347885,0.0,-0.7006800013585207,0.0,0.0,0.0,0.0,0.0,-0.0838721302436192,0.0,-0.008883669460361569,0.9064825195671519,0.0]
Intercept: 1.4865691775513472
RMSE: 13.445672
r2: 0.666233


#### Regression with Scaled Features

In [34]:
%%time
from pyspark.ml.regression import LinearRegression

(train_data, test_data) = scaled_model_data.randomSplit([0.8, 0.2])

lr = LinearRegression(featuresCol = 'features', labelCol=target_col, maxIter=10, regParam=0.3, elasticNetParam=0.8)
lr_model = lr.fit(train_data)
print("Coefficients: " + str(lr_model.coefficients))
print("Intercept: " + str(lr_model.intercept))

trainingSummary = lr_model.summary
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

Coefficients: [0.19159575926939018,0.4771867766725499,0.0,-0.4970160380872573,0.0,-0.0025395739719759466,0.0,0.0,0.0,-0.041840849953451206,0.0,-0.5196905110401441,18.609427045352216,0.0]
Intercept: 1.4909126321537371
RMSE: 13.600124
r2: 0.660929
CPU times: user 38.1 ms, sys: 16.2 ms, total: 54.3 ms
Wall time: 3min 50s
