In [1]:
# Note that this code must be run from python3 or ipython3 in a session's 
# terminal, not run directly in the graphical console. See
# https://github.com/dask/dask/issues/4612

import cdsw_dask_utils
import cdsw
import pandas as pd

In [2]:
#read flights dataset
input_file = "data/1988.csv.bz2"
columns = ['Month','DayofMonth','DayOfWeek','CRSDepTime','CRSArrTime','UniqueCarrier','FlightNum',
           'Origin','Dest','Cancelled']
flight_df = pd.read_csv(input_file,sep=',',header=0, na_values='NA', usecols=columns )#\
flight_df.head()

Unnamed: 0,Month,DayofMonth,DayOfWeek,CRSDepTime,CRSArrTime,UniqueCarrier,FlightNum,Origin,Dest,Cancelled
0,1,9,6,1331,1435,PI,942,SYR,BWI,0
1,1,10,7,1331,1435,PI,942,SYR,BWI,0
2,1,11,1,1331,1435,PI,942,SYR,BWI,0
3,1,12,2,1331,1435,PI,942,SYR,BWI,0
4,1,13,3,1331,1435,PI,942,SYR,BWI,0


In [3]:
# feature engineering - Create routes
flight_df['route'] = flight_df['Origin'] + "_" + flight_df['Dest']
# show top 20 routes 
flight_df['route'].value_counts().head(20)

LAX_SFO    20750
SFO_LAX    20658
LAX_PHX    13461
PHX_LAX    13273
LAX_LAS    12175
LGA_BOS    12027
LAS_LAX    11801
SJC_LAX    11535
LAX_SJC    11292
BOS_LGA    11141
SAN_LAX    10748
ORD_EWR    10697
LGA_DCA    10500
DCA_LGA    10454
EWR_ORD    10334
MSP_ORD    10152
HOU_DAL    10143
LGA_ORD    10057
LAX_SAN    10028
ORD_LGA     9891
Name: route, dtype: int64

In [4]:
#look at their cancelations
flight_df[['route', 'Cancelled']].groupby('route')\
        .agg({'route':'size', 'Cancelled':'sum'}) \
        .rename(columns={'route':'count','Cancelled':'nb_cancelled'}) \
        .reset_index()\
        .sort_values(['count'],ascending=False)\
        .head(20)

Unnamed: 0,route,count,nb_cancelled
1920,LAX_SFO,20750,228
3375,SFO_LAX,20658,206
1911,LAX_PHX,13461,78
2870,PHX_LAX,13273,71
1894,LAX_LAS,12175,58
1956,LGA_BOS,12027,287
1840,LAS_LAX,11801,47
3432,SJC_LAX,11535,71
1921,LAX_SJC,11292,71
366,BOS_LGA,11141,243


In [5]:
#focus on 100 biggest routes 
route_lst=flight_df['route'].value_counts().head(50)
flight_df = flight_df[flight_df['route'].isin(route_lst.index)]
flight_df.head()

Unnamed: 0,Month,DayofMonth,DayOfWeek,CRSDepTime,CRSArrTime,UniqueCarrier,FlightNum,Origin,Dest,Cancelled,route
786,1,1,5,955,1035,PS,1400,LAX,SAN,0,LAX_SAN
787,1,2,6,955,1035,PS,1400,LAX,SAN,0,LAX_SAN
788,1,4,1,955,1035,PS,1400,LAX,SAN,0,LAX_SAN
789,1,5,2,955,1035,PS,1400,LAX,SAN,0,LAX_SAN
790,1,6,3,955,1035,PS,1400,LAX,SAN,1,LAX_SAN


In [6]:
# count number of null values()
nulls = flight_df.isnull().sum()
nulls

Month            0
DayofMonth       0
DayOfWeek        0
CRSDepTime       0
CRSArrTime       0
UniqueCarrier    0
FlightNum        0
Origin           0
Dest             0
Cancelled        0
route            0
dtype: int64

In [7]:
# ### encode labels 
flight_df.Cancelled = pd.Categorical(flight_df.Cancelled)
flight_df['Label'] = flight_df.Cancelled.cat.codes
flight_df.drop(['Cancelled'], axis=1, inplace=True)
flight_df.head()


