In [7]:
## Library to Install
#!pip install sklearn-genetic-opt
#!pip install psutil
#!pip install nvidia-ml-py3

import pandas as pd
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
from sklearn import preprocessing
import sklearn.metrics as metrics
from sklearn.metrics import confusion_matrix
from sklearn.preprocessing import StandardScaler
from multiprocessing import Process, Queue
import time


# Importing the required libraries
import psutil
#import nvidia_smi

# Creating an almost infinite for loop to monitor the details continuously
def check_ressources(q, msg):
    nvidia_smi.nvmlInit()
    i = 0
    cpu_usage = []
    mem_usage = []
    gpus = nvidia_smi.nvmlDeviceGetCount()
    time = []
    gpus_usage = {}
    while(q.empty()):
        # Obtaining all the essential details
        cpu_usage.append(psutil.cpu_percent())
        mem_usage.append(psutil.virtual_memory().percent)
        for gpu in range(gpus):
            handle = nvidia_smi.nvmlDeviceGetHandleByIndex(gpu)
            info = nvidia_smi.nvmlDeviceGetMemoryInfo(handle)
            if gpu in gpus_usage.keys():
                gpus_usage[gpu].append((info.used/info.total)*100)
            else:
                gpus_usage[gpu] = [(info.used/info.total)*100]
        time.append(i)
        i = i + 50
        plt.pause(0.05)
    plt.xlabel("Time in ms")
    plt.ylabel("Percentage of usage")
    plt.plot(time, cpu_usage, color = "red", linestyle = 'dotted')
    plt.plot(time, mem_usage, color = "blue", linestyle = 'dotted')
    legend = ["CPU", "Memory"]
    for gpu in gpus_usage:
        legend.append(f'GPU Usage ID {gpu}')
        plt.plot(time, gpus_usage[gpu], linestyle = 'dotted')
    plt.legend(legend, loc='center left', bbox_to_anchor=(1, 0.5))
    plt.title(f"Ressource Plot: for {msg}")
    print(q.get())
    plt.show()
    nvidia_smi.nvmlShutdown()

def auprc_multiclass(y_test, y_pred, classes):
    yt = preprocessing.label_binarize(y_test, classes=classes)
    yp = preprocessing.label_binarize(y_pred, classes=classes)
    precision, recall, _ = metrics.precision_recall_curve(yt.ravel(),yp.ravel())
    return metrics.auc(recall, precision)

def plot_confusion_matrix(y_test, y_pred):
    plt.figure(figsize=(12,10))
    sns.set(font_scale=1.2)
    sns.heatmap(
      confusion_matrix(y_test, y_pred), 
      xticklabels=np.unique(y_pred), 
      yticklabels=np.unique(y_test), 
      annot=True,
      annot_kws={"size": 12}, fmt='g')

    plt.title("Confusion matrix")
    plt.show()

