In [None]:
# from google.colab import drive
# drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
from typing import List, Tuple
from enum import Enum
from typing import Union
import torch
from torch import Tensor
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader, Subset
import numpy as np
import sklearn
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.neural_network import MLPClassifier
from sklearn.svm import SVC
from sklearn.metrics import confusion_matrix, roc_auc_score
from pathlib import Path
import pickle

In [None]:
sklearn.__version__

'1.2.2'

In [None]:
TorchOrNumpy = Union[np.ndarray, torch.Tensor]

In [None]:
class BlackBoxDetector:
    """
    Black box detector that intends to mimic an antivirus/anti-Malware program that detects whether
    a specific program is either malware or benign.
    """
    class Type(Enum):
        DecisionTree = DecisionTreeClassifier()
        LogisticRegression = LogisticRegression(solver='lbfgs', max_iter=int(1e6))
        MultiLayerPerceptron = MLPClassifier()
        RandomForest = RandomForestClassifier(n_estimators=100)
        SVM = SVC(gamma="auto",probability=True)

        @staticmethod
        def names():
            r""" Builds the list of all enum names """
            return [c.name for c in BlackBoxDetector.Type]

        @staticmethod
        def get_from_name(name):
            for c in BlackBoxDetector.Type:
                if c.name == name:
                    return c
            raise ValueError("Unknown enum \"%s\" for class \"%s\"", name, __class__.name)

    def __init__(self, learner_type: 'BlackBoxDetector.Type'):
        self.type = learner_type
        self._model = sklearn.clone(self.type.value)
        self.training = True

    def fit(self, X: TorchOrNumpy, y: TorchOrNumpy):
        if isinstance(X, torch.Tensor):
            X = X.numpy()
        if isinstance(y, torch.Tensor):
            y = y.numpy()
        self._model.fit(X, y)
        self.training = False

    def predict(self, X: TorchOrNumpy) -> torch.tensor:
        if self.training:
            raise ValueError("Detector does not appear to be trained but trying to predict")
        if torch.cuda.is_available():
            X = X.cpu()
        if isinstance(X, torch.Tensor):
            X = X.numpy()
        print(X)
        y = torch.from_numpy(self._model.predict(X)).float()
        return y.cuda() if torch.cuda.is_available() else y

In [None]:
class MalwareDataset(Dataset):
    """
    Encapsulates a malware dataset.  All elements in the dataset will be either malware or benign
    """
    def __init__(self, x: Union[np.ndarray, torch.Tensor], y):
        super().__init__()

        if isinstance(x, np.ndarray):
            x = torch.from_numpy(x).float()
        self.x = x
        self.y = y

    def __getitem__(self, index):
        return self.x[index], self.y

    def __len__(self):
        return self.x.shape[0]

    @property
    def num_features(self):
        r""" Number of features in the dataset """
        return self.x.shape[1]

In [None]:
class _DataGroup:
    r"""Encapsulates either PyTorch DataLoaders or Datasets.  This class is intended only for internal use by MalGAN."""
    def __init__(self, train: MalwareDataset, valid: MalwareDataset, test: MalwareDataset):
        self.train = train
        self.valid = valid
        self.test = test
        self.is_loaders = False

    def build_loader(self, batch_size: int = 0):
        r""" Constructs loaders from the datasets :param batch_size: Batch size for training """
        self.train = DataLoader(self.train, batch_size=batch_size, shuffle=True, pin_memory=True)
        if self.valid:
            self.valid = DataLoader(self.valid, batch_size=batch_size, pin_memory=True)
        self.test = DataLoader(self.test, batch_size=batch_size, pin_memory=True)
        self.is_loaders = True

In [None]:
VALIDATION_SPLIT = 0.2

In [None]:
def split_train_valid_test(dataset: Dataset, is_benign: bool):
  """Helper function to partition into test, train, and validation subsets"""
  valid_len = 0 if is_benign else int(VALIDATION_SPLIT * len(dataset))
  test_len = int(0.3 * len(dataset))

  # Order must be train, validation, test
  lengths = [len(dataset) - valid_len - test_len, valid_len, test_len]
  return _DataGroup(*torch.utils.data.random_split(dataset, lengths))