Unnamed: 0,Month,DayofMonth,DayOfWeek,CRSDepTime,CRSArrTime,UniqueCarrier,FlightNum,Origin,Dest,route,Label
786,1,1,5,955,1035,PS,1400,LAX,SAN,LAX_SAN,0
787,1,2,6,955,1035,PS,1400,LAX,SAN,LAX_SAN,0
788,1,4,1,955,1035,PS,1400,LAX,SAN,LAX_SAN,0
789,1,5,2,955,1035,PS,1400,LAX,SAN,LAX_SAN,0
790,1,6,3,955,1035,PS,1400,LAX,SAN,LAX_SAN,1


In [8]:
import numpy as np
airport_list = np.unique(np.concatenate((np.sort(flight_df['Origin'].unique()),
                                         np.sort(flight_df['Dest'].unique())))
                        )
airport_list

array(['ATL', 'BOS', 'DAL', 'DCA', 'DEN', 'DFW', 'DTW', 'EWR', 'HOU',
       'IAH', 'LAS', 'LAX', 'LGA', 'MCO', 'MIA', 'MSP', 'ORD', 'PDX',
       'PHX', 'SAN', 'SAT', 'SEA', 'SFO', 'SJC', 'STL'], dtype=object)

In [9]:
from sklearn.preprocessing import LabelEncoder
# Categorical columns ['UniqueCarrier','Origin','Dest', 'route']

# encode airlines 
le_carrier = LabelEncoder()
flight_df['Carrier_encoded'] = le_carrier.fit_transform(flight_df['UniqueCarrier'])

# Encode airports : Using same encoder for both origin and dest ( consistent encoding of airports )
le_airport = LabelEncoder()
le_airport.fit(airport_list)
flight_df['Origin_encoded'] = le_airport.transform(flight_df['Origin'])
flight_df['Dest_encoded'] = le_airport.transform(flight_df['Dest'])

# Encode routes 
le_route = LabelEncoder()
flight_df['route_encoded'] = le_route.fit_transform(flight_df['route'])


# Viewing few rows of make and its encoded columns
flight_df[['UniqueCarrier','Carrier_encoded','Origin','Origin_encoded',
           'Dest', 'Dest_encoded', 'route', 'route_encoded' ]].sample(20)


Unnamed: 0,UniqueCarrier,Carrier_encoded,Origin,Origin_encoded,Dest,Dest_encoded,route,route_encoded
1426235,HP,5,PHX,18,LAX,11,PHX_LAX,36
2250435,WN,13,SAN,19,PHX,18,SAN_PHX,39
4507648,PI,8,MCO,13,MIA,14,MCO_MIA,25
4423454,US,12,LAS,10,LAX,11,LAS_LAX,14
45708,UA,11,SEA,21,LAX,11,SEA_LAX,42
2823202,CO,2,ORD,16,EWR,7,ORD_EWR,30
3117843,WN,13,HOU,8,DAL,2,HOU_DAL,12
4428033,WN,13,HOU,8,DAL,2,HOU_DAL,12
1073082,CO,2,ORD,16,EWR,7,ORD_EWR,30
1155985,DL,3,SEA,21,LAX,11,SEA_LAX,42


In [10]:
flight_df.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 487253 entries, 786 to 5201496
Data columns (total 15 columns):
Month              487253 non-null int64
DayofMonth         487253 non-null int64
DayOfWeek          487253 non-null int64
CRSDepTime         487253 non-null int64
CRSArrTime         487253 non-null int64
UniqueCarrier      487253 non-null object
FlightNum          487253 non-null int64
Origin             487253 non-null object
Dest               487253 non-null object
route              487253 non-null object
Label              487253 non-null int8
Carrier_encoded    487253 non-null int64
Origin_encoded     487253 non-null int64
Dest_encoded       487253 non-null int64
route_encoded      487253 non-null int64
dtypes: int64(10), int8(1), object(4)
memory usage: 56.2+ MB


In [11]:
# # 2. Build a classification model using MLLib
# ### Split Test/Train

from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(flight_df.drop(['UniqueCarrier','Origin','Dest','route'],axis=1),
                                                    flight_df['Label'], 
                                                    test_size=0.4, 
                                                    random_state=100)
del flight_df # free up some memory

In [12]:
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import GridSearchCV

