# Check Processed Files in Different Modes
In order to run this notebook on newly-created data again, `vnet_preprocessing.py` file needs to be modified as:
- Do not drop `FLOW_END_MILLISECONDS`, `FLOW_START_MILLISECONDS`, `IPV4_SRC_ADDR`, `IPV6_SRC_ADDR`, `IPV4_DST_ADDR`,
    `IPV6_DST_ADDR` columns
- Create columns `PRETOCEL`, `L4_SRC_PORTE`, and `L4_DST_PORTE` before applying its mapping function as:
```
data['PRETOCEL'] = data['PROTOCOL']
data['L4_SRC_PORTE'] = data['L4_SRC_PORT']
data['L4_DST_PORTE'] = data['L4_DST_PORT']
```

The above changes allows the produced data to be sortable, and thus comparable in order to determine if the produced results are equal or not.

In [1]:
import pandas as pd
import numpy as np

In [2]:
WINDOW_FEATURES = ['flows_total', 'flows_concurrent_max', 'flows_itimes_avg', 'flows_itimes_std', 'flows_itimes_min',
    'flows_itimes_max', 'dur_total', 'dur_avg', 'dur_std', 'pkt_total', 'ppf_avg', 'ppf_std', 'bytes_total', 'bpf_avg',
    'bpf_std', 'bpp_avg', 'bpp_std', 'bps_avg', 'bps_std', 'pps_avg', 'pps_std', 'port_src_uniq_cnt',
    'port_src_entropy', 'proto_all_or', 'flag_syn_ratio', 'flag_ack_ratio', 'flag_fin_ratio', 'ip_dst_uniq',
    'hdr_payload_ratio_avg', 'ttl_std', 'window_active_ratio', 'flows_total_std', 
    'flows_concurrent_max_std', 'flows_itimes_avg_std', 'flows_itimes_std_std', 'flows_itimes_min_std',
    'flows_itimes_max_std', 'dur_total_std', 'dur_avg_std', 'dur_std_std', 'pkt_total_std', 'ppf_avg_std',
    'ppf_std_std', 'bytes_total_std', 'bpf_avg_std', 'bpf_std_std', 'bps_avg_std', 'bps_std_std', 'bpp_avg_std',
    'bpp_std_std', 'port_src_uniq_cnt_std',	'port_src_entropy_std', 'flag_syn_ratio_std', 'flag_ack_ratio_std',
    'flag_fin_ratio_std', 'ip_dst_uniq_std', 'hdr_payload_ratio_avg_std', 'ttl_std_std']

WINDOW_NORAND = ['flows_total', 'flows_concurrent_max', 'flows_itimes_avg', 'flows_itimes_std', 'flows_itimes_min',
    'flows_itimes_max', 'dur_total', 'dur_avg', 'dur_std', 'pkt_total', 'ppf_avg', 'ppf_std', 'bytes_total', 'bpf_avg',
    'bpf_std', 'bpp_avg', 'bpp_std', 'bps_avg', 'bps_std', 'pps_avg', 'pps_std', 'port_src_uniq_cnt', 'proto_all_or',
    'flag_syn_ratio', 'flag_ack_ratio', 'flag_fin_ratio', 'ip_dst_uniq', 'hdr_payload_ratio_avg', 'ttl_std',
    'window_active_ratio', 'flows_total_std', 'flows_concurrent_max_std',
    'flows_itimes_avg_std', 'flows_itimes_std_std', 'flows_itimes_min_std', 'flows_itimes_max_std', 'dur_total_std',
    'dur_avg_std', 'dur_std_std', 'pkt_total_std', 'ppf_avg_std', 'ppf_std_std', 'bytes_total_std', 'bpf_avg_std',
    'bpf_std_std', 'bps_avg_std', 'bps_std_std', 'bpp_avg_std', 'bpp_std_std', 'port_src_uniq_cnt_std',
    'flag_syn_ratio_std', 'flag_ack_ratio_std', 'flag_fin_ratio_std', 'ip_dst_uniq_std', 'hdr_payload_ratio_avg_std',
    'ttl_std_std']

FLOAT_ORIG_COLS = ['IN_BPS', 'IN_BPP', 'IN_PPS', 'OUT_BPS', 'OUT_BPP', 'OUT_PPS']

In [3]:
orig_serial   = pd.read_csv('../09_sub_serial.csv')
orig_parallel = pd.read_csv('../09_sub_par1.csv')

In [4]:
list(orig_serial.columns)

