In [1]:
import pandas as pd
import torch
import numpy as np
from transformers import AutoConfig, AutoModelForSequenceClassification
from transformers import CanineForSequenceClassification, CanineTokenizer, BertTokenizer
from torch.utils.data import Dataset
from datasets import load_metric
from sklearn.metrics import roc_auc_score
from scipy.special import softmax
from transformers import TrainingArguments
from transformers import Trainer
from sklearn.utils import resample
import torch.nn as nn

In [2]:
def compute_metrics(eval_pred):
    precision = load_metric("precision")
    recall = load_metric("recall")
    f1 = load_metric("f1")
    acc = load_metric("accuracy")
    mcc = load_metric("matthews_correlation")
    #auc = load_metric("auc")
    
    logits, labels = eval_pred
    predictions = np.argmax(logits, axis=-1)
    precision = precision.compute(predictions=predictions, average = "macro", references=labels)["precision"]
    recall = recall.compute(predictions=predictions, average = "macro", references=labels)["recall"]
    f1 = f1.compute(predictions=predictions, average = "macro", references=labels)["f1"]
    acc = acc.compute(predictions=predictions, references=labels)["accuracy"]
    mcc = mcc.compute(predictions=predictions, references=labels)["matthews_correlation"]
    auc = roc_auc_score(labels, softmax(logits, axis=1), multi_class='ovo', average='macro')
    return {"precision": precision, "recall": recall, "acc": acc, "mcc": mcc, "f1": f1, "auc":auc}

In [3]:
class MalwareDataset(Dataset):
    def __init__(self, encodings, labels):
        self.encodings = encodings
        self.labels = labels

    def __getitem__(self, idx):
        item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}
        item['labels'] = torch.tensor(self.labels[idx])
        return item

    def __len__(self):
        return len(self.labels)

