In [None]:
import os
from sklearn.model_selection import StratifiedKFold
import shutil
import lightgbm as lgb
import joblib
import pandas as pd
import csv
import configparser
import re
import lief
import hashlib
import json
from sklearn.feature_extraction import FeatureHasher

from sklearn.metrics import accuracy_score, roc_auc_score, confusion_matrix
from sklearn.metrics import confusion_matrix, accuracy_score, classification_report
import numpy as np
np.random.seed=42


In [None]:
config = configparser.ConfigParser()

# Read the configuration file
config.read('config.ini')
at = 'adv_training'
at_type = config.getint(at, 'At_type')
original_samples = config.get(at, 'original_samples')
malware = config.get(at, 'malware_files')
goodware = config.get(at, 'goodware_files')
full_dos = config.get(at, 'full_dos')
extend_dos = config.get(at, 'extend_dos')
content_shift = config.get(at, 'content_shift')
fgsm = config.get(at, 'fgsm')
gamma = config.get(at, 'gamma')
#output folders path
output = config.get(at, 'output')
fdos_output = config.get(at, 'fdos_output')
fdos_models = config.get(at, 'fdos_models')
fdos_results = config.get(at, 'fdos_results')
extend_output = config.get(at, 'extend_output')
extend_models = config.get(at, 'extend_models')
extend_results = config.get(at, 'extend_results')
content_output = config.get(at, 'content_output')
content_models = config.get(at, 'content_models')
content_results = config.get(at, 'content_results')
fgs_output = config.get(at, 'fgs_output')
fgs_models = config.get(at, 'fgs_models')
fgs_results = config.get(at, 'fgs_results')
gamma_output = config.get(at, 'gamma_output')
gamma_models = config.get(at, 'gamma_models')
gamma_results = config.get(at, 'gamma_results')

if at_type == 1:
    adversarial = full_dos 
    output_folder = fdos_output
    model_folder = fdos_models
    results_folder = fdos_results
    if os.path.exists(fdos_output):
        shutil.rmtree(fdos_output)
    os.makedirs(fdos_output, exist_ok=True)
    if os.path.exists(fdos_models):
        shutil.rmtree(fdos_models)
    os.makedirs(fdos_models, exist_ok=True)
    if os.path.exists(fdos_results):
        shutil.rmtree(fdos_results)
    os.makedirs(fdos_results, exist_ok=True)
    
    
    
if at_type ==2:
    adversarial = extend_dos 
    output_folder = extend_output
    model_folder = extend_models
    results_folder = extend_results
    if os.path.exists(extend_output):
        shutil.rmtree(extend_output)
    os.makedirs(extend_output, exist_ok=True)
    if os.path.exists(extend_models):
        shutil.rmtree(extend_models)
    os.makedirs(extend_models, exist_ok=True)
    if os.path.exists(extend_results):
        shutil.rmtree(extend_results)
    os.makedirs(extend_results, exist_ok=True)
if at_type ==3:
    adversarial = content_shift 
    output_folder = content_output
    model_folder = content_models
    results_folder = content_results
    
    if os.path.exists(content_output):
        shutil.rmtree(content_output)
    os.makedirs(content_output, exist_ok=True)
    if os.path.exists(content_models):
        shutil.rmtree(content_models)
    os.makedirs(content_models, exist_ok=True)
    if os.path.exists(content_results):
        shutil.rmtree(content_results)
    os.makedirs(content_results, exist_ok=True)
if at_type ==4:
    adversarial = fgsm 
    output_folder = fgs_output
    model_folder = fgs_models
    results_folder = fgs_results
    
    if os.path.exists(fgs_output):
        shutil.rmtree(fgs_output)
    os.makedirs(fgs_output, exist_ok=True)
    if os.path.exists(fgs_models):
        shutil.rmtree(fgs_models)
    os.makedirs(fgs_models, exist_ok=True)
    if os.path.exists(fgs_results):
        shutil.rmtree(fgs_results)
    os.makedirs(fgs_results, exist_ok=True)
if at_type ==5:
    adversarial = gamma 
    output_folder = gamma_output
    model_folder = gamma_models
    results_folder = gamma_results

    if os.path.exists(gamma_output):
        shutil.rmtree(gamma_output)
    os.makedirs(gamma_output, exist_ok=True)
    if os.path.exists(gamma_models):
        shutil.rmtree(gamma_models)
    os.makedirs(gamma_models, exist_ok=True)
    if os.path.exists(gamma_results):
        shutil.rmtree(gamma_results)
    os.makedirs(gamma_results, exist_ok=True)
    

if os.path.exists(original_samples):
    shutil.rmtree(original_samples)
os.makedirs(original_samples, exist_ok=True)
    
n_splits = 3
folds_folders = ["fold1", "fold2", "fold3"]
csv_files_and_destinations = {
    "fold1_data.csv": "fold1",
    "fold2_data.csv": "fold2",
    "fold3_data.csv": "fold3"}


### copy binary files from malware, goodware folder and paste it into original samples folder

In [None]:
def copy_files(source_folder, original_samples):
    files = os.listdir(source_folder)
    for file_name in files:
        source_path = os.path.join(source_folder, file_name)
        destination_path = os.path.join(original_samples, file_name)
        shutil.copy(source_path, destination_path)
    
        
# Copy files from malicious and goodware folders to the original_samples folder
copy_files(malware, original_samples)
copy_files(goodware, original_samples)


### Extracting files names and labels from malware and goodware samples

In [None]:
#Extract file names and labels form benign and maliocus samples
malware_files = os.listdir(malware)
goodware_files = os.listdir(goodware)
all_files = malware_files  + goodware_files
labels = [1] * len(malware_files ) + [0] * len(goodware_files)

##### apply stratifiedkfold to split the extratced files names and labels into three folds and saved in folder fold1, fold2 and fold3  respectively

In [None]:
skf = StratifiedKFold(n_splits=n_splits, shuffle=True, random_state=42)
fold_samples = [[] for _ in range(n_splits)]

