In [1]:
import time
import multiprocessing
from scapy.utils import PcapReader, rdpcap
from scapy.layers.l2 import Ether
from scapy.layers.inet import IP
from scapy.config import conf

conf.l2types.register(1, Ether)

In [2]:
def process_packet(pkt, index):
    """
    Обработка пакета: проверка IP-слоя и извлечение 5-tuple.
    """
    if not pkt.haslayer(IP):
        return None

    try:
        ip_layer = pkt[IP]
        src = ip_layer.src
        dst = ip_layer.dst
        proto = ip_layer.proto
        sport = pkt.sport if hasattr(pkt, "sport") else None
        dport = pkt.dport if hasattr(pkt, "dport") else None
        flow_key = (src, dst, sport, dport, proto)
        return index, flow_key
    except Exception as e:
        return None

In [3]:
# Ленивый подход
def read_pcap_lazy(file_path):
    """
    Эффективное чтение pcap-файла с помощью генератора.
    """
    with PcapReader(file_path) as pcap_reader:
        for packet in pcap_reader:
            yield packet

In [4]:
def lazy_processing(file_path, output_file):
    """
    Обработка пакетов с помощью ленивого подхода.
    """
    with open(output_file, "w") as f:
        for index, pkt in enumerate(read_pcap_lazy(file_path)):
            result = process_packet(pkt, index)
            if result:
                f.write(f"{result[0]}: {result[1]}\n")
    print(f"Lazy processing completed. Results saved to {output_file}")

In [5]:
# Обработка с использованием rdpcap
def rdpcap_processing(file_path, output_file):
    """
    Обработка пакетов с использованием scapy.rdpcap (чтение всех пакетов сразу) и tqdm.
    """
    packets = rdpcap(file_path)
    with open(output_file, "w") as f:
        for index, pkt in enumerate(packets):
            result = process_packet(pkt, index)
            if result:
                f.write(f"{result[0]}: {result[1]}\n")
    print(f"Rdpcap processing completed. Results saved to {output_file}")

In [6]:
# Параллельный подход
def worker(input_queue, output_queue):
    """
    Рабочий процесс для параллельной обработки пакетов.
    """
    while True:
        task = input_queue.get()
        if task is None:  # Сигнал завершения
            break
        index, pkt = task
        result = process_packet(pkt, index)
        if result:
            output_queue.put(result)


def parallel_processing(file_path, output_file, num_workers=4):
    """
    Обработка пакетов с использованием multiprocessing.
    """
    input_queue = multiprocessing.Queue(maxsize=1000)
    output_queue = multiprocessing.Queue()

    # Запуск рабочих процессов
    workers = []
    for _ in range(num_workers):
        p = multiprocessing.Process(target=worker, args=(input_queue, output_queue))
        p.start()
        workers.append(p)

    # Чтение пакетов и передача в input_queue
    with PcapReader(file_path) as pcap_reader:
        for index, pkt in enumerate(pcap_reader):
            input_queue.put((index, pkt))

    # Отправка сигналов завершения
    for _ in range(num_workers):
        input_queue.put(None)

    # Сохранение результатов
    with open(output_file, "w") as f:
        processed_count = 0
        while processed_count < index + 1:
            result = output_queue.get()
            f.write(f"{result[0]}: {result[1]}\n")
            processed_count += 1
    print(f"Multiprocess complete and results saved to {output_file}")

    # Ожидание завершения рабочих процессов
    for p in workers:
        p.join()

In [7]:
# Сравнение времени
if __name__ == "__main__":
    pcap_file = "/run/media/nemo/Projects/GitVerse/SIMON/pcaps/2.pcap"
    lazy_output = "lazy_output.txt"
    parallel_output = "parallel_output.txt"
    rdpcap_output = "rdpcap_output.txt"

    # Ленивый подход
    start_time = time.time()
    lazy_processing(pcap_file, lazy_output)
    lazy_time = time.time() - start_time
    print(f"Lazy processing time: {lazy_time:.2f} seconds")

    # Обработка с использованием rdpcap
    start_time = time.time()
    rdpcap_processing(pcap_file, rdpcap_output)
    rdpcap_time = time.time() - start_time
    print(f"Rdpcap processing time: {rdpcap_time:.2f} seconds")

    # Параллельный подход
    start_time = time.time()
    # parallel_processing(pcap_file, parallel_output, num_workers=4)
    parallel_time = time.time() - start_time
    print(f"Parallel processing time: {parallel_time:.2f} seconds")

Lazy processing completed. Results saved to lazy_output.txt
Lazy processing time: 9.43 seconds
Rdpcap processing completed. Results saved to rdpcap_output.txt
Rdpcap processing time: 10.40 seconds
Parallel processing time: 0.00 seconds
