## Produce Partial Flows based on Flow Duration

In [1]:
CSV_DIR = "datasets"
INPUT_DIR = "PCAP/deduplicated_reordered"
DAY = "wednesday"

In [2]:
import os
import sys
from datetime import timedelta
import time
import logging
from nfstream import NFPlugin, NFStreamer
import nfstream
# from labeller import cicids2017
import hashlib


# set up logging
def setup_logging(log_filename="generate-n-fd-flows.log"):
    with open(log_filename, "w"):  # Use 'w' to clear the existing log file, if it exists
        pass  # Do nothing, just open and close to clear the file

    logging.basicConfig(
        level=logging.INFO,
        format="%(message)s",
        handlers=[logging.FileHandler(log_filename), logging.StreamHandler(sys.stdout)],
    )

def consistent_hash(value):
    # This function converts a value into a consistent hash.
    return hashlib.sha256(value.encode()).hexdigest()
    

class PayloadManager(NFPlugin):
    """Manages the payload data for network flows."""

    def on_init(self, packet, flow):
        # Initialize payload sizes based on the packet direction.
        flow.udps.src2dst_payload = packet.payload_size if packet.direction == 0 else 0
        flow.udps.dst2src_payload = packet.payload_size if packet.direction == 1 else 0

    def on_update(self, packet, flow):
        # Update payload sizes based on the packet direction.
        flow.udps.src2dst_payload += packet.payload_size if packet.direction == 0 else 0
        flow.udps.dst2src_payload += packet.payload_size if packet.direction == 1 else 0


class FlowExpirationManager(NFPlugin):
    """Manages the expiration policy for TCP flows."""

    def on_init(self, packet, flow):
        # Set the expiration ID based on TCP rst or fin flags.
        if packet.rst or packet.fin:
            flow.expiration_id = -1

    def on_update(self, packet, flow):
        # Update expiration policy based on TCP rst or fin flags.
        if packet.rst or packet.fin:
            flow.expiration_id = -1


class FlowLabelManager(NFPlugin):
    """Labels flows upon expiration."""

    def __init__(self, day):
        self.day = day

    def on_expire(self, flow):
        # Assign a label to the flow and clean up payloads.
        flow.udps.label = cicids2017(
            self.day, flow, label_reverse=True, signal_reverse=False
        )
        self.cleanup_payload(flow)

    @staticmethod
    def cleanup_payload(flow):
        # Clean up payload data from the flow.
        if hasattr(flow.udps, "src2dst_payload"):
            del flow.udps.src2dst_payload
        if hasattr(flow.udps, "dst2src_payload"):
            del flow.udps.dst2src_payload


class PacketCountManager(NFPlugin):
    """Expire flows on specific packet count."""
    
    def __init__(self, max_packets):
        self.max_packets = max_packets

    def on_update(self, packet, flow):
        # Check for expiration
        if flow.bidirectional_packets == self.max_packets:
            flow.expiration_id = -1  # Mark for expiration in NFStream

class FlowDurationManager(NFPlugin):
    """Expire flows after a specific duration."""

    def __init__(self, max_duration_ms):
        self.max_duration_ms = max_duration_ms

    def on_update(self, packet, flow):        
        # Check for flow expiration based on duration
        if flow.bidirectional_duration_ms >= self.max_duration_ms:
            flow.expiration_id = -1  # Mark for expiration in NFStream

class HashManager(NFPlugin):
    """Calculate forward and backward hashes."""
    
    def on_init(self, packet, flow):
        # Initialize packet count and compute initial hashes
        flow.udps.flow_key_hash = consistent_hash(f"{packet.src_ip}-{packet.src_port}-{packet.dst_ip}-{packet.dst_port}-{packet.protocol}-{flow.bidirectional_first_seen_ms}")
        