In [None]:
def load_dataset(file_path: Union[str, Path], y: int) -> MalwareDataset:
    file_ext = Path(file_path).suffix
    if file_ext in {".npy", ".npz"}:
        data = np.load(file_path)
        # DEBUG
        print(data.view())

    elif file_ext in {".pt", ".pth"}:
        data = torch.load(str(file_path))
    elif file_ext == ".pk":
        with open(str(file_path), "rb") as f_in:
            data = pickle.load(f_in)
    else:
        raise ValueError("Unknown file extension.  Cannot determine how to import")
    return MalwareDataset(x=data, y=y)

In [None]:
malware_features = Path("/content/drive/MyDrive/Feature_Vector/malware_feature_set.pk")
benign_features = Path("/content/drive/MyDrive/Feature_Vector/benign_feature_set.pk")

In [None]:
malware = load_dataset(str(malware_features), 1)
benign = load_dataset(str(benign_features), 0)

In [None]:
mal_data = split_train_valid_test(malware, is_benign=False)
ben_data = split_train_valid_test(benign, is_benign=True)
mal_data, ben_data

(<__main__._DataGroup at 0x7cf8a85b0f40>,
 <__main__._DataGroup at 0x7cf8a85b0520>)

In [None]:
bb = BlackBoxDetector(BlackBoxDetector.Type.RandomForest)

In [None]:
def extract_x(ds: Subset) -> torch.Tensor:
  # noinspection PyUnresolvedReferences
  x = ds.dataset.x[ds.indices]
  return x.cpu() if torch.cuda.is_available() else x

In [None]:
mal_x = extract_x(mal_data.train)
ben_x = extract_x(ben_data.train)

In [None]:
merge_data = torch.cat((mal_x,ben_x))
merge_data

tensor([[0., 0., 0.,  ..., 0., 0., 0.],
        [0., 1., 1.,  ..., 0., 0., 0.],
        [0., 1., 0.,  ..., 0., 0., 0.],
        ...,
        [0., 1., 0.,  ..., 0., 0., 0.],
        [0., 1., 0.,  ..., 0., 0., 0.],
        [0., 0., 0.,  ..., 0., 0., 0.]])

In [None]:
merged_y = torch.cat((torch.full((len(mal_data.train),), 1), torch.full((len(ben_data.train),), 0)))
merged_y

tensor([1, 1, 1,  ..., 0, 0, 0])

In [None]:
bb.fit(merge_data,merged_y)

In [None]:
from sklearn.ensemble import VotingClassifier

class EnsembleBlackBoxDetector:
    def __init__(self):
        self.models = [
            ('DecisionTree', DecisionTreeClassifier()),
            ('RandomForest', RandomForestClassifier()),
            ('SVC', SVC(probability=True)),
            ('Logistic', LogisticRegression()),
            ('MLP', MLPClassifier())
        ]
        self.ensemble_model = VotingClassifier(estimators=self.models, voting='soft')
        self.training = True

    def fit(self, X: TorchOrNumpy, y: TorchOrNumpy):
        if isinstance(X, torch.Tensor):
            X = X.numpy()
        if isinstance(y, torch.Tensor):
            y = y.numpy()
        self.ensemble_model.fit(X, y)
        self.training = False

    def predict(self, X: TorchOrNumpy) -> torch.tensor:
        if self.training:
            raise ValueError("Detector does not appear to be trained but trying to predict")
        if torch.cuda.is_available():
            X = X.cpu()
        if isinstance(X, torch.Tensor):
            X = X.numpy()
        print(X)
        y = torch.from_numpy(self.ensemble_model.predict_proba(X)[:, 1]).float()
        return y.cuda() if torch.cuda.is_available() else y
    def save_model(self, filename):
        with open(filename, 'wb') as file:
            pickle.dump(self.ensemble_model, file)

    @classmethod
    def load_model(cls, filename, learner_types):
        with open(filename, 'rb') as file:
            ensemble_model = pickle.load(file)
        ensemble_detector = cls(learner_types)
        ensemble_detector.ensemble_model = ensemble_model
        ensemble_detector.training = False
        return ensemble_detector


In [None]:
EBD = EnsembleBlackBoxDetector()

In [None]:
EBD.fit(merge_data,merged_y)

