CSV

In [None]:
import pandas as pd

file_path = "/home/tommy/Project/PcodeBERT/dataset/csv/base_dataset_filtered.csv"


with open(file_path, "r") as f:
    df = pd.read_csv(f)

#only save family is benign, mirai, gafgyt, tsunami
df = df[df["family"].isin(["benign", "mirai", "gafgyt", "tsunami"])]
df = df.reset_index(drop=True)
df.to_csv("/home/tommy/Project/PcodeBERT/dataset/csv/base_dataset_filtered_v2.csv", index=False)


Load Data

In [None]:
import re

#Regex pattern preprocessing
#1)  opcode_pattern: Extract P-Code
#2)  opcode_pattern: Extract Calculation
OPCODE_PAT = re.compile(r"(?:\)\s+|---\s+)([A-Z_]+)")
OPERAND_PAT = re.compile(r"\(([^ ,]+)\s*,\s*[^,]*,\s*([0-9]+)\)")

def _map_operand(op_type: str) -> str:
    op_type_l = op_type.lower()
    if op_type_l == 'register':
        return "REG"
    if op_type_l == 'ram':
        return "MEM"
    if op_type_l in {'const', 'constant'}:
        return "CONST"
    if op_type_l == 'unique':
        return "UNIQUE"
    if op_type_l == 'stack':
        return "STACK"
    return "UNK"

In [None]:
import os
import json
import pickle
import random
import csv
import numpy as np
import torch 
from transformers import AutoTokenizer, RobertaForMaskedLM 
from collections import Counter


def _tokens_from_operation(operation_str: str) -> list[str]:
    if not operation_str:
        return []
    m = OPCODE_PAT.search(operation_str)
    if not m:
        return []
    opcode = m.group(1)
    tokens = [opcode]
    operands = OPERAND_PAT.findall(operation_str)
    for op_type, _ in operands:
        tokens.append(_map_operand(op_type))
    return tokens


def load_data_from_folder(folder_path, archs):
    arch_data = {arch: {} for arch in archs}
    for root, _, files in os.walk(folder_path):
        subfolder = os.path.basename(root)
        arch = next((a for a in archs if a in subfolder), None)
        if not arch:
            continue

        file_base = subfolder.split("_")[-1]
        for fname in files:
            if not fname.endswith(".json"):
                continue
            path = os.path.join(root, fname)
            try:
                with open(path, 'r', encoding='utf-8') as f:
                    data = json.load(f)

                for _, func in data.items():
                    fn = func.get("function_name", "").strip()
                    instrs = func.get("instructions", [])
                    if not fn or not instrs:
                        continue

                    flat_tokens: list[str] = []
                    for ins in instrs:
                        op = ins.get("operation", "").strip()
                        if not op:
                            continue
                        sent = _tokens_from_operation(op)
                        if sent:
                            flat_tokens.extend(sent)

                    if not flat_tokens:
                        continue

                    tokenized_line = " ".join(flat_tokens)
                    key = f"{file_base}::{fn}"
                    arch_data[arch][key] = (file_base, fn, tokenized_line)

            except Exception as e:
                print(f"{file_base} 讀取失敗，跳過: {e}")
    return arch_data


def load_pretrained_model():
    """載入預訓練的模型和tokenizer"""
    model_path = "/home/tommy/Project/PcodeBERT/outputs/models/pretrain"
    
    print(f"Loading model from: {model_path}")
    
    tokenizer = AutoTokenizer.from_pretrained(model_path)
    model = RobertaForMaskedLM.from_pretrained(model_path)
    
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)
    model.eval()
    
    print(f"Model loaded successfully on device: {device}")
    return model, tokenizer, device

def get_sentence_embedding(sentence, model, tokenizer, device):
    """對單個sentence生成embedding"""
    inputs = tokenizer(sentence, return_tensors="pt", truncation=True, padding=True, max_length=512)
    inputs = {k: v.to(device) for k, v in inputs.items()}
    
    with torch.no_grad():
        # model.roberta(**inputs) -> model(**inputs) for base model
        outputs = model(**inputs, output_hidden_states=True) 
        # 使用最後一層 hidden state 的 [CLS] token embedding
        embedding = outputs.hidden_states[-1][:, 0, :].cpu().numpy()
    
    return embedding[0]


def extract_and_vectorize_with_bert():
    input_folders = [
        "/home/tommy/Project/PcodeBERT/outputs/align_sentences"
    ]

    output_path = "/home/tommy/Project/PcodeBERT/outputs/alignment_vector"
    archs = ["mips_32", "arm_32", "x86_64"]
    print("Loading BERT model...")
    model, tokenizer, device = load_pretrained_model()

    # 每個來源資料夾都跑一次
    arch_datasets = []
    print("Loading data from folders...")
    for folder in input_folders:
        arch_datasets.append(load_data_from_folder(folder, archs))

    # 找所有來源的交集
    print("Finding common keys...")
    common_keys = None
    for arch_data in arch_datasets:
        if common_keys is None:
            common_keys = set(arch_data["mips_32"]) & set(arch_data["arm_32"]) & set(arch_data["x86_64"])
        else:
            common_keys &= set(arch_data["mips_32"]) & set(arch_data["arm_32"]) & set(arch_data["x86_64"])

    common_keys = list(common_keys)
    random.shuffle(common_keys)

    # 平均切給不同來源
    n = len(input_folders)
    chunk_size = len(common_keys) // n
    key_groups = [common_keys[i*chunk_size:(i+1)*chunk_size] for i in range(n)]
    

    print("Generating embeddings...")
    samples = []
    for arch_data, keys in zip(arch_datasets, key_groups):
        for i, key in enumerate(keys):
            if (i + 1) % 100 == 0:
                print(f"Processing item {i+1}/{len(keys)}...")
            
            _, _, x86_op = arch_data["x86_64"][key]
            _, _, arm_op = arch_data["arm_32"][key]
            
            vec_x = get_sentence_embedding(x86_op, model, tokenizer, device)
            vec_a = get_sentence_embedding(arm_op, model, tokenizer, device)
            
            samples.append((x86_op, arm_op, vec_x, vec_a, 1))
    
    print("Saving results...")
    os.makedirs(output_path, exist_ok=True)

    pk_file = os.path.join(output_path, "train_arm_vector_mix_bert.pickle")
    with open(pk_file, "wb") as f:
        pickle.dump([(vec_x, vec_a, label) for _, _, vec_x, vec_a, label in samples], f)

    csv_file = os.path.join(output_path, "train_arm_op_mix_bert.csv")
    with open(csv_file, "w", newline="") as f:
        writer = csv.writer(f)
        writer.writerow(["x86_op", "arm_op", "label"])
        for x86_op, arm_op, _, _, label in samples:
            writer.writerow([x86_op, arm_op, label])
            
    
    print(f"\n已生成 {pk_file} & {csv_file}，共 {len(samples)} 筆樣本")


if __name__ == "__main__":
    extract_and_vectorize_with_bert()