# ### parameters for grid search
param_numTrees = list(range(2,4,2))
print(param_numTrees)
param_maxDepth = list(range(4,6,2))
print(param_maxDepth)

rfc = RandomForestClassifier(random_state=10, n_jobs=-1)

GS_params = { 
    'n_estimators': param_numTrees,
    'max_depth' : param_maxDepth
}

CV_rfc = GridSearchCV(estimator=rfc, 
                      param_grid=GS_params, 
                      cv= 3,
                      verbose = 1,
                      n_jobs=-1)

[2]
[4]


In [14]:
CV_rfc.fit(X_train, y_train)

Fitting 3 folds for each of 1 candidates, totalling 3 fits


[Parallel(n_jobs=-1)]: Using backend LokyBackend with 8 concurrent workers.
[Parallel(n_jobs=-1)]: Done   3 out of   3 | elapsed:    2.0s finished


GridSearchCV(cv=3, error_score='raise-deprecating',
             estimator=RandomForestClassifier(bootstrap=True, class_weight=None,
                                              criterion='gini', max_depth=None,
                                              max_features='auto',
                                              max_leaf_nodes=None,
                                              min_impurity_decrease=0.0,
                                              min_impurity_split=None,
                                              min_samples_leaf=1,
                                              min_samples_split=2,
                                              min_weight_fraction_leaf=0.0,
                                              n_estimators='warn', n_jobs=-1,
                                              oob_score=False, random_state=10,
                                              verbose=0, warm_start=False),
             iid='warn', n_jobs=-1,
             param_grid={'max_

In [None]:
# ### Show Best Parameters 
print(CV_rfc.best_params_)

In [13]:
# Run a Dask cluster with three workers and return an object containing
# a description of the cluster. 
# 
# Using helper library 
#
# Note that the scheduler will run in the current session, and the Dask
# dashboard will become available in the nine-dot menu at the upper
# right corner of the CDSW app.

cluster = cdsw_dask_utils.run_dask_cluster(
  n=2, \
  cpu=1, \
  memory=3, \
  nvidia_gpu=0
)

Waiting for Dask scheduler to become ready...
Dask scheduler is ready
IDs ['77xmjvdpodlp87jo', '4lkgtrkgjcjrhece']


In [14]:
#Get the Dask Scheduler UI
import os 
engine_id = os.environ.get('CDSW_ENGINE_ID')
cdsw_domain = os.environ.get('CDSW_DOMAIN')

from IPython.core.display import HTML
HTML('<a  target="_blank" rel="noopener noreferrer" href="http://read-only-{}.{}">http://read-only-{}.{}</a>'
     .format(engine_id,cdsw_domain,engine_id,cdsw_domain))

In [15]:
# #### Connect a Dask client to the scheduler address in the cluster

from dask.distributed import Client
client = Client(cluster["scheduler_address"])

In [17]:
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import GridSearchCV

# ### parameters for grid search
param_numTrees = list(range(2,18,2))
print(param_numTrees)
param_maxDepth = list(range(4,20,2))
print(param_maxDepth)

rfc = RandomForestClassifier(random_state=10, n_jobs=-1)

GS_params = { 
    'n_estimators': param_numTrees,
    'max_depth' : param_maxDepth
}

CV_rfc = GridSearchCV(estimator=rfc, 
                      param_grid=GS_params, 
                      cv= 3,
                      verbose = 1,
                      n_jobs=-1)

[2, 4, 6, 8, 10, 12, 14, 16]
[4, 6, 8, 10, 12, 14, 16, 18]


In [18]:
# ### Fit Model with Dask
from joblib import Parallel, parallel_backend
with parallel_backend('dask'):
  CV_rfc.fit(X_train, y_train)

Fitting 3 folds for each of 64 candidates, totalling 192 fits


[Parallel(n_jobs=-1)]: Using backend DaskDistributedBackend with 16 concurrent workers.
[Parallel(n_jobs=-1)]: Done  18 tasks      | elapsed:   11.2s
[Parallel(n_jobs=-1)]: Done 192 out of 192 | elapsed:  1.3min finished


In [19]:
# ### Show Best Parameters 
print(CV_rfc.best_params_)

{'max_depth': 4, 'n_estimators': 2}


In [None]:
## print dask cluster config 

import json
print(json.dumps(client.scheduler_info(), indent=4))