In [1]:
import warnings

import scapy.plist

warnings.filterwarnings("ignore")

import logging
from utils.logger_config import LoggerCustom

from typing import List


# Import the necessary libraries to read .pcap files
from scapy.all import *
from scapy.layers.inet import IP
from scapy.layers.inet import TCP
from scapy.layers.inet import UDP


from src.utils.utils import get_prefix

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import fnmatch
import os
import re



In [9]:
logger = LoggerCustom.get_logger("Data Processing", level=logging.INFO)

class DataProcessor():
    def __init__(self):
        pass

    # ----- PUBLIC METHODS ----- #
    def run(self, filepath: str, provider: str = None):
        logger.info("Starting Data processing...")
        files = self.__get_pcap_file_list(filepath, provider=provider)
        for file in files:
            logger.debug(f"Processing file: {file.replace(filepath + '/', '')}")
            # df = self.read_pcap_file(file)
            # logger.info(f"File {file} processed")
            # logger.info(f"Saving file {file} to csv")
            # df.to_csv(f"{file}.csv")
        logger.info(f"Data processing finished, processed {len(files):,} files")


    def read_pcap_file(self, file_path: str) -> pd.DataFrame:
        """
        Read the pcap file and return a dataframe
        """
        packets: scapy.plist.PacketList = rdpcap(file_path)
        df = pd.DataFrame(columns=['src', 'dst', 'sport', 'dport', 'proto', 'size', 'time'])
        for packet in packets:

            if IP in packet:
                src = packet[IP].src
                dst = packet[IP].dst
                proto = packet[IP].proto
                size = packet[IP].len
                time = packet.time

                if TCP in packet:
                    s_port = packet[TCP].sport
                    d_port = packet[TCP].dport
                elif UDP in packet:
                    s_port = packet[UDP].sport
                    d_port = packet[UDP].dport
                else:
                    s_port = 0
                    d_port = 0
                df = df.append({'packet': packet, 'src': src, 'dst': dst, 'sport': s_port, 'dport': d_port, 'proto': proto, 'size': size, 'time': time}, ignore_index=True)
        return df

    # ----- PRIVATE METHODS ----- #
    def __get_pcap_file_list(self, directory: str, provider:str=None) -> List[str]:
        """
        Get the list of files in the given path
        """

        # Default regex is all .pcap files
        file_list: List[str] = []
        regex: str = self.__generate_regex(domain_name=provider)
        logger.info(f"Regex used: {regex}")
        for root, dirs, files in os.walk(directory):
            for file in files:
                if re.match(regex, file):
                    file_list.append(os.path.join(root, file))

        return file_list


    @functools.cache
    def __generate_regex(self, domain_name: str = None):
        # Default regex to get all .pcap files
        subdomain: str = r"\w+"
        domain_name: str = domain_name or r"\w+"
        top_level_domain: str = r"\w+"
        unknown: str = r"[\d]+"
        address: str = r"[\d\.]+"

        return fr"({subdomain}).({domain_name}).{top_level_domain}_({unknown})_({address}).pcap"



dp = DataProcessor()
dp.run(f"{get_prefix(True)}data/PCAP", provider="google")

[INFO] Logger: <Starting Data processing...>
[INFO] Logger: <Regex used: (\w+).(google).\w+_([\d]+)_([\d\.]+).pcap>
[INFO] Logger: <Data processing finished, processed 33 files>
