In [52]:
from keras.layers import Input, Dense, Activation
from keras.layers import Maximum, Concatenate
from keras.models import Model
from keras.optimizers import adam_v2
from sklearn.tree import DecisionTreeClassifier
from sklearn.linear_model import SGDClassifier
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.svm import SVC

from Ensemble_Classifiers import Ensemble_Classifier
from sklearn.model_selection import train_test_split
import numpy as np

global seed

seed = 0

In [53]:
class MalGAN():
    def __init__(self, blackbox, X, Y, threshold):
        self.apifeature_dims = 135
        self.z_dims = 100
        # self.generator_layers = [self.apifeature_dims+self.z_dims, 32, 64, 64, self.apifeature_dims]
        self.generator_layers = [self.apifeature_dims+self.z_dims, 64, 64, 64, 128, self.apifeature_dims]
        # self.generator_layers = [self.apifeature_dims+self.z_dims, 64, 128, self.apifeature_dims]
        
        # self.substitute_detector_layers = [self.apifeature_dims, 64, 64, 1]
        self.substitute_detector_layers = [self.apifeature_dims, 128, 128, 128, 1]
        # self.substitute_detector_layers = [self.apifeature_dims, 128, 1]
        
        self.blackbox = blackbox       
        optimizer = adam_v2.Adam(learning_rate=0.0002, beta_1=0.5)
        self.X = X
        self.Y = Y
        self.threshold = threshold

        # Build and Train blackbox_detector
        self.blackbox_detector = self.build_blackbox_detector()

        # Build and compile the substitute_detector
        self.substitute_detector = self.build_substitute_detector()
        self.substitute_detector.compile(loss='binary_crossentropy', optimizer=optimizer, metrics=['accuracy'])

        # Build the generator
        self.generator = self.build_generator()

        # The generator takes malware and noise as input and generates adversarial malware examples
        example = Input(shape=(self.apifeature_dims,))
        noise = Input(shape=(self.z_dims,))
        input = [example, noise]
        malware_examples = self.generator(input)

        # The discriminator takes generated images as input and determines validity
        validity = self.substitute_detector(malware_examples)

        # The combined model  (stacked generator and substitute_detector)
        # Trains the generator to fool the discriminator
        self.combined = Model(input, validity)
        self.combined.compile(loss='binary_crossentropy', optimizer=optimizer)
        
        # For the combined model we will only train the generator
        self.substitute_detector.trainable = False
    
    def load_data(self):
        x_ran, x_ben,y_ran, y_ben = self.X[:self.threshold], self.X[self.threshold:], self.Y[:self.threshold], self.Y[self.threshold:]

        return (x_ran, y_ran), (x_ben, y_ben)
    
    def build_blackbox_detector(self):
        if self.blackbox in ['SVM']:
            blackbox_detector = SVC(kernel = 'linear')
        
        elif self.blackbox in ['GB']:
            blackbox_detector = GradientBoostingClassifier(random_state=seed)
        
        elif self.blackbox in ['SGD']:
            blackbox_detector = SGDClassifier(random_state=seed)  

        elif self.blackbox in ['DT']:
            blackbox_detector = DecisionTreeClassifier(random_state=seed)
        
        elif self.blackbox in ['Ensem']:
            blackbox_detector = Ensemble_Classifier()

        return blackbox_detector

    def build_generator(self):

        example = Input(shape=(self.apifeature_dims,))
        noise = Input(shape=(self.z_dims,))
        x = Concatenate(axis=1)([example, noise])
        for dim in self.generator_layers[1:]:
            x = Dense(dim)(x)
        x = Activation(activation='tanh')(x)
        x = Maximum()([example, x])
        generator = Model([example, noise], x, name='generator')
        generator.summary()
        return generator

    def build_substitute_detector(self):

        input = Input(shape=(self.substitute_detector_layers[0],))
        x = input
        for dim in self.substitute_detector_layers[1:]:
            x = Dense(dim)(x)
        x = Activation(activation='sigmoid')(x)
        substitute_detector = Model(input, x, name='substitute_detector')
        substitute_detector.summary()
        return substitute_detector


    
    def train(self, epochs, batch_size=32):

        # Load and Split the dataset
        (xmal, ymal), (xben, yben) = self.load_data()
        xtrain_mal, xtest_mal, ytrain_mal, ytest_mal = train_test_split(xmal, ymal, test_size=0.50)
        xtrain_ben, xtest_ben, ytrain_ben, ytest_ben = train_test_split(xben, yben, test_size=0.50)

        bl_xtrain_mal, bl_ytrain_mal, bl_xtrain_ben, bl_ytrain_ben = xtrain_mal, ytrain_mal, xtrain_ben, ytrain_ben

        
        self.blackbox_detector.fit(np.concatenate([xmal, xben]), np.concatenate([ymal, yben]))

        ytrain_ben_blackbox = self.blackbox_detector.predict(bl_xtrain_ben)
        
        Original_Train_TPR = self.blackbox_detector.score(bl_xtrain_mal, bl_ytrain_mal)
        
        Original_Test_TPR = self.blackbox_detector.score(xtest_mal, ytest_mal)
        Train_TPR, Test_TPR = [Original_Train_TPR], [Original_Test_TPR]


        for epoch in range(epochs):

            for step in range(xtrain_mal.shape[0] // batch_size):
                # ---------------------
                #  Train substitute_detector
                # ---------------------

                # Select a random batch of malware examples
                idx_mal = np.random.randint(0, xtrain_mal.shape[0], batch_size)
  
                xmal_batch = xtrain_mal[idx_mal]
                
                noise = np.random.normal(0, 1, (batch_size, self.z_dims))
                
                idx_ben = np.random.randint(0, xmal_batch.shape[0], batch_size)
                
                xben_batch = xtrain_ben[idx_ben]
                yben_batch = ytrain_ben_blackbox[idx_ben]

                # Generate a batch of new malware examples
                gen_examples = self.generator.predict([xmal_batch, noise])
                ymal_batch = self.blackbox_detector.predict(np.ones(gen_examples.shape)*(gen_examples > 0.5))

                # Train the substitute_detector

                d_loss_real = self.substitute_detector.train_on_batch(gen_examples, ymal_batch)
                d_loss_fake = self.substitute_detector.train_on_batch(xben_batch, yben_batch)
                d_loss = 0.5 * np.add(d_loss_real, d_loss_fake)

                # ---------------------
                #  Train Generator
                # ---------------------

                idx = np.random.randint(0, xtrain_mal.shape[0], batch_size)
                xmal_batch = xtrain_mal[idx]
                noise = np.random.uniform(0, 1, (batch_size, self.z_dims))

                # Train the generator
                g_loss = self.combined.train_on_batch([xmal_batch, noise], np.zeros((batch_size, 1)))

            # Compute Train TPR
            noise = np.random.uniform(0, 1, (xtrain_mal.shape[0], self.z_dims))
            gen_examples = self.generator.predict([xtrain_mal, noise])
            TPR = self.blackbox_detector.score(np.ones(gen_examples.shape) * (gen_examples > 0.5), ytrain_mal)
            Train_TPR.append(TPR)

            # Compute Test TPR
            noise = np.random.uniform(0, 1, (xtest_mal.shape[0], self.z_dims))
            gen_examples = self.generator.predict([xtest_mal, noise])
            TPR = self.blackbox_detector.score(np.ones(gen_examples.shape) * (gen_examples > 0.5), ytest_mal)
            Test_TPR.append(TPR)

            print("%d [D loss: %f, acc.: %.2f%%] [G loss: %f]" % (epoch, d_loss[0], 100*d_loss[1], g_loss))
            
            if int(epoch) == int(epochs-1):
                return  d_loss[0], 100*d_loss[1], g_loss
        

In [54]:
# create the dict to save the D loss, acc and G loss for different classifiers
D_loss_dict, Acc_dict, G_loss_dict = {}, {}, {}
# get the data from Feature-Selector
X = np.loadtxt('../dataset/matrix/X_fs.csv')
Y = np.loadtxt('../dataset/matrix/Y_str.csv')


In [55]:
X.shape

(321, 135)

In [56]:
# load the classifier
for classifier in [ 'SVM', 'SGD', 'DT', 'GB', 'Ensem']: 
    print('[+] \nTraining the model with {} classifier\n'.format(classifier))
    malgan = MalGAN(blackbox=classifier, X=X, Y=Y, threshold = 179)
    d_loss, acc, g_loss = malgan.train(epochs=50, batch_size=32)

    D_loss_dict[classifier] = d_loss
    Acc_dict[classifier] = acc 
    G_loss_dict[classifier] = g_loss


print('=====================')
print(D_loss_dict)
print('=====================')
print(Acc_dict)
print('=====================')
print(G_loss_dict)

[+] 
Training the model with SVM classifier

Model: "substitute_detector"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_151 (InputLayer)      [(None, 135)]             0         
                                                                 
 dense_230 (Dense)           (None, 128)               17408     
                                                                 
 dense_231 (Dense)           (None, 128)               16512     
                                                                 
 dense_232 (Dense)           (None, 128)               16512     
                                                                 
 dense_233 (Dense)           (None, 1)                 129       
                                                                 
 activation_60 (Activation)  (None, 1)                 0         
                                                                 
To

In [57]:

matrix_dict = {}

for key, value in D_loss_dict.items():
    matrix_dict[key] = []


for key, value in D_loss_dict.items():
    matrix_dict[key].append(D_loss_dict[key])
    matrix_dict[key].append(Acc_dict[key])
    matrix_dict[key].append(G_loss_dict[key])

In [58]:
matrix_dict

{'SVM': [0.17202455550432205, 93.75, 0.0009889311622828245],
 'SGD': [0.11816747300326824, 96.875, 0.0015482198214158416],
 'DT': [0.36949513480067253, 82.8125, 0.0593997947871685],
 'GB': [0.2745802402496338, 89.0625, 0.006602793466299772],
 'Ensem': [0.16268927976489067, 95.3125, 0.004743809811770916]}

In [59]:
import pandas as pd

df = pd.DataFrame.from_dict(matrix_dict, orient='columns') 
df.index= list([ 'D_Loss', 'Acc', 'G_Loss'])
df

Unnamed: 0,SVM,SGD,DT,GB,Ensem
D_Loss,0.172025,0.118167,0.369495,0.27458,0.162689
Acc,93.75,96.875,82.8125,89.0625,95.3125
G_Loss,0.000989,0.001548,0.0594,0.006603,0.004744


In [60]:
import dataframe_image as dfi
dfi.export(df, '128_4_ran_matrix_0.5.png')