#### VPC Flow Logs De-Duplicator: Cut Flow Log Cost Down to Size.
This notebook ingests VPC flow logs from a folder into a dataframe and de-duplicates them. VPC flow logs resemble netflow in that each flow has source and destination IPs and ports and a protocol. It tends to compress even better than netflow because a given TCP session will produce many vpc flow logs records for the session where only the byte and packet counts are different.

These can be de-duplicated into one record with a sum of bytes and packets and a start / end timestamp. With compression ratios as high as 30x, the cost of storage and processing can be massively reduced, making vpc flow logs more palatable to organizations where the processing and storage cost deters collection and analysis of this data. In addition, this can be done in the VPC where the flow logs reside, eliminating the cost associated with exporting them into a third party SaaS tool. 

In [None]:
pd.set_option('display.max_rows', 10)
pd.set_option('display.max_columns', None)
pd.set_option('display.max_colwidth', 50)
pd.set_option('display.expand_frame_repr', False)

In [None]:
# Define the folder name to ingest vpc flow logs

root_dir = r"2024"


dataframes = []


for subdir, dirs, files in os.walk(root_dir):
    for file in files:
        
        if file.endswith(".log"):
            file_path = os.path.join(subdir, file)
            
            df = pd.read_csv(file_path, delimiter=' ')  
            dataframes.append(df)


vflows = pd.concat(dataframes, ignore_index=True)

In [30]:
# Optionally import flows from a csv file 


In [32]:
# Process duplicate flow

duplicate_rows = vflows.duplicated(
    subset=['account-id', 'interface-id', 'srcaddr', 'dstaddr', 'srcport', 'dstport', 'protocol', 'action'], 
    keep=False
)
duplicates = vflows[duplicate_rows]

duplicate_counts = duplicates.groupby(
    ['account-id', 'interface-id', 'srcaddr', 'dstaddr', 'dstport', 'srcport', 'protocol', 'action']
).agg(
    count=('account-id', 'size'),       
    total_bytes=('bytes', 'sum'),      
    total_packets=('packets', 'sum')   
).reset_index()

duplicate_counts = duplicate_counts.sort_values(by='count', ascending=True)


In [34]:
# Report output stats
if duplicate_counts.shape[0] > 0:  
    ratio = vflows.shape[0] / duplicate_counts.shape[0]
    print(f"The raw flow logs contained {vflows.shape[0]:,} records.")
    print(f"They deduped down to: {duplicate_counts.shape[0]:,} records.")
    print(f"The compression ratio of raw to de-duplicated records is {ratio:,.2f}!")
else:
    print("No deduplicated records to calculate a ratio.")

size_raw = vflows.memory_usage(deep=True).sum()
size_deduped = duplicate_counts.memory_usage(deep=True).sum()

size_difference = size_raw - size_deduped
single_row_counts = vflows.groupby(
    ['account-id', 'interface-id', 'srcaddr', 'dstaddr', 'dstport', 'srcport', 'protocol', 'action']
).size().reset_index(name='row_count')

print(f"The size of the raw DataFrame is {size_raw:,} bytes.")
print(f"The size of the de-duplicated DataFrame is {size_deduped:,} bytes.")
print(f"The memory saved is {size_difference:,} bytes!")
print("Long live the fighters!")

The raw flow logs contained 10,692,493 records.
They deduped down to: 382,608 records.
The compression ratio of raw to de-duplicated records is 27.95!
The size of the raw DataFrame is 7,170,529,574 bytes.
The size of the de-duplicated DataFrame is 239,448,265 bytes.
The memory saved is 6,931,081,309 bytes!
Long live the fighters!


In [46]:
# Count runts consisting of a single flow by action
# With Internet facing IPs, there will tend to be many single flow connections 
# due to scanning and enumeration activity. These are often low impact.
single_row_counts = vflows.groupby(
    ['account-id', 'interface-id', 'srcaddr', 'dstaddr', 'dstport', 'srcport', 'protocol', 'action']
).size().reset_index(name='row_count')

single_row_groups = single_row_counts[single_row_counts['row_count'] == 1]
action_counts = single_row_groups['action'].value_counts().reset_index()
action_counts.columns = ['action', 'count']
action_counts['count'] = action_counts['count'].replace(',', '', regex=True).astype(int)
action_counts['count'] = action_counts['count'].apply(lambda x: f"{x:,}")

total_excluded = single_row_groups.shape[0]
print(f"{total_excluded:,} single flow runts were excluded")
print("These break down as follows:")
print(action_counts)

8,761,159 single flow runts were excluded
These break down as follows:
   action      count
0  REJECT  6,233,838
1  ACCEPT  2,527,321


In [None]:
# Cast the fields post-processing to save time
duplicate_counts['bytes'] = pd.to_numeric(duplicate_counts['bytes'], errors='coerce')
duplicate_counts['packets'] = pd.to_numeric(duplicate_counts['packets'], errors='coerce')
duplicate_counts['start'] = pd.to_datetime(duplicate_counts['start'], unit='s')
duplicate_counts['end'] = pd.to_datetime(duplicate_counts['end'], unit='s')

In [None]:
# optionally output to csv for sharing
ddf.to_csv('ddf.csv', index=False)

In [None]:
# Optionally output to excel for sharing
duplicate_counts.to_excel('dedup.xlsx', index=False)