# Collect sample names and labels for each fold without moving files
for fold, (_, test_index) in enumerate(skf.split(all_files, labels), 1):
    fold_samples[fold - 1] = [(all_files[i], labels[i]) for i in test_index]

    # Create a folder for the current fold within the main output directory
    fold_folder = os.path.join(output_folder, f'fold{fold}')
    
    # Create a new folder for the current fold
    os.makedirs(fold_folder)

    # Create a DataFrame for the current fold
    fold_df = pd.DataFrame(fold_samples[fold - 1], columns=['file_names', 'labels'])

    # Save the DataFrame to a CSV file in the current fold folder
    csv_filename = f'fold{fold}_data.csv'
    csv_path = os.path.join(fold_folder, csv_filename)
    fold_df.to_csv(csv_path, index=False)

    samples_with_label_1 = sum(1 for _, label in fold_samples[fold - 1] if label == 1)
    samples_with_label_0 = sum(1 for _, label in fold_samples[fold - 1] if label == 0)

    print(f"Fold {fold}: {len(test_index)} samples with label 1: {samples_with_label_1}, samples with label 0: {samples_with_label_0}")


#### copy binary files from original sample and paste into folder fold1, fold2, fold3 by reading files names from corresponding folds

In [None]:
#move binary files into fold1, fold2, fold3
# Loop through CSV files and move files based on CSV content
for csv_file, corresponding_folder in csv_files_and_destinations.items():
    fold_file_path = os.path.join(output_folder, corresponding_folder, csv_file)

    # Read CSV file
    df = pd.read_csv(fold_file_path)
    fold_file_names = df['file_names'].tolist()

    # Create a new subfolder with a modified name within the existing subfolder
    new_folder_name = f"{corresponding_folder}_binary_files"
    full_destination_path = os.path.join(output_folder, corresponding_folder, new_folder_name)
    os.makedirs(full_destination_path, exist_ok=True)

    # List all files in the source folder
    original_files = os.listdir(original_samples)

    # Move files based on CSV content
    for file_name in fold_file_names:
        source_path = os.path.join(original_samples, file_name)
        destination_path = os.path.join(full_destination_path, file_name)
        shutil.copy(source_path, destination_path)
        print(f"Moved {file_name} to {destination_path}")


#####  original binary files from fold1, fold2, fold3 copied and paste into modified_fold1, modified_fold2, modified_fold3 folders respectively

In [None]:
# List of folders inside subfolders fold1, fold2, fold3
binary_files_folders = ["fold1_binary_files", "fold2_binary_files", "fold3_binary_files"]
# Create modified folders and subfolders if they don't exist
for folds_folder, binary_files_folder in zip(folds_folders, binary_files_folders):
    modified_folder = f"modified_{folds_folder}"
    modified_folder_binary = f"{modified_folder}_binary_files"
    
    #full_subfolder_path = os.path.join(output_folder, folds_folder)
    modified_folder_path = os.path.join(output_folder, modified_folder)
    modified_folder_binary_path = os.path.join(modified_folder_path, modified_folder_binary)
    
    os.makedirs(modified_folder_binary_path, exist_ok=True)

    source_folder = os.path.join(output_folder,folds_folder,binary_files_folder)
    destination_folder = modified_folder_binary_path

    # List all files in the original folder
    files_to_copy = os.listdir(source_folder)

    # Copy files to the modified folder binary
    for file_name in files_to_copy:
        source_path = os.path.join(source_folder, file_name)
        destination_path = os.path.join(destination_folder, file_name)
        shutil.copy(source_path, destination_path)
        print(f"Copied {file_name} to {destination_path}")


##### Binary files in modified folder replaced with the corresponding adversarial files if same files exists in both folders otherwise keep original in modified folders

In [None]:
# List of modified folders
modified_folders = ["modified_fold1", "modified_fold2", "modified_fold3"]
# Iterate through each modified folder
for modified_folder_name in modified_folders:
    modified_folder_path = os.path.join(output_folder, modified_folder_name)
    modified_binary_folder_name = f"{modified_folder_name}_binary_files"
    modified_binary_folder_path = os.path.join(modified_folder_path, modified_binary_folder_name)

    # List files in the modified binary folder
    modified_binary_files = os.listdir(modified_binary_folder_path)

    for modified_file in modified_binary_files:
        # Check if the file with the same name exists in the "adversarial" folder
        if modified_file in os.listdir(adversarial):
            modified_file_path = os.path.join(modified_binary_folder_path, modified_file)
            adversarial_file_path = os.path.join(adversarial, modified_file)
            
            print("modified_file_path", modified_file_path)
            print("adversarial_file_path", adversarial_file_path)
            
            shutil.copy2(adversarial_file_path, modified_file_path)
            print(f"Replaced {modified_file} with the file from the 'adversarial' folder.")
        else:
            print(f"No replacement found for {modified_file} in the 'adversarial' folder.")


###### read files name from folds and check if same binary files exists in adversarial folder, then copy from adversarial and paste to new folders adv_fold1, adv_fold2 and adv_fold3

In [None]:
# Iterate through each fold folder
for csv_file,corresponding_folder in csv_files_and_destinations.items():
    csv_file_path = os.path.join(output_folder, corresponding_folder, csv_file)

    adv_folder = f"adv_{corresponding_folder}"
    adv_folder_binary = f"{adv_folder}_binary_files"
    
    #full_subfolder_path = os.path.join(output_folder, folds_folder)
    adv_folder_path = os.path.join(output_folder, adv_folder)
    adv_files_path = os.path.join(adv_folder_path, adv_folder_binary)
    os.makedirs(adv_files_path, exist_ok=True)
    # read csv files names
    df = pd.read_csv(csv_file_path)
    file_names = df['file_names'].tolist()
    for fold_file in file_names:
        # Check if the file with the same name exists in the "adversarial" folder
        if fold_file in os.listdir(adversarial):
            source_path = os.path.join(adversarial, fold_file)

            # Move binary files from adversarial folder to adv_fold1, adv_fold2, adv_fold3
            destination_path = os.path.join(adv_files_path, fold_file)
            shutil.copy2(source_path, destination_path)

            print(f"Moved {fold_file} from 'adversarial' folder to 'adv_{corresponding_folder}'")
        else:
            print(f"No matching file found for {adv_folder} in the 'adversarial' folder.")


