In [11]:
import pandas as pd
import os
import subprocess
import platform
import glob
import time
import argparse
from pathlib import Path
import time
import psutil
import csv
from concurrent.futures import ProcessPoolExecutor
import logging
import math

In [12]:
max_memory = 0

def print_memory_usage():
    global max_memory
    process = psutil.Process(os.getpid())  # Get the current process
    memory_info = process.memory_info()  # Get memory usage information
    current_memory = memory_info.rss / 1024 ** 2  # Convert memory usage to MB
    if current_memory > max_memory:
        max_memory = current_memory  # Update max memory usage
        print(f"memory_usage: {current_memory:.2f} MB")  # Print current memory usage

In [13]:
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

class PrepPcap:
    def __init__(self, file_path, output_path):
        self.path = Path(file_path)
        self.outdir = Path(output_path)
        self.tsvpath = self.outdir / self.path.with_suffix('.tsv').name
        self.tsv_time = None
        self.parse_type = None
        self._tshark = self._get_tshark_path()
        
        # Prepare the environment
        self.__prep__()

    # Fields to be parsed, defined as a class attribute
    _FIELDS = [
        "-e", "frame.time_epoch", "-e", "frame.len", "-e", "eth.src", "-e", "eth.dst",
        "-e", "ip.src", "-e", "ip.dst", "-e", "ip.len", "-e", "tcp.srcport", "-e", "tcp.dstport",
        "-e", "udp.srcport", "-e", "udp.dstport", "-e", "arp.opcode", "-e", "arp.src.hw_mac",
        "-e", "arp.src.proto_ipv4", "-e", "arp.dst.hw_mac", "-e", "arp.dst.proto_ipv4",
        "-e", "icmp.type", "-e", "icmp.code", "-e", "ipv6.src", "-e", "ipv6.dst"
    ]

    def pcap2tsv_with_tshark(self):
        logging.info('Parsing with tshark...')
        start_time = time.time()
        cmd = [str(self._tshark), "-r", str(self.path), "-T", "fields"] + self._FIELDS + ["-E", "header=y", "-E", "occurrence=f"]

        # Ensure output directory exists
        self.outdir.mkdir(parents=True, exist_ok=True)
        
        # Run tshark command
        try:
            with open(self.tsvpath, 'w') as output_file:
                subprocess.run(cmd, stdout=output_file, stderr=subprocess.PIPE, check=True)
        except subprocess.CalledProcessError as e:
            logging.error("Error occurred while executing tshark command:")
            logging.error(e.stderr.decode())
            raise
        
        end_time = time.time()
        self.tsv_time = end_time - start_time
        logging.info(f"tshark parsing complete. File saved as: {self.tsvpath}")
        
    def _get_tshark_path(self):
        if platform.system() == 'Windows':
            default_path = Path(r'C:\Program Files\Wireshark\tshark.exe')
            if default_path.is_file():
                return default_path
        for path in os.environ['PATH'].split(os.pathsep):
            tshark_path = Path(path) / 'tshark'
            if tshark_path.is_file():
                return tshark_path
        raise FileNotFoundError("tshark not found in PATH. Please ensure it is installed.")

    def __prep__(self):
        if not self.path.is_file():
            raise FileNotFoundError(f"File: {self.path} does not exist")
        
        file_type = self.path.suffix.lower()
        if file_type == '.tsv':
            self.parse_type = "tsv"
        elif file_type in {'.pcap', '.pcapng'}:
            if not self.tsvpath.is_file():
                self.pcap2tsv_with_tshark()
            else:
                logging.info(f"{self.tsvpath} already exists!")
            self.parse_type = "tsv"
        else:
            raise ValueError(f"File: {self.path} is not a tsv or pcap file")

In [14]:
def save_to_csv(df, output_file, verbose=True):
    # Ensure the output directory exists
    os.makedirs(os.path.dirname(output_file), exist_ok=True)
    
    try:
        df.to_csv(output_file, index=False)
        if verbose:
            logging.info(f"Result saved to {output_file}")
    except Exception as e:
        logging.error(f"Error saving file {output_file}: {type(e).__name__} - {e}")

