# Assemblage Passtell test code

This notebook contains codes to evaluate [Passtell](https://github.com/zeropointdynamics/passtell) on [Assemblage](https://assemblage-dataset.net/) data.
We will use the [Assemblage sample dataset](https://www.kaggle.com/datasets/changliuh7rfs5/assemblagedataset) for testing purpose. First, download the dataset and inflate the compressed files. Then, run this notebook to obtain the csv file for training and testing with author's codes.

In [2]:
import pandas as pd
import sqlite3
import pefile
import capstone
import hashlib
from tqdm import tqdm

In [3]:
def getmd5(s):
    return hashlib.md5(s.encode()).hexdigest()

In [4]:
db = sqlite3.connect('./sample.sqlite')
cursor = db.cursor()
data_list = []

# iterate functions table, also get the associated file name, github url, toolset_version, optimization
for x in tqdm(cursor.execute('''SELECT 
        f.name AS function_name,
        b.file_name AS binary_file_name,
        b.github_url,
        b.toolset_version,
        b.optimization,
        r.start AS rva_start,
        r.end AS rva_end,
        b.path AS binary_path,
        b.platform AS platform
    FROM 
        functions f
    JOIN 
        binaries b ON f.binary_id = b.id
    LEFT JOIN 
        rvas r ON f.id = r.function_id;
    ''')):
    function_name = x[0]
    binary_file_name = x[1]
    github_url = x[2]
    toolset_version = x[3]
    optimization = x[4]
    rva_start = x[5]
    rva_end = x[6]
    binary_path = x[7]
    platform = x[8]
    # Filter out non Windows and compiler inserted functions
    if function_name.startswith("_") or (not toolset_version.startswith("v1")):
        continue
    function_bytes = pefile.PE(f"sample/sample/{binary_path}", fast_load=1).get_memory_mapped_image()[rva_start:rva_end]
    data_list.append([binary_file_name, getmd5(github_url), toolset_version, optimization, function_name, rva_start, rva_end, platform, function_bytes])

329702it [07:22, 744.58it/s] 


In [5]:
def mask_operands(insn):
    masked_op_str = insn.op_str
    
    # Iterate through operands to identify imm and memory addresses
    for op in insn.operands:
        if op.type == capstone.x86.X86_OP_IMM:  # Immediate value
            masked_op_str = masked_op_str.replace(f"{op.imm}", "#IMM#")
        elif op.type == capstone.x86.X86_OP_MEM:  # Memory address
            masked_op_str = masked_op_str.replace(f"{op.mem.disp}", "#MEM#")
    
    return masked_op_str

In [6]:
for data in data_list:
    binary_file_name, github_url, toolset_version, optimization, function_name, rva_start, rva_end, platform, function_bytes = data
    if "64" in platform:
        md = capstone.Cs(capstone.CS_ARCH_X86, capstone.CS_MODE_64)
    else:
        md = capstone.Cs(capstone.CS_ARCH_X86, capstone.CS_MODE_32)
    md.detail = True
    insts = []
    for insn in md.disasm(function_bytes, rva_start):
        masked_op_str = mask_operands(insn)
        insts.append(masked_op_str)
    
    data.append(";".join(insts))
    data.pop(-2)
    data.pop(5)
    data.pop(5)

In [7]:
for x in data_list:
    x.append(len(x[-1].split(";")))
    x.pop(-3)

In [8]:
# save to file
df = pd.DataFrame(data_list, columns=['filename', 'suite', 'compiler', 'opt', 'function', 'insts', 'size'])

df_original = pd.read_csv('../passtell/passtell_USENIX_ATC_22/balanced_dataset.csv')
df_original = df_original.drop(columns=['Unnamed: 0'])
df = pd.concat([df_original, df], ignore_index=True)
# reive icc
df = df[df['compiler'] != 'icc']
df.to_csv('balanced_dataset_new.csv', index=1)