### code to extarct features from windows PE files

In [None]:

#!/usr/bin/python
''' Extracts some basic features from PE files. Many of the features
implemented have been used in previously published works. For more information,
check out the following resources:
* Schultz, et al., 2001: http://128.59.14.66/sites/default/files/binaryeval-ieeesp01.pdf
* Kolter and Maloof, 2006: http://www.jmlr.org/papers/volume7/kolter06a/kolter06a.pdf
* Shafiq et al., 2009: https://www.researchgate.net/profile/Fauzan_Mirza/publication/242084613_A_Framework_for_Efficient_Mining_of_Structural_Information_to_Detect_Zero-Day_Malicious_Portable_Executables/links/0c96052e191668c3d5000000.pdf
* Raman, 2012: http://2012.infosecsouthwest.com/files/speaker_materials/ISSW2012_Selecting_Features_to_Classify_Malware.pdf
* Saxe and Berlin, 2015: https://arxiv.org/pdf/1508.03096.pdf
It may be useful to do feature selection to reduce this set of features to a meaningful set
for your modeling problem.
'''

LIEF_MAJOR, LIEF_MINOR, _ = lief.__version__.split('.')
LIEF_EXPORT_OBJECT = int(LIEF_MAJOR) > 0 or ( int(LIEF_MAJOR)==0 and int(LIEF_MINOR) >= 10 )
LIEF_HAS_SIGNATURE = int(LIEF_MAJOR) > 0 or (int(LIEF_MAJOR) == 0 and int(LIEF_MINOR) >= 11)


class FeatureType(object):
    ''' Base class from which each feature type may inherit '''

    name = ''
    dim = 0

    def __repr__(self):
        return '{}({})'.format(self.name, self.dim)

    def raw_features(self, bytez, lief_binary):
        ''' Generate a JSON-able representation of the file '''
        raise (NotImplementedError)

    def process_raw_features(self, raw_obj):
        ''' Generate a feature vector from the raw features '''
        raise (NotImplementedError)

    def feature_vector(self, bytez, lief_binary):
        ''' Directly calculate the feature vector from the sample itself. This should only be implemented differently
        if there are significant speedups to be gained from combining the two functions. '''
        return self.process_raw_features(self.raw_features(bytez, lief_binary))


class ByteHistogram(FeatureType):
    ''' Byte histogram (count + non-normalized) over the entire binary file '''

    name = 'histogram'
    dim = 256

    def __init__(self):
        super(FeatureType, self).__init__()

    def raw_features(self, bytez, lief_binary):
        counts = np.bincount(np.frombuffer(bytez, dtype=np.uint8), minlength=256)
        return counts.tolist()

    def process_raw_features(self, raw_obj):
        counts = np.array(raw_obj, dtype=np.float32)
        sum = counts.sum()
        normalized = counts / sum
        return normalized


class ByteEntropyHistogram(FeatureType):
    ''' 2d byte/entropy histogram based loosely on (Saxe and Berlin, 2015).
    This roughly approximates the joint probability of byte value and local entropy.
    See Section 2.1.1 in https://arxiv.org/pdf/1508.03096.pdf for more info.
    '''

    name = 'byteentropy'
    dim = 256

    def __init__(self, step=1024, window=2048):
        super(FeatureType, self).__init__()
        self.window = window
        self.step = step

    def _entropy_bin_counts(self, block):
        # coarse histogram, 16 bytes per bin
        c = np.bincount(block >> 4, minlength=16)  # 16-bin histogram
        p = c.astype(np.float32) / self.window
        wh = np.where(c)[0]
        H = np.sum(-p[wh] * np.log2(
            p[wh])) * 2  # * x2 b.c. we reduced information by half: 256 bins (8 bits) to 16 bins (4 bits)

        Hbin = int(H * 2)  # up to 16 bins (max entropy is 8 bits)
        if Hbin == 16:  # handle entropy = 8.0 bits
            Hbin = 15

        return Hbin, c

    def raw_features(self, bytez, lief_binary):
        output = np.zeros((16, 16), dtype=np.int)
        a = np.frombuffer(bytez, dtype=np.uint8)
        if a.shape[0] < self.window:
            Hbin, c = self._entropy_bin_counts(a)
            output[Hbin, :] += c
        else:
            # strided trick from here: http://www.rigtorp.se/2011/01/01/rolling-statistics-numpy.html
            shape = a.shape[:-1] + (a.shape[-1] - self.window + 1, self.window)
            strides = a.strides + (a.strides[-1],)
            blocks = np.lib.stride_tricks.as_strided(a, shape=shape, strides=strides)[::self.step, :]

            # from the blocks, compute histogram
            for block in blocks:
                Hbin, c = self._entropy_bin_counts(block)
                output[Hbin, :] += c

        return output.flatten().tolist()

    def process_raw_features(self, raw_obj):
        counts = np.array(raw_obj, dtype=np.float32)
        sum = counts.sum()
        normalized = counts / sum
        return normalized


