# Description
process et-bert data for fine-tuning
label.pcap -> train/validation/test.tsv

In [1]:
import os
import logging
import scapy.all as scapy
import random
import binascii
import csv
import numpy as np
import pandas as pd
from multiprocessing import Pool, cpu_count

os.chdir('LLM4Traffic/tool/Data-Process')

logging.basicConfig(       
    level=logging.INFO,            
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',  
    handlers=[
        logging.FileHandler('logs/et-bert_process_pkt.log', mode='w'),  
        logging.StreamHandler()          
    ],
    force=True
)

logger = logging.getLogger()

FileNotFoundError: [Errno 2] No such file or directory: 'LLM4Traffic/tool/Data-Process'

In [29]:
def cut(obj, sec):
    result = [obj[i:i+sec] for i in range(0,len(obj),sec)]
    try:
        remanent_count = len(result[0])%4
    except Exception as e:
        remanent_count = 0
        print("cut datagram error!")
    if remanent_count == 0:
        pass
    else:
        result = [obj[i:i+sec+remanent_count] for i in range(0,len(obj),sec+remanent_count)]
    return result

def bigram_generation(packet_datagram, packet_len = 64, flag=True):
    result = ''
    generated_datagram = cut(packet_datagram,1)
    token_count = 0
    for sub_string_index in range(len(generated_datagram)):
        if sub_string_index != (len(generated_datagram) - 1):
            token_count += 1
            if token_count > packet_len:
                break
            else:
                merge_word_bigram = generated_datagram[sub_string_index] + generated_datagram[sub_string_index + 1]
        else:
            break
        result += merge_word_bigram
        result += ' '
    
    return result

In [46]:
dataset = 'tls'
level = 'polished'

dataset_path = f'LLM4Traffic/pipeline/{level}/{dataset}'
output_path = f'LLM4Traffic/code/ET-BERT/data_{level}/{dataset}'
os.makedirs(output_path, exist_ok=True)

In [35]:
# just for packet-level
# type: train, test, val
# file: pcap file
# this is for generating dataset for ET-BERT with pcap and parquet files

def clean_packet(packet):
    if packet.haslayer(scapy.Ether):
        packet = packet[scapy.Ether].payload

    if packet.haslayer(scapy.IP):
        packet = packet[scapy.IP].payload
    elif packet.haslayer('IPv6'):
        packet = packet['IPv6'].payload

    if packet.haslayer(scapy.UDP):
        packet[scapy.UDP].sport = 0  
        packet[scapy.UDP].dport = 0  # 
    elif packet.haslayer(scapy.TCP):
        packet[scapy.TCP].sport = 0  # 
        packet[scapy.TCP].dport = 0  #
    
    return packet

def get_feature_packet(packet, payload_length):
    packet_data_string = ''
    packet_data = packet.copy()
    packet_string = (binascii.hexlify(bytes(packet_data))).decode()[8:]  # remove eth header, ip header and port
    packet_data_string += bigram_generation(packet_string, packet_len=payload_length, flag=True)
    return packet_data_string

def save_to_tsv(dataset_file, output_path, type):
    with open(f"{output_path}/{type}.tsv", 'w', newline='') as f:
        tsv_w = csv.writer(f, delimiter='\t')
        tsv_w.writerows(dataset_file)

def process_file(path, class_name, payload_length):
    dataset_file = [["label", "text_a"]]
    dataset_numpy = []
    dataset_label = []

    pkts = scapy.PcapReader(f"{dataset_path}/{path}.pcap")

    for id, pkt in enumerate(pkts):
        pkt = clean_packet(pkt)
        feature_packet = get_feature_packet(pkt, payload_length)
        dataset_file.append([int(class_name), feature_packet])
        dataset_numpy.append(feature_packet)
        dataset_label.append(class_name)

    # logger.info(f"Finish processing, the length of dataset is {len(dataset_numpy)}")

    return dataset_file, dataset_numpy, dataset_label

def generate_dataset(dataset_path, output_path, payload_length):
    for type in os.listdir(f'{dataset_path}'):
        logger.info(f"Start processing {dataset_path}/{type}")

        if type == 'test':
            dataset_file_list, dataset_numpy_list, dataset_label_list = [], [], []
            id = 0
            for file_name in os.listdir(f'{dataset_path}/{type}'):
                logger.info(f"Start processing {dataset_path}/{type}/{file_name}")

                if 'pcap' in file_name:
                    dataset_file, dataset_numpy, dataset_label = process_file(f"{type}/{file_name[:-5]}", id, payload_length)

                    if id == 0:
                        dataset_file_list.extend(dataset_file)
                    else:
                        dataset_file_list.extend(dataset_file[1:])

                    dataset_numpy_list.extend(dataset_numpy)
                    dataset_label_list.extend(dataset_label)

                    id += 1

            print(id)
            save_to_tsv(dataset_file_list, output_path, type)
            np.save(f"{output_path}/x_payload_{type}.npy", dataset_numpy_list)
            np.save(f"{output_path}/y_label_{type}.npy", dataset_label_list)
        else:
            for folder in os.listdir(f'{dataset_path}/{type}'):
                logger.info(f"Start processing {dataset_path}/{type}/{folder}")

                dataset_file_list, dataset_numpy_list, dataset_label_list = [], [], []
                id = 0
                for file_name in os.listdir(f'{dataset_path}/{type}/{folder}'):
                    logger.info(f"Start processing {dataset_path}/{type}/{folder}/{file_name}")

                    if 'pcap' in file_name:
                        dataset_file, dataset_numpy, dataset_label = process_file(f"{type}/{folder}/{file_name[:-5]}", id, payload_length)

                        if id == 0:
                            dataset_file_list.extend(dataset_file)
                        else:
                            dataset_file_list.extend(dataset_file[1:])

                        dataset_numpy_list.extend(dataset_numpy)
                        dataset_label_list.extend(dataset_label)

                        id += 1
                print(id)
                save_to_tsv(dataset_file_list, f"{output_path}/{type}", folder)
                np.save(f"{output_path}/{type}/x_payload_{folder}.npy", dataset_numpy_list)
                np.save(f"{output_path}/{type}/y_label_{folder}.npy", dataset_label_list)
    

In [47]:

# dataset/type(train, test, validation)/.pcap
generate_dataset(dataset_path, output_path, payload_length = 128)

logger.info(f'Finish')
# main(dataset_path='your_dataset_path', type='your_type', output_path='your_output_path', payload_length=100)