In [4]:
class RandomTransformerClassifier:
    def __init__(self, trainingset, validationset, testset, num_classes, epochs=10, batch_size=8, model_name='google/canine-s', max_sequence_length=2048):
        self.trainingset = trainingset
        self.validationset = validationset
        self.testset = testset
        self.num_classes = num_classes
        self.model_name = model_name
        if 'google/canine' == model_name.split('-')[0]:
            self.tokenizer = CanineTokenizer.from_pretrained(self.model_name)
        else:
            self.tokenizer = BertTokenizer.from_pretrained(self.model_name)
            
        self.tokenizer.model_max_length = max_sequence_length
        self.base_model = AutoModelForSequenceClassification.from_pretrained(self.model_name, num_labels=num_classes)
        self.training_args = TrainingArguments(
        output_dir='./results',          # output directory
        #do_train=True,
        do_eval=True,
        evaluation_strategy='epoch',
        save_strategy='epoch',
        load_best_model_at_end=True,
        metric_for_best_model='eval_auc',
        greater_is_better=True,
        fp16=True,
        num_train_epochs=epochs,              # total number of training epochs
        per_device_train_batch_size=batch_size,  # batch size per device during training
        per_device_eval_batch_size=batch_size,   # batch size for evaluation
        lr_scheduler_type='cosine',
        warmup_steps=500,                # number of warmup steps for learning rate scheduler
        weight_decay=0.01,               # strength of weight decay
        dataloader_num_workers=16,
        #logging_dir='./logs',            # directory for storing logs
        #logging_steps=10,
    )
        
    def fit(self, n_estimators=2):
        self.validation_preds = []
        self.test_preds = []
        for i in range(n_estimators):
            #bagging_trainset = self.trainingset.sample(frac=1, replace=True)
            bagging_trainset = resample(self.trainingset, replace=True, stratify=self.trainingset['class'])
            train_encodings = self.tokenizer(self.get_list_strs(bagging_trainset.api), padding="max_length", truncation=True, return_tensors="pt")
            val_encodings = self.tokenizer(self.get_list_strs(self.validationset.api), padding="max_length", truncation=True, return_tensors="pt")
            test_encodings = self.tokenizer(self.get_list_strs(self.testset.api), padding="max_length", truncation=True, return_tensors="pt")
            
            trainset = MalwareDataset(train_encodings, bagging_trainset['class'].values)
            valset = MalwareDataset(val_encodings, self.validationset['class'].values)
            testset = MalwareDataset(test_encodings, self.testset['class'].values)
            
            trainer = WeightedLossTrainer(
                model=self.base_model, args=self.training_args, train_dataset=trainset, eval_dataset=valset,
                compute_metrics=compute_metrics
            )
            
            trainer.train()
            
            val_preds = trainer.predict(valset)
            test_preds = trainer.predict(testset)
            self.validation_preds.append(val_preds)
            self.test_preds.append(test_preds)
            del bagging_trainset
            del train_encodings
            del val_encodings
            del test_encodings
            del trainset
            del valset
            del testset
            del trainer
            torch.cuda.empty_cache()
            print(f'{i + 1}. estimator is done....')
    def get_preds(self):
        return self.validation_preds, self.test_preds
    
    def get_metrics(self):
        #np.argmax((softmax(val[0].predictions, axis=1) + softmax(val[1].predictions, axis=1))/2, axis=1)
        val_preds = np.zeros_like(self.validation_preds[0].predictions)
        for val in self.validation_preds:
            val_preds += softmax(val.predictions, axis=1)
        
        val_logits = val_preds / len(self.validation_preds)
        val_preds = np.argmax(val_preds / len(self.validation_preds), axis=1)
        
        test_preds = np.zeros_like(self.test_preds[0].predictions)
        for test in self.test_preds:
            test_preds += softmax(test.predictions, axis=1)
        
        test_logits = test_preds / len(self.test_preds)
        test_preds = np.argmax(test_preds / len(self.test_preds), axis=1)
        precision = load_metric("precision")
        recall = load_metric("recall")
        f1 = load_metric("f1")
        acc = load_metric("accuracy")
        mcc = load_metric("matthews_correlation")
    
        val_precision = precision.compute(predictions=val_preds, average = "macro", references=self.validationset["class"].values)["precision"]
        val_recall = recall.compute(predictions=val_preds, average = "macro", references=self.validationset["class"].values)["recall"]
        val_f1 = f1.compute(predictions=val_preds, average = "macro", references=self.validationset["class"].values)["f1"]
        val_acc = acc.compute(predictions=val_preds, references=self.validationset["class"].values)["accuracy"]
        val_mcc = mcc.compute(predictions=val_preds, references=self.validationset["class"].values)["matthews_correlation"]
        val_auc = roc_auc_score(self.validationset["class"].values, softmax(val_logits, axis=1), multi_class='ovo', average='macro')
        
        test_precision = precision.compute(predictions=test_preds, average = "macro", references=self.testset["class"].values)["precision"]
        test_recall = recall.compute(predictions=test_preds, average = "macro", references=self.testset["class"].values)["recall"]
        test_f1 = f1.compute(predictions=test_preds, average = "macro", references=self.testset["class"].values)["f1"]
        test_acc = acc.compute(predictions=test_preds, references=self.testset["class"].values)["accuracy"]
        test_mcc = mcc.compute(predictions=test_preds, references=self.testset["class"].values)["matthews_correlation"]
        test_auc = roc_auc_score(self.testset["class"].values, softmax(test_logits, axis=1), multi_class='ovo', average='macro')

        return {"val_precision": val_precision, "val_recall": val_recall, "val_acc": val_acc, "val_mcc": val_mcc, "val_f1": val_f1, "val_auc":val_auc}, {"test_precision": test_precision, "test_recall": test_recall, "test_acc": test_acc, "test_mcc": test_mcc, "test_f1": test_f1, "test_auc":test_auc}
        

        
    def get_list_strs(self, df):
        lst_str = []
        for i in range(len(df)):
            str_ = df.values[i]
            lst_str.append(str_)
        return lst_str
    

In [5]:
from sklearn.metrics import confusion_matrix
from sklearn.metrics import f1_score
def get_conf_mat(preds, truth):
    test_preds = np.zeros_like(preds[0].predictions)
    for test in preds:
        test_preds += softmax(test.predictions, axis=1)
    test_logits = test_preds / len(preds)
    test_preds = np.argmax(test_logits, axis=1)
    f1 = f1_score(truth, test_preds, average= "macro")
    return confusion_matrix(truth, test_preds), f1

In [7]:
malware_calls_df = pd.read_csv("../Datasets/Oliveria.csv")

In [8]:
malware_calls_df.columns = ["api", "class"]

In [9]:
malware_calls_df.head()

In [11]:
malware_calls_df["class"].value_counts()

In [12]:
CAT2IDX = {
    'Trojan': 0,
    'Adware': 1,
    'Downloader': 2,
    'Ransomware': 3,
    'Agent': 4,
    'Riskware': 5,
    'Backdoor': 6,
    'Dropper': 7,
    'Virus': 8
}

IDX2CAT = {
    0:'Trojan',
    1:'Adware',
    2:'Downloader',
    3:'Ransomware',
    4:'Agent',
    5:'Riskware',
    6:'Backdoor',
    7:'Dropper',
    8:'Virus'   
}