In [15]:
def get_subdirectories(directory, recursive):
    """Retrieve subdirectories based on recursion flag."""
    if recursive:
        return [os.path.join(directory, d) for d in os.listdir(directory) if os.path.isdir(os.path.join(directory, d))]
    return [directory]

In [16]:
def extract_features(tsv_file, output_file='output.csv', save_interval=100000, chunksize=50000):
    start_time = time.time()
    print("Reading file")

    # Initialize flow_stats dictionary and list for temporary storage
    flow_stats = {}
    flow_stats_list = []

    # Open output file for writing
    with open(output_file, mode='w', newline='') as f:
        writer = None

        # Read file in chunks
        for chunk in pd.read_csv(tsv_file, sep='\t', low_memory=False, chunksize=chunksize):
            for i, row in chunk.iterrows():
                frame_time = row['frame.time_epoch']
                src_ip, dst_ip = '', ''

                # Identify IP and protocol information
                if pd.notna(row.get('ip.src')):
                    src_ip, dst_ip = row['ip.src'], row['ip.dst']
                elif pd.notna(row.get('ipv6.src')):
                    src_ip, dst_ip = row['ipv6.src'], row['ipv6.dst']
                elif pd.notna(row.get('arp.src.proto_ipv4')):
                    src_ip, dst_ip = row['arp.src.proto_ipv4'], row['arp.dst.proto_ipv4']
                    #protocol = 'ARP'

                src_eth, dst_eth = row['eth.src'], row['eth.dst']

                # Determine protocol type if not ARP
                flow_key = (src_ip, dst_ip, src_eth, dst_eth) #####different from transformer
                

                ##########################################################################################
                # Initialize or update flow statistics
                if flow_key not in flow_stats:
                    flow_stats[flow_key] = {
                        'Src_ip': src_ip,  #index
                        'Dst_ip': dst_ip,
                        'Src_eth': src_eth, 
                        'Dst_eth': dst_eth,
                        'frame_len':0,
                        'TCP':0,
                        'UDP':0,
                        'ICMP':0,
                        'Total': 0,
                        'last_time': frame_time, 
                        'Frequency':0
                    }

                flow_stats[flow_key]['frame_len'] = row['frame.len']
                flow_stats[flow_key]['TCP'] = row['tcp.dstport']
                flow_stats[flow_key]['UDP'] = row['udp.dstport']
                flow_stats[flow_key]['ICMP'] = row['icmp.type']
                #flow_stats[flow_key]['ARP'] = row['arp.opcode']
                #flow_stats[flow_key][protocol] += 1


                # Calculate Frequency for the current flow
                time_interval = float(frame_time) - float(flow_stats[flow_key]['last_time'])
                frequency = flow_stats[flow_key]['Total'] / time_interval if time_interval > 0 else 1
                Lambda=2
                factor = math.pow(2, (-Lambda * time_interval))
                flow_stats[flow_key]['Total'] = flow_stats[flow_key]['Total']*factor+1
                
                flow_stats[flow_key]['Frequency'] = frequency
                flow_stats[flow_key]['last_time'] = frame_time
                ##########################################################################################


                # Create a snapshot of the current flow stats
                current_stats = flow_stats[flow_key].copy()
                flow_stats_list.append(current_stats)

                # Write batch to file every save_interval rows
                if len(flow_stats_list) >= save_interval:
                    df_batch = pd.DataFrame(flow_stats_list)
                    flow_stats_list.clear()  # Clear the temporary storage

                    # Only write to CSV if the batch is not empty
                    if not df_batch.empty:
                        if writer is None:
                            # Initialize DictWriter and write header once
                            writer = csv.DictWriter(f, fieldnames=df_batch.columns)
                            writer.writeheader()
                        writer.writerows(df_batch.to_dict(orient='records'))
                        print_memory_usage()

        # Write any remaining data
        if flow_stats_list:
            df_batch = pd.DataFrame(flow_stats_list)
            if writer is None:
                writer = csv.DictWriter(f, fieldnames=df_batch.columns)
                writer.writeheader()
            writer.writerows(df_batch.to_dict(orient='records'))
    
    end_time = time.time()
    analysis_time = end_time - start_time
    print(f"Data saved to {output_file}. Processed in {analysis_time:.2f} seconds.")
    
    return analysis_time

