In [1]:
from utils.dataset import DatasetStack
from utils.compute import regular_feature, RegressionModel

In [2]:
datasets = DatasetStack(airport_info_path='../datasets/AIRPORTS_INFO.csv')
datasets.load('../datasets/app-dataset/full/train.csv', 'train')
datasets.load('../datasets/app-dataset/full/test.csv', 'test')

Initializing Spark...
Spark stand by (Master=local, AppName=e106e22b-b0fc-4e13-9d00-40974f18cef6)
Load datasets from ../datasets/app-dataset/full/train.csv
                     0
Spark Master       Sub
Spark APP Name     Sub
Data Path             
Data Count      121513
Load datasets from ../datasets/app-dataset/full/test.csv
                    0
Spark Master      Sub
Spark APP Name    Sub
Data Path            
Data Count      14861


In [3]:
trainset = regular_feature(datasets['train'].dataframe(), True)
testset = regular_feature(datasets['test'].dataframe(), True)

In [9]:
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.tuning import ParamGridBuilder

In [12]:
model = RegressionModel(DecisionTreeRegressor, trainset, testset)

In [13]:
lr = model.model
grid = ParamGridBuilder().addGrid(lr.maxDepth, [3, 5]).build()
grid
result = model.kfolds(grid, k=2)

In [14]:
result

[21.356046550443843, 19.671678483675933]

In [2]:
train_set = AirDelayDataset('../datasets/app-dataset/full/train.csv',
                            airport_data_path='../datasets/AIRPORTS_INFO.csv')

Initializing Spark...
Spark stand by (Master=local, AppName=28b5434f-007c-47ff-97ec-278c21a5d341)
Load datasets from ../datasets/app-dataset/full/train.csv
                                                     0
Spark Master                                     local
Spark APP Name    28b5434f-007c-47ff-97ec-278c21a5d341
Data Path       ../datasets/app-dataset/full/train.csv
Data Count                                      121513


In [3]:
train_set

                                                     0
Spark Master                                     local
Spark APP Name    28b5434f-007c-47ff-97ec-278c21a5d341
Data Path       ../datasets/app-dataset/full/train.csv
Data Count                                      121513

In [13]:
data = train_set.attach_airport_info('both')
dataframe = data.dataframe().select(
    'FL_DATE', 'ORIGIN_LAT', 'ORIGIN_LON', 'DEST_LAT', 'DEST_LON',
    'DEP_TIME', 'DEP_DELAY', 'OP_CARRIER', 
    'DISTANCE', 'CARRIER_DELAY', 'WEATHER_DELAY', 'NAS_DELAY',
    'SECURITY_DELAY', 'LATE_AIRCRAFT_DELAY', 'ARR_DELAY'
)

In [14]:
dataframe = dataframe.fillna(0)

In [20]:
from pyspark.sql.functions import corr

In [32]:
dataframe.columns[0]

'FL_DATE'

In [35]:
cors = {}
for item in dataframe.columns:
    print(item)
    if item == 'ARR_DELAY' or item == 'FL_DATE' or item == 'OP_CARRIER':
        continue
    cor = dataframe.select(corr('ARR_DELAY', item)).toPandas().iloc[0].values[0]
    cors[item] = cor

FL_DATE
ORIGIN_LAT
ORIGIN_LON
DEST_LAT
DEST_LON
DEP_TIME
DEP_DELAY
OP_CARRIER
DISTANCE
CARRIER_DELAY
WEATHER_DELAY
NAS_DELAY
SECURITY_DELAY
LATE_AIRCRAFT_DELAY
ARR_DELAY


In [36]:
cors

{'ORIGIN_LAT': -0.0021211824507489257,
 'ORIGIN_LON': 0.01695532941141543,
 'DEST_LAT': -0.0033262318774313266,
 'DEST_LON': 0.022658593476044278,
 'DEP_TIME': 0.15723548225103148,
 'DEP_DELAY': 0.9398714422412073,
 'DISTANCE': -0.02600098737563631,
 'CARRIER_DELAY': 0.6069754707513229,
 'WEATHER_DELAY': 0.3037087221972517,
 'NAS_DELAY': 0.43254351458637247,
 'SECURITY_DELAY': 0.026983116998700082,
 'LATE_AIRCRAFT_DELAY': 0.6150973659113383}

In [15]:
dataframe.show()

