In [1]:
import numpy as np
import pandas as pd

from sklearn.model_selection import train_test_split

class DataSplitter:
    def __init__(self, x):
        self.x = x

    # Takes a dataset and returns four values consisting of a training and test variable for each of X and Y:
    def shuffle_and_split(self):
        # Shuffle data to ensure randomization
        shuffled = self.x.sample(frac=1, random_state=42)

        # Define variables for models
        X = shuffled.drop("has_malware", axis=1)
        Y = shuffled["has_malware"]

        # Split the given data into separate training and test datasets
        (X_train, X_test, Y_train, Y_test) = train_test_split(
            X, Y, test_size=0.2, random_state=42
        )

        return X_train, X_test, Y_train, Y_test

# Usage
# Load vectorized data
cv = pd.read_csv('/Users/aymannadeem/code/malware-detection/datasets/extend/cv_df.csv',index_col=[0])
tf = pd.read_csv('/Users/aymannadeem/code/malware-detection/datasets/extend/tfidf_df.csv',index_col=[0])

# Create an instance of DataScale class and pass in each vectorized dataframe
split_cv = DataSplitter(cv)
split_tf = DataSplitter(tf)

# Shuffle and split
x_train_cv, x_test_cv, y_train_cv, y_test_cv = split_cv.shuffle_and_split()
x_train_tf, x_test_tf, y_train_tf, y_test_tf = split_tf.shuffle_and_split()

In [2]:
import numpy as np
import pandas as pd

from joblib import Parallel, delayed
import multiprocessing


from sklearn.preprocessing import StandardScaler
from sklearn.preprocessing import MinMaxScaler
from sklearn.preprocessing import RobustScaler
from sklearn.preprocessing import QuantileTransformer
from sklearn.preprocessing import PowerTransformer


class DataScaler:
    def __init__(self, x_train, x_test, y_train, y_test):
        self.x_train = x_train
        self.x_test = x_test
        self.y_train = y_train
        self.y_test = y_test

    # Creates scaler instances for MinMaxScaler, StandScaler, RobustScaler, QuantileTransformer, and PowerTransformer
    def create_scalers(self):
        # Create a dictionary of scaler instances
        scalers = {
            "MinMaxScaler": MinMaxScaler(),
            "StandardScaler": StandardScaler(),
            "RobustScaler": RobustScaler(),
            "QuantileTransformer": QuantileTransformer(
                n_quantiles=1000, output_distribution="uniform"
            ),
            # "PowerTransformer": PowerTransformer(
            #     method="yeo-johnson", standardize=True
            # ),
        }
        return scalers

    # Takes a scaler instance, a training set, and a test set as input,
    # fits the scaler on the training set, and transforms both the training and test sets using the fitted scaler:
    def scale_data(self, scaler):
        # Fit the scaler on the training set
        scaler.fit(self.x_train)

        # Transform the training set
        X_train_scaled = scaler.transform(self.x_train)

        # Transform the test set
        X_test_scaled = scaler.transform(self.x_test)

        # Create DataFrames using the scaled datasets
        X_train_scaled_df = pd.DataFrame(X_train_scaled, columns=self.x_train.columns)
        X_test_scaled_df = pd.DataFrame(X_test_scaled, columns=self.x_test.columns)

        # print(X_train_scaled_df)
        # print(X_test_scaled_df)

        return X_train_scaled_df, X_test_scaled_df

    def scale_datasets(self):
        # Create scaler instances
        scalers = self.create_scalers()

        # Initialize empty lists to store scaled training and test variable pairs
        scaled_train_pairs = []
        scaled_test_pairs = []
        scaler_names = []

        # Define the number of workers for parallel processing
        num_workers = multiprocessing.cpu_count()

        # Parallelize the scaling process using joblib
        results = Parallel(n_jobs=num_workers)(
            delayed(self.scale_data)(scaler_instance)
            for scaler_name, scaler_instance in scalers.items()
        )

        # Process the results
        for scaler_name, (X_train_scaled, X_test_scaled) in zip(scalers.keys(), results):
            # Append scaled variable pairs to the respective lists
            scaled_train_pairs.append((X_train_scaled, self.y_train))
            scaled_test_pairs.append((X_test_scaled, self.y_test))

            # Append corresponding scaler names to a list
            scaler_names.append(scaler_name)

            print(f"Training set for {scaler_name}: {scaled_train_pairs}")
            print(f"Test set for {scaler_name}: {scaled_test_pairs}")

        # Return the lists of scaled variable pairs and the scaler names used
        return scaler_names, scaled_train_pairs, scaled_test_pairs


# Usage

# Create an instance of DataScale class and pass in each vectorized dataframe
datascaler_cv = DataScaler(x_train_cv, x_test_cv, y_train_cv, y_test_cv)
datascaler_tf = DataScaler(x_train_tf, x_test_tf, y_train_tf, y_test_tf)

# Scale
scaler_names_cv, train_scaled_cv, test_scaled_cv = datascaler_cv.scale_datasets()
scaler_names_tf, train_scaled_tf, test_scaled_tf = datascaler_tf.scale_datasets()

