In [None]:
import os
import pefile
import math
import csv
import hashlib
from collections import Counter, defaultdict

In [None]:
KNOWN_SECTIONS = [
    ".text", ".rdata", ".data", ".idata", ".edata",
    ".pdata", ".rsrc", ".reloc", ".bss", ".tls", ".debug"
]

# normalize dll names lower-case for matching
KNOWN_DLLS = [
    "kernel32.dll", "advapi32.dll", "user32.dll", "gdi32.dll",
    "ntdll.dll", "ws2_32.dll", "wsock32.dll", "wininet.dll"
]

SUSPICIOUS_APIS = [
    "VirtualAlloc", "VirtualAllocEx", "VirtualProtect",
    "WriteProcessMemory", "CreateRemoteThread", "LoadLibrary",
    "GetProcAddress", "WinExec", "ShellExecute", "URLDownloadToFile"
]

In [None]:
def get_entropy(data):
    if not data:
        return 0.0
    counter = Counter(data)
    length = len(data)
    entropy = 0.0
    for count in counter.values():
        p_x = count / length
        entropy -= p_x * math.log2(p_x)
    return entropy

In [None]:
def md5_file(path):
    import hashlib
    h = hashlib.md5()
    with open(path, "rb") as f:
        for chunk in iter(lambda: f.read(8192), b""):
            h.update(chunk)
    return h.hexdigest()

In [None]:
def safe_decode(b):
    try:
        return b.decode(errors="ignore")
    except:
        return str(b)