In [None]:
# Create some dummy data for prediction
X_test = np.random.choice([0, 1], size=(1, 6005))

# Convert numpy array to torch tensor
X_test_torch = torch.from_numpy(X_test).float()

In [None]:
X_test_torch

tensor([[1., 1., 0.,  ..., 0., 1., 0.]])

In [None]:
# Predict the labels for the test data
y_pred = EBD.predict(X_test_torch)

# Print the predicted labels
y_pred

[[1. 1. 0. ... 0. 1. 0.]]


tensor([0.8601])

In [None]:
x = X_test_torch.numpy()
x

array([[1., 1., 0., ..., 0., 1., 0.]], dtype=float32)

In [None]:
!pip install lief



In [None]:
import lief
import logging
import re

# Set up logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)

def filter_imported_functions(func_string_with_library):
    """
    Filters the returned imported functions of binary to remove those with special characters (lots of noise for some reason),
    and require functions to start with a capital letter since Windows API functions seem to obey Upper Camelcase convension.

    Update: The limitation for the upper case in the preprocessing step has been removed.
    """
    func_string = func_string_with_library.split(":")[0]

    if re.match("^[a-zA-Z]*$", func_string):
        return True
    else:
        return False

def process_imported_functions_output(imports):
    imports = list(filter(lambda x: filter_imported_functions(x), imports))
    # imports = list(map(lambda x: remove_encoding_indicator(x), imports))
    return imports

def feature_generation(file: str, feature_vector_mapping: dict):
    feature_vector = [0] * len(feature_vector_mapping)

    try:
        binary = lief.parse(file)
        imports = [e.name + ':' + lib.name.lower() for lib in binary.imports for e in lib.entries]
        imports = process_imported_functions_output(imports)

        sections = [section.name for section in binary.sections]

        for lib_import in imports:
            if lib_import in feature_vector_mapping:
                index = feature_vector_mapping[lib_import]
                feature_vector[index] = 1

        for section in sections:
            if section in feature_vector_mapping:
                index = feature_vector_mapping[section]
                feature_vector[index] = 1

    except Exception as e:
        logger.error(f"Error parsing {file}: {str(e)}")
        # You can choose to handle the error differently

    return [feature_vector]

In [None]:
# prompt: load a pickle file

with open('/content/drive/MyDrive/Feature_Vector/feature_vector_mapping.pk', 'rb') as f:
    feature_vector_mapping = pickle.load(f)


In [None]:
len(feature_vector_mapping)

6005

In [None]:
benign_path = "/content/drive/MyDrive/Dataset/Benign/Benign test/ApacheMonitor.exe"
malware_path = "/content/drive/MyDrive/Dataset/Virus/Virus test/Locker/VirusShare_0e4c40c9c9921673242963ccd664ab91.exe"

In [None]:
X_test_ben = feature_generation(benign_path,feature_vector_mapping)
X_test_mal = feature_generation(malware_path,feature_vector_mapping)
X_test_torch_ben = torch.tensor(X_test_ben).float()
X_test_torch_mal = torch.tensor(X_test_mal).float()
X_test_torch_ben, X_test_torch_mal

(tensor([[0., 0., 1.,  ..., 0., 0., 0.]]),
 tensor([[1., 1., 1.,  ..., 0., 0., 0.]]))

In [None]:
EBD.predict(X_test_torch_ben),EBD.predict(X_test_torch_mal)

[[0. 0. 1. ... 0. 0. 0.]]
[[1. 1. 1. ... 0. 0. 0.]]


(tensor([0.5217]), tensor([0.9996]))

In [None]:
with open("/content/drive/MyDrive/MalwareScore.pkl","wb") as ms:
  pickle.dump(EBD,ms)

In [None]:
# EBD.save_model("/content/drive/MyDrive/MalwareScore.pkl")

TypeError: EnsembleBlackBoxDetector.load_model() missing 1 required positional argument: 'learner_types'

In [None]:
load_model = pickle.load(open("/content/drive/MyDrive/MalwareScore.pkl","rb"))

In [None]:
t = load_model.predict(X_test_torch_mal)

[[1. 1. 1. ... 0. 0. 0.]]


In [None]:
t.item()

0.9996148347854614