Training set for MinMaxScaler: [(          Star      Fork     Watch  open_issues  closed_issues  \
0     0.000034  0.000011  0.000034          0.0            0.0   
1     0.000000  0.000011  0.000000          0.0            0.0   
2     0.000000  0.000000  0.000000          0.0            0.0   
3     0.000010  0.000057  0.000010          0.0            0.0   
4     0.000010  0.000011  0.000010          0.0            0.0   
...        ...       ...       ...          ...            ...   
6267  0.000029  0.000034  0.000029          0.0            0.0   
6268  0.000019  0.000000  0.000019          0.0            0.0   
6269  0.000000  0.000000  0.000000          0.0            0.0   
6270  0.000053  0.000057  0.000053          0.0            0.0   
6271  0.000019  0.000000  0.000019          0.0            0.0   

      issue_comments  open_pulls  closed_pulls  comments  contributors  ...  \
0                0.0         0.0           0.0       0.0      0.002347  ...   
1               

In [3]:
from sklearn.feature_selection import SelectKBest
from sklearn.feature_selection import chi2
from sklearn.feature_selection import f_classif
from sklearn.feature_selection import mutual_info_classif
from sklearn.feature_selection import VarianceThreshold
import numpy as np


class FeatureSelector:
    def __init__(self, scaler_names, scaled_train_pairs, scaled_test_pairs):
        self.scaler_names = scaler_names
        self.scaled_train_pairs = scaled_train_pairs
        self.scaled_test_pairs = scaled_test_pairs

    def select_features(self, method):
        scaler_names = []
        train_scaled = []
        test_scaled = []

        for scaler_name, scaled_train_pair, scaled_test_pair in zip(
            self.scaler_names, self.scaled_train_pairs, self.scaled_test_pairs
        ):
            X_train_scaled, Y_train = scaled_train_pair

            # Convert X_train_scaled to a DataFrame
            X_train_scaled = pd.DataFrame(X_train_scaled)

            # Apply absolute value transformation to handle negative inputs
            X_train_scaled = np.abs(X_train_scaled)

            # Apply VarianceThreshold to remove constant features
            constant_filter = VarianceThreshold()
            X_train_filtered = constant_filter.fit_transform(X_train_scaled)

            # Get the indices of selected features
            selected_feature_indices = constant_filter.get_support(indices=True)

            # Retrieve the selected feature names
            selected_feature_names = X_train_scaled.columns[selected_feature_indices].tolist()
            # print(f"Selected features for scaler {scaler_name}:")
            # print(selected_feature_names)

            # Perform feature selection
            if method == "SelectKBest":
                selector = SelectKBest(score_func=f_classif, k=10)
            elif method == "Chi-Squared":
                selector = SelectKBest(score_func=chi2, k=10)
            elif method == "Mutual Information":
                selector = SelectKBest(score_func=mutual_info_classif, k=10)
            else:
                raise ValueError(f"Invalid feature selection method: {method}")

            X_train_selected = selector.fit_transform(X_train_filtered, Y_train)

            # Apply the same feature selection to the test set
            X_test_scaled, Y_test = scaled_test_pair
            X_test_scaled = np.abs(X_test_scaled)
            X_test_scaled = pd.DataFrame(X_test_scaled)
            X_test_filtered = constant_filter.transform(X_test_scaled)
            X_test_selected = selector.transform(X_test_filtered)

            # Store the selected features, scaler names, and scaled data
            scaler_names.append(scaler_name)
            train_scaled.append((X_train_selected, Y_train))
            test_scaled.append((X_test_selected, Y_test))

        return scaler_names, train_scaled, test_scaled


# Usage

# Create instances of FeatureSelector
feature_selector_cv = FeatureSelector(scaler_names_cv, train_scaled_cv, test_scaled_cv)
feature_selector_tf = FeatureSelector(scaler_names_tf, train_scaled_tf, test_scaled_tf)

# Perform feature selection using different methods
(
    scaler_names_cv_kbest,
    train_scaled_cv_kbest,
    test_scaled_cv_kbest,
) = feature_selector_cv.select_features("SelectKBest")
(
    scaler_names_cv_chi,
    train_scaled_cv_chi,
    test_scaled_cv_chi,
) = feature_selector_cv.select_features("Chi-Squared")
(
    scaler_names_cv_mi,
    train_scaled_cv_mi,
    test_scaled_cv_mi,
) = feature_selector_cv.select_features("Mutual Information")

(
    scaler_names_tf_kbest,
    train_scaled_tf_kbest,
    test_scaled_tf_kbest,
) = feature_selector_tf.select_features("SelectKBest")
(
    scaler_names_tf_chi,
    train_scaled_tf_chi,
    test_scaled_tf_chi,
) = feature_selector_tf.select_features("Chi-Squared")
(
    scaler_names_tf_mi,
    train_scaled_tf_mi,
    test_scaled_tf_mi,
) = feature_selector_tf.select_features("Mutual Information")

test_scaled_cv_kbest

[(array([[2.01746549e-04, 0.00000000e+00, 8.14606742e-01, ...,
          1.81818182e-01, 3.57142857e-01, 3.00000000e-01],
         [0.00000000e+00, 0.00000000e+00, 0.00000000e+00, ...,
          9.09090909e-02, 3.57142857e-01, 9.00000000e-01],
         [1.15283742e-04, 0.00000000e+00, 9.55056180e-01, ...,
          1.81818182e-01, 2.85714286e-01, 1.00000000e+00],
         ...,
         [5.90829178e-04, 0.00000000e+00, 0.00000000e+00, ...,
          1.00000000e+00, 2.85714286e-01, 1.00000000e+00],
         [1.15283742e-04, 0.00000000e+00, 7.64044944e-01, ...,
          4.54545455e-01, 2.14285714e-01, 9.00000000e-01],
         [4.61134968e-04, 0.00000000e+00, 1.17977528e-01, ...,
          2.72727273e-01, 3.57142857e-01, 1.00000000e+00]]),
  1225    0
  1482    0
  1999    0
  5599    1
  4426    1
         ..
  2514    0
  880     0
  6658    1
  721     0
  2356    0
  Name: has_malware, Length: 1568, dtype: int64),
 (array([[0.10995776, 0.22134264, 0.8478233 , ..., 0.59806714, 0.15778

In [31]:
train_scaled_tf_kbest

[(array([[2.16157016e-04, 0.00000000e+00, 8.42696629e-01, ...,
          9.09090909e-01, 7.14285714e-01, 1.00000000e+00],
         [4.46724501e-04, 0.00000000e+00, 8.31460674e-01, ...,
          6.36363636e-01, 7.85714286e-01, 7.00000000e-01],
         [7.20523388e-05, 0.00000000e+00, 7.64044944e-01, ...,
          1.81818182e-01, 2.85714286e-01, 6.00000000e-01],
         ...,
         [0.00000000e+00, 0.00000000e+00, 8.87640449e-01, ...,
          9.09090909e-02, 2.85714286e-01, 2.00000000e-01],
         [1.87336081e-04, 0.00000000e+00, 0.00000000e+00, ...,
          8.18181818e-01, 1.42857143e-01, 1.00000000e+00],
         [6.91702453e-04, 0.00000000e+00, 0.00000000e+00, ...,
          9.09090909e-01, 2.14285714e-01, 1.00000000e+00]]),
  4109    1
  4573    1
  377     0
  5240    1
  4042    1
         ..
  7759    1
  4882    1
  3355    0
  5864    1
  119     0
  Name: has_malware, Length: 6272, dtype: int64),
 (array([[0.10976437, 0.22134264, 0.92003727, ..., 0.31571282, 1.83898

In [17]:
print(type(test_scaled_tf_mi))
print(type(test_scaled_cv))

<class 'list'>
<class 'list'>


In [10]:
import random
from joblib import Parallel, delayed

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import os
import shap
from sklearn.metrics import precision_recall_curve

from sklearn.linear_model import LogisticRegression
from sklearn.naive_bayes import GaussianNB
from sklearn.linear_model import Perceptron
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier

from sklearn.model_selection import cross_val_score
from sklearn.model_selection import learning_curve
from sklearn.metrics import (
    precision_score,
    recall_score,
    f1_score,
    accuracy_score,
    confusion_matrix,
    roc_auc_score,
)


class ModelEvaluator:
    def __init__(self, vectorizer_name, feature_selection_method, scaler_names, train_datasets, test_datasets):
        self.vectorizer_name = vectorizer_name
        self.feature_selection_method = feature_selection_method # Feature selection
        self.scaler_names = scaler_names
        self.train_datasets = train_datasets
        self.test_datasets = test_datasets
        self.confusion_matrices = []
        self.metrics_dataframes = []

    # Define the classifiers
    def create_classifiers(self):
        # Create a dictionary of classifier instances
        classifiers = {
            "Logistic Regression": LogisticRegression(max_iter=1000),
            "Gaussian Naive Bayes": GaussianNB(),
            "Perceptron": Perceptron(),
            "Decision Tree": DecisionTreeClassifier(),
            "Random Forest": RandomForestClassifier(random_state=1),
        }
        return classifiers

    # Takes a classifier object and datasets as arguments and performs the necessary steps to fit the classifier to each dataset.
    def evaluate_classifiers(self):
        vectorizer_name = self.vectorizer_name
        feature_selection_method = self.feature_selection_method
        classifiers = self.create_classifiers()
        results = {}

        combined_metrics_df = pd.DataFrame(
            columns=[
                "Training Accuracy",
                "Accuracy",
                "Training Error",
                "Test Error",
                "Training Precision",
                "Precision",
                "Training Recall",
                "Recall",
                "Training Specificity",
                "Specificity",
                "Training F1-Score",
                "F1-Score",
                "Training ROC-AUC",
                "ROC-AUC",
                "classifier_name",
                "scaler_name",
                "vectorizer_name",
                "feature_selection_method",
            ]
        )

        def evaluate_classifier(
            classifier_name, classifier, train_dataset, test_dataset, scaler_name
        ):
            X_train, y_train = train_dataset
            X_test, y_test = test_dataset

            # Fit the classifier
            classifier.fit(X_train, y_train)

            # Plot learning curve
            self.plot_learning_curve(
                classifier,
                X_train,
                y_train,
                classifier_name,
                scaler_name,
                vectorizer_name,
                feature_selection_method,
            )

            # Make predictions
            y_train_pred = classifier.predict(X_train)
            y_pred = classifier.predict(X_test)

            # Calculate evaluation metrics
            train_confusion_mat = confusion_matrix(y_train, y_train_pred)
            confusion_mat = confusion_matrix(y_test, y_pred)
            train_accuracy = accuracy_score(y_train, y_train_pred)
            accuracy = accuracy_score(y_test, y_pred)
            train_error = 1 - train_accuracy
            test_error = 1 - accuracy
            train_precision = precision_score(y_train, y_train_pred)
            precision = precision_score(y_test, y_pred)
            train_recall = recall_score(y_train, y_train_pred)
            recall = recall_score(y_test, y_pred)
            train_f1 = f1_score(y_train, y_train_pred)
            f1 = f1_score(y_test, y_pred)
            train_roc_auc = roc_auc_score(y_train, y_train_pred)
            roc_auc = roc_auc_score(y_test, y_pred)
            # Extract true negatives (TN), false positives (FP), false negatives (FN), and true positives (TP) from the confusion matrix
            train_tn, train_fp, train_fn, train_tp = train_confusion_mat.ravel()
            tn, fp, fn, tp = confusion_mat.ravel()
            # Calculate specificity
            train_specificity = train_tn / (train_tn + train_fp)
            specificity = tn / (tn + fp)

            if feature_selection_method=="none":
                self.compute_shap_values(
                    classifier,
                    X_train,
                    y_train,
                    classifier_name,
                    scaler_name,
                    vectorizer_name,
                    feature_selection_method,
                )
                
            # Call the plot_precision_recall_curve function to plot the curve
            self.plot_precision_recall_curve(
                classifier,
                X_test,
                y_test,
                classifier_name,
                scaler_name,
                vectorizer_name,
                feature_selection_method,
            )

            # Return the results
            return (
                classifier_name,
                scaler_name,
                vectorizer_name,
                feature_selection_method,
                train_accuracy,
                accuracy,
                train_error,
                test_error,
                train_precision,
                precision,
                train_recall,
                recall,
                train_specificity,
                specificity,
                train_f1,
                f1,
                train_roc_auc,
                roc_auc,
                confusion_mat,
            )

        for classifier_name, classifier in classifiers.items():
            results[classifier_name] = {}

            evaluated_results = Parallel(n_jobs=-1)(
                delayed(evaluate_classifier)(
                    classifier_name,
                    classifier,
                    self.train_datasets[i],
                    self.test_datasets[i],
                    self.scaler_names[i],
                )
                for i in range(len(self.train_datasets))
            )

            for result in evaluated_results:
                (
                    classifier_name,
                    scaler_name,
                    vectorizer_name,
                    feature_selection_method,
                    train_accuracy,
                    accuracy,
                    train_error,
                    test_error,
                    train_precision,
                    precision,
                    train_recall,
                    recall,
                    train_specificity,
                    specificity,
                    train_f1,
                    f1,
                    train_roc_auc,
                    roc_auc,
                    confusion_mat,
                ) = result

                # Store the results
                results[classifier_name][scaler_name] = {
                    "Training Accuracy": train_accuracy,
                    "Accuracy": accuracy,
                    "Training Error": train_error,
                    "Test Error": test_error,
                    "Training Precision": train_precision,
                    "Precision": precision,
                    "Training Recall": train_recall,
                    "Recall": recall,
                    "Training Specificity": train_specificity,
                    "Specificity": specificity,
                    "Training F1-Score": train_f1,
                    "F1-Score": f1,
                    "Training ROC-AUC": train_roc_auc,
                    "ROC-AUC": roc_auc,
                    "Confusion Matrix": confusion_mat,
                }

                self.store_confusion_mat(
                    vectorizer_name, confusion_mat, classifier_name, scaler_name, feature_selection_method,
                )

                metrics_df = pd.DataFrame(
                    {
                        "Metric": [
                            "Training Accuracy",
                            "Accuracy",
                            "Training Error",
                            "Test Error",
                            "Training Precision",
                            "Precision",
                            "Training Recall",
                            "Recall",
                            "Training Specificity",
                            "Specificity",
                            "Training F1-Score",
                            "F1-Score",
                            "Training ROC-AUC",
                            "ROC-AUC",
                        ],
                        "Score": [
                            train_accuracy,
                            accuracy,
                            train_error,
                            test_error,
                            train_precision,
                            precision,
                            train_recall,
                            recall,
                            train_specificity,
                            specificity,
                            train_f1,
                            f1,
                            train_roc_auc,
                            roc_auc,
                        ],
                    }
                )
                metrics_df = metrics_df.sort_values(by="Score", ascending=False)
                metrics_df = metrics_df.set_index("Score")
                metrics = self.format_metrics(
                    metrics_df, vectorizer_name, classifier_name, scaler_name, feature_selection_method
                )
                self.metrics_dataframes.append(metrics)

                combined_metrics_df = pd.concat(
                    [
                        combined_metrics_df,
                        pd.DataFrame(
                            {
                                "Training Accuracy": train_accuracy,
                                "Accuracy": accuracy,
                                "Training Error": train_error,
                                "Test Error": test_error,
                                "Training Precision": train_precision,
                                "Precision": precision,
                                "Training Recall": train_recall,
                                "Recall": recall,
                                "Training Specificity": train_specificity,
                                "Specificity": specificity,
                                "Training F1-Score": train_f1,
                                "F1-Score": f1,
                                "Training ROC-AUC": train_roc_auc,
                                "ROC-AUC": roc_auc,
                                "classifier_name": classifier_name,
                                "scaler_name": scaler_name,
                                "vectorizer_name": vectorizer_name,
                                "feature_selection_method": feature_selection_method,
                            },
                            index=[0],
                        ),
                    ],
                    ignore_index=True,
                )

        self.display_results()

        # Save the DataFrames as CSV tables
        combined_metrics_df.to_csv(
            f"/Users/aymannadeem/code/malware-detection/results/combined_metrics_{vectorizer_name}_{feature_selection_method}.csv",
            index=False,
        )

        return results, combined_metrics_df

    def store_confusion_mat(
        self, vectorizer_name, confusion_mat, classifier_name, scaler_name, feature_selection_method
    ):
        self.confusion_matrices.append(
            (vectorizer_name, confusion_mat, classifier_name, scaler_name, feature_selection_method)
        )

    def format_metrics(self, metrics_df, vectorizer_name, classifier_name, scaler_name, feature_selection_method):
        metrics_df_styled = metrics_df.style.set_table_attributes(
            "style='display:inline'"
        ).set_caption(f"{classifier_name} - {scaler_name} - {vectorizer_name} - {feature_selection_method}")

        # Add CSS styling to align the "Metric" and "Score" columns
        metrics_df_styled = metrics_df_styled.set_table_styles(
            [
                {
                    "selector": "th.col_heading",
                    "props": [("padding", "5px 15px 5px 15px")],
                }
            ]
        )

        return metrics_df_styled

    # Generates a learning curve plot for a given classifier and dataset.
    # Uses the learning_curve function to calculate the training and cross-validation
    # scores for different training set sizes.
    # The learning curve plot provides insights into how the classifier's performance varies with the number of training examples. It helps to identify if the model is overfitting (high training accuracy but low cross-validation accuracy) or underfitting (low training accuracy and low cross-validation accuracy). It also provides an estimate of the model's generalization performance as the training set size increases.
    def plot_learning_curve(
        self, classifier, X, y, classifier_name, scaler_name, vectorizer_name, feature_selection_method
    ):
        # The learning_curve function performs cross-validation by splitting the dataset
        # into multiple train-test splits and returns the training and test scores for each split.
        train_sizes, train_scores, test_scores = learning_curve(
            classifier,
            X,
            y,
            train_sizes=np.linspace(0.1, 1.0, 10),
            cv=5,
            scoring="accuracy",
        )

        # Compute mean and standard deviation of training
        # and test scores across different train-test splits.
        train_scores_mean = np.mean(train_scores, axis=1)
        train_scores_std = np.std(train_scores, axis=1)
        test_scores_mean = np.mean(test_scores, axis=1)
        test_scores_std = np.std(test_scores, axis=1)

        plt.figure(figsize=(10, 6))
        plt.title(
            f"Learning Curve - {classifier_name} - {scaler_name} - {vectorizer_name} - {feature_selection_method}"
        )
        plt.xlabel("Training Examples")
        plt.ylabel("Accuracy")
        plt.grid()

        # Create two shaded regions to represent variances
        # in the training and test scores, respectively.
        plt.fill_between(
            train_sizes,
            train_scores_mean - train_scores_std,
            train_scores_mean + train_scores_std,
            alpha=0.1,
            color="r",
        )
        plt.fill_between(
            train_sizes,
            test_scores_mean - test_scores_std,
            test_scores_mean + test_scores_std,
            alpha=0.1,
            color="g",
        )
        plt.plot(
            train_sizes,
            train_scores_mean,
            "o-",
            color="r",
            label="Training Accuracy",
        )
        plt.plot(
            train_sizes,
            test_scores_mean,
            "o-",
            color="g",
            label="Cross-Validation Accuracy",
        )
        plt.legend(loc="best")
        plt.tight_layout()

        # Save the figure as a JPEG image
        output_dir = "/Users/aymannadeem/code/malware-detection/results/fit"
        os.makedirs(output_dir, exist_ok=True)
        output_filename = (
            f"learning_curve_{vectorizer_name}_{classifier_name}_{scaler_name}_{feature_selection_method}.jpeg"
        )
        output_path = os.path.join(output_dir, output_filename)
        plt.savefig(output_path, bbox_inches="tight")
        plt.close()

        print(
            f"Learning curve for {classifier_name} - {scaler_name} - {vectorizer_name} - {feature_selection_method} saved as a JPEG image."
        )


    def compute_shap_values(self, classifier, X_train, y_train, classifier_name, scaler_name, vectorizer_name, feature_selection_method):
        try:
            # Fit the classifier
            classifier.fit(X_train, y_train)

            # Create a callable function based on the classifier name
            predict_function = (
                lambda input_data: classifier.predict_proba(input_data)[:, 1]
                if classifier_name != "Perceptron"
                else classifier.predict(input_data)
            )

            # Determine the SHAP explainer based on the classifier name
            explainer = shap.Explainer(predict_function, X_train, algorithm="auto", n_jobs=2)
                
            # Disable the additivity check
            explainer.check_additivity = False
            
            # Calculate SHAP values for the sampled data
            shap_values = explainer.shap_values(X_train)

            # Create a DataFrame to store the SHAP values for the sampled data
            shap_df = pd.DataFrame(data=shap_values, columns=X_train.columns)

            # Sort the DataFrame by the absolute sum of SHAP values across instances
            shap_df['importance'] = shap_df.abs().sum(axis=0)
            shap_df = shap_df.sort_values(by='importance', ascending=False).drop('importance', axis=1)

            # Save the SHAP values to a CSV file
            output_dir = "/Users/aymannadeem/code/malware-detection/results/shap"  # Specify the directory where you want to save the CSV file
            os.makedirs(output_dir, exist_ok=True)
            output_filename = f"shap_values_{vectorizer_name}_{classifier_name}_{scaler_name}_{feature_selection_method}.csv"
            output_path = os.path.join(output_dir, output_filename)
            shap_df.to_csv(output_path, index=False)

            # Plot SHAP summary plot
            title = f"SHAP Summary Plot - {classifier_name} - {scaler_name} - {vectorizer_name} - {feature_selection_method}"
            shap.summary_plot(
                shap_values,
                X_train,
                feature_names=X_train.columns.tolist(),
                class_names=["No Malware", "Malware"],
                show=False,
            )
            plt.title(title)

            # Save the figure as a JPEG image
            output_dir = "/Users/aymannadeem/code/malware-detection/results/shap"  # Specify the directory where you want to save the JPEG image
            os.makedirs(output_dir, exist_ok=True)
            output_filename = f"shap_summary_{vectorizer_name}_{classifier_name}_{scaler_name}_{feature_selection_method}.png"
            output_path = os.path.join(output_dir, output_filename)
            plt.savefig(output_path, bbox_inches="tight")
            plt.close()

            print(f"SHAP summary plot and SHAP values for {classifier_name} - {scaler_name} - {vectorizer_name} - {feature_selection_method} saved as PNG image and CSV file.")
        
        except Exception as e:
            print(f"Error occurred while computing SHAP values: {str(e)}")

    # Function to plot precision-recall curve
    def plot_precision_recall_curve(
        self,
        classifier,
        X,
        y,
        classifier_name,
        scaler_name,
        vectorizer_name,
        feature_selection_method,
    ):
        # Fit the classifier
        classifier.fit(X, y)

        # Predict probabilities
        if classifier_name != "Perceptron":
            y_probs = classifier.predict_proba(X)[:, 1]
        else:
            y_probs = classifier.predict(X)

        # Calculate precision-recall values at different probability thresholds
        precision, recall, thresholds = precision_recall_curve(y, y_probs)

        # Plot the precision-recall curve
        plt.figure(figsize=(8, 6))
        plt.plot(recall, precision, lw=2)
        plt.xlabel("Recall")
        plt.ylabel("Precision")
        plt.title(
            f"Precision-Recall Curve - {classifier_name} - {scaler_name} - {vectorizer_name} - {feature_selection_method}"
        )
        plt.grid(True)
        plt.xlim([0.0, 1.0])
        plt.ylim([0.0, 1.0])
        plt.tight_layout()

        # Save the figure as a JPEG image
        output_dir = "/Users/aymannadeem/code/malware-detection/results/precision_recall"
        os.makedirs(output_dir, exist_ok=True)
        output_filename = f"precision_recall_{vectorizer_name}_{classifier_name}_{scaler_name}_{feature_selection_method}.jpeg"
        output_path = os.path.join(output_dir, output_filename)
        plt.savefig(output_path, bbox_inches="tight")
        plt.close()

        print(
            f"Precision-Recall curve for {classifier_name} - {scaler_name} - {vectorizer_name} - {feature_selection_method} saved as a JPEG image."
        )
    
    def display_results(self):
        # Save results to jpeg
        output_dir = "/Users/aymannadeem/code/malware-detection/results"  # Specify the directory where you want to save the JPEG images
        os.makedirs(
            output_dir, exist_ok=True
        )  # Create the output directory if it doesn't exist

        for i, (
            vectorizer_name,
            confusion_mat,
            classifier_name,
            scaler_name,
            feature_selection_method,
        ) in enumerate(self.confusion_matrices):
            fig, axes = plt.subplots(1, 2, figsize=(15, 5))

            # Plot the confusion matrix
            ax = axes[0]
            ax.matshow(confusion_mat, cmap=plt.cm.Oranges, alpha=0.3)
            for j in range(confusion_mat.shape[0]):
                for k in range(confusion_mat.shape[1]):
                    ax.text(
                        x=k,
                        y=j,
                        s=confusion_mat[j, k],
                        va="center",
                        ha="center",
                        size="xx-large",
                    )

            ax.set_xlabel("Predictions", fontsize=18)
            ax.set_ylabel("Actuals", fontsize=18)
            ax.set_title(
                f"Confusion Matrix for {classifier_name} - {scaler_name} - {vectorizer_name} - {feature_selection_method}",
                # f"Confusion Matrix for {classifier_name} - {scaler_name} - {vectorizer_name}",
                fontsize=18,
            )

            # Plot the metrics table
            ax = axes[1]
            ax.axis("off")
            metrics_html = self.metrics_dataframes[
                i
            ].to_html()  # Render the styled DataFrame as HTML
            metrics_df = pd.read_html(metrics_html)[
                0
            ]  # Convert the HTML table back to a DataFrame
            metrics_table = (
                metrics_df.values.tolist()
            )  # Convert the DataFrame to a list of lists

            ax.table(
                cellText=metrics_table,
                colLabels=metrics_df.columns,
                cellLoc="center",
                loc="center",
            )

            # Save the figure as a JPEG image
            output_filename = f"{vectorizer_name}_{classifier_name}_{scaler_name}_{feature_selection_method}.jpeg"
            output_path = os.path.join(output_dir, output_filename)
            plt.savefig(output_path, bbox_inches="tight")
            plt.close()

        print("Results saved as JPEG images.")


# Usage


# # Create an instance of DataScale class and pass in each vectorized dataframe
# evaluator_cv = ModelEvaluator("CV", scaler_names_cv, train_scaled_cv, test_scaled_cv)
# evaluator_tf = ModelEvaluator("TF-IDF", scaler_names_tf, train_scaled_tf, test_scaled_tf)
                
# Evaluate ML results

# results_cv = evaluator_cv.evaluate_classifiers()
# results_tf = evaluator_tf.evaluate_classifiers()

# All features -- no selection
#     evaluator_cv = ModelEvaluator(
#         "CV", "none", scaler_names_cv, train_scaled_cv, test_scaled_cv
#     )
#     evaluator_tf = ModelEvaluator(
#         "TF-IDF", "none", scaler_names_tf, train_scaled_tf, test_scaled_tf
#     )

#     # Model results -- no selection
#     results_cv, metrics_df_cv = evaluator_cv.evaluate_classifiers()
#     results_tf, metrics_df_tf = evaluator_tf.evaluate_classifiers()

# Features selected via K-best
evaluator_cv_kbest = ModelEvaluator(
    "CV",
    "k-best",
    scaler_names_cv_kbest,
    train_scaled_cv_kbest,
    test_scaled_cv_kbest,
)
evaluator_tf_kbest = ModelEvaluator(
    "TF-IDF",
    "k-best",
    scaler_names_tf_kbest,
    train_scaled_tf_kbest,
    test_scaled_tf_kbest,
)

# Model results via K-best
results_cv_kbest, metrics_df_cv_kbest = evaluator_cv_kbest.evaluate_classifiers()
results_tf_kbest, metrics_df_tf_kbest = evaluator_tf_kbest.evaluate_classifiers()

# Features selected via Chi-squared
evaluator_cv_chi = ModelEvaluator(
    "CV",
    "chi-squared",
    scaler_names_cv_chi,
    train_scaled_cv_chi,
    test_scaled_cv_chi,
)
evaluator_tf_chi = ModelEvaluator(
    "TF-IDF",
    "chi-squared",
    scaler_names_tf_chi,
    train_scaled_tf_chi,
    test_scaled_tf_chi,
)

# Model results via chi-squared
results_cv_chi, metrics_df_cv_chi = evaluator_cv_chi.evaluate_classifiers()
results_tf_chi, metrics_df_tf_chi = evaluator_tf_chi.evaluate_classifiers()

# Features selected via Mutual information
evaluator_cv_mi = ModelEvaluator(
    "CV",
    "mutual information",
    scaler_names_cv_mi,
    train_scaled_cv_mi,
    test_scaled_cv_mi,
)
evaluator_tf_mi = ModelEvaluator(
    "TF-IDF",
    "mutual information",
    scaler_names_tf_mi,
    train_scaled_tf_mi,
    test_scaled_tf_mi,
)

# Model results via Mutual information
results_cv_mi, metrics_df_cv_mi = evaluator_cv_mi.evaluate_classifiers()
results_tf_mi, metrics_df_tf_mi = evaluator_tf_mi.evaluate_classifiers()

Results saved as JPEG images.
Results saved as JPEG images.
Results saved as JPEG images.
Results saved as JPEG images.
Results saved as JPEG images.
Results saved as JPEG images.
Learning curve for Gaussian Naive Bayes - QuantileTransformer - CV - k-best saved as a JPEG image.
Precision-Recall curve for Gaussian Naive Bayes - QuantileTransformer - CV - k-best saved as a JPEG image.
Learning curve for Decision Tree - MinMaxScaler - CV - k-best saved as a JPEG image.
Precision-Recall curve for Decision Tree - MinMaxScaler - CV - k-best saved as a JPEG image.
Learning curve for Logistic Regression - MinMaxScaler - TF-IDF - k-best saved as a JPEG image.
Precision-Recall curve for Logistic Regression - MinMaxScaler - TF-IDF - k-best saved as a JPEG image.
Learning curve for Random Forest - MinMaxScaler - TF-IDF - k-best saved as a JPEG image.
Precision-Recall curve for Random Forest - MinMaxScaler - TF-IDF - k-best saved as a JPEG image.
Learning curve for Logistic Regression - QuantileTra

In [36]:
import os
import pandas as pd

# Get all files in the output directory
output_dir = "/Users/aymannadeem/code/malware-detection/results/shap"
all_files = os.listdir(output_dir)

# Filter out non-CSV files
csv_files = [file for file in all_files if file.endswith('.csv')]

# Load all CSV files into a dictionary of DataFrames
dfs = {file[:-4]: pd.read_csv(os.path.join(output_dir, file)) for file in csv_files}

# Compute the absolute values before concatenation
abs_dfs = {name: df.abs() for name, df in dfs.items()}

# Concatenate all dataframes, adding a 'source' column to know where each row came from
concat_df = pd.concat([df.assign(source=name) for name, df in abs_dfs.items()])

# Exclude 'source' from the features for the importance computation
features = concat_df.columns.drop('source')

# Compute the total importance of each feature
total_importance = concat_df[features].sum()

# Sort by the total importance
sorted_importance = total_importance.sort_values(ascending=False)

# Get the top contributing features
n = 20  # change this to the number of top features desired
top_features = sorted_importance.index[:n]
print(top_features)

Index(['Creation_year', 'Last Update_year', 'author_account_created_at_year',
       'labels', 'Author Account Activity Duration (Days)', 'author_email',
       'Fork', 'Python', 'Age of account at time of repo creation (Days)',
       'Repo Activity Duration (Days)', 'author_email_repo_count',
       'author_account_last_update_year', 'JavaScript', 'Watch',
       'repos_created_by_author', 'CSS', 'commits', 'C', 'Star',
       'author_following'],
      dtype='object')


In [42]:
# Update train_scaled_tf and test_scaled_tf to include only top_features
for i in range(len(train_scaled_tf)):
    train_scaled_tf[i] = (train_scaled_tf[i][0][top_features], train_scaled_tf[i][1])
    
for i in range(len(test_scaled_tf)):
    test_scaled_tf[i] = (test_scaled_tf[i][0][top_features], test_scaled_tf[i][1])

# No need to update scaler_names_tf, it just contains the names of the scalers used.

# Now you can use these updated inputs in your ModelEvaluator
evaluator_shap = ModelEvaluator(
    "TF-IDF",
    "shap",
    scaler_names_tf,
    train_scaled_tf,
    test_scaled_tf,
)

results_shap, metrics_shap = evaluator_shap.evaluate_classifiers()

Results saved as JPEG images.
Learning curve for Logistic Regression - QuantileTransformer - TF-IDF - shap saved as a JPEG image.
Learning curve for Gaussian Naive Bayes - StandardScaler - TF-IDF - shap saved as a JPEG image.
Learning curve for Perceptron - StandardScaler - TF-IDF - shap saved as a JPEG image.
Learning curve for Perceptron - MinMaxScaler - TF-IDF - shap saved as a JPEG image.
Learning curve for Perceptron - RobustScaler - TF-IDF - shap saved as a JPEG image.
Learning curve for Decision Tree - StandardScaler - TF-IDF - shap saved as a JPEG image.
Learning curve for Gaussian Naive Bayes - MinMaxScaler - TF-IDF - shap saved as a JPEG image.
Learning curve for Perceptron - QuantileTransformer - TF-IDF - shap saved as a JPEG image.
Learning curve for Decision Tree - RobustScaler - TF-IDF - shap saved as a JPEG image.
Learning curve for Logistic Regression - MinMaxScaler - TF-IDF - shap saved as a JPEG image.
Learning curve for Gaussian Naive Bayes - RobustScaler - TF-IDF - 