def process_files_in_directory(input_dir: str, day: str, output_dir: str, Ns: list):
    """Process all PCAP files in a directory and output to another directory."""

    BPF = "ip and (ip proto \\tcp or \\udp)"  # only ipv4 tcp and udp traffic to capture

    for n in Ns:
        input_file = os.path.join(input_dir, f"rd{day.capitalize()}.pcap")
        if os.path.isfile(input_file):
            output_file = os.path.join(output_dir, f"{day}_fd_{n}.csv")

            logging.info(f"----- FD={n}ms -----")
            # logging.info(f"Processing {input_file} into {output_file}")

            start = time.time()

            streamer = NFStreamer(
                  source=input_file
                , decode_tunnels=False                                # Default: True
                , bpf_filter=BPF                                      # Default: None
                , promiscuous_mode=True                               # Default: True
                , snapshot_length=1536                                # Default: 1536
                , idle_timeout=60                                     # Default: 120
                , active_timeout=18000                                # Default: 1800
                , accounting_mode=1                                   # Default: 0
                , udps=[                                              # Default: None
                    FlowExpirationManager(),
                    # PayloadManager(),
                    # FlowLabelManager(day.capitalize()),
                    HashManager(),
                    # PacketCountManager(n),
                    FlowDurationManager(n)
                ]      
                , n_dissections=0                                     # Default: 20
                , statistical_analysis=True                           # Default: False
                , splt_analysis=20                                     # Default: 0
                , n_meters=1                                          # Default: 0
                , performance_report=0                                # Default: 0
            )

            # Convert the stream to a DataFrame
            df = streamer.to_pandas(columns_to_anonymize=[])
            logging.info(f"NFStream generated flows: {len(df)}")

            end = time.time()
            processing_time = end - start
            delta = timedelta(seconds=processing_time)
            # logging.info(f"Time required to generate flows: {str(delta)}")

            # Define parameters for filtering
            max_duration_ms = n
            tolerance = 0.20

            # Keep rows where 'bidirectional_duration' is within the defined range
            df_filtered = df[(df['bidirectional_duration_ms'] >= (max_duration_ms * (1 - tolerance))) & 
                             (df['bidirectional_duration_ms'] <= (max_duration_ms * (1 + tolerance)))]
            logging.info(f"Number of flows with duration around {max_duration_ms/1000} seconds (±{tolerance*100}%): {len(df_filtered)}")
            df = df_filtered

            # Save the filtered DataFrame to a CSV file
            df.rename(columns={
                               # "udps.label": "label",
                               "udps.flow_key_hash": "flow_key_hash"
                               }, inplace=True)
            df.to_csv(output_file, index=False)

            logging.info(f"Flows stored as: {day}_fd_{n}.csv")
            logging.info(f"\n")


if __name__ == "__main__":
    input_dir = INPUT_DIR
    output_dir = CSV_DIR
    day = DAY
    Ns = [5, 10, 50, 100, 150, 300, 500, 1000, 5000, 10000, 15000, 20000]
    
    # Ensure output directory exists
    os.makedirs(output_dir, exist_ok=True)

    setup_logging()

    logging.info(f"Generating flows with NFStream: {nfstream.__version__}")
    logging.info(f"\n")

    process_files_in_directory(input_dir, day, output_dir, Ns)

Generating flows with NFStream: 6.5.4a


----- FD=5ms -----
NFStream generated flows: 2568515
Number of flows with duration around 0.005 seconds (±20.0%): 225783
Flows stored as: wednesday_fd_5.csv


----- FD=10ms -----
NFStream generated flows: 2418505
Number of flows with duration around 0.01 seconds (±20.0%): 226310
Flows stored as: wednesday_fd_10.csv


----- FD=50ms -----
NFStream generated flows: 1982856
Number of flows with duration around 0.05 seconds (±20.0%): 149071
Flows stored as: wednesday_fd_50.csv


----- FD=100ms -----
NFStream generated flows: 1849162
Number of flows with duration around 0.1 seconds (±20.0%): 110163
Flows stored as: wednesday_fd_100.csv


----- FD=150ms -----
NFStream generated flows: 1731757
Number of flows with duration around 0.15 seconds (±20.0%): 165271
Flows stored as: wednesday_fd_150.csv


----- FD=300ms -----
NFStream generated flows: 1613325
Number of flows with duration around 0.3 seconds (±20.0%): 58211
Flows stored as: wednesday_fd_300.csv