In [None]:
import pefile
import os
import hashlib
import math
from torch import optim, nn
from torch.utils.data import DataLoader, Dataset
import torch
import numpy as np

from sklearn.model_selection import train_test_split

import matplotlib.pyplot as plt

In [8]:
def getNFiles(root="/", n_files=500):
    stored_files = []
    for dirpath, dirs, files in os.walk(root, topdown=False):
        for filename in files:
            if filename.endswith(".exe"):
                stored_files.append(dirpath + "/" + filename)
            
            if len(stored_files) >= n_files: return stored_files
    
    return stored_files

files = getNFiles(n_files=100)

# **Extracting file data**

In [9]:
pe = pefile.PE(files[0])
pe.full_load()

## 1.) Imported Symbols

In [10]:
def getImports(pe: pefile.PE):
    imports = []
    for entry in pe.DIRECTORY_ENTRY_IMPORT:
        for imp in entry.imports:
            imports.append(imp.name.decode("utf-8"))
    
    return imports

### 1.1) Get all imports and weight them

In [None]:
from math import log2

# Returns all imports in the training dataset
def getGlobalImpVocab(training_files: list):
    import_vocab = set()
    for file_path in training_files:
        try: 
            pe = pefile.PE(file_path)
            #pe.full_load()

            imports = getImports(pe)
            for imp in imports:
                import_vocab.add(imp)
        
            pe.close()  # free memory

        except Exception as e:
            print("Failed to read file: ", file_path, e)

    return import_vocab


def getTrainingIDF(global_import_vocab, training_files: list):
    import_count = {imp:0 for imp in global_import_vocab}
    IDF = {}

    for file_path in training_files:
        try:
            pe = pefile.PE(file_path)
            pe.full_load()

            imports = getImports(pe)
            for imp in imports:
                import_count[imp] = import_count.get(imp, 0) + 1
    
                
        except Exception as e:
            print("Failed with file: ", file_path)
            continue

    N = len(training_files)
    IDF = {imp:log2(N/import_count[imp]) for imp in import_count.keys()}
    return IDF


def get_TF_IDF(global_import_vocab, training_IDF, new_file):
    TF_IDF = {imp:0 for imp in global_import_vocab}

    try:
        pe = pefile.PE(new_file)
        pe.full_load()

        imports = getImports(pe)
        for imp in imports:
            TF_IDF[imp] += training_IDF[imp]


    except Exception as e:
        print("Failed with file: ", e)
        return None

    return TF_IDF


global_import_vocab = getGlobalImpVocab(files)
print("global_import_vocab: ", global_import_vocab)

IDF = getTrainingIDF(global_import_vocab, files)
print("IDF: ", IDF)

new_test_file = files[0]
TF_IDF = get_TF_IDF(global_import_vocab, IDF, new_test_file)
print("TF_IDF: ", TF_IDF)


Failed to read file:  /mnt/c/$RECYCLE.BIN/S-1-5-21-2210480649-2313652220-1945043235-1001/$REY8PR4/Image to Speech/Library/PackageCache/com.unity.ide.visualstudio@2.0.22/Editor/COMIntegration/Release/COMIntegration.exe 'NoneType' object has no attribute 'decode'
Failed to read file:  /mnt/c/AI_RecycleBin/{AA8E5A88-AA90-4DB6-B0A5-9F9C187FAA16}/138/XSplit.Video.Editor.exe 'NoneType' object has no attribute 'decode'
Failed to read file:  /mnt/c/AI_RecycleBin/{AA8E5A88-AA90-4DB6-B0A5-9F9C187FAA16}/144/XSplitCleanup2x64.exe 'PE' object has no attribute 'DIRECTORY_ENTRY_IMPORT'
Failed to read file:  /mnt/c/AI_RecycleBin/{AA8E5A88-AA90-4DB6-B0A5-9F9C187FAA16}/160/xsplit_updater.exe 'NoneType' object has no attribute 'decode'
Failed to read file:  /mnt/c/Program Files/7-Zip/7zFM.exe 'NoneType' object has no attribute 'decode'
Failed to read file:  /mnt/c/Program Files/7-Zip/7zG.exe 'NoneType' object has no attribute 'decode'
Failed to read file:  /mnt/c/Program Files/Angry IP Scanner/uninstall.

# Import Hash

### The import has is unique and deterministic. Basically summarizes all imports into a hash

In [11]:
def get_imp_hash(file: pefile.PE):
    return pe.get_imphash()

pe = pefile.PE(files[44])
print(get_imp_hash(pe))

