In [None]:
#imports
import dask
import dask.dataframe as dd
from dask.distributed import Client, progress
import dask.array as da
from dask.diagnostics import Profiler, ResourceProfiler, CacheProfiler
import pandas as pd
import numpy as np
import dask_ml
#prefs
pd.set_option('max_columns', None)


In [None]:
#close existing dask connection if it exists and open a new one
try:
    if client is not None:
        client.close()
        print("closed existing connection, ",client)       
except Exception as e:
    print(e)
    

In [None]:
#get new connection
client = Client(n_workers=2, threads_per_worker=2, memory_limit='8GB')
display(client)

In [None]:
#get stock data
data = dd.read_csv("ids-data/Thursday-01-03-2018_TrafficForML_CICFlowMeter.csv")

#display(data.head())


In [None]:
data['Timestamp'] = dd.to_datetime(data.Timestamp, format='%d/%m/%Y %H:%M:%S').astype(np.int64)


In [None]:
tmp_data = data.loc[:, 'Dst Port':'Idle Min']
too_big = (tmp_data > np.finfo(np.float32).max)
too_big = (too_big).any()
with pd.option_context('max_rows', None):
    display(too_big.compute())

In [None]:
#get labels for decision tree presentation, note the use of unique later 
labels = data['Label']
labels

In [None]:
# Decide which features we want to include in our analysis

possible_features = 'Dst Port	Protocol	Timestamp	Flow Duration	Tot Fwd Pkts	Tot Bwd Pkts	TotLen Fwd Pkts	TotLen Bwd Pkts	Fwd Pkt Len Max	Fwd Pkt Len Min	Fwd Pkt Len Mean	Fwd Pkt Len Std	Bwd Pkt Len Max	Bwd Pkt Len Min	Bwd Pkt Len Mean	Bwd Pkt Len Std	Flow Byts/s	Flow Pkts/s	Flow IAT Mean	Flow IAT Std	Flow IAT Max	Flow IAT Min	Fwd IAT Tot	Fwd IAT Mean	Fwd IAT Std	Fwd IAT Max	Fwd IAT Min	Bwd IAT Tot	Bwd IAT Mean	Bwd IAT Std	Bwd IAT Max	Bwd IAT Min	Fwd PSH Flags	Bwd PSH Flags	Fwd URG Flags	Bwd URG Flags	Fwd Header Len	Bwd Header Len	Fwd Pkts/s	Bwd Pkts/s	Pkt Len Min	Pkt Len Max	Pkt Len Mean	Pkt Len Std	Pkt Len Var	FIN Flag Cnt	SYN Flag Cnt	RST Flag Cnt	PSH Flag Cnt	ACK Flag Cnt	URG Flag Cnt	CWE Flag Count	ECE Flag Cnt	Down/Up Ratio	Pkt Size Avg	Fwd Seg Size Avg	Bwd Seg Size Avg	Fwd Byts/b Avg	Fwd Pkts/b Avg	Fwd Blk Rate Avg	Bwd Byts/b Avg	Bwd Pkts/b Avg	Bwd Blk Rate Avg	Subflow Fwd Pkts	Subflow Fwd Byts	Subflow Bwd Pkts	Subflow Bwd Byts	Init Fwd Win Byts	Init Bwd Win Byts	Fwd Act Data Pkts	Fwd Seg Size Min	Active Mean	Active Std	Active Max	Active Min	Idle Mean	Idle Std	Idle Max	Idle Min'.split('\t')

exclude_features = ('Timestamp', 'Flow Byts/s', 'Flow Pkts/s')

features = [f for f in possible_features if f not in exclude_features]


In [None]:
#get dataframe of just features
X = data.loc[:, features]
X.head()

In [None]:
#setup plot
import matplotlib.pyplot as plt
print(plt.rcParams.get('figure.figsize'))

In [None]:
#setup figure size
fig_size = plt.rcParams["figure.figsize"]
fig_size[0] = 20
fig_size[1] = 20
plt.rcParams["figure.figsize"] = fig_size

In [None]:
Y = labels

In [None]:
from sklearn import tree
clf = tree.DecisionTreeClassifier(random_state=0)

In [None]:
from dask_ml.model_selection import KFold as K
from dask_ml.model_selection import train_test_split


In [None]:
X_train, X_test, y_train, y_test = train_test_split(X,Y,test_size=0.1)





In [None]:
from sklearn.metrics import plot_confusion_matrix

#fit and predict
clf.fit(X_train, y_train)
plot_confusion_matrix(clf, X_test, y_test)

In [None]:

def doKFolds(X,Y):
    
    #look at dask dataframes
    display(X.head())
    display(Y.head())

    #create KFold object
    c = K()
    #breakpoint()
    #split on dask arrays, doesn't work on dataframes yet
    gen = c.split(X.to_dask_array(lengths=True),Y.to_dask_array(lengths=True))

    #inspect generator
    print(gen)
    display(type(gen))
    
    #call generator
    for train,test in gen:
        print("train = ",train.compute())
        print("test = ",test.compute())
        print("x train = ",X.loc[train])
        got = X.loc[train.compute()]
        display(got.head())
        #print(got.compute())
        clf.fit(got,got)

        
doKFolds(X, Y)

In [None]:
%%timeit -n 10
#time with just pandas
with joblib.parallel_backend('threading'):
     clf.fit(x, y)


In [None]:
%%timeit -n 10
#time with dask
with joblib.parallel_backend('dask'):
     clf.fit(X, Y)


#clf = clf.fit(X, Y)

In [None]:
clf.fit(X, Y)


In [None]:
%%timeit -n 10
#time with dask, even if you set threaing it's still a dask dataframe so it's produced via dasks engine
with joblib.parallel_backend('threading'):
     clf.fit(X, Y)


#clf = clf.fit(X, Y)

In [None]:
#cache results note you have to assign in X and Y
X = client.persist(X)
Y = client.persist(Y)

In [None]:
%%timeit -n 10
#time with dask after cache
with joblib.parallel_backend('dask'):
     clf.fit(X, Y)

In [None]:
%%timeit -n 10
#time with just pandas once X and Y are cached
with joblib.parallel_backend('threading'):
     clf.fit(x, y)

In [None]:
#x = tree.plot_tree(clf,feature_names=features,class_names=labels.astype(str),rounded=True,filled=True) 
x = tree.plot_tree(clf,rounded=True,filled=True,class_names=sorted,feature_names=features) 
