In [None]:
from google.colab import drive
drive.mount('/content/drive')

 # Install dependencies and libraries

In [6]:
# Install dependencies
!pip install lief==0.12.0
!pip install deap
!pip install tqdm
!pip install python-magic

# Install ML-Pentest Lib
!pip install ml-pentest

# Install EMBER augmented version
!pip install ember-mivia

Collecting lief==0.12.0
  Downloading lief-0.12.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (4.2 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m4.2/4.2 MB[0m [31m13.3 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: lief
Successfully installed lief-0.12.0
Collecting deap
  Downloading deap-1.4.1-cp310-cp310-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl (135 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m135.4/135.4 kB[0m [31m1.2 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: deap
Successfully installed deap-1.4.1
Collecting python-magic
  Downloading python_magic-0.4.27-py2.py3-none-any.whl (13 kB)
Installing collected packages: python-magic
Successfully installed python-magic-0.4.27
Collecting ml-pentest
  Downloading ml_pentest-0.0.1.tar.gz (57.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m57.9/57.9 MB[0m [31m9.6 MB/s[0m eta

# Import and model definition

In [7]:
from ml_pentest.attacks.blackbox.genetic_attack.GAMMA.gamma_section_injection import GammaSectionInjection
from ml_pentest.attacks.blackbox.genetic_attack.GAMMA.attack_utils import create_section_population_from_folder

import os
import torch
import lief
import numpy as np
import random

## Defining custom Wrapper

In [8]:
from ml_pentest.models.wrappers.model_wrapper import ModelWrapper
from ml_pentest.models.feature_extractor.pe_format.ember_feature_extractor import get_ember_features_from_bytes

def remove_features(features):
  features_filtered = features[:626] + features[637:647] + features[657:]

  return features_filtered

class MyWrapper(ModelWrapper):
    """
    GBDTWrapper is a concrete implementation of the ModelWrapper abstract class, specifically designed for Gradient Boosting Decision Tree (GBDT) models.

    Methods:
        __init__(model: object) -> None:
            Initializes a GBDTWrapper object.
        classify_sample(sample: bytes) -> float:
            Classifies a single sample using the GBDT model and returns the classification probability.

    Note:
        This class assumes that the GBDT model object passed to it has a method called 'predict' for making predictions.
    """
    def __init__(self, model):
        """
        Initializes a GBDTWrapper object.

        Args:
            model (object): An object that represents the GBDT classifier model to be used for classification.
        """
        super().__init__(model)

    def classify_sample(self, sample):
        """
        Classifies a single sample using the GBDT model.
        This method includes all necessary pre-processing steps to prepare the sample for the model and returns the classification probability.

        Args:
            sample (bytes): A byte array that represents an executable file.

        Returns:
            float: The classification probability, ranging from 0 to 1, where 1 represents the malware class.
        """
        ember_features = get_ember_features_from_bytes(sample)
        ember_features = remove_features(ember_features)
        features = np.array([ember_features])
        result = self._model.predict_proba(features)

        return result[0][1].item()

Loading model and instanciating wrapper:

In [3]:
import gdown

id = '1u4AhwQgwNk-0F7GONRL2LXdrDiFz3Ao2'
gdown.download(id=id, output="voting_classifier.pkl")

Downloading...
From: https://drive.google.com/uc?id=1u4AhwQgwNk-0F7GONRL2LXdrDiFz3Ao2
To: /content/voting_classifier.pkl
100%|██████████| 10.9M/10.9M [00:00<00:00, 23.2MB/s]


'voting_classifier.pkl'

In [9]:
import pickle

model_path = "voting_classifier.pkl"

my_model = None
with open(model_path, 'rb') as file:
    my_model = pickle.load(file)

model_wrapper = MyWrapper(my_model)

# Load the data

## Sorel-20M test set malware

Loading test sets raw malware:

In [10]:
url = '1-CNx_k1yEJ-RDC94nXUMlhtc1FQH7lzt'
gdown.download(id=url, output="test_set.tar.gz")

Downloading...
From (original): https://drive.google.com/uc?id=1-CNx_k1yEJ-RDC94nXUMlhtc1FQH7lzt
From (redirected): https://drive.google.com/uc?id=1-CNx_k1yEJ-RDC94nXUMlhtc1FQH7lzt&confirm=t&uuid=6ece22d0-7e81-4016-96ee-16286170cb07
To: /content/test_set.tar.gz
100%|██████████| 2.52G/2.52G [00:47<00:00, 53.1MB/s]


'test_set.tar.gz'

In [11]:
!mkdir -p test
!tar -zxf 'test_set.tar.gz' -C /content/test

## RUN THIS SECTION ONLY THE FIRST TIME
## Load the initial input sample that we want to obfuscate

In [None]:
malware_directory = '/content/malware'

malwares = dict()
max_num_samples = 100
while len(malwares) < max_num_samples:
  classification_results = 0
  # search for a file classified as malware (with classification result > 0.5)
  while classification_results < 0.5:
    # input sample that we want to obfuscate
    malware_name = random.choice(os.listdir(malware_directory))
    path = os.path.join(malware_directory, malware_name)

    with open(path, "rb") as file_handle:
      code = file_handle.read()
      try:
        classification_results = model_wrapper.classify_sample(code)
        if classification_results >= 0.5:
          print(f"Malware correctly classified:\n\t{malware_name}")
          print("Number of malwares added:", len(malwares))
          x = np.frombuffer(code, dtype=np.uint8)
          malwares[malware_name] = x
      except Exception:
        continue

In [None]:
print("Number of malwares in dictionary (expected 100):", len(malwares))

Saving malwares in a folder for future access:

In [None]:
import os
import shutil
malware_path = "/content/malware"
destination_path = "path/to/save/folder"

for malware_name in malwares.keys():
  print(f"Moving {malware_name} to folder.")
  shutil.copy(f"{malware_path}/{malware_name}", destination_path)

In [None]:
!zip -r 'path/to/save/malware_to_obfuscate.zip' destination_path

Check that all files are classified as malware:

In [None]:
incorrect_samples = 0
destination_path = "path/to/save/folder"
malware_directory = "/content/test/malware"

for malware_name in os.listdir(destination_path):
  path = os.path.join(malware_directory, malware_name)

  with open(path, "rb") as file_handle:
    code = file_handle.read()
    try:
      classification_results = model_wrapper.classify_sample(code)

      if classification_results < 0.5:
        print(f"Incorrect classification, model classified {malware_name} sample as benign with a probability of {classification_results}")
        incorrect_samples += 1
      else:
        print(f"{malware_name} classified as malware correctly")
    except:
      print("Exception during the analysis")

print(f"Number of incorrect classification (expected 0): {incorrect_samples}")

# Gamma Attack

## Extract the benign section to inject

In [24]:
id = '1DRH8KDmf-oTm6Xd7rYcbKFeGXBQrgRpS'

gdown.download(id=id, output ='malware_to_obfuscate.zip')

!unzip malware_to_obfuscate.zip

Downloading...
From (original): https://drive.google.com/uc?id=1DRH8KDmf-oTm6Xd7rYcbKFeGXBQrgRpS
From (redirected): https://drive.google.com/uc?id=1DRH8KDmf-oTm6Xd7rYcbKFeGXBQrgRpS&confirm=t&uuid=5b0eb8bc-ce34-457f-8c99-e19a9fdba8fa
To: /content/malware_to_obfuscate.zip
100%|██████████| 112M/112M [00:04<00:00, 24.0MB/s]


Archive:  malware_to_obfuscate.zip
   creating: content/drive/Shareddrives/AFC/gamma_handcrafted/malwares_to_obfuscate/
  inflating: content/drive/Shareddrives/AFC/gamma_handcrafted/malwares_to_obfuscate/14649a48ea276bfbb2e5c9ce912c3e232483c582d884e175ce7bf8b8337d48f9  
  inflating: content/drive/Shareddrives/AFC/gamma_handcrafted/malwares_to_obfuscate/2d9294da989eef84609a549bcc41a6efe135a90da2b4a95c3d9b2adafa02d813  
  inflating: content/drive/Shareddrives/AFC/gamma_handcrafted/malwares_to_obfuscate/493f031eba201838fa24a184e93125dd1409987e4859cbbc066b2ce9b2cafde7  
  inflating: content/drive/Shareddrives/AFC/gamma_handcrafted/malwares_to_obfuscate/b889e0e8212eb10c6debfed2c47a37984eab990658c082b17c1a88680ba10507  
  inflating: content/drive/Shareddrives/AFC/gamma_handcrafted/malwares_to_obfuscate/0926c24a0653319193377d6e742a3d25fd5b91d24578a999962262502b6cc974  
  inflating: content/drive/Shareddrives/AFC/gamma_handcrafted/malwares_to_obfuscate/3c2b3d01388be86ec88f262e7dfb53079eb12109b

In [26]:
benign_path = '/content/test/benign'
malware_path = '/content/content/drive/Shareddrives/AFC/gamma_handcrafted/malwares_to_obfuscate'

section_population_25, _ = create_section_population_from_folder(
    benign_path, how_many = 25, sections_to_extract=['.data','.rdata', '.idata', '.rodata'],
    cache_file='/content/section_population.pkl')

# Section population with how_many = 50
section_population_50, _ = create_section_population_from_folder(
    benign_path, how_many = 50, sections_to_extract=['.data','.rdata', '.idata', '.rodata'],
    cache_file='/content/section_population.pkl')
print("Section extracted")

# Attack parameters
lambda_values = [10e-3, 10e-5, 10e-7, 10e-9]
query_values = [20, 60, 120, 300]
POPULATION_SIZE = 20

Section extracted


## Create the attack

In [22]:
from ml_pentest.attacks.blackbox.genetic_attack.GAMMA.attack_utils import generate_adv_samples_from_folder

def gamma_attack(base_path, section_population):

  if not os.path.exists(os.path.join(base_path, 'samples')):
      os.makedirs(os.path.join(base_path, 'samples'))
  if not os.path.exists(os.path.join(base_path, 'results')):
      os.makedirs(os.path.join(base_path, 'results'))

  for lambda_value in lambda_values:
      if not os.path.isdir(os.path.join(base_path, 'samples',str(lambda_value))):
          os.mkdir(os.path.join(base_path, 'samples',str(lambda_value)))
      for query_budget in query_values:
          destination_folder = os.path.join(base_path, 'samples', str(lambda_value),str(query_budget))
          if not os.path.isdir(destination_folder):
              os.mkdir(destination_folder)

          print(destination_folder, base_path)
          print("Lambda: ", lambda_value, "Query budget: ", query_budget)
          attack = GammaSectionInjection(section_population=section_population, model_wrapper=model_wrapper,
                                      population_size=POPULATION_SIZE, lambda_value=lambda_value, iterations=100,
                                      debug=False, hard_label=False, query_budget=query_budget,
                                      stagnation=5)
          result_file = os.path.join(base_path, 'results', 'results_'+str(lambda_value)+'_'+str(query_budget)+'.json')
          generate_adv_samples_from_folder(source_folder=malware_path,
                                        destination_folder=destination_folder, gamma_attack=attack, model=model_wrapper,result_file=result_file)

In [None]:
#GAMMA attack with how_many = 25
gamma_attack('/content/attack_results_25', section_population_25)

#GAMMA attack with how_many = 50
gamma_attack('/content/attack_results_50', section_population_50)

## Evaluate the attack

In [29]:
base_path = '/content/attack_results_25/results'
variant_path = '/content/attack_results_50/results'

models = dict()
models["how_many = 25"] = base_path
models["how_many = 50"] = variant_path

In [None]:
from ml_pentest.attack_reports.blackbox.genetic_attack.GAMMA.analize_results import compute_mean_times, print_gamma_results, plot_detection_rate, plot_injected_bytes, plot_heatmap, plot_detection_rate_vs_query_budget, plot_avg_injected_bytes_vs_query_budget, plot_gamma_attack
import os
import numpy as np

malware_files = os.listdir(malware_path)
class_results = []

for f in malware_files:
    with open(os.path.join(malware_path, f), 'rb') as file:
        file_bytes = file.read()
        class_results.append( model_wrapper.classify_sample(file_bytes) )

avg_detection_rate = np.mean(class_results)

model_name='MalConv2'

print("============================ GAMMA Results ============================")

print("===== how_many = 25 =====")
for lambda_value in lambda_values:
    print_gamma_results(base_path, lambda_value, query_values, avg_detection_rate = avg_detection_rate)

print("===== how_many = 50 =====")
for lambda_value in lambda_values:
    print_gamma_results(variant_path, lambda_value, query_values, avg_detection_rate = avg_detection_rate)


save_path = '/content/attack_results/plots'
if not os.path.exists(save_path):
    os.makedirs(save_path)

# Plotting graphs for how_many = 25
plot_detection_rate(base_path, os.path.join(base_path, '25'), query_values, lambda_values, 'detection_rate_gamma.png', model_name=model_name)
plot_injected_bytes(base_path, os.path.join(base_path, '25'), query_values, lambda_values, 'GAMMA', 'gamma_injected_bytes.png', model_name=model_name,  x_range_kb=None)
plot_heatmap(base_path, os.path.join(base_path, '25'), query_values, lambda_values, 'heatmap.png')

# Plotting graphs for how_many = 50
plot_detection_rate(base_path, os.path.join(base_path, '50'), query_values, lambda_values, 'detection_rate_gamma.png', model_name=model_name)
plot_injected_bytes(base_path, os.path.join(base_path, '50'), query_values, lambda_values, 'GAMMA', 'gamma_injected_bytes.png', model_name=model_name,  x_range_kb=None)
plot_heatmap(base_path, os.path.join(base_path, '50'), query_values, lambda_values, 'heatmap.png')

# Plotting graphs to compare two attacks
plot_gamma_attack(base_path, variant_path, save_path, query_values, lambda_values, 'gamma_attack.png', model_name=model_name)
plot_detection_rate_vs_query_budget(models, query_values, lambda_values, save_path, 'detection_vs_query.png')
plot_avg_injected_bytes_vs_query_budget(models, query_values, lambda_values, save_path, file_name='avg_injected_bytes_vs_query_budget.png', lower_limit=None, upper_limit=None)