04a8d5893e95ecf8d9dd1d1c8967553e


# Exports

### Extracts exportable files from the executable and converts it into a md5 hash

In [33]:
def get_exports(pe):
    if not hasattr(pe, 'DIRECTORY_ENTRY_EXPORT'):
        return []
    
    exports = []
    for e in pe.DIRECTORY_ENTRY_EXPORT.symbols:
        name = e.name.decode() if e.name else f"ord{e.ordinal}"
        #print("e.name.decode(): ", e.name.decode())
        #print("e.ordinal: ", e.ordinal)
        exports.append(name)
    
    #print("bef. exports: ", exports)
    exports_str = ",".join(sorted(exports))  # Sort for deterministic hash
    #print("aft. exports_str: ", exports_str)
    return hashlib.md5(exports_str.encode()).hexdigest()


pe = pefile.PE(files[44])
print(get_exports(pe))

21530e4a259160afbcb6c4ca0e315c3e


# Digital Signature

### Checks if the file has a signature.

### 

In [13]:
def check_digital_signature(pe):
    cert_dir = pe.OPTIONAL_HEADER.DATA_DIRECTORY[pefile.DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_SECURITY']]
    return cert_dir.VirtualAddress != 0 and cert_dir.Size != 0

pe = pefile.PE(files[44])
print(check_digital_signature(pe))

True


# Timestamp

In [14]:
def get_timestamp(pe):
    return pe.FILE_HEADER.TimeDateStamp

pe = pefile.PE(files[44])
print(get_timestamp(pe))

1744771084


In [15]:
def has_debug_info(pe):
    return hasattr(pe, 'DIRECTORY_ENTRY_DEBUG')

pe = pefile.PE(files[44])
print(has_debug_info(pe))

True


# Entropy

#### The entropy of a files tells us how encrypted a file might be. The entropy is calculated using "shannon entropy" returning an entropy between 0 - 8 for each byte. If the entropy is high, like 6.5 - 8, it can be suspicious, indicating a lot of encryption is done in the file.

In [None]:
def calc_section_entropy(pe):
    section_entropy = {
        ".text": 0, # Code
        ".rdata": 0, # Read‑Only Data
        ".rodata": 0, # Read‑Only Data
        ".idata": 0, # Import Table
        ".edata": 0, # Export Table
        ".rsrc": 0, # Resources
        ".reloc": 0, # Base Relocations
        ".tls": 0, # Thread‑Local Storage
        ".pdata": 0, # Exception Unwind Data
        ".xdata": 0, # Exception Unwind Data

    }
    entropies = []
    for section in pe.sections:
        data = section.get_data()
        section_name = section.Name.decode().strip('\x00')

        # Skip section if not one of the already defined (ADD CUSTOM SECTIONS IN FUTURE)
        if section_name not in section_entropy.keys(): 
            continue

        if not data:
            continue
        entropy = 0
        if data:
            occurences = [0]*256
            for byte in data:
                occurences[byte] += 1
            for count in occurences:
                if count == 0:
                    continue
                p_x = count / len(data)
                entropy -= p_x * math.log2(p_x)
        
        section_entropy[section_name] = round(entropy, 3)
        #entropies.append((section.Name.decode().strip('\x00'), round(entropy, 3)))

    return section_entropy

################################################
# Sections are important due to the possibility
# of malicious software hiding embedded data in them.
# If a section has abnormally high entropy it 
# can be a red flag!!!
################################################
def extract_all_sections(files: list):
    sections = set()
    for file in files:
        pe = pefile.PE(file)
        for section in pe.sections:
            sections.add(section.Name.decode().strip('\x00'))

    return sections

pe = pefile.PE(files[44])
print(calc_section_entropy(pe))

#print(extract_all_sections(files))

{'.text': 6.604, '.rdata': 6.109, '.rodata': 4.322, '.idata': 0, '.edata': 0, '.rsrc': 2.209, '.reloc': 5.46, '.tls': 0.244, '.pdata': 6.208, '.xdata': 0}


# Feature Extraction

In [38]:
def md5_to_byte_vector(md5_str):
    """
    Converts a 32‑char hex MD5 string into a length‑16 float32 vector in [0,1].
    """

    if isinstance(md5_str, list):
        return np.zeros((16,))

    # 1) hex → raw bytes (length 16)
    raw = bytes.fromhex(md5_str)
    # 2) bytes → uint8 array → float32 → normalize
    vec  = np.frombuffer(raw, dtype=np.uint8).astype(np.float32) / 255.0
    return vec  # shape (16,)

def get_features(file):
    pe = pefile.PE(file)
    pe.full_load()
    features = {
        "import_hash": md5_to_byte_vector(get_imp_hash(pe)),
        "export_hash": md5_to_byte_vector(get_exports(pe)),
        "has_digital_signature": check_digital_signature(pe),
        "timestamp": get_timestamp(pe),
        "has_debug_info": has_debug_info(pe),
        **calc_section_entropy(pe) # Dict unpacking
    }

    return features

def get_flattened_features(features: dict):
    flattenend = []
    for key, item in features.items():
        if isinstance(item, np.ndarray):
            flattenend.extend(list(item))
        elif isinstance(item, list):
            flattenend.extend(list(item))
        else:
            flattenend.append(item)
    
    return np.array(flattenend)

features_dict = get_features(files[44])
print("features_dict: \n", features_dict)
flattened_features = get_flattened_features(features_dict)
print("flattened_features: \n", flattened_features)
print("len(flattened_features): ", len(flattened_features))

features_dict: 
 {'import_hash': array([0.01568628, 0.65882355, 0.8352941 , 0.5372549 , 0.24313726,
       0.58431375, 0.9254902 , 0.972549  , 0.8509804 , 0.8666667 ,
       0.11372549, 0.10980392, 0.5372549 , 0.40392157, 0.33333334,
       0.24313726], dtype=float32), 'export_hash': array([0.12941177, 0.3254902 , 0.05490196, 0.2901961 , 0.14509805,
       0.5686275 , 0.3764706 , 0.6862745 , 0.7372549 , 0.7137255 ,
       0.76862746, 0.7921569 , 0.05490196, 0.19215687, 0.36078432,
       0.24313726], dtype=float32), 'has_digital_signature': True, 'timestamp': 1744771084, 'has_debug_info': True, '.text': 6.604, '.rdata': 6.109, '.rodata': 4.322, '.idata': 0, '.edata': 0, '.rsrc': 2.209, '.reloc': 5.46, '.tls': 0.244, '.pdata': 6.208, '.xdata': 0}
flattened_features: 
 [1.56862754e-02 6.58823550e-01 8.35294127e-01 5.37254930e-01
 2.43137255e-01 5.84313750e-01 9.25490201e-01 9.72549021e-01
 8.50980401e-01 8.66666675e-01 1.13725491e-01 1.09803922e-01
 5.37254930e-01 4.03921574e-01 3.333333

# Autoencoder

In [35]:
class AutoEncoder(nn.Module):

    def __init__(self, input_dims, hidden_dims=(128, 64, 32)):
        super().__init__()

        self.encoder = nn.Sequential()

        prev_dims = input_dims
        for dim in hidden_dims:
            self.encoder.append(nn.Linear(prev_dims, dim))
            self.encoder.append(nn.BatchNorm1d(dim))
            self.encoder.append(nn.ReLU(inplace=True))
            prev_dims = dim
        
        self.decoder = nn.Sequential()

        for dim in hidden_dims[::-1]:
            self.decoder.append(nn.Linear(prev_dims, dim))
            self.decoder.append(nn.BatchNorm1d(dim))
            self.decoder.append(nn.ReLU(inplace=True))
            prev_dims = dim

    def forward(self, x):
        latent = self.encoder(x)
        reconstructed = self.decoder(latent)

        return latent, reconstructed

# Dataset

In [44]:
class CustomDataset(Dataset):

    def __init__(self, data):
        self.data = data

    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, idx):
        return self.data[idx]

dset = CustomDataset(files)
train_data = DataLoader(dset, batch_size=16, shuffle=True)

# Training

In [45]:
input_dims = len(get_flattened_features(get_features(files[1])))
hidden_dims = (256, 128, 64)
AE = AutoEncoder(input_dims, hidden_dims)
optimizer = optim.Adam(AE.parameters(), lr=0.001, weight_decay=0.02)
loss_fn = nn.MSELoss()

losses = []
EPOCHS = 50
for epoch in range(EPOCHS):
    for batch_idx, batch in enumerate(train_data):
        file = batch

        # Features into model
        features_dict = get_features(files)
        flattened_features = get_flattened_features(features_dict)

        prediction = AE(flattened_features)

        loss = loss_fn(flattened_features, prediction)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        losses.append(loss.item())


plt.figure(figsize=(10,8))
plt.plot([e for e in range(EPOCHS)], losses)
plt.show()

        



TypeError: stat: path should be string, bytes, os.PathLike or integer, not list