In [69]:
import boto3
import gzip
import pandas as pd
import json
from smart_open import open
pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', None)

In [70]:
s3 = boto3.resource('s3')
s3_client = boto3.client("s3")
bucket = s3.Bucket('ml-flow-dump')


source_url = 's3://ml-flow-dump/flow-45days-2022.10.1-000284.json.gz'
 
for i,json_line in enumerate(open(source_url, transport_params={"client": s3_client})):
    my_json = json.loads(json_line)
    df = pd.json_normalize(my_json)  # one line at a time
    break
print(df.shape)

(1, 78)


In [71]:
print(df.select_dtypes(include=['object']).columns)
print(len(df.select_dtypes(include=['object']).columns))

Index(['action', 'customer', 'dstip', 'dstvlan', 'flowsrcip', 'flowsrcname',
       'flowtype', 'inputalias', 'inputclasses', 'inputname', 'nexthop',
       'outputalias', 'outputclasses', 'outputname', 'payload', 'site',
       'srcip', 'srcvlan', 'tags', 'dstas.org', 'dstiprep.categories',
       'dstgeo.continentcode', 'dstgeo.countrycode', 'dstgeo.subdiso',
       'dstowneras.number', 'dstowneras.org', 'srcas.number', 'srcas.org',
       'srciprep.categories', 'srcgeo.continentcode', 'srcgeo.countrycode',
       'srcgeo.subdiso', 'srcowneras.number', 'srcowneras.org'],
      dtype='object')
34


In [72]:
print(df.select_dtypes(include=['bool']).columns)
print(len(df.select_dtypes(include=['bool']).columns))


Index(['bogondst', 'bogonsrc', 'dstinternal', 'srcinternal', 'tcpflags.urg',
       'tcpflags.ack', 'tcpflags.psh', 'tcpflags.rst', 'tcpflags.syn',
       'tcpflags.fin', 'tcpflags.ece', 'tcpflags.cwr', 'tcpflags.ns'],
      dtype='object')
13


In [73]:
print(df.select_dtypes(include=['float64']).columns)
print(len(df.select_dtypes(include=['float64']).columns))

Index(['pbratio'], dtype='object')
1


In [65]:
print(df.select_dtypes(include=['int64']).columns)
print(len(df.select_dtypes(include=['int64']).columns))

Index(['bits', 'bitsxrate', 'dstport', 'duration', 'end', 'flowbrate',
       'flowprate', 'flowrtime', 'flowversion', 'input', 'ipversion', 'output',
       'packets', 'packetsxrate', 'protocolint', 'samplerate', 'srcport',
       'start', 'tcpflagsint', 'timestamp', 'tos', 'dstas.number',
       'dstiprep.count', 'dstgeo.location.lat', 'dstgeo.location.lon',
       'icmp.code', 'icmp.type', 'srciprep.count', 'srcgeo.location.lat',
       'srcgeo.location.lon'],
      dtype='object')
30


In [75]:
# features to drop
l1 = ['flowsrcname','site','flowtype','inputname','tags','action','customer','dstas.org',
        'dstiprep.categories', 'dstgeo.continentcode', 'dstgeo.countrycode',
        'dstgeo.subdiso', 'srcas.org', 'srciprep.categories',
        'srcgeo.continentcode', 'srcgeo.countrycode', 'srcgeo.subdiso', 'dstiprep.count','tos',
        'bogondst','bogonsrc','dstinternal','input','output','srciprep.count','srcinternal']
        
l2 = ['dstvlan','inputalias','inputclasses','outputalias','outputclasses',
    'outputname','payload','dstowneras.org','srcvlan','dstowneras.number','srcas.number','srcowneras.number','srcowneras.org','dstas.number']

l3 =['flowrtime','end','flowversion','ipversion','samplerate']

tcp = [col for col in df.columns if 'tcp' in col]
icm = [col for col in df.columns if 'icm' in col]

In [76]:
def replace_boolean(data):
    for col in data:
        data[col].replace(True, 1, inplace=True)
        data[col].replace(False, 0, inplace=True)
    return data

In [77]:
l = l1 + l2 + l3 + tcp + icm
df_out = df.drop(l,axis=1)
df_out = replace_boolean(df_out)
print(df_out.shape)
df_out.head()

(1, 21)


