In [1]:
import dask
import numpy as np
import pandas as pd
import dask.dataframe as dd
from dask.distributed import Client
#Importing plot lib
import matplotlib.pyplot as plt
import seaborn as sns
import matplotlib.dates as mdates
sns.set(rc={'figure.figsize':(20, 10)})
import statsmodels as sm
from statsmodels.tsa.stattools import adfuller
from numpy import log
from statsmodels.graphics.tsaplots import plot_acf, plot_pacf

In [2]:
# Creating distributed client
client = Client()

In [3]:
df = dd.read_csv('june*')
print (df)

Dask DataFrame Structure:
                 2016-06-01 00:05:01   39.364 211.62.96.220 42.219.158.212  55107  64188     UDP  .A....      0    0.1     19   3958 background
npartitions=3889                                                                                                                               
                              object  float64        object         object  int64  int64  object  object  int64  int64  int64  int64     object
                                 ...      ...           ...            ...    ...    ...     ...     ...    ...    ...    ...    ...        ...
...                              ...      ...           ...            ...    ...    ...     ...     ...    ...    ...    ...    ...        ...
                                 ...      ...           ...            ...    ...    ...     ...     ...    ...    ...    ...    ...        ...
                                 ...      ...           ...            ...    ...    ...     ...     ...    ..

In [4]:
test_df = dd.read_csv('test*')
print (test_df)

Dask DataFrame Structure:
                 2016-06-20 00:06:55    2.856 42.219.159.90 83.107.168.102  36351    445     TCP  ....S.      0    0.1      2     96 background
npartitions=1325                                                                                                                               
                              object  float64        object         object  int64  int64  object  object  int64  int64  int64  int64     object
                                 ...      ...           ...            ...    ...    ...     ...     ...    ...    ...    ...    ...        ...
...                              ...      ...           ...            ...    ...    ...     ...     ...    ...    ...    ...    ...        ...
                                 ...      ...           ...            ...    ...    ...     ...     ...    ...    ...    ...    ...        ...
                                 ...      ...           ...            ...    ...    ...     ...     ...    ..

In [5]:
df.columns=['te','td','sa','da','sp','dp','pr','flg','fwd','stos','pkt','byt','label']
test_df.columns=['te','td','sa','da','sp','dp','pr','flg','fwd','stos','pkt','byt','label']

In [6]:
df.head(10)

Unnamed: 0,te,td,sa,da,sp,dp,pr,flg,fwd,stos,pkt,byt,label
0,2016-06-01 00:05:03,39.828,42.219.158.226,71.247.111.184,80,52475,TCP,.AP.S.,0,0,57,79635,background
1,2016-06-01 00:05:04,36.128,42.219.153.155,223.80.226.127,443,54691,TCP,.AP.S.,0,0,9,2791,background
2,2016-06-01 00:05:04,36.204,223.80.226.127,42.219.153.155,54691,443,TCP,.AP.S.,0,0,13,3896,background
3,2016-06-01 00:05:04,42.452,42.219.153.7,42.187.82.40,53,53,UDP,.A....,0,0,2,175,background
4,2016-06-01 00:05:06,36.968,42.219.153.89,143.72.8.137,63532,53,UDP,.A....,0,0,3,233,background
5,2016-06-01 00:05:06,36.992,143.72.8.137,42.219.153.89,53,63532,UDP,.A....,0,0,3,281,background
6,2016-06-01 00:05:06,38.172,240.177.183.143,42.219.155.59,443,62402,TCP,.AP...,0,0,2,82,background
7,2016-06-01 00:05:06,40.648,42.219.155.59,52.76.101.190,42776,22163,UDP,.A....,0,0,4,275,background
8,2016-06-01 00:05:07,176.32,42.219.153.89,92.225.32.126,61846,443,TCP,.AP...,0,0,5,2220,background
9,2016-06-01 00:05:07,176.532,92.225.32.126,42.219.153.89,443,61846,TCP,.AP...,0,0,10,2070,background


In [7]:
# Keeping a copy
data = df
test_data = test_df

In [11]:
# Changing column data type for te
data['te'] = dd.to_datetime(data['te'])

In [None]:
data['ts'] = data['te'] - pd.to_timedelta(data['td'], unit='ms')



In [None]:
test_data['ts'] = test_data['te'] - pd.to_timedelta(data.td, unit='ms')

In [None]:
# Function to mark traffic has anomalous data when windowing
normal_traffic_type = ['background', 'blacklist']
def isAnomolus(x):
    data = ~x.isin(normal_traffic_type)
    if data.any():
        return 1
    else:
        return 0

In [None]:
# Function to mark traffic has anomalous data when windowing
normal_traffic_type = ['background', 'blacklist']
def isAnomolus_new(x):
    return x

In [None]:
isAnomolus_agg = dd.Aggregation(
    name = 'isAnomolus',
    chunk = lambda x: x.apply(lambda a: (~a.isin(normal_traffic_type)).any()),
    agg = lambda y: y.apply(lambda z: 1 if z.any() == True else 0)
)

In [None]:
# Data windowed for 5 min by adding pkt and byt, whreas label and true if any anomalous traffic exists in time window
data_win = data.groupby(pd.Grouper(key='te', freq='5T')).agg({
                                        "pkt":  "sum",
                                        "byt":  "sum",
                                        "label": isAnomolus_agg
                                    }).compute()