class SectionInfo(FeatureType):
    ''' Information about section names, sizes and entropy.  Uses hashing trick
    to summarize all this section info into a feature vector.
    '''

    name = 'section'
    dim = 5 + 50 + 50 + 50 + 50 + 50

    def __init__(self):
        super(FeatureType, self).__init__()

    @staticmethod
    def _properties(s):
        return [str(c).split('.')[-1] for c in s.characteristics_lists]

    def raw_features(self, bytez, lief_binary):
        if lief_binary is None:
            return {"entry": "", "sections": []}

        # properties of entry point, or if invalid, the first executable section

        try:
            if int(LIEF_MAJOR) > 0 or (int(LIEF_MAJOR) == 0 and int(LIEF_MINOR) >= 12):
                section = lief_binary.section_from_rva(lief_binary.entrypoint - lief_binary.imagebase)
                if section is None:
                    raise lief.not_found
                entry_section = section.name
            else: # lief < 0.12
                entry_section = lief_binary.section_from_offset(lief_binary.entrypoint).name
        except lief.not_found:
                # bad entry point, let's find the first executable section
                entry_section = ""
                for s in lief_binary.sections:
                    if lief.PE.SECTION_CHARACTERISTICS.MEM_EXECUTE in s.characteristics_lists:
                        entry_section = s.name
                        break

        raw_obj = {"entry": entry_section}
        raw_obj["sections"] = [{
            'name': s.name,
            'size': s.size,
            'entropy': s.entropy,
            'vsize': s.virtual_size,
            'props': self._properties(s)
        } for s in lief_binary.sections]
        return raw_obj

    def process_raw_features(self, raw_obj):
        sections = raw_obj['sections']
        general = [
            len(sections),  # total number of sections
            # number of sections with zero size
            sum(1 for s in sections if s['size'] == 0),
            # number of sections with an empty name
            sum(1 for s in sections if s['name'] == ""),
            # number of RX
            sum(1 for s in sections if 'MEM_READ' in s['props'] and 'MEM_EXECUTE' in s['props']),
            # number of W
            sum(1 for s in sections if 'MEM_WRITE' in s['props'])
        ]
        # gross characteristics of each section
        section_sizes = [(s['name'], s['size']) for s in sections]
        section_sizes_hashed = FeatureHasher(50, input_type="pair").transform([section_sizes]).toarray()[0]
        section_entropy = [(s['name'], s['entropy']) for s in sections]
        section_entropy_hashed = FeatureHasher(50, input_type="pair").transform([section_entropy]).toarray()[0]
        section_vsize = [(s['name'], s['vsize']) for s in sections]
        section_vsize_hashed = FeatureHasher(50, input_type="pair").transform([section_vsize]).toarray()[0]
        entry_name_hashed = FeatureHasher(50, input_type="string").transform([raw_obj['entry']]).toarray()[0]
        characteristics = [p for s in sections for p in s['props'] if s['name'] == raw_obj['entry']]
        characteristics_hashed = FeatureHasher(50, input_type="string").transform([characteristics]).toarray()[0]

        return np.hstack([
            general, section_sizes_hashed, section_entropy_hashed, section_vsize_hashed, entry_name_hashed,
            characteristics_hashed
        ]).astype(np.float32)


class ImportsInfo(FeatureType):
    ''' Information about imported libraries and functions from the
    import address table.  Note that the total number of imported
    functions is contained in GeneralFileInfo.
    '''

    name = 'imports'
    dim = 1280

    def __init__(self):
        super(FeatureType, self).__init__()

    def raw_features(self, bytez, lief_binary):
        imports = {}
        if lief_binary is None:
            return imports

        for lib in lief_binary.imports:
            if lib.name not in imports:
                imports[lib.name] = []  # libraries can be duplicated in listing, extend instead of overwrite

            # Clipping assumes there are diminishing returns on the discriminatory power of imported functions
            #  beyond the first 10000 characters, and this will help limit the dataset size
            for entry in lib.entries:
                if entry.is_ordinal:
                    imports[lib.name].append("ordinal" + str(entry.ordinal))
                else:
                    imports[lib.name].append(entry.name[:10000])

        return imports

    def process_raw_features(self, raw_obj):
        # unique libraries
        libraries = list(set([l.lower() for l in raw_obj.keys()]))
        libraries_hashed = FeatureHasher(256, input_type="string").transform([libraries]).toarray()[0]

        # A string like "kernel32.dll:CreateFileMappingA" for each imported function
        imports = [lib.lower() + ':' + e for lib, elist in raw_obj.items() for e in elist]
        imports_hashed = FeatureHasher(1024, input_type="string").transform([imports]).toarray()[0]

        # Two separate elements: libraries (alone) and fully-qualified names of imported functions
        return np.hstack([libraries_hashed, imports_hashed]).astype(np.float32)


class ExportsInfo(FeatureType):
    ''' Information about exported functions. Note that the total number of exported
    functions is contained in GeneralFileInfo.
    '''

    name = 'exports'
    dim = 128

    def __init__(self):
        super(FeatureType, self).__init__()

    def raw_features(self, bytez, lief_binary):
        if lief_binary is None:
            return []

        # Clipping assumes there are diminishing returns on the discriminatory power of exports beyond
        #  the first 10000 characters, and this will help limit the dataset size
        if LIEF_EXPORT_OBJECT:
            # export is an object with .name attribute (0.10.0 and later)
            clipped_exports = [export.name[:10000] for export in lief_binary.exported_functions]
        else:
            # export is a string (LIEF 0.9.0 and earlier)
            clipped_exports = [export[:10000] for export in lief_binary.exported_functions]


        return clipped_exports

    def process_raw_features(self, raw_obj):
        exports_hashed = FeatureHasher(128, input_type="string").transform([raw_obj]).toarray()[0]
        return exports_hashed.astype(np.float32)