In [17]:
def process_chunk(chunk, prev_chunk, next_chunk, n,select_n=5):
    data = pd.concat([prev_chunk, chunk, next_chunk]).reset_index(drop=True)
    data_len = len(data)
    chunk_len = len(chunk)


    cols = data.columns
    new_columns = (
        [f"backward_{i+1}_{col}" for i in range(select_n) for col in cols] +
        [f"current_{col}" for col in cols] +
        [f"forward_{i+1}_{col}" for i in range(select_n) for col in cols]
    )

    context_data = []
    m = len(prev_chunk)

    for index in range(m, m + chunk_len):
        row = data.iloc[index]
        current_src_eth = row['Src_eth']

        backward_context = [dict.fromkeys(data.columns, None) for _ in range(select_n)]
        forward_context = [dict.fromkeys(data.columns, None) for _ in range(select_n)]

        back_count = 0
        for i in range(1, n + 1):
            if index - i < 0:
                break
            if data.iloc[index - i]['Src_eth'] == current_src_eth:
                backward_context[4 - back_count] = data.iloc[index - i].to_dict()
                back_count += 1
                if back_count >= select_n:
                    break

        forward_count = 0
        for i in range(1, n + 1):
            if index + i >= data_len:
                break
            if data.iloc[index + i]['Src_eth'] == current_src_eth:
                forward_context[forward_count] = data.iloc[index + i].to_dict()
                forward_count += 1
                if forward_count >= select_n:
                    break

        combined_context = []
        for b_context in backward_context:
            combined_context.extend(b_context.values())
        combined_context.extend(row.values)
        for f_context in forward_context:
            combined_context.extend(f_context.values())

        context_data.append(combined_context)

    return pd.DataFrame(context_data, columns=new_columns)

def save_to_csv_partial(df, output_file, mode='a'):
    if os.path.exists(output_file):
        df.to_csv(output_file, mode=mode, header=False , index=False)
    else:
        df.to_csv(output_file, mode='w', header=True , index=False)


def extract_context_parallel(file_path, output_file, chunk_size=100000, n=10):
    if os.path.exists(output_file):
        os.remove(output_file)
        print(f"Delete {output_file}")

    prev_chunk = pd.DataFrame()  
    with pd.read_csv(file_path, chunksize=chunk_size) as reader:
        with ProcessPoolExecutor() as executor:
            futures = []
            current_chunk = next(reader, None)  
            
            while current_chunk is not None:
                try:
                    next_chunk = next(reader, None) 
                except StopIteration:
                    next_chunk = None 

                if next_chunk is not None:
                    next_chunk_context = next_chunk.iloc[:n]
                else:
                    next_chunk_context = pd.DataFrame() 

                futures.append(executor.submit(process_chunk, current_chunk, prev_chunk, next_chunk_context, n))

                prev_chunk = current_chunk.tail(n)
                current_chunk = next_chunk  


            for future in futures:
                while not future.done(): 
                    time.sleep(0.01)  
                df_chunk = future.result()  
                save_to_csv_partial(df_chunk, output_file, mode='a')
                del df_chunk  

In [18]:
def process_pcap_files(pcap_files, outdir, debug):
    """Process each PCAP file in the list."""
    if not debug:
        for pcap_path in pcap_files:
            print(f"\nStart {pcap_path}")
            PP = PrepPcap(pcap_path, outdir)
            print_memory_usage()
            name = os.path.splitext(os.path.basename(pcap_path))[0]
            feature_csv = os.path.join(outdir, f'{name}_feature.csv')
            context_csv = os.path.join(outdir, f'{name}_context.csv')
            PP.analysis_time = extract_features(PP.tsvpath,feature_csv)
            print("***************")
            extract_context_parallel(feature_csv,context_csv,100000,10)
            print_memory_usage()
            print(f'Process {pcap_path} finished!') 
    else:
        print("Debug mode enabled. Skipping detailed processing.")
        s=time.time()
        for pcap_path in pcap_files:
            print(f"\nStart {pcap_path}")
            PP = PrepPcap(pcap_path, outdir)
            print_memory_usage()
            name = os.path.splitext(os.path.basename(pcap_path))[0]
            feature_csv = os.path.join(outdir, f'{name}_feature.csv')
            context_csv = os.path.join(outdir, f'{name}_context.csv')
            #PP.analysis_time = extract_features(PP.tsvpath,feature_csv)
            print("***************")
            extract_context_parallel(feature_csv,context_csv,100000,10)
            print_memory_usage()
        e=time.time()
        print(e-s)      

