In [1]:
import glob
import pandas as pd
import numpy as np
import os
from matplotlib import pyplot as plt
import dask.dataframe as dd
import pickle
from dask.distributed import Client
client = Client(processes=False)
client

0,1
Client  Scheduler: inproc://192.168.1.2/20780/1  Dashboard: http://192.168.1.2:8787/status,Cluster  Workers: 1  Cores: 12  Memory: 15.37 GiB


In [2]:
f = 25 # Hz
E = 500 # Number of directed edges kept

In [3]:
print(f'Number of Files: {len(glob.glob("./SplitRead/*.csv"))}')

Number of Files: 536


In [4]:
df = dd.read_csv("./SplitRead/*.csv", parse_dates=['time']).repartition(npartitions=30)
df['timeId'] = ((df['time'] - df['time'].min()).apply(lambda x: x.total_seconds()*f, meta=('time', 'float64'))).astype(np.int32)
df = client.persist(df)

In [5]:
link_count = df.groupby(['src', 'dst']).size().compute().sort_values(ascending=False)
top_links = link_count.iloc[:E]



In [6]:
ip_pairs = dict.fromkeys(top_links.index)
series_ip_id = pd.Series(sum(list(ip_pairs.keys()), tuple())).sort_values().drop_duplicates().reset_index(drop=True)
ip_id_map = {ip_: id_ for ip_, id_ in zip(series_ip_id.values, series_ip_id.index)}

In [7]:
filt_df = df[df.apply(lambda x: (x['src'], x['dst']) in ip_pairs, axis=1, meta=('pandas.Series', 'bool'))].drop(columns='time').groupby(['src', 'dst', 'timeId']).sum().reset_index() # .compute()
filt_df['src'] = filt_df['src'].apply(lambda x: ip_id_map[x], meta=('string', 'int32'))
filt_df['dst'] = filt_df['dst'].apply(lambda x: ip_id_map[x], meta=('string', 'int32')) 
filt_df = filt_df.compute()
filt_df





Unnamed: 0,src,dst,timeId,len
0,0,160,82,60
1,0,160,83,621
2,0,160,84,386
3,0,160,85,304
4,0,160,87,520
...,...,...,...,...
4140740,590,186,22491,1080
4140741,590,186,22492,1080
4140742,590,186,22493,2160
4140743,590,186,22494,1080


In [12]:
filt_df

Unnamed: 0,src,dst,timeId,len
0,0,160,82,60
1,0,160,83,621
2,0,160,84,386
3,0,160,85,304
4,0,160,87,520
...,...,...,...,...
4140740,590,186,22491,1080
4140741,590,186,22492,1080
4140742,590,186,22493,2160
4140743,590,186,22494,1080


In [8]:
filt_df.to_csv('temporal_traffic_data.csv', index=False)

In [9]:
import pickle

# Store data (serialize)
with open('ip_pairs.pickle', 'wb') as file:
    pickle.dump(ip_pairs, file, protocol=pickle.HIGHEST_PROTOCOL)

with open('ip_id_map.pickle', 'wb') as file:
    pickle.dump(ip_id_map, file, protocol=pickle.HIGHEST_PROTOCOL)

In [10]:
# Load data (deserialize)
with open('ip_pairs.pickle', 'rb') as file:
    ip_pairs = pickle.load(file)
    
with open('ip_id_map.pickle', 'rb') as file:
    ip_id_map = pickle.load(file)

{('202.186.214.78', '133.219.145.254'): None,
 ('203.208.249.123', '204.194.81.240'): None,
 ('203.208.247.215', '210.163.118.145'): None,
 ('133.215.167.161', '13.154.26.52'): None,
 ('204.194.81.240', '203.208.249.123'): None,
 ('210.163.118.145', '203.208.247.215'): None,
 ('172.165.148.129', '150.142.186.149'): None,
 ('170.119.252.128', '163.93.231.20'): None,
 ('146.73.113.127', '163.93.232.221'): None,
 ('104.237.126.155', '163.93.238.55'): None,
 ('210.120.135.176', '150.142.249.138'): None,
 ('163.93.58.182', '60.113.18.119'): None,
 ('146.73.113.127', '163.93.238.55'): None,
 ('202.179.47.17', '13.154.24.120'): None,
 ('131.141.180.211', '144.115.18.160'): None,
 ('65.246.214.32', '203.208.241.240'): None,
 ('131.141.180.211', '144.115.72.5'): None,
 ('150.98.36.147', '163.93.141.84'): None,
 ('202.179.47.17', '52.178.185.38'): None,
 ('223.195.33.20', '150.142.249.138'): None,
 ('150.98.36.147', '163.93.155.151'): None,
 ('175.154.230.26', '150.142.249.138'): None,
 ('126.26

In [None]:
N = len(ip_id_map)
T = filt_df['timeId'].max() + 1
F = 2
data_matrix = np.zeros((N, T, F))
data_matrix.shape

In [None]:
for srcId, dstId, timeId, size in filt_df[['src', 'dst', 'timeId', 'len']].values:
    data_matrix[srcId, timeId, 0] += size
    data_matrix[dstId, timeId, 1] += size

In [None]:
fig, [ax1, ax2] = plt.subplots(2, figsize=(16, 10), dpi=150)
ax1.plot(np.log2(1 + data_matrix[:15, :, 0].T), ',', alpha=.75)
ax2.plot(np.log2(1 + data_matrix[:15, :, 1].T), ',', alpha=.75)
plt.show()


In [None]:
fig, [ax1, ax2] = plt.subplots(2, figsize=(16, 10))
ax1.plot(np.log2(1 + data_matrix[:5, :, 0].T), ',', alpha=.25)
ax2.plot(np.log2(1 + data_matrix[:5, :, 1].T), ',', alpha=.25)
plt.show()

fig, [ax1, ax2] = plt.subplots(2, figsize=(16, 10))
ax1.plot(np.log2(1 + data_matrix[-5:, :, 0].T), ',', alpha=.25)
ax2.plot(np.log2(1 + data_matrix[-5:, :, 1].T), ',', alpha=.25)
plt.show()

In [None]:
plt.scatter(data_matrix.std(axis=1)[:, 0] + 1, data_matrix.std(axis=1)[:, 1] + 1, 
            c=(data_matrix.std(axis=1) > 0).sum(axis=-1) * (data_matrix.std(axis=1)[:, 0] > 0))
plt.yscale('log')
plt.xscale('log')