In [None]:
data_win

In [None]:
# Data windowed for 5 min by adding pkt and byt, whreas label and true if any anomalous traffic exists in time window
data_win_ts = data.groupby(pd.Grouper(key='ts', freq='5T')).agg({
                                        "pkt":  "sum",
                                        "byt":  "sum",
                                        "label": isAnomolus_agg
                                    }).compute()

In [None]:
# Data windowed for 5 min by adding pkt and byt, whreas label and true if any anomalous traffic exists in time window
test_data_win = test_data.groupby(pd.Grouper(key='te', freq='5T')).agg({
                                        "pkt":  "sum",
                                        "byt":  "sum",
                                        "label": isAnomolus_agg
                                    }).compute()

In [None]:
test_data_win

In [None]:
# Data windowed for 5 min by adding pkt and byt, whreas label and true if any anomalous traffic exists in time window
test_data_win_ts = test_data.groupby(pd.Grouper(key='ts', freq='5T')).agg({
                                        "pkt":  "sum",
                                        "byt":  "sum",
                                        "label": isAnomolus_agg
                                    }).compute()

In [None]:
data_win_anomaly = data_win[data_win['label'] == 1]

In [None]:
data_win_anomaly

### Taking only normal traffic(including blacklisted) 
The idea is to forecats the normal traffic, and compare with anomalous traffic for any deviation. Hence getting normal traffic windowed for each 5min which will be used for forecasting.

In [None]:
# Filter normal traffic
data_normal_traffic = data[data['label'].isin(normal_traffic_type)] 

In [None]:
test_data_normal_traffic = test_data[test_data['label'].isin(normal_traffic_type)] 

In [None]:
# Window normal traffic for 5min window
data_normal_traffic_win = data_normal_traffic.groupby(pd.Grouper(key='te', freq='5T')).agg({
                                        "pkt":  "sum",
                                        "byt":  "sum"
                                    }).compute()

In [None]:
# Window normal traffic for 5min window
test_data_normal_traffic_win = test_data_normal_traffic.groupby(pd.Grouper(key='te', freq='5T')).agg({
                                        "pkt":  "sum",
                                        "byt":  "sum"
                                    }).compute()

In [None]:
# Window normal traffic for 5min window
data_normal_traffic_win_ts = data_normal_traffic.groupby(pd.Grouper(key='ts', freq='5T')).agg({
                                        "pkt":  "sum",
                                        "byt":  "sum"
                                    }).compute()

In [None]:
# Window normal traffic for 5min window
test_data_normal_traffic_win_ts = test_data_normal_traffic.groupby(pd.Grouper(key='ts', freq='5T')).agg({
                                        "pkt":  "sum",
                                        "byt":  "sum"
                                    }).compute()

In [None]:
data_win.to_csv('data_win.csv')

In [None]:
data_normal_traffic_win.to_csv('data_normal_traffic_win.csv')

In [None]:
test_data_win.to_csv('test_data_win.csv')

In [None]:
test_data_normal_traffic_win.to_csv('test_data_normal_traffic_win.csv')

In [None]:
data_win_ts.to_csv('data_win_ts.csv')

In [None]:
data_normal_traffic_win_ts.to_csv('data_normal_traffic_win_ts.csv')

In [None]:
test_data_win_ts.to_csv('test_data_win_ts.csv')

In [None]:
test_data_normal_traffic_win_ts.to_csv('test_data_normal_traffic_win_ts.csv')

In [None]:
plt.figure()
plt.plot(data_normal_traffic_win.index, data_normal_traffic_win.byt)
plt.show()
plt.close()

In [None]:
merged_data = pd.merge(data_win, data_normal_traffic_win, on=['te'], how='inner')

In [None]:
merged_data['byt_diff'] = merged_data.byt_x - merged_data.byt_y

In [None]:
plt.rc('font', size=12)
fig, ax = plt.subplots(figsize=(20, 6))
myFmt = mdates.DateFormatter('%Y-%M-%D')
ax.xaxis.set_major_formatter(myFmt)
ax.plot(merged_data.byt_diff, label='Difference')
plt.show()
plt.close()

In [None]:
plt.close()

### EDA for windowed data

In [None]:
plt.hist(data_win.label)
plt.show()
plt.close()

In [None]:
plt.scatter(merged_data.label, merged_data.byt_diff)
plt.show()
plt.close()

In [None]:
merged_data[(merged_data.byt_diff == 0) & (merged_data.label == 1)].count()

In [None]:
merged_data[merged_data.byt_diff !=0].byt_diff.nsmallest(40)

In [None]:
data['anomaly'] = data['label'].apply(lambda x: 0 if x in(normal_traffic_type) else 1)

In [None]:
protocols = ['TCP', 'UDP', 'ICMP', 'GRE', 'ESP', 'RSVP', 'IPv6', 'IPIP', '255','nan']
data['protocol'] = data['pr'].apply(lambda x: protocols.index(x) if x in protocols else -1)

In [None]:
#data['pr'].unique().compute()

In [None]:
#data['flg'].unique().compute()

In [None]:
plt.close()