In [None]:
def extract_features(file_path, label):
    feats = {}
    try:
        pe = pefile.PE(file_path, fast_load=True)
        pe.parse_data_directories(directories=[
            pefile.DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_IMPORT'],
            pefile.DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_EXPORT'],
            pefile.DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_RESOURCE'],
            pefile.DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_TLS']
        ])
    except Exception as e:
        print(f"[!] Cannot parse PE {file_path}: {e}")
        return None

    # File-level
    with open(file_path, "rb") as f:
        data = f.read()
    feats["MD5"] = md5_file(file_path)
    feats["FileSize"] = len(data)
    feats["FileEntropy"] = round(get_entropy(data), 3)

    # Header
    try:
        fh = pe.FILE_HEADER
        oh = pe.OPTIONAL_HEADER
        feats["TimeDateStamp"] = fh.TimeDateStamp
        feats["Machine"] = fh.Machine
        feats["Characteristics"] = fh.Characteristics
        feats["NumberOfSections"] = fh.NumberOfSections
        feats["AddressOfEntryPoint"] = oh.AddressOfEntryPoint
        feats["SizeOfImage"] = oh.SizeOfImage
        feats["SizeOfHeaders"] = oh.SizeOfHeaders
        feats["Subsystem"] = oh.Subsystem
        feats["DllCharacteristics"] = oh.DllCharacteristics
        feats["LinkerVersion"] = f"{oh.MajorLinkerVersion}.{oh.MinorLinkerVersion}"
        feats["Checksum"] = oh.CheckSum
    except Exception:
        pass

    # TLS callbacks flag
    feats["Has_TLS"] = 1 if hasattr(pe, "DIRECTORY_ENTRY_TLS") else 0

    # Section-level: presence flags + per-section entropy/size/ratio (if present)
    section_map = {sec.Name.decode(errors="ignore").rstrip("\x00"): sec for sec in pe.sections}
    # Normalize names (lowercase, ensure leading dot)
    normalized_names = {name.lower() if name.startswith('.') else ('.'+name.lower()): name for name in section_map.keys()}

    # initialize section presence and stats
    for s in KNOWN_SECTIONS:
        key_presence = f"sec_present_{s.strip('.')}"
        feats[key_presence] = 0
        feats[f"sec_entropy_{s.strip('.')}"] = 0.0
        feats[f"sec_virtualsize_{s.strip('.')}"] = 0
        feats[f"sec_rawsize_{s.strip('.')}"] = 0
        feats[f"sec_ratio_{s.strip('.')}"] = 0.0

    # fill actual
    for raw_name, sec in section_map.items():
        name = raw_name.decode(errors="ignore").rstrip("\x00") if isinstance(raw_name, bytes) else raw_name
        lname = name.lower()
        # try to match known section if possible
        matched = None
        for s in KNOWN_SECTIONS:
            if lname == s or lname.startswith(s):
                matched = s
                break
        if matched is None:
            # not in known list; create generic features for unknown sections count
            feats.setdefault("unknown_section_count", 0)
            feats["unknown_section_count"] += 1
            # you may optionally store unknown section names
        else:
            feats[f"sec_present_{matched.strip('.')}"] = 1
            raw = sec.get_data()
            ent = round(get_entropy(raw), 3)
            feats[f"sec_entropy_{matched.strip('.')}"] = ent
            feats[f"sec_virtualsize_{matched.strip('.')}"] = sec.Misc_VirtualSize
            feats[f"sec_rawsize_{matched.strip('.')}"] = sec.SizeOfRawData
            try:
                feats[f"sec_ratio_{matched.strip('.')}"] = round(sec.Misc_VirtualSize / sec.SizeOfRawData, 3) if sec.SizeOfRawData else 0.0
            except Exception:
                feats[f"sec_ratio_{matched.strip('.')}"] = 0.0

    # Section-level summary
    entropies = []
    ratios = []
    for sec in pe.sections:
        try:
            entropies.append(get_entropy(sec.get_data()))
            if sec.SizeOfRawData:
                ratios.append(sec.Misc_VirtualSize / sec.SizeOfRawData)
        except:
            pass
    feats["mean_section_entropy"] = round(sum(entropies)/len(entropies), 3) if entropies else 0.0
    feats["mean_section_ratio"] = round(sum(ratios)/len(ratios), 3) if ratios else 0.0

    # Imports: count, dll presence flags, suspicious api count, top imports
    import_count = 0
    dll_presence = {dll:0 for dll in KNOWN_DLLS}
    suspicious_api_count = 0
    top_imports = []  # collect names
    try:
        if hasattr(pe, "DIRECTORY_ENTRY_IMPORT"):
            for entry in pe.DIRECTORY_ENTRY_IMPORT:
                dllname = safe_decode(entry.dll).lower()
                import_count += len(entry.imports)
                # mark known DLLs presence
                for known in KNOWN_DLLS:
                    if known in dllname:
                        dll_presence[known] = 1
                # functions
                for imp in entry.imports:
                    if imp.name:
                        iname = imp.name.decode(errors="ignore")
                        top_imports.append(iname)
                        for api in SUSPICIOUS_APIS:
                            if api.lower() in iname.lower():
                                suspicious_api_count += 1
                    else:
                        top_imports.append(str(imp.ordinal))
    except Exception:
        pass

    feats["import_count"] = import_count
    feats["suspicious_api_count"] = suspicious_api_count
    # add dll presence flags
    for dll, val in dll_presence.items():
        feats[f"dll_{dll.replace('.', '_')}"] = val

    # store first N imports as separate features (optional)
    N = 10
    for i in range(N):
        feats[f"import_{i}"] = top_imports[i] if i < len(top_imports) else ""

    # Exports
    export_count = 0
    try:
        if hasattr(pe, "DIRECTORY_ENTRY_EXPORT") and pe.DIRECTORY_ENTRY_EXPORT:
            export_count = len(pe.DIRECTORY_ENTRY_EXPORT.symbols)
    except:
        pass
    feats["export_count"] = export_count

    # Resources: count and total size
    res_count = 0
    res_total_size = 0
    try:
        if hasattr(pe, "DIRECTORY_ENTRY_RESOURCE"):
            for res_type in pe.DIRECTORY_ENTRY_RESOURCE.entries:
                # navigate directories to resource data
                def traverse(node):
                    nonlocal res_count, res_total_size
                    if hasattr(node, 'directory'):
                        for e in node.directory.entries:
                            traverse(e)
                    elif hasattr(node, 'data'):
                        try:
                            res_count += 1
                            res_total_size += node.data.struct.Size
                        except:
                            pass
                traverse(res_type)
    except Exception:
        pass
    feats["resource_count"] = res_count
    feats["resource_total_size"] = res_total_size

    # Strings: count of printable strings > length threshold, suspicious keywords
    printable_strings = []
    try:
        parts = data.split(b'\x00')
        for p in parts:
            if len(p) >= 4:
                try:
                    s = p.decode('utf-8', errors='ignore')
                    if any(c.isalnum() for c in s):
                        printable_strings.append(s)
                except:
                    pass
    except:
        pass
    feats["strings_count"] = len(printable_strings)

    suspicious_kw = ["http://", "https://", "hkey_local_machine", "cmd.exe", "powershell", ".exe", ".dll"]
    feats["suspicious_string_count"] = sum(1 for s in printable_strings if any(k in s.lower() for k in suspicious_kw))

    # Label
    feats["label"] = label

    return feats

In [None]:
def scan_and_save(dataset_root, out_csv):
    rows = []
    for root, _, files in os.walk(dataset_root):
        for fn in files:
            if fn.lower().endswith((".exe", ".dll")):
                path = os.path.join(root, fn)
                # labeling logic: you can refine (multi-class) based on folder structure
                label = "Benign" if "benign" in root.lower() else "Malware"
                feats = extract_features(path, label)
                if feats:
                    rows.append(feats)

    if not rows:
        print("No samples processed.")
        return

    # ensure deterministic field order
    fieldnames = sorted(rows[0].keys())
    with open(out_csv, "w", newline="", encoding="utf-8") as fout:
        writer = csv.DictWriter(fout, fieldnames=fieldnames)
        writer.writeheader()
        for r in rows:
            # ensure all fields exist
            for f in fieldnames:
                if f not in r:
                    r[f] = ""
            writer.writerow(r)
    print(f"[+] Saved {len(rows)} samples to {out_csv}")

In [None]:
if __name__ == "__main__":
    dataset_dir = "Dataset"   # change if needed
    output_file = "pe_features_extended.csv"
    scan_and_save(dataset_dir, output_file)