In [13]:
malware_calls_df['class'] = malware_calls_df['class'].map(lambda x: CAT2IDX[x])

In [14]:
from sklearn.model_selection import train_test_split
X_train, X_test = train_test_split(malware_calls_df,
test_size=0.2, random_state=75, stratify = malware_calls_df['class'])

In [16]:
X_train.head()

In [17]:
X_train, X_val = train_test_split(X_train, test_size=0.2, random_state=75, stratify = X_train['class'])

In [19]:
X_train.head()

In [22]:
#class_weights = (1 - (malware_calls_df['class'].value_counts().sort_index() / len(malware_calls_df))).values
#class_weights = torch.from_numpy(class_weights).float().to("cuda")
#class_weights

In [23]:
from transformers import Trainer
class WeightedLossTrainer(Trainer):
    def compute_loss(self, model, inputs, return_outputs=False):
        outputs = model(**inputs)
        logits = outputs.get("logits")
        labels = inputs.get("labels")
        loss_func = nn.CrossEntropyLoss(weight=class_weights)
        loss = loss_func(logits, labels)
        return (loss, outputs) if return_outputs else loss

In [27]:
classifier = RandomTransformerClassifier(X_train, X_val, X_test, num_classes=9, 
                                         epochs=20, batch_size=8, max_sequence_length=512,
                                         model_name = "bert-base-cased")                                  

In [26]:
classifier.fit(n_estimators=2)

In [30]:
val, test = classifier.get_metrics()

In [32]:
val

In [33]:
test

In [None]:
cm, f1 = get_conf_mat(classifier.get_preds()[1], X_test["class"])

In [None]:
import numpy as np


def plot_confusion_matrix(cm,
                          target_names,
                          title='Confusion matrix',
                          f1 = None,
                          cmap=None,
                          normalize=True):
    """
    given a sklearn confusion matrix (cm), make a nice plot

    Arguments
    ---------
    cm:           confusion matrix from sklearn.metrics.confusion_matrix

    target_names: given classification classes such as [0, 1, 2]
                  the class names, for example: ['high', 'medium', 'low']

    title:        the text to display at the top of the matrix

    cmap:         the gradient of the values displayed from matplotlib.pyplot.cm
                  see http://matplotlib.org/examples/color/colormaps_reference.html
                  plt.get_cmap('jet') or plt.cm.Blues

    normalize:    If False, plot the raw numbers
                  If True, plot the proportions

    Usage
    -----
    plot_confusion_matrix(cm           = cm,                  # confusion matrix created by
                                                              # sklearn.metrics.confusion_matrix
                          normalize    = True,                # show proportions
                          target_names = y_labels_vals,       # list of names of the classes
                          title        = best_estimator_name) # title of graph

    Citiation
    ---------
    http://scikit-learn.org/stable/auto_examples/model_selection/plot_confusion_matrix.html

    """
    import matplotlib.pyplot as plt
    import numpy as np
    import itertools

    accuracy = np.trace(cm) / float(np.sum(cm))
    misclass = 1 - accuracy

    if cmap is None:
        cmap = plt.get_cmap('Blues')

    plt.figure(figsize=(8, 6))
    plt.imshow(cm, interpolation='nearest', cmap=cmap)
    plt.title(title)
    plt.colorbar()

    if target_names is not None:
        tick_marks = np.arange(len(target_names))
        plt.xticks(tick_marks, target_names, rotation=45)
        plt.yticks(tick_marks, target_names)

    if normalize:
        cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]


    thresh = cm.max() / 1.5 if normalize else cm.max() / 2
    for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
        if normalize:
            plt.text(j, i, "{:0.4f}".format(cm[i, j]),
                     horizontalalignment="center",
                     color="white" if cm[i, j] > thresh else "black")
        else:
            plt.text(j, i, "{:,}".format(cm[i, j]),
                     horizontalalignment="center",
                     color="white" if cm[i, j] > thresh else "black")


    plt.tight_layout()
    plt.ylabel('True label')
    plt.xlabel('Predicted label\naccuracy={:0.4f}; misclass={:0.4f}; f1 = {:0.4f}'.format(accuracy, misclass,f1))
    plt.show()

In [None]:
target_names = ["Trojan","Adware","Downloader","Ransomware","Agent","Riskware","Backdoor","Dropper","Virus"]

In [None]:
plot_confusion_matrix(cm,
                      target_names,
                      title='Confusion matrix',
                      f1 = f1,
                      cmap=None,
                      normalize=False)