class GeneralFileInfo(FeatureType):
    ''' General information about the file '''

    name = 'general'
    dim = 10

    def __init__(self):
        super(FeatureType, self).__init__()

    def raw_features(self, bytez, lief_binary):
        if lief_binary is None:
            return {
                'size': len(bytez),
                'vsize': 0,
                'has_debug': 0,
                'exports': 0,
                'imports': 0,
                'has_relocations': 0,
                'has_resources': 0,
                'has_signature': 0,
                'has_tls': 0,
                'symbols': 0
            }

        return {
            'size': len(bytez),
            'vsize': lief_binary.virtual_size,
            'has_debug': int(lief_binary.has_debug),
            'exports': len(lief_binary.exported_functions),
            'imports': len(lief_binary.imported_functions),
            'has_relocations': int(lief_binary.has_relocations),
            'has_resources': int(lief_binary.has_resources),
            'has_signature': int(lief_binary.has_signatures) if LIEF_HAS_SIGNATURE else int(lief_binary.has_signature),
            'has_tls': int(lief_binary.has_tls),
            'symbols': len(lief_binary.symbols),
        }

    def process_raw_features(self, raw_obj):
        return np.asarray([
            raw_obj['size'], raw_obj['vsize'], raw_obj['has_debug'], raw_obj['exports'], raw_obj['imports'],
            raw_obj['has_relocations'], raw_obj['has_resources'], raw_obj['has_signature'], raw_obj['has_tls'],
            raw_obj['symbols']
        ],
                          dtype=np.float32)


class HeaderFileInfo(FeatureType):
    ''' Machine, architecure, OS, linker and other information extracted from header '''

    name = 'header'
    dim = 62

    def __init__(self):
        super(FeatureType, self).__init__()

    def raw_features(self, bytez, lief_binary):
        raw_obj = {}
        raw_obj['coff'] = {'timestamp': 0, 'machine': "", 'characteristics': []}
        raw_obj['optional'] = {
            'subsystem': "",
            'dll_characteristics': [],
            'magic': "",
            'major_image_version': 0,
            'minor_image_version': 0,
            'major_linker_version': 0,
            'minor_linker_version': 0,
            'major_operating_system_version': 0,
            'minor_operating_system_version': 0,
            'major_subsystem_version': 0,
            'minor_subsystem_version': 0,
            'sizeof_code': 0,
            'sizeof_headers': 0,
            'sizeof_heap_commit': 0
        }
        if lief_binary is None:
            return raw_obj

        raw_obj['coff']['timestamp'] = lief_binary.header.time_date_stamps
        raw_obj['coff']['machine'] = str(lief_binary.header.machine).split('.')[-1]
        raw_obj['coff']['characteristics'] = [str(c).split('.')[-1] for c in lief_binary.header.characteristics_list]
        raw_obj['optional']['subsystem'] = str(lief_binary.optional_header.subsystem).split('.')[-1]
        raw_obj['optional']['dll_characteristics'] = [
            str(c).split('.')[-1] for c in lief_binary.optional_header.dll_characteristics_lists
        ]
        raw_obj['optional']['magic'] = str(lief_binary.optional_header.magic).split('.')[-1]
        raw_obj['optional']['major_image_version'] = lief_binary.optional_header.major_image_version
        raw_obj['optional']['minor_image_version'] = lief_binary.optional_header.minor_image_version
        raw_obj['optional']['major_linker_version'] = lief_binary.optional_header.major_linker_version
        raw_obj['optional']['minor_linker_version'] = lief_binary.optional_header.minor_linker_version
        raw_obj['optional'][
            'major_operating_system_version'] = lief_binary.optional_header.major_operating_system_version
        raw_obj['optional'][
            'minor_operating_system_version'] = lief_binary.optional_header.minor_operating_system_version
        raw_obj['optional']['major_subsystem_version'] = lief_binary.optional_header.major_subsystem_version
        raw_obj['optional']['minor_subsystem_version'] = lief_binary.optional_header.minor_subsystem_version
        raw_obj['optional']['sizeof_code'] = lief_binary.optional_header.sizeof_code
        raw_obj['optional']['sizeof_headers'] = lief_binary.optional_header.sizeof_headers
        raw_obj['optional']['sizeof_heap_commit'] = lief_binary.optional_header.sizeof_heap_commit
        return raw_obj

    def process_raw_features(self, raw_obj):
        return np.hstack([
            raw_obj['coff']['timestamp'],
            FeatureHasher(10, input_type="string").transform([[raw_obj['coff']['machine']]]).toarray()[0],
            FeatureHasher(10, input_type="string").transform([raw_obj['coff']['characteristics']]).toarray()[0],
            FeatureHasher(10, input_type="string").transform([[raw_obj['optional']['subsystem']]]).toarray()[0],
            FeatureHasher(10, input_type="string").transform([raw_obj['optional']['dll_characteristics']]).toarray()[0],
            FeatureHasher(10, input_type="string").transform([[raw_obj['optional']['magic']]]).toarray()[0],
            raw_obj['optional']['major_image_version'],
            raw_obj['optional']['minor_image_version'],
            raw_obj['optional']['major_linker_version'],
            raw_obj['optional']['minor_linker_version'],
            raw_obj['optional']['major_operating_system_version'],
            raw_obj['optional']['minor_operating_system_version'],
            raw_obj['optional']['major_subsystem_version'],
            raw_obj['optional']['minor_subsystem_version'],
            raw_obj['optional']['sizeof_code'],
            raw_obj['optional']['sizeof_headers'],
            raw_obj['optional']['sizeof_heap_commit'],
        ]).astype(np.float32)