+-------------------+------------------+-------------------+-----------------+-------------------+--------+---------+----------+--------+-------------+-------------+---------+--------------+-------------------+---------+
|            FL_DATE|        ORIGIN_LAT|         ORIGIN_LON|         DEST_LAT|           DEST_LON|DEP_TIME|DEP_DELAY|OP_CARRIER|DISTANCE|CARRIER_DELAY|WEATHER_DELAY|NAS_DELAY|SECURITY_DELAY|LATE_AIRCRAFT_DELAY|ARR_DELAY|
+-------------------+------------------+-------------------+-----------------+-------------------+--------+---------+----------+--------+-------------+-------------+---------+--------------+-------------------+---------+
|2009-01-01 00:00:00|29.984399795532227| -95.34140014648438|        32.115004|        -110.938053|  1750.0|      0.0|        XE|   936.0|          0.0|          0.0|      0.0|           0.0|                0.0|    -16.0|
|2009-01-01 00:00:00|           41.9786|           -87.9048|        33.938801|         -81.119499|  1421.0|     -1.0

In [70]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.feature import VectorIndexer, VectorAssembler, StringIndexer, StandardScaler
from pyspark.ml import Pipeline
from pyspark.sql.functions import month, dayofmonth

In [54]:

date = dataframe.select('FL_DATE',
                 month(dataframe['FL_DATE']).alias('FL_DATE_MONTH'),
                 dayofmonth(dataframe['FL_DATE']).alias('FL_DATE_DAY'))
dataframe_date = dataframe.join(date, on='FL_DATE', how='left_outer')

In [65]:
evaluator_rmse = RegressionEvaluator(predictionCol='prediction', labelCol='ARR_DELAY', metricName='rmse')
evaluator_r2 = RegressionEvaluator(predictionCol='prediction', labelCol='ARR_DELAY', metricName='r2')

In [55]:
dataframe_date.show()

+-------------------+------------------+------------------+---------+-----------+--------+---------+----------+--------+-------------+-------------+---------+--------------+-------------------+---------+-------------+-----------+
|            FL_DATE|        ORIGIN_LAT|        ORIGIN_LON| DEST_LAT|   DEST_LON|DEP_TIME|DEP_DELAY|OP_CARRIER|DISTANCE|CARRIER_DELAY|WEATHER_DELAY|NAS_DELAY|SECURITY_DELAY|LATE_AIRCRAFT_DELAY|ARR_DELAY|FL_DATE_MONTH|FL_DATE_DAY|
+-------------------+------------------+------------------+---------+-----------+--------+---------+----------+--------+-------------+-------------+---------+--------------+-------------------+---------+-------------+-----------+
|2009-01-01 00:00:00|29.984399795532227|-95.34140014648438|32.115004|-110.938053|  1750.0|      0.0|        XE|   936.0|          0.0|          0.0|      0.0|           0.0|                0.0|    -16.0|            1|          1|
|2009-01-01 00:00:00|29.984399795532227|-95.34140014648438|32.115004|-110.938053

In [103]:
from utils.compute import regular_feature

In [104]:
df = regular_feature(dataframe)

In [105]:
df.show()

+-------------------+----------+-----------+---------+-----------+--------+---------+----------+--------+-------------+-------------+---------+--------------+-------------------+---------+-------------+-----------+----------------+--------------------+--------------------+
|            FL_DATE|ORIGIN_LAT| ORIGIN_LON| DEST_LAT|   DEST_LON|DEP_TIME|DEP_DELAY|OP_CARRIER|DISTANCE|CARRIER_DELAY|WEATHER_DELAY|NAS_DELAY|SECURITY_DELAY|LATE_AIRCRAFT_DELAY|ARR_DELAY|FL_DATE_MONTH|FL_DATE_DAY|OP_CARRIER_INDEX|           features_|            features|
+-------------------+----------+-----------+---------+-----------+--------+---------+----------+--------+-------------+-------------+---------+--------------+-------------------+---------+-------------+-----------+----------------+--------------------+--------------------+
|2009-03-28 00:00:00| 33.942501|-118.407997|47.449162|-122.311134|  1828.0|     -2.0|        AS|   954.0|          0.0|          0.0|      0.0|           0.0|                0.0|

In [108]:
model = DecisionTreeRegressor(featuresCol='features', labelCol='ARR_DELAY', maxDepth=5)

In [109]:
model = model.fit(df)

DecisionTreeRegressionModel: uid=DecisionTreeRegressor_0ff1069ea206, depth=5, numNodes=63, numFeatures=15

In [None]:
evaluator_rmse.evaluate(pred)

11.70849776457867

In [None]:
evaluator_r2.evaluate(pred)

0.9125372771683421

In [None]:
RegressionEvaluator.evaluate()

In [None]:
def feature_filter(dataset):
    