['IN_BYTES',
 'IN_PKTS',
 'TCP_FLAGS',
 'IPV4_SRC_ADDR',
 'IPV6_SRC_ADDR',
 'IPV4_DST_ADDR',
 'IPV6_DST_ADDR',
 'OUT_BYTES',
 'OUT_PKTS',
 'MIN_IP_PKT_LEN',
 'MAX_IP_PKT_LEN',
 'MIN_TTL',
 'MAX_TTL',
 'FLOW_START_MILLISECONDS',
 'FLOW_END_MILLISECONDS',
 'SRC_FRAGMENTS',
 'DST_FRAGMENTS',
 'CLIENT_TCP_FLAGS',
 'SERVER_TCP_FLAGS',
 'SRC_TO_DST_AVG_THROUGHPUT',
 'DST_TO_SRC_AVG_THROUGHPUT',
 'NUM_PKTS_UP_TO_128_BYTES',
 'NUM_PKTS_128_TO_256_BYTES',
 'NUM_PKTS_256_TO_512_BYTES',
 'NUM_PKTS_512_TO_1024_BYTES',
 'NUM_PKTS_1024_TO_1514_BYTES',
 'NUM_PKTS_OVER_1514_BYTES',
 'LONGEST_FLOW_PKT',
 'SHORTEST_FLOW_PKT',
 'RETRANSMITTED_IN_PKTS',
 'RETRANSMITTED_OUT_PKTS',
 'OOORDER_IN_PKTS',
 'OOORDER_OUT_PKTS',
 'DURATION_IN',
 'DURATION_OUT',
 'TCP_WIN_MIN_IN',
 'TCP_WIN_MAX_IN',
 'TCP_WIN_MSS_IN',
 'TCP_WIN_SCALE_IN',
 'TCP_WIN_MIN_OUT',
 'TCP_WIN_MAX_OUT',
 'TCP_WIN_MSS_OUT',
 'TCP_WIN_SCALE_OUT',
 'SRC_TO_DST_IAT_MIN',
 'SRC_TO_DST_IAT_MAX',
 'SRC_TO_DST_IAT_AVG',
 'SRC_TO_DST_IAT_STDDEV',
 '

### Modify DataFrames to Make Them Comparable

In [5]:
# Drop columns affected by randomness
modif_serial   = orig_serial.drop(columns=['port_src_entropy', 'port_src_entropy_std'])
modif_parallel = orig_parallel.drop(columns=['port_src_entropy', 'port_src_entropy_std'])

# Round float columns in the original data in order to be comparable
modif_serial[FLOAT_ORIG_COLS] = modif_serial[FLOAT_ORIG_COLS].round(5)
modif_parallel[FLOAT_ORIG_COLS] = modif_parallel[FLOAT_ORIG_COLS].round(5)

In [6]:
SORTBY = ['FLOW_END_MILLISECONDS', 'FLOW_START_MILLISECONDS', 'IPV4_SRC_ADDR', 'IPV6_SRC_ADDR', 'IPV4_DST_ADDR',
    'IPV6_DST_ADDR', 'PRETOCEL', 'L4_SRC_PORTE', 'L4_DST_PORTE', 'Label']

In [7]:
## Sort the frames in order to be comparable
serial_sorted = modif_serial.sort_values(by=SORTBY, axis=0, kind='mergesort', ignore_index=True)
parallel_sorted = modif_parallel.sort_values(by=SORTBY, axis=0, kind='mergesort', ignore_index=True)

### Determine Equality Between Serial and Wrapper Parallelism

In [8]:
# Determine if all entries within the sorted dataframes are equal -- whether the dataframes are equal
#((serial_sorted == parallel_sorted).sum() == len(serial_sorted)).sum() == len(serial_sorted.columns)
serial_sorted.equals(parallel_sorted)

False

In [9]:
# Determine which columns within the dataframes are different
differences = ((serial_sorted == parallel_sorted) == False).any()
diff_cols = list(differences[differences].index)
print(diff_cols)

['pps_std']


In [10]:
# Which cols are not affected
print(list(set(WINDOW_NORAND) - set(diff_cols)))

['proto_all_or', 'flows_itimes_max_std', 'flag_syn_ratio', 'pkt_total_std', 'bpp_avg', 'ppf_avg', 'flag_syn_ratio_std', 'bytes_total', 'flag_ack_ratio', 'flag_fin_ratio', 'hdr_payload_ratio_avg', 'flows_itimes_avg_std', 'flows_total', 'bpp_std', 'bpf_avg_std', 'ttl_std_std', 'flows_itimes_std', 'bpf_std', 'ppf_avg_std', 'pps_avg', 'flows_itimes_min_std', 'ip_dst_uniq', 'bpf_std_std', 'dur_std', 'port_src_uniq_cnt', 'flows_total_std', 'bps_avg_std', 'bytes_total_std', 'ip_dst_uniq_std', 'hdr_payload_ratio_avg_std', 'dur_std_std', 'flows_itimes_std_std', 'bpp_avg_std', 'bps_avg', 'dur_total', 'flag_ack_ratio_std', 'pkt_total', 'ppf_std', 'bpp_std_std', 'bps_std', 'flows_itimes_avg', 'dur_avg', 'dur_total_std', 'ppf_std_std', 'flows_concurrent_max_std', 'ttl_std', 'dur_avg_std', 'bps_std_std', 'window_active_ratio', 'port_src_uniq_cnt_std', 'flows_itimes_max', 'flows_itimes_min', 'flows_concurrent_max', 'bpf_avg', 'flag_fin_ratio_std']


OK, so essentially only 1 windowing column is affected, inspect manually.
### Manual Inspection

In [11]:
serial_winds   = serial_sorted[WINDOW_NORAND]
parallel_winds = parallel_sorted[WINDOW_NORAND]

In [12]:
# Determine which rows are different
diff_rows_idx = (serial_winds == parallel_winds).sum(axis=1) != len(WINDOW_NORAND)
diff_rows_lst = diff_rows_idx[diff_rows_idx].index
diff_rows_lst[:10]

Int64Index([290502, 290571, 403402, 442955, 453035, 494020, 513285, 531151,
            537681],
           dtype='int64')

In [13]:
# Whats the portion of different columns
len(diff_rows_lst) / len(serial_winds)

9.000009000009e-06

In [14]:
# Determine which exact are different for a given case
diff_cols = (serial_winds.iloc[diff_rows_lst[0]] == parallel_winds.iloc[diff_rows_lst[0]]) == False
serial_winds.iloc[diff_rows_lst[0]][diff_cols]

pps_std    3.832965e-16
Name: 290502, dtype: float64

In [15]:
parallel_winds.iloc[diff_rows_lst[0]][diff_cols]

pps_std    3.832965e-16
Name: 290502, dtype: float64

In [16]:
# Determine how many windower rows are all 0s (serial vs. parallel)
zero_rows_ser = ((serial_winds == 0).sum(axis=1) == len(WINDOW_NORAND)).sum()
zero_rows_ser

329117

In [17]:
# Determine how many windower rows are all 0s (serial vs. parallel)
zero_rows_par = ((parallel_winds == 0).sum(axis=1) == len(WINDOW_NORAND)).sum()
zero_rows_par

329117

### Let's Round It

In [18]:
# Lets round them on 5
serial_winds_rounded = serial_winds.round(5)
parallel_winds_rounded = parallel_winds.round(5)

In [19]:
# Determine if all entries within the sorted dataframes are equal -- whether the dataframes are equal
((serial_winds_rounded == parallel_winds_rounded).sum() == len(serial_winds_rounded)).sum() == len(serial_winds_rounded.columns)

True

In [20]:
# Recompute the number of different rows and determine relative count
diff_rows_idx = (serial_winds_rounded == parallel_winds_rounded).sum(axis=1) != len(WINDOW_NORAND)
diff_rows_lst = diff_rows_idx[diff_rows_idx].index
len(diff_rows_lst) / len(serial_winds_rounded)

0.0

In [21]:
# Determine which columns within the dataframes are different
differences = ((serial_winds_rounded == parallel_winds_rounded) == False).any()
diff_cols = list(differences[differences].index)
diff_cols

[]

### Add Parallel For 16 Processes

In [22]:
orig_serial     = pd.read_csv('../09_sub_serial_mergesort.csv')
orig_parallel1   = pd.read_csv('../09_sub_par1_mergesort.csv')
orig_parallel16 = pd.read_csv('../09_sub_par16_mergesort.csv')

In [23]:
def preprocess_df(data: pd.DataFrame) -> pd.DataFrame:
    # Perform unified data preprocessing as before to make it comparable
    data = data.drop(columns=['port_src_entropy', 'port_src_entropy_std'])
    data[FLOAT_ORIG_COLS] = data[FLOAT_ORIG_COLS].round(5)
    data[WINDOW_NORAND] = data[WINDOW_NORAND].round(5)
    data = data.sort_values(by=SORTBY, axis=0, kind='mergesort', ignore_index=True)

    return data

In [24]:
preproc_serial     = preprocess_df(orig_serial)
preproc_parallel1  = preprocess_df(orig_parallel1)
preproc_parallel16 = preprocess_df(orig_parallel16)

### Compare Them

In [25]:
preproc_serial.equals(preproc_parallel1)

True

In [26]:
preproc_serial.equals(preproc_parallel16)

True

In [27]:
preproc_parallel1.equals(preproc_parallel16)

True

In [28]:
# Determine which columns within the dataframes are different
differences = ((preproc_parallel1 == preproc_parallel16) == False).any()
diff_cols = list(differences[differences].index)
print(diff_cols)

[]


In [29]:
# Print columns that are not affected
print(set(preproc_parallel1.columns) - set(diff_cols))

{'flag_syn_ratio', 'IPV6_DST_ADDR', 'hdr_payload_ratio_avg', 'PROTOCOL_ICMP6', 'SRC_TO_DST_IAT_AVG', 'flows_total', 'bpf_avg_std', 'TCP_WIN_MIN_OUT', 'L4_SRC_PORT_DATA', 'L4_DST_PORT_DATA', 'ICMP_TYPE_ECHO_REQUEST', 'MIN_IP_PKT_LEN', 'flows_itimes_min_std', 'FLAG_CWR', 'PROTOCOL_GRE', 'OUT_BPP', 'NUM_PKTS_512_TO_1024_BYTES', 'RETRANSMITTED_IN_PKTS', 'SRC_TO_DST_IAT_MAX', 'ip_dst_uniq_std', 'SERVER_FLAG_URG', 'L4_DST_PORT_PLAYSTATION', 'MIN_TTL', 'CLIENT_FLAG_PSH', 'bpp_avg_std', 'OUT_PPS', 'PROTOCOL_ICMP', 'ICMP_TYPE_OTHER', 'SERVER_FLAG_ECE', 'SERVER_FLAG_FIN', 'TCP_WIN_MSS_IN', 'DURATION_IN', 'LONGEST_FLOW_PKT', 'DST_TO_SRC_IAT_AVG', 'ppf_std_std', 'NUM_PKTS_UP_TO_128_BYTES', 'flows_concurrent_max_std', 'L4_DST_PORT_SHELL', 'SHORTEST_FLOW_PKT', 'TCP_WIN_MAX_IN', 'window_active_ratio', 'port_src_uniq_cnt_std', 'L4_SRC_PORT_TLS', 'IN_BPP', 'SERVER_FLAG_CWR', 'MAX_TTL', 'proto_all_or', 'flows_itimes_max_std', 'FLAG_FIN', 'IPV4_SRC_ADDR', 'pkt_total_std', 'L4_DST_PORT_QUERY', 'flag_ack_r

In [30]:
preproc_parallel1 == preproc_parallel16

Unnamed: 0,IN_BYTES,IN_PKTS,TCP_FLAGS,IPV4_SRC_ADDR,IPV6_SRC_ADDR,IPV4_DST_ADDR,IPV6_DST_ADDR,OUT_BYTES,OUT_PKTS,MIN_IP_PKT_LEN,...,L4_DST_PORT_PLAYSTATION,L4_DST_PORT_CHAT,L4_DST_PORT_QUERY,L4_DST_PORT_DYNAMIC,ICMP_TYPE_OTHER,ICMP_TYPE_ECHO_REPLY,ICMP_TYPE_DEST_UNREACHABLE,ICMP_TYPE_REDIRECT,ICMP_TYPE_ECHO_REQUEST,ICMP_TYPE_TIME_EXCEEDED
0,True,True,True,True,True,True,True,True,True,True,...,True,True,True,True,True,True,True,True,True,True
1,True,True,True,True,True,True,True,True,True,True,...,True,True,True,True,True,True,True,True,True,True
2,True,True,True,True,True,True,True,True,True,True,...,True,True,True,True,True,True,True,True,True,True
3,True,True,True,True,True,True,True,True,True,True,...,True,True,True,True,True,True,True,True,True,True
4,True,True,True,True,True,True,True,True,True,True,...,True,True,True,True,True,True,True,True,True,True
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
999994,True,True,True,True,True,True,True,True,True,True,...,True,True,True,True,True,True,True,True,True,True
999995,True,True,True,True,True,True,True,True,True,True,...,True,True,True,True,True,True,True,True,True,True
999996,True,True,True,True,True,True,True,True,True,True,...,True,True,True,True,True,True,True,True,True,True
999997,True,True,True,True,True,True,True,True,True,True,...,True,True,True,True,True,True,True,True,True,True


In [31]:
# Determine which rows are different
diff_rows_idx = (preproc_parallel1 == preproc_parallel16).sum(axis=1) != len(preproc_parallel1.columns)
diff_rows_lst = diff_rows_idx[diff_rows_idx].index
diff_rows_lst[:10]

Int64Index([], dtype='int64')

In [32]:
len(diff_rows_lst) / len(preproc_parallel16)

0.0

Voilla, all versions (serial, parallel with 1 worker and parallel with 16 workers) are equivalent!