class Malware_Detection:
    def __init__(self, dataset="MalMen2020", verbose=False, optimization=True, lines=-1):
        if dataset == "MalMen2020":
            dfs = [pd.read_csv("Obfuscated-MalMem2022.csv")]
        elif dataset == "ADFA-LD":
            self.type_g = ['3-gramme', '5-gramme', '6-gramme']
            dfs = [pd.read_csv("/kaggle/input/adfald/train_3g.csv"),
                   pd.read_csv("/kaggle/input/adfald/train_5g.csv"),
                   pd.read_csv("/kaggle/input/adfald/train_6g.csv")]
        else:
            raise ValueError( "Error: Unknow Dataset Please Choose either MalMen2020 or ADFA-LD")
        self.verbose = verbose
        self.dataset = dataset
        self.opt = optimization
        self.df = None
        print(f"Preprocessing the Dataset {dataset} in progress ...")
        for df in dfs:
            self.preprocessing(df)
        print('Processing Done !')
        if lines != -1:
            print(f"Only {lines} rows will be used from the Dataset {dataset}....")
            self.dataset_crop(lines)
        self.output_evaluation = pd.DataFrame()
    def dataset_crop(self, lines):
        if self.dataset == "MalMen2020":
            if lines >= len(self.df):
                raise ValueError(f'Number of rows selected is {lines} >= number of rows in the dataset')
            benign_dataset = self.df[self.df['Class'] == 'Benign']
            total_rows = len(benign_dataset)
            frac = ((lines/2 + 1)/total_rows)
            benign_dataset = benign_dataset.sample(frac=frac)
            self.df = self.df[self.df['Class'] != 'Benign']
            malware_sub = self.df['Category_Subfamily'].unique()
            nb_subs = len(malware_sub)
            list_dataset = [benign_dataset]
            for mal in malware_sub:
                malware_dataset = self.df[self.df['Category_Subfamily'] == mal]
                total_rows = len(malware_dataset)
                frac = ((lines/2 + 1)/nb_subs)/total_rows
                list_dataset.append(malware_dataset.sample(frac=frac))
            self.df = pd.concat(list_dataset)
        else:
            for i,df in enumerate(self.df):
                if lines >= len(df):
                    raise ValueError(f'Number of rows selected is {lines} >= number of rows in the dataset')
                benign_dataset = df[df['Label'] == 'Normal']
                total_rows = len(benign_dataset)
                frac = ((lines/2 + 1)/total_rows)
                benign_dataset = benign_dataset.sample(frac=frac)
                df = df[df['Label'] != 'Normal']
                malware_sub = df['Label'].unique()
                nb_subs = len(malware_sub)
                list_dataset = [benign_dataset]
                for mal in malware_sub:
                    malware_dataset = df[df['Label'] == mal]
                    total_rows = len(malware_dataset)
                    frac = ((lines/2 + 1)/nb_subs)/total_rows
                    list_dataset.append(malware_dataset.sample(frac=frac))
                self.df[i] = pd.concat(list_dataset)
            
            
            
        

    def init_model(self,  model_name_binary='CART',model_name_family='CART', model_name_subfamily='CART'):
        from sklearn.model_selection import train_test_split
        if self.dataset == "MalMen2020":
            print(f"Pipeline : Training for Malware/Benign Classification Model Used: {model_name_binary} <====> Malware Family Classification Model Used: {model_name_family} <===> Malware Sub Family Classification Model Used: {model_name_subfamily}")
            # Training for Malware/Benign Classification
            X = self.df.copy(deep=True)
            y_bin = X['Class']
            X = X.drop(['Class', 'Category_Subfamily', 'Category_Family'], axis=1)
            y_bin_ = y_bin.map({x:i for i,x in enumerate(y_bin.unique())})
            X_train, X_test, y_train, y_test = train_test_split(X, y_bin_, test_size=0.2, random_state=1) # 80% training and 20% test

            #Ressource Check
            q = Queue()
            p = Process(target=check_ressources, args=(q, f"Training for Malware/Benign Classification Model Used: {model_name_binary}"))
            p.start()
            self.model_b = self.fit_model(model_name_binary, X_train, y_train)
            q.put('Done')
            p.join()

            y_pred = self.model_b.predict(X_test)

            y_pred = y_pred.astype(str)
            y_test = y_test.astype(str)
            for i,x in enumerate(y_bin.unique()):
                y_pred[y_pred == str(i)] = x
                y_test[y_test == str(i)] = x
            print(f"Evaluation of Malware/Benign Classification for {model_name_binary} Algorithm")
            self.evaluate_model(y_test, y_pred, y_bin.unique(), model_name_binary, "Malware/Benign Classification")

            # Training for Malware Family Classification 
            X = self.df[self.df['Category_Family'] != 'Benign'].copy(deep=True)
            y_f = X['Category_Family']
            X = X.drop(['Class', 'Category_Subfamily', 'Category_Family'], axis=1)
            y_f_ = y_f.map({x:i for i,x in enumerate(y_f.unique())})
            X_train, X_test, y_train, y_test = train_test_split(X, y_f_, test_size=0.2, random_state=1) # 80% training and 20% test

            # Ressource Check
            q = Queue()
            p = Process(target=check_ressources, args=(q, f"Malware Family Classification Model Used: {model_name_family}"))
            p.start()
            self.model_f = self.fit_model(model_name_family, X_train, y_train)
            q.put('Done')
            p.join()



            y_pred = self.model_f.predict(X_test)
            y_pred = y_pred.astype(str)
            y_test = y_test.astype(str)
            for i,x in enumerate(y_f.unique()):
                y_pred[y_pred == str(i)] = x
                y_test[y_test == str(i)] = x

            print(f"Evaluation of Malware Family Classification for {model_name_family} Algorithm")
            self.evaluate_model(y_test, y_pred, y_f.unique(), model_name_family, "Malware Family Classification")

            # Training for Malware Sub Family Classification
            X = self.df[self.df['Category_Family'] != 'Benign'].copy(deep=True)
            Xs = [X[X['Category_Family'] == f] for f in X['Category_Family'].unique()]
            y_sfs = [x['Category_Subfamily'] for x in Xs]
            for X, y_sf in zip(Xs, y_sfs):
                fam = X['Category_Family'].unique()
                X = X.drop(['Class', 'Category_Subfamily', 'Category_Family'], axis=1)
                y_sf_ = y_sf.map({x:i for i,x in enumerate(y_sf.unique())})
                X_train, X_test, y_train, y_test = train_test_split(X, y_sf_, test_size=0.2, random_state=1) # 80% training and 20% test
                # Ressource Check
                q = Queue()
                p = Process(target=check_ressources, args=(q, f"Malware Sub Family Classification Model Used: {model_name_subfamily}"))
                p.start()
                self.model_f = self.fit_model(model_name_subfamily, X_train, y_train)
                q.put('Done')
                p.join()
                y_pred = self.model_f.predict(X_test)
                y_pred = y_pred.astype(str)
                y_test = y_test.astype(str)
                for i,x in enumerate(y_sf.unique()):
                    y_pred[y_pred == str(i)] = x
                    y_test[y_test == str(i)] = x
                print(f"Evaluation of Malware Sub Family Classification for {model_name_subfamily} Algorithm")
                self.evaluate_model(y_test, y_pred, y_sf.unique(), model_name_subfamily, f"Malware Sub Family Classification {fam[0]}")
        else:
            for i,df in enumerate(self.df):
                print(f"Dataset Used : {self.type_g[i]}\n\n")
                print(f"Pipeline : Training for Attack/Normal Classification Model Used: {model_name_binary} <====> Attacks Family Classification Model Used: {model_name_family}")
                # Training for Attack/Normal Classification
                X = df.copy(deep=True)
                y_bin = X['Label']
                X = X.drop(['Label', 'ID'], axis=1)
                y_bin_ = y_bin.map({x:1 if x != "Normal" else 0 for x in y_bin.unique()})
                X_train, X_test, y_train, y_test = train_test_split(X, y_bin_, test_size=0.2, random_state=1) # 80% training and 20% test

                #Ressource Check
                q = Queue()
                p = Process(target=check_ressources, args=(q, f"Training for Attack/Normal Classification Model Used: {model_name_binary}"))
                p.start()
                self.model_b = self.fit_model(model_name_binary, X_train, y_train)
                q.put('Done')
                p.join()

                y_pred = self.model_b.predict(X_test)

                y_pred = y_pred.astype(str)
                y_test = y_test.astype(str)
                y_pred[y_pred == "0"] = 'Normal'
                y_pred[y_pred == "1"] = 'Attack'
                y_test[y_test == "0"] = 'Normal'
                y_test[y_test == "1"] = 'Attack'
                print(f"Evaluation of Attack/Normal Classification for {model_name_binary} Algorithm")
                self.evaluate_model(y_test, y_pred, y_bin.unique(), model_name_binary, "Attack/Normal Classification")

                # Training for Attacks Family Classification
                X = df[df['Label'] != 'Normal'].copy(deep=True)
                y_f = X['Label']
                X = X.drop(['ID', 'Label'], axis=1)
                y_f_ = y_f.map({x:i for i,x in enumerate(y_f.unique())})
                X_train, X_test, y_train, y_test = train_test_split(X, y_f_, test_size=0.2, random_state=1) # 80% training and 20% test

                # Ressource Check
                q = Queue()
                p = Process(target=check_ressources, args=(q, f"Attacks Family Classification Model Used: {model_name_family}"))
                p.start()
                self.model_f = self.fit_model(model_name_family, X_train, y_train)
                q.put('Done')
                p.join()



                y_pred = self.model_f.predict(X_test)
                y_pred = y_pred.astype(str)
                y_test = y_test.astype(str)
                for i,x in enumerate(y_f.unique()):
                    y_pred[y_pred == str(i)] = x
                    y_test[y_test == str(i)] = x

                print(f"Evaluation of Attacks Family Classification Classification for {model_name_family} Algorithm")
                self.evaluate_model(y_test, y_pred, y_f.unique(), model_name_family, "Attacks Family Classification")
            
            display(self.output_evaluation)
                    
                 
            
        
    def evaluate_model(self, y_test, y_pred, cat, model_name, title):
        self.output_evaluation = self.output_evaluation.append({
                            'Model':model_name,
                            'Precision': metrics.precision_score(y_test, y_pred, average='micro'),
                            'Recall': metrics.recall_score(y_test, y_pred, average='micro'),
                            'Balanced Accuracy': metrics.balanced_accuracy_score(y_test, y_pred),
                            'Matthews Correlation Coefficient': metrics.matthews_corrcoef(y_test, y_pred),
                            'AUPRC': auprc_multiclass(y_test, y_pred, cat),
                            'Time Taken': self.delta * 1000,
                            'Evaluation': title}, ignore_index=True)
        print(f"Precision:{metrics.precision_score(y_test, y_pred, average='micro')}\n")
        print(f"Recall:{metrics.recall_score(y_test, y_pred, average='micro')}\n")
        print(f"Accuracy:{metrics.accuracy_score(y_test, y_pred)}\n")
        print(f"Balanced Accuracy:{metrics.balanced_accuracy_score(y_test, y_pred)}\n")
        print(f"Matthews Correlation Coefficient:{metrics.matthews_corrcoef(y_test, y_pred)}\n")
        print(f"AUPRC:{auprc_multiclass(y_test, y_pred, cat)}\n")
        print(f"Time Taken: {self.delta * 1000} ms\n")
        plot_confusion_matrix(y_test, y_pred)
        
    def cross_validation_ga(self, model, X_train,y_train, param_grid):
        from sklearn_genetic import GASearchCV
        from sklearn_genetic import ExponentialAdapter
        from sklearn.model_selection import StratifiedKFold
        
        # Instantiate the GA search model
        mutation_adapter = ExponentialAdapter(initial_value=0.8, end_value=0.2, adaptive_rate=0.1)
        crossover_adapter = ExponentialAdapter(initial_value=0.2, end_value=0.8, adaptive_rate=0.1)
        cv = StratifiedKFold(n_splits=3, shuffle=True)
        evolved_estimator = GASearchCV(estimator=model,
                               cv=cv,
                               scoring='accuracy',
                               population_size=20,
                               generations=20,
                               mutation_probability=mutation_adapter,
                               crossover_probability=crossover_adapter,
                               param_grid=param_grid,
                               n_jobs=-1)
        evolved_estimator.fit(X_train, y_train)
        return evolved_estimator
    
    def fit_model(self, model_name, X_train, y_train):
        start = time.time()
        if model_name == 'CART':
            from sklearn.tree import DecisionTreeClassifier
            model = DecisionTreeClassifier(splitter='best', max_depth=8, min_samples_leaf=100)
            if self.opt:
                from sklearn_genetic.space import Categorical, Integer
                param_grid = {
                'criterion':Categorical(['gini','entropy']),
                'splitter':Categorical(['best','random']),                                        
                'min_samples_leaf':Integer(100,300),
                'max_depth':Integer(3,8),
                }
                model = self.cross_validation_ga(model, X_train, y_train, param_grid)
                with open(f"{model_name}-best.txt", "a") as f:
                    f.write(f"{model.best_params_}\n\n\n")
                    f.close()
                return model
        elif model_name == 'XGBOOST':
            import xgboost as xgb
            model = xgb.XGBClassifier(objective= 'binary:logistic', n_estimators= 268, max_depth= 8, learning_rate=0.08455776011574645, tree_method='gpu_hist')
            if self.opt:
                from sklearn_genetic.space import Categorical, Integer, Continuous
                param_grid = {
                'objective':Categorical(['binary:logistic','multi:softmax']),
                'n_estimators':Integer(100,300),
                'max_depth':Integer(3,8),
                'learning_rate':Continuous(0.001, 0.1),
                }
                model = self.cross_validation_ga(model, X_train, y_train, param_grid)
                with open(f"{model_name}-best.txt", "a") as f:
                    f.write(f"{model.best_params_}\n\n\n")
                    f.close()
                return model
        elif model_name == 'SVM':
            from sklearn import svm
            from sklearn.pipeline import make_pipeline
            model = make_pipeline(StandardScaler(), svm.SVC(random_state=0,kernel='linear', decision_function_shape='ovo'))
            if self.opt:
                model = svm.SVC(random_state=0,kernel='linear', decision_function_shape='ovo')
                from sklearn_genetic.space import Categorical, Integer, Continuous
                param_grid = {
                'kernal':Categorical(['linear', 'poly', 'rbf', 'sigmoid', 'precomputed']),
                'gamma':Categorical(['auto','scale']),
                'decision_function_shape':Categorical(['ovr','ovo'])
                }
                model = self.cross_validation_ga(model, StandardScaler(X_train), y_train, param_grid)
                with open(f"{model_name}-best.txt", "a") as f:
                    f.write(f"{model.best_params_}\n\n\n")
                    f.close()
                return make_pipeline(StandardScaler(), model)
        elif model_name == 'KNN':
            from sklearn.neighbors import KNeighborsClassifier
            model = KNeighborsClassifier()
            if self.opt:
                from sklearn_genetic.space import Categorical, Integer, Continuous
                param_grid = {
                'weights':Categorical(['uniform','distance']),
                'n_neighbors':Integer(3,9),
                'leaf_size':Integer(20,50),
                'algorithm':Categorical(['auto','ball_tree', 'kd_tree', 'brute']),
                }
                model = self.cross_validation_ga(model, X_train, y_train, param_grid)
                with open(f"{model_name}-best.txt", "a") as f:
                    f.write(f"{model.best_params_}\n\n\n")
                    f.close()
                return model
        elif model_name == 'RandomForest':
            from sklearn.ensemble import RandomForestClassifier
            model = RandomForestClassifier()
            if self.opt:
                from sklearn_genetic.space import Categorical, Integer, Continuous
                param_grid = {
                'criterion':Categorical(['gini','entropy']),
                'min_samples_split':Integer(1,10),
                'max_depth':Integer(3,8),
                }
                model = self.cross_validation_ga(model, X_train, y_train, param_grid)
                with open(f"{model_name}-best.txt", "a") as f:
                    f.write(f"{model.best_params_}\n\n\n")
                    f.close()
                return model
        elif model_name == 'MLP':
            from sklearn.neural_network import MLPClassifier
            model = MLPClassifier(random_state=1, max_iter=300)
            if self.opt:
                from sklearn_genetic.space import Categorical, Integer, Continuous
                param_grid = {
                'activation':Categorical(['identité', 'logistique', 'tanh', 'relu']),
                'solveur':Categorical(['lbfgs', 'sgd', 'adam']),
                'max_iter':Integer(100,500),
                }
                model = self.cross_validation_ga(model, X_train, y_train, param_grid)
                with open(f"{model_name}-best.txt", "a") as f:
                    f.write(f"{model.best_params_}\n\n\n")
                    f.close()
                return model
        else:
            raise ValueError( "Model not Implemented yet. Please Choose between : CART, RandomForest, KNN, XGBOOST, MLP and SVM")
        model.fit(X_train, y_train)
        self.delta = time.time() - start
        return model
    def is_malware(self, model, malware_features):
        return (Boolean, probability)
    def pca(self, X, y_bin, y_f, y_sf=None):
        from sklearn.decomposition import PCA
        from sklearn.preprocessing import StandardScaler
        # Scale data before applying PCA
        scaling=StandardScaler()
        # Use fit and transform method
        scaling.fit(X)
        Scaled_X=scaling.transform(X)
        principal=PCA()
        principal.fit(Scaled_X)
        #Let's find how many compenents do we need to preserve 80% of the explained variance
        nb_cmp = 0
        for i,x in enumerate(principal.explained_variance_ratio_.cumsum()): 
            if x >= 0.8:
                nb_cmp = i
                print(next(f'Number of components : {n} , Value of Cumulative Explained Variance: {x}' ))
                break
        #Visualization
        plt.figure(figsize = (15,8))
        plt.plot(range(1,len(X.columns)+1), principal.explained_variance_ratio_.cumsum(), linestyle = '--', marker = 'o')
        _ = plt.title('Explained Variance by Components')
        _ = plt.xlabel('Number of components')
        _ = plt.ylabel('Cumulative Explained Variance')
        principal=PCA(n_components = nb_cmp)
        principal.fit(Scaled_X)
        x=principal.transform(Scaled_X)
        plt.figure(figsize=(10,10))
        ax = plt.axes(projection ="3d")
        ax.scatter3D(x[:,0],x[:,1],x[:,2],c=y_bin.tolist(),cmap='plasma')
        _ = plt.xlabel('First Component')
        _ = plt.ylabel('Second Component')
        _ = ax.set_zlabel('Third Component')
        plt.figure(figsize=(10,10))
        ax = plt.axes(projection ="3d")
        ax.scatter3D(x[:,0],x[:,1],x[:,2],c=y_f.tolist(),cmap='plasma')
        _ = plt.xlabel('First Component')
        _ = plt.ylabel('Second Component')
        _ = ax.set_zlabel('Third Component')
        if y_sf is not None:
            plt.figure(figsize=(10,10))
            ax = plt.axes(projection ="3d")
            ax.scatter3D(x[:,0],x[:,1],x[:,2],c=y_sf.tolist(),cmap='plasma')
            _ = plt.xlabel('First Component')
            _ = plt.ylabel('Second Component')
            _ = ax.set_zlabel('Third Component')
    def tsne(self, X, y_bin, y_f, y_sf=None):
        from sklearn.manifold import TSNE
        tsne = TSNE(n_components=3, verbose=(1 if self.verbose else 0), random_state=123)
        x = tsne.fit_transform(X) 
        plt.figure(figsize=(10,10))
        ax = plt.axes(projection ="3d")
        ax.scatter3D(x[:,0],x[:,1],x[:,2],c=y_bin.tolist(),cmap='plasma')
        _ = plt.xlabel('First Component')
        _ = plt.ylabel('Second Component')
        _ = ax.set_zlabel('Third Component')
        plt.figure(figsize=(10,10))
        ax = plt.axes(projection ="3d")
        ax.scatter3D(x[:,0],x[:,1],x[:,2],c=y_f.tolist(),cmap='plasma')
        _ = plt.xlabel('First Component')
        _ = plt.ylabel('Second Component')
        _ = ax.set_zlabel('Third Component')
        if y_sf is not None:
            plt.figure(figsize=(10,10))
            ax = plt.axes(projection ="3d")
            ax.scatter3D(x[:,0],x[:,1],x[:,2],c=y_sf.tolist(),cmap='plasma')
            _ = plt.xlabel('First Component')
            _ = plt.ylabel('Second Component')
            _ = ax.set_zlabel('Third Component')
    def umap(self, X, y_bin, y_f, y_sf=None):
        from umap import UMAP
        umap = UMAP(n_components=3, verbose=1, random_state=123)
        x = umap.fit_transform(X)
        plt.figure(figsize=(10,10))
        ax = plt.axes(projection ="3d")
        ax.scatter3D(x[:,0],x[:,1],x[:,2],c=y_bin.tolist(),cmap='plasma')
        _ = plt.xlabel('First Component')
        _ = plt.ylabel('Second Component')
        _ = ax.set_zlabel('Third Component')
        plt.figure(figsize=(10,10))
        ax = plt.axes(projection ="3d")
        ax.scatter3D(x[:,0],x[:,1],x[:,2],c=y_f.tolist(),cmap='plasma')
        _ = plt.xlabel('First Component')
        _ = plt.ylabel('Second Component')
        _ = ax.set_zlabel('Third Component')
        if y_sf is not None:
            plt.figure(figsize=(10,10))
            ax = plt.axes(projection ="3d")
            ax.scatter3D(x[:,0],x[:,1],x[:,2],c=y_sf.tolist(),cmap='plasma')
            _ = plt.xlabel('First Component')
            _ = plt.ylabel('Second Component')
            _ = ax.set_zlabel('Third Component')
        
    def visualization(self, method='PCA'):
        print(f"Visualization of the dataset {self.dataset} in progress ... Method: {method}")
        if self.dataset == 'MalMen2020':
            plot_cols = ['Category_Family', 'Category_Subfamily']
            plot_names = ['Category Family', 'Category Subfamily'] 
            for c,n in zip(plot_cols, plot_names):
                fig = plt.figure(figsize=(30, 8))
                ax = sns.countplot(data=self.df, x=c)
                ax.set_title(f'Repartition of {n} in {self.dataset} Dataset', fontsize=20)
                _ = plt.xticks(rotation = 90, fontsize=18)
                _ = plt.xlabel(n,fontsize=14)
                _ = plt.ylabel("Count", fontsize=14)
                
            X = self.df.copy(deep=True)
            y_bin = X['Class']
            y_f = X['Category_Family']
            y_sf = X['Category_Subfamily']
            X = X.drop(['Class', 'Category_Subfamily', 'Category_Family'], axis=1)
            y_bin = y_bin.replace(['Benign', 'Malware'], ['green', 'red'])
            y_f = y_f.replace(['Benign', 'Ransomware' ,'Spyware', 'Trojan'], ['green', 'yellow', 'orange', 'red'])
            y_sf = y_sf.replace(y_sf.unique(), [x for x in range(len(y_sf.unique()))])
            if method == 'PCA':
                self.pca(X, y_bin, y_f, y_sf)
            elif method == 'TSNE':
                self.tsne(X, y_bin, y_f, y_sf)
            elif method == 'UMAP':
                self.umap(X, y_bin, y_f, y_sf)
            else:
                raise ValueError( "Only PCA, TSNE and UMAP are implemented make sure to choose only one of them.")
        else:
            for i, df in enumerate(self.df):
                fig = plt.figure(figsize=(30, 8))
                ax = sns.countplot(data=df, x='Label')
                ax.set_title(f'Repartition of Normal and Attack Labels in {self.dataset} Dataset: {self.type_g[i]}', fontsize=20)
                _ = plt.xticks(rotation = 90, fontsize=18)
                _ = plt.xlabel('Labels',fontsize=14)
                _ = plt.ylabel("Count", fontsize=14)
                
                fig = plt.figure(figsize=(30, 8))
                ax = sns.countplot(data=df[df['Label'] != "Normal"], x='Label')
                ax.set_title(f'Repartition of Attack Labels in {self.dataset} Dataset: {self.type_g[i]}', fontsize=20)
                _ = plt.xticks(rotation = 90, fontsize=18)
                _ = plt.xlabel('Attack Labels',fontsize=14)
                _ = plt.ylabel("Count", fontsize=14)

                X = df.copy(deep=True)
                y_bin = X['Label']
                y_bin = y_bin.map({x:"green" if x == "Normal" else "red" for x in y_bin.unique()})
                y_f = X['Label']
                X = X.drop(['Label'], axis=1)
                y_f = y_f.replace(y_f.unique(), [x for x in range(len(y_f.unique()))])
                if method == 'PCA':
                    self.pca(X, y_bin, y_f)
                elif method == 'TSNE':
                    self.tsne(X, y_bin, y_f)
                elif method == 'UMAP':
                    self.umap(X, y_bin, y_f)
                else:
                    raise ValueError( "Only PCA, TSNE and UMAP are implemented make sure to choose only one of them.")
        plt.show()
            
        
        
    def preprocessing(self, df):
        if self.verbose:
            # Exploring the Dataset:
            print(df.head())
            col = list(df.columns)
            print(f"Column number: {len(col)}\n")
            print(f"Column names: {col}\n")
            print(f"Column types: \n{df.dtypes}\n")
            print(f"Number of rows of the data set: {len(df)}\n")
            cat_col = df.select_dtypes('object')
            print(f"Categorial columns: \n{cat_col.columns}\n\n")
            for c in cat_col:
                print(f"Existing values of {c}: {cat_col[c].unique()}")

            num_col = df.select_dtypes(include=np.number)
            print(f"Numerical columns: \n{num_col.columns}\n\n")
            for c in num_col:
                print(f"Existing values of {c}: {num_col[c].unique()}")
                print(f"Min Value of {c}: {np.min(num_col[c])}")
                print(f"Max Value of {c}: {np.max(num_col[c])}")
                print(f"Std deviation Value of {c}: {np.std(num_col[c])}")
            if self.dataset == "MalMen2020":
                lookup_query = df.query('Class == "Malware" & Category != "Benign"').index
                print(f"Checking if the positive class (1) match malware categories : {len(lookup_query) > 0}")

                lookup_query = df.query('Category != "Benign" & Class == "Malware"').index
                print(f"Checking if the malware categories match Class : {len(lookup_query) > 0}")

                lookup_query = df.query('Class == "Benign" & Category == "Benign"').index
                print(f"Checking if the begnin class match the ‘Benign’ malware category : {len(lookup_query) > 0}")
                print(f"Number of malware class : {len(df[df['Class'] == 'Malware'])}, ratio : {(len(df[df['Class'] == 'Malware'])/len(df))*100:.2f}%")
                print(f"Number of benign class : {len(df[df['Class'] == 'Benign'])}, ratio : {(len(df[df['Class'] == 'Benign'])/len(df))*100:.2f}%")
            else:
                print(f"Number of malware class : {len(df[df['Label'] != 'Normal'])}, ratio : {(len(df[df['Label'] != 'Normal'])/len(df))*100:.2f}%")
                print(f"Number of benign class : {len(df[df['Label'] == 'Normal'])}, ratio : {(len(df[df['Label'] == 'Normal'])/len(df))*100:.2f}%")
                
        
        # Data Cleaning
        column_to_remove = []
        for c in df.columns:
            if len(df[c].unique()) == 1:
                column_to_remove.append(c)
        df = df.drop(column_to_remove, axis=1)
        if self.dataset == "MalMen2020":
            cat = df['Category'].tolist()
            cat_f = []
            cat_sf = []
            for x in cat:
                f_sf = x.split('-')
                if len(f_sf) >= 3:
                    cat_f.append(f_sf[0])
                    cat_sf.append(f_sf[0] + "-" + f_sf[1])
                elif len(f_sf) >= 2:
                    cat_f.append(f_sf[0])
                    cat_sf.append(f_sf[0])
                elif len(f_sf) >= 1:
                    cat_f.append(x)
                    cat_sf.append(x)
                else:
                    raise ValueError( f"Error : There is no Family in {x}")
            df['Category_Family'] = cat_f
            df['Category_Subfamily'] = cat_sf
            df = df.drop(['Category'], axis=1)
            if self.verbose:
                print(f"Number of occurrences for each malware class: \n{df['Category_Family'].value_counts()}")
                print(f"Number of occurrences for each malware class: \n{df['Category_Subfamily'].value_counts()}")
            self.df = df
        else:
            df = df.drop(['Unnamed: 0'], axis=1)
            if self.df is None:
                self.df = [df]
            else:
                self.df.append(df)
            


In [8]:
md = Malware_Detection(optimization=False, lines=-1, verbose=False)
md.init_model()

FileNotFoundError: [Errno 2] No such file or directory: 'Obfuscated-MalMem2022.csv'