In [19]:
def parser():
    parser = argparse.ArgumentParser(description="Process PCAP files in a directory.")
    
    # Main directory argument
    parser.add_argument(
        '-d', '--directory', 
        required=True, 
        help="Specify the directory containing PCAP files."
    )
    
    # Analysis options
    parser.add_argument(
        '-a', '--analysis', 
        action='store_true', 
        help="Generate a summary analysis of the PCAP files."
    )
    parser.add_argument(
        '-r', '--recursive', 
        action='store_true', 
        help="Recursively traverse subdirectories for PCAP files."
    )
    
    # Debugging options
    parser.add_argument(
        '-b', '--debug', 
        action='store_true', 
        help="Enable debug mode for additional output."
    )

    # Parse the arguments from the command line
    #args = parser.parse_args()  
    args = parser.parse_args(['-d', 'Data/kitsune','-a','-b'])  
    return args

In [20]:
def main():
    args = parser()
    directory = args.directory
    recursive = args.recursive
    debug = args.debug

    # Get subdirectories based on recursive flag
    subdirectories = get_subdirectories(directory, recursive)

    for subdir in subdirectories:
        pcap_files = glob.glob(os.path.join(subdir, '*.pcap*'))
        outdir = os.path.join(subdir, 'output')
        
        # If no PCAP files are found, check for TSV files as a fallback
        if not pcap_files:
            pcap_files = glob.glob(os.path.join(subdir, '*.tsv'))
            if not pcap_files:
                print(f"Directory '{subdir}' has no files!\n")
                continue
        
        # Ensure output directory exists
        if not os.path.exists(outdir):
            os.makedirs(outdir)
            print(f"Directory '{outdir}' created.")
        
        print(f"Starting analysis for directory '{subdir}'")
        
        # Process each file based on debug flag
        process_pcap_files(pcap_files, outdir, debug)

if __name__ == "__main__":
    main()

2024-11-19 16:49:45,744 - INFO - Data/kitsune/output/Video_Injection_pcap.tsv already exists!


Starting analysis for directory 'Data/kitsune'
Debug mode enabled. Skipping detailed processing.

Start Data/kitsune/Video_Injection_pcap.pcapng
memory_usage: 259.89 MB
***************


2024-11-19 16:54:30,797 - INFO - Data/kitsune/output/OS_Scan_pcap.tsv already exists!


memory_usage: 320.86 MB

Start Data/kitsune/OS_Scan_pcap.pcapng
***************


2024-11-19 16:58:21,888 - INFO - Data/kitsune/output/Fuzzing_pcap.tsv already exists!


memory_usage: 336.39 MB

Start Data/kitsune/Fuzzing_pcap.pcapng
***************


2024-11-19 17:03:00,100 - INFO - Data/kitsune/output/Active_Wiretap_pcap.tsv already exists!



Start Data/kitsune/Active_Wiretap_pcap.pcapng
***************


2024-11-19 17:07:40,767 - INFO - Data/kitsune/output/SSDP_Flood_pcap.tsv already exists!


memory_usage: 358.27 MB

Start Data/kitsune/SSDP_Flood_pcap.pcap
***************


2024-11-19 17:14:52,671 - INFO - Data/kitsune/output/ARP_MitM_pcap.tsv already exists!


memory_usage: 533.01 MB

Start Data/kitsune/ARP_MitM_pcap.pcapng
***************


2024-11-19 17:19:58,648 - INFO - Data/kitsune/output/SSL_Renegotiation_pcap.tsv already exists!



Start Data/kitsune/SSL_Renegotiation_pcap.pcap
***************


2024-11-19 17:24:22,760 - INFO - Data/kitsune/output/Mirai_pcap.tsv already exists!



Start Data/kitsune/Mirai_pcap.pcap
***************


2024-11-19 17:26:59,717 - INFO - Data/kitsune/output/SYN_DoS_pcap.tsv already exists!



Start Data/kitsune/SYN_DoS_pcap.pcap
***************
2531.4536652565002