class StringExtractor(FeatureType):
    ''' Extracts strings from raw byte stream '''

    name = 'strings'
    dim = 1 + 1 + 1 + 96 + 1 + 1 + 1 + 1 + 1

    def __init__(self):
        super(FeatureType, self).__init__()
        # all consecutive runs of 0x20 - 0x7f that are 5+ characters
        self._allstrings = re.compile(b'[\x20-\x7f]{5,}')
        # occurances of the string 'C:\'.  Not actually extracting the path
        self._paths = re.compile(b'c:\\\\', re.IGNORECASE)
        # occurances of http:// or https://.  Not actually extracting the URLs
        self._urls = re.compile(b'https?://', re.IGNORECASE)
        # occurances of the string prefix HKEY_.  No actually extracting registry names
        self._registry = re.compile(b'HKEY_')
        # crude evidence of an MZ header (dropper?) somewhere in the byte stream
        self._mz = re.compile(b'MZ')

    def raw_features(self, bytez, lief_binary):
        allstrings = self._allstrings.findall(bytez)
        if allstrings:
            # statistics about strings:
            string_lengths = [len(s) for s in allstrings]
            avlength = sum(string_lengths) / len(string_lengths)
            # map printable characters 0x20 - 0x7f to an int array consisting of 0-95, inclusive
            as_shifted_string = [b - ord(b'\x20') for b in b''.join(allstrings)]
            c = np.bincount(as_shifted_string, minlength=96)  # histogram count
            # distribution of characters in printable strings
            csum = c.sum()
            p = c.astype(np.float32) / csum
            wh = np.where(c)[0]
            H = np.sum(-p[wh] * np.log2(p[wh]))  # entropy
        else:
            avlength = 0
            c = np.zeros((96,), dtype=np.float32)
            H = 0
            csum = 0

        return {
            'numstrings': len(allstrings),
            'avlength': avlength,
            'printabledist': c.tolist(),  # store non-normalized histogram
            'printables': int(csum),
            'entropy': float(H),
            'paths': len(self._paths.findall(bytez)),
            'urls': len(self._urls.findall(bytez)),
            'registry': len(self._registry.findall(bytez)),
            'MZ': len(self._mz.findall(bytez))
        }

    def process_raw_features(self, raw_obj):
        hist_divisor = float(raw_obj['printables']) if raw_obj['printables'] > 0 else 1.0
        return np.hstack([
            raw_obj['numstrings'], raw_obj['avlength'], raw_obj['printables'],
            np.asarray(raw_obj['printabledist']) / hist_divisor, raw_obj['entropy'], raw_obj['paths'], raw_obj['urls'],
            raw_obj['registry'], raw_obj['MZ']
        ]).astype(np.float32)


class DataDirectories(FeatureType):
    ''' Extracts size and virtual address of the first 15 data directories '''

    name = 'datadirectories'
    dim = 15 * 2

    def __init__(self):
        super(FeatureType, self).__init__()
        self._name_order = [
            "EXPORT_TABLE", "IMPORT_TABLE", "RESOURCE_TABLE", "EXCEPTION_TABLE", "CERTIFICATE_TABLE",
            "BASE_RELOCATION_TABLE", "DEBUG", "ARCHITECTURE", "GLOBAL_PTR", "TLS_TABLE", "LOAD_CONFIG_TABLE",
            "BOUND_IMPORT", "IAT", "DELAY_IMPORT_DESCRIPTOR", "CLR_RUNTIME_HEADER"
        ]

    def raw_features(self, bytez, lief_binary):
        output = []
        if lief_binary is None:
            return output

        for data_directory in lief_binary.data_directories:
            output.append({
                "name": str(data_directory.type).replace("DATA_DIRECTORY.", ""),
                "size": data_directory.size,
                "virtual_address": data_directory.rva
            })
        return output

    def process_raw_features(self, raw_obj):
        features = np.zeros(2 * len(self._name_order), dtype=np.float32)
        for i in range(len(self._name_order)):
            if i < len(raw_obj):
                features[2 * i] = raw_obj[i]["size"]
                features[2 * i + 1] = raw_obj[i]["virtual_address"]
        return features


class PEFeatureExtractor(object):
    ''' Extract useful features from a PE file, and return as a vector of fixed size. '''

    def __init__(self, feature_version=2, print_feature_warning=True, features_file=''):
        self.features = []
        features = {
                    'ByteHistogram': ByteHistogram(),
                    'ByteEntropyHistogram': ByteEntropyHistogram(),
                    'StringExtractor': StringExtractor(),
                    'GeneralFileInfo': GeneralFileInfo(),
                    'HeaderFileInfo': HeaderFileInfo(),
                    'SectionInfo': SectionInfo(),
                    'ImportsInfo': ImportsInfo(),
                    'ExportsInfo': ExportsInfo()
            }

        if os.path.exists(features_file):
            with open(features_file, encoding='utf8') as f:
                x = json.load(f)
                self.features = [features[feature] for feature in x['features'] if feature in features]
        else:
            self.features = list(features.values())

        if feature_version == 1:
            if not lief.__version__.startswith("0.8.3"):
                if print_feature_warning:
                    print(f"WARNING: EMBER feature version 1 were computed using lief version 0.8.3-18d5b75")
                    print(f"WARNING:   lief version {lief.__version__} found instead. There may be slight inconsistencies")
                    print(f"WARNING:   in the feature calculations.")
        elif feature_version == 2:
            self.features.append(DataDirectories())
            if not lief.__version__.startswith("0.9.0"):
                if print_feature_warning:
                    print(f"WARNING: EMBER feature version 2 were computed using lief version 0.9.0-")
                    print(f"WARNING:   lief version {lief.__version__} found instead. There may be slight inconsistencies")
                    print(f"WARNING:   in the feature calculations.")
        else:
            raise Exception(f"EMBER feature version must be 1 or 2. Not {feature_version}")
        self.dim = sum([fe.dim for fe in self.features])

    def raw_features(self, bytez):
        lief_errors = (lief.bad_format, lief.bad_file, lief.pe_error, lief.parser_error, lief.read_out_of_bound,
                       RuntimeError)
        try:
            lief_binary = lief.PE.parse(list(bytez))
        except lief_errors as e:
            #print("lief error: ", str(e))
            lief_binary = None
        except Exception:  # everything else (KeyboardInterrupt, SystemExit, ValueError):
            raise

        features = {"sha256": hashlib.sha256(bytez).hexdigest()}
        features.update({fe.name: fe.raw_features(bytez, lief_binary) for fe in self.features})
        return features

    def process_raw_features(self, raw_obj):
        feature_vectors = [fe.process_raw_features(raw_obj[fe.name]) for fe in self.features]
        return np.hstack(feature_vectors).astype(np.float32)

    def feature_vector(self, bytez):
        return self.process_raw_features(self.raw_features(bytez))


##### features are extracted from the binary files saved in original folders, modified_folders, adv_folders and and saved in correspond folders