Unnamed: 0,bits,bitsxrate,dstip,dstport,duration,flowbrate,flowprate,flowsrcip,nexthop,packets,packetsxrate,pbratio,protocolint,srcip,srcport,start,timestamp,dstgeo.location.lat,dstgeo.location.lon,srcgeo.location.lat,srcgeo.location.lon
0,818736,818736,10.40.4.244,2055,1012,818736,74,10.2.6.1,216.51.124.93,74,74,9e-05,17,10.2.6.1,50632,1664593150520,1664593167000,0,0,0,0


In [78]:
print(df_out.dtypes)

bits                     int64
bitsxrate                int64
dstip                   object
dstport                  int64
duration                 int64
flowbrate                int64
flowprate                int64
flowsrcip               object
nexthop                 object
packets                  int64
packetsxrate             int64
pbratio                float64
protocolint              int64
srcip                   object
srcport                  int64
start                    int64
timestamp                int64
dstgeo.location.lat      int64
dstgeo.location.lon      int64
srcgeo.location.lat      int64
srcgeo.location.lon      int64
dtype: object


In [79]:
# convert to categorical type 
# ['srcport','dstport','protocolint','dstgeo.location.lat',
# 'dstgeo.location.lon','srcgeo.location.lat','srcgeo.location.lon'] 
df_out[['srcport','dstport','protocolint','dstgeo.location.lat','dstgeo.location.lon','srcgeo.location.lat','srcgeo.location.lon']]\
   = df_out[['srcport','dstport','protocolint','dstgeo.location.lat','dstgeo.location.lon','srcgeo.location.lat','srcgeo.location.lon']].astype(object)
df_out.dtypes

bits                     int64
bitsxrate                int64
dstip                   object
dstport                 object
duration                 int64
flowbrate                int64
flowprate                int64
flowsrcip               object
nexthop                 object
packets                  int64
packetsxrate             int64
pbratio                float64
protocolint             object
srcip                   object
srcport                 object
start                    int64
timestamp                int64
dstgeo.location.lat     object
dstgeo.location.lon     object
srcgeo.location.lat     object
srcgeo.location.lon     object
dtype: object

# Feature Engineering

In [80]:
df_out['timeDelta'] = df_out["timestamp"] - df_out["start"]
df_out.drop(["timestamp","start"],axis=1, inplace=True)
df_out

Unnamed: 0,bits,bitsxrate,dstip,dstport,duration,flowbrate,flowprate,flowsrcip,nexthop,packets,packetsxrate,pbratio,protocolint,srcip,srcport,dstgeo.location.lat,dstgeo.location.lon,srcgeo.location.lat,srcgeo.location.lon,timeDelta
0,818736,818736,10.40.4.244,2055,1012,818736,74,10.2.6.1,216.51.124.93,74,74,9e-05,17,10.2.6.1,50632,0,0,0,0,16480


We decide to replace `Duration`, `Packets`, `Bytes` column with a ratio corresponding to the number of bytes by packets by minutes.

In [81]:
df_out['transferred_ratio'] = (df_out['bits'] / 8 / df_out['packets']) / (df['duration'] + 1)
df_out.drop(columns=['duration', 'packets', 'bits'], axis =1, inplace=True)
df_out

Unnamed: 0,bitsxrate,dstip,dstport,flowbrate,flowprate,flowsrcip,nexthop,packetsxrate,pbratio,protocolint,srcip,srcport,dstgeo.location.lat,dstgeo.location.lon,srcgeo.location.lat,srcgeo.location.lon,timeDelta,transfered_ratio
0,818736,10.40.4.244,2055,818736,74,10.2.6.1,216.51.124.93,74,9e-05,17,10.2.6.1,50632,0,0,0,0,16480,1.365252


**preferred columns for netflow analysis**:

cat: ['Source_IP', 'Destination_IP', 'Source_Port', 'Destination_Port', 'Protocol', 'Flag', 'Service_Type']

num: ['Duration', 'Packets', 'Bytes', 'timeDelta']

In [82]:
df_out.dtypes

bitsxrate                int64
dstip                   object
dstport                 object
flowbrate                int64
flowprate                int64
flowsrcip               object
nexthop                 object
packetsxrate             int64
pbratio                float64
protocolint             object
srcip                   object
srcport                 object
dstgeo.location.lat     object
dstgeo.location.lon     object
srcgeo.location.lat     object
srcgeo.location.lon     object
timeDelta                int64
transfered_ratio       float64
dtype: object