In [None]:
# Function to extract file name
def extract_file_name(file_path):
    return os.path.basename(file_path)

# Function to extract features and save to CSV
def extract_and_save_features(input_folder, output_csv, folder_name):
    feature_extractor = PEFeatureExtractor()
    vectorized_list = []
    fname_list = []

    binary_folder_name = folder_name + "_binary_files"
    binary_folder_path = os.path.join(input_folder, binary_folder_name)

    for i, f in enumerate(os.listdir(binary_folder_path)):
        path = os.path.join(binary_folder_path, f)
        with open(path, "rb") as file_handle:
            bytez = file_handle.read()

        vectorized = feature_extractor.feature_vector(bytez)
        vectorized_list.append(vectorized)

        f_name = extract_file_name(path)
        fname_list.append(f_name)

    header = []
    for h in range(1, 257):
        header.append(f"hist_{h}")
    for b in range(257, 513):
        header.append(f"byte_{b}")
    for s in range(513, 617):
        header.append(f"string_{s}")
    for g in range(617, 627):
        header.append(f"gen_{g}")
    for h in range(627, 689):
        header.append(f"head_{h}")
    for sec in range(689, 944):
        header.append(f"section_{sec}")
    for imp in range(944, 2224):
        header.append(f"imports_{imp}")
    for exp in range(2224, 2352):
        header.append(f"exports_{exp}")
    for d in range(2352, 2382):
        header.append(f"directories_{d}")
    with open(output_csv, 'w', newline='') as f:
        writer = csv.writer(f)
        writer.writerow(header)
        for ve in vectorized_list:
            writer.writerow(ve)

    new_column_header = "f_name"
    df = pd.read_csv(output_csv)
    df[new_column_header] = fname_list
    df.to_csv(output_csv, index=False)

# List of folders to process
folders_to_process = ["fold1", "fold2", "fold3", "modified_fold1", "modified_fold2", "modified_fold3", "adv_fold1", "adv_fold2", "adv_fold3"]

# Iterate through each folder
for folder_name in folders_to_process:
    folder_path = os.path.join(output_folder, folder_name)
    output_csv_path = os.path.join(folder_path, f"{folder_name}_lief.csv")

    extract_and_save_features(folder_path, output_csv_path, folder_name)


#### folds contains files name and labels copy from fold1, fold2, fold3 and paste into corresponding modified and adv_folders

In [None]:
for folder_name in folds_folders:
    # Source paths
    source_csv_path = os.path.join(output_folder, folder_name, f"{folder_name}_data.csv")

    # Destination paths
    modified_folder_path = os.path.join(output_folder, f"modified_{folder_name}")
    adv_folder_path = os.path.join(output_folder, f"adv_{folder_name}")

    # Move CSV file to modified folder
    modified_csv_path = os.path.join(modified_folder_path, f"{folder_name}_data.csv")
    shutil.copy2(source_csv_path, modified_csv_path)
    print(f"Moved {source_csv_path} to {modified_csv_path}")

    # Move CSV file to adv folder
    adv_csv_path = os.path.join(adv_folder_path, f"{folder_name}_data.csv")
    shutil.copy2(source_csv_path, adv_csv_path)
    print(f"Moved {source_csv_path} to {adv_csv_path}")


#### read labels from folds and assign to lief_features files in correspondoing modified_folders and adv_folders

In [None]:

#Insertion of labels in folders
def merge_and_save(file_labels, files_features, output_csv):
    # Read fold_data CSV
    labels_data = pd.read_csv(file_labels)

    # Read input folder CSV
    features_data = pd.read_csv(files_features)

    # Merge based on 'f_name'
    merged_df = pd.merge(features_data,labels_data[['file_names', 'labels']], left_on='f_name', right_on='file_names', how='inner')

    # Drop unnecessary columns
    merged_df.drop(columns=['file_names', 'f_name'], inplace=True)

    # Save the merged DataFrame to a new CSV file
    merged_df.to_csv(output_csv, index=False)

# List of files to process
files_to_process = [
    ('fold1_data.csv', 'fold1_lief.csv', 'fold1_updated.csv'),
    ('fold2_data.csv', 'fold2_lief.csv', 'fold2_updated.csv'),
    ('fold3_data.csv', 'fold3_lief.csv', 'fold3_updated.csv'),
    ('fold1_data.csv', 'modified_fold1_lief.csv', 'modified_fold1_updated.csv'),
    ('fold2_data.csv', 'modified_fold2_lief.csv', 'modified_fold2_updated.csv'),
    ('fold3_data.csv', 'modified_fold3_lief.csv', 'modified_fold3_updated.csv'),
    ('fold1_data.csv', 'adv_fold1_lief.csv', 'AT_fold1.csv'),
    ('fold2_data.csv', 'adv_fold2_lief.csv', 'AT_fold2.csv'),
    ('fold3_data.csv', 'adv_fold3_lief.csv', 'AT_fold3.csv'),
]

# Iterate through each file pair and process
for fold_data_csv, fold_csv, output_csv in files_to_process:
    subfolder_name = os.path.basename(fold_csv).split('_lief')[0]
    labels_file_path = os.path.join(output_folder, subfolder_name, fold_data_csv)
    features_file_path = os.path.join(output_folder, subfolder_name, fold_csv)
    output_csv_path = os.path.join(output_folder, subfolder_name, output_csv)
    merge_and_save(labels_file_path, features_file_path, output_csv_path)


#### lief_features files from fold1, fold2, fol3 combined with leif_features files of adv_folders to create adversarial training files

In [None]:
# List of pairs to concatenate and save
concatenation_pairs = [
    {"source_subfolder": "fold1", "source_file": "fold1_updated.csv", "dest_subfolder": "adv_fold1", "dest_file": "AT_fold1.csv"},
    {"source_subfolder": "fold2", "source_file": "fold2_updated.csv", "dest_subfolder": "adv_fold2", "dest_file": "AT_fold2.csv"},
    {"source_subfolder": "fold3", "source_file": "fold3_updated.csv", "dest_subfolder": "adv_fold3", "dest_file": "AT_fold3.csv"},
]

for pair in concatenation_pairs:
    # Get paths for source and destination files
    source_path = os.path.join(output_folder, pair["source_subfolder"], pair["source_file"])
    dest_path = os.path.join(output_folder, pair["dest_subfolder"], pair["dest_file"])

    # Read source CSV file
    source_df = pd.read_csv(source_path)

    # Read destination CSV file
    dest_df = pd.read_csv(dest_path)

    # Concatenate source and destination DataFrames
    concatenated_df = pd.concat([dest_df, source_df], axis=0, ignore_index=True)

    # Save the concatenated DataFrame to a new CSV file
    output_csv_path = os.path.join(output_folder, pair["dest_subfolder"], f"{pair['dest_file'].replace('.csv', '_updated.csv')}")
    concatenated_df.to_csv(output_csv_path, index=False)

    print(f"Concatenated and saved files in {pair['source_subfolder']} with {pair['dest_subfolder']} and saved to {output_csv_path}")


In [None]:
output_folder

In [None]:
fold1_updated = pd.read_csv(os.path.join(output_folder,'fold1/fold1_updated.csv'))
fold2_updated = pd.read_csv(os.path.join(output_folder, 'fold2/fold2_updated.csv'))
fold3_updated = pd.read_csv(os.path.join(output_folder, 'fold3/fold3_updated.csv'))
modified_fold1_updated = pd.read_csv(os.path.join(output_folder, 'modified_fold1/modified_fold1_updated.csv'))
modified_fold2_updated = pd.read_csv(os.path.join(output_folder, 'modified_fold2/modified_fold2_updated.csv'))
modified_fold3_updated = pd.read_csv(os.path.join(output_folder, 'modified_fold3/modified_fold3_updated.csv'))
AT_fold1_updated = pd.read_csv(os.path.join(output_folder, 'adv_fold1/AT_fold1_updated.csv'))
AT_fold2_updated = pd.read_csv(os.path.join(output_folder, 'adv_fold2/AT_fold2_updated.csv'))
AT_fold3_updated = pd.read_csv(os.path.join(output_folder, 'adv_fold3/AT_fold3_updated.csv'))

### training and evaluating the model with Trial1, Trial2 and Trial3

In [None]:

# List of datasets for each trial
trials = [
    (['fold1_updated', 'fold2_updated'], 'fold3_updated', 'modified_fold3_updated', 'AT_fold3_updated'),
    (['fold1_updated', 'fold3_updated'], 'fold2_updated', 'modified_fold2_updated', 'AT_fold2_updated'),
    (['fold2_updated', 'fold3_updated'], 'fold1_updated', 'modified_fold1_updated', 'AT_fold1_updated'),
    (['modified_fold1_updated', 'modified_fold2_updated'], 'fold3_updated', 'modified_fold3_updated', 'AT_fold3_updated'),
    (['modified_fold1_updated', 'modified_fold3_updated'], 'fold2_updated', 'modified_fold2_updated', 'AT_fold2_updated'),
    (['modified_fold2_updated', 'modified_fold3_updated'], 'fold1_updated', 'modified_fold1_updated', 'AT_fold1_updated'),
    (['AT_fold1_updated', 'AT_fold2_updated'], 'fold3_updated', 'modified_fold3_updated', 'AT_fold3_updated'),
    (['AT_fold1_updated', 'AT_fold3_updated'], 'fold2_updated', 'modified_fold2_updated', 'AT_fold2_updated'),
    (['AT_fold2_updated', 'AT_fold3_updated'], 'fold1_updated', 'modified_fold1_updated', 'AT_fold1_updated'),
]

# Iterate through each trial
for i, trial in enumerate(trials, start=1):
    train_datasets, test_fold, test_modified_fold, test_AT_fold = trial

    # Concatenate training datasets
    training_data = pd.concat([globals()[dataset] for dataset in train_datasets])

    # Extract features and labels for training
    x_train = training_data.iloc[:, :-1]
    y_train = training_data.iloc[:, -1]

    # Extract features and labels for testing
    x_test_fold = globals()[test_fold].iloc[:, :-1]
    y_test_fold = globals()[test_fold].iloc[:, -1]

    x_test_modified_fold = globals()[test_modified_fold].iloc[:, :-1]
    y_test_modified_fold = globals()[test_modified_fold].iloc[:, -1]

    x_test_AT_fold = globals()[test_AT_fold].iloc[:, :-1]
    y_test_AT_fold = globals()[test_AT_fold].iloc[:, -1]

    # Train the model
    model = lgb.LGBMClassifier()
    model = model.fit(x_train, y_train)
    train_set_name = '+'.join(train_datasets)
    model_filename = f"trial_{i}_train_{train_set_name}_model.pkl"
    model_path = os.path.join(model_folder, model_filename)
    joblib.dump(model, model_path)

    # Evaluate performance on test sets
    for test_set_name, x_test, y_test in zip(['fold', 'modified_fold', 'AT_fold'],
                                            [x_test_fold, x_test_modified_fold, x_test_AT_fold],
                                            [y_test_fold, y_test_modified_fold, y_test_AT_fold]):
        # Predict on the test set
        y_predicted = np.argmax(model.predict_proba(x_test), axis=1)

        # Evaluate performance
        confusion_mat = confusion_matrix(y_test, y_predicted)
        accuracy = accuracy_score(y_test, y_predicted)
        classification_rep = classification_report(y_test, y_predicted)

        # Generate result filename
        
        result_filename = f"trial_{i}_train_{train_set_name}_test_{test_set_name}"

        # Write results to file
        result_path = os.path.join(results_folder, result_filename)
        with open(result_path, "w") as text_file:
            text_file.write(f'Accuracy: {accuracy}')
            text_file.write(f'\nClassification: {classification_rep}')
            text_file.write(f'\nConfusion: {confusion_mat}')
