# Preprocessing

In [1]:
import matplotlib.pyplot as plt
import numpy as np
import os
import pandas as pd
import seaborn as sns
import time

import torch
from torch.utils.data import DataLoader
import torch.utils.data as data_utils


In [2]:
import torch



# Device
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

# Path of the model (saved/to save)
modelFolder = './models/'

# When True, retrain the whole model
retrain = True

# Downsample the dataset
ds = True

# Size of the split
trainSize = 0.75
valSize = 0.05
testSize = 0.20

# Specify number of seconds for the window. Default: 16
window_size = 16

# Model hyper-parameters
batch_size = 4
learning_rate = 1e-3

# Seed for reproducibility
seed = 42

# Classes to drop in the dataset
classes_to_drop=[
    'stabf','stab']



In [3]:
import numpy as np
import os
import pandas as pd
import random

from imblearn.under_sampling import RandomUnderSampler
from sklearn import preprocessing
from sklearn.metrics import f1_score
from torch.utils.data import Dataset

import torch
import torch.nn as nn



def setSeed(seed=seed):
    """
    Setting the seed for reproducibility
    """
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = True

setSeed()

def min_max_norm(self,col):
    self._data[col]=(self._data[col]-self._data[col].min())/(self._data[col].max()-self._data[col].min())


def std_scaler(self,col):
    self._data[col]=(self._data[col]-self._data[col].mean())/(self._data[col].std())


def f1(test_loader, model):
    f1 = 0
    with torch.no_grad():
        for i, (data, labels) in enumerate(test_loader):
            outputs = model(data)
            pred = outputs.data.max(1, keepdim=True)[1]
            f1 += f1_score(labels, pred, average='macro')
    avg_f1 = f1/len(test_loader)
    return (avg_f1)


class CustomDataset(Dataset):
    def __init__(self, file_path='/content/new_dataset.csv', classes_to_drop=classes_to_drop, window_size=window_size, normalize=True, normalize_method='mean_std', auth=False, target=None):

        self._window_size=window_size
        self._data=pd.read_csv(file_path)

        # if auth==True:
        #     if target != 'J':
        #         self._data = self._data[self._data['stabf'].isin([target, 'J'])]
        #     else:
        #         self._data = self._data[self._data['stabf'].isin([target, 'I'])]

        #     self._data['stabf'] = self._data['stabf'].apply(lambda x: target if x == target else 'Z')
        #     self._data['stabf'] = self._data['stabf'].map({target: 1, 'Z': 0}).fillna(0).astype(int)


        # # Random Undersampling
        # X = self._data.drop('stabf', axis=1)
        # y = self._data['stabf']

        # # sampler = RandomUnderSampler(sampling_strategy='not minority', random_state=seed)
        # # X_resampled, y_resampled = sampler.fit_resample(X, y)

        # # X_resampled['Class'] = y_resampled
        # self._data = X

        # The data is sorted by Class A,B,C the indexes of the dataframe have restarted by ignore index
        self._data = self._data.sort_values(by=['stabf'], inplace=False,ignore_index = True)

        # class_uniq contains the letters of the drivers A,B and it loops across all of them
        for class_uniq in list(self._data['stabf'].unique()):
            # Find the total number of elements belonging to a class
            tot_number=sum(self._data['stabf']==class_uniq)
            # Number of elements to drop so that the class element is divisible by window size
            to_drop=tot_number%window_size
            # Returns the index of the first element of the class
            index_to_start_removing=self._data[self._data['stabf']==class_uniq].index[0]
            # Drop element from first element to the element required
            self._data.drop(self._data.index[index_to_start_removing:index_to_start_removing+to_drop],inplace=True)


        # Resetting index of dataframe after dropping values
        self._data = self._data.reset_index()
        self._data = self._data.drop(['index'], axis=1)

        index_starting_class=[] # This array contains the starting index of each class in the df
        for class_uniq in list(self._data['stabf'].unique()):
            # Appending the index of first element of each clas
            index_starting_class.append(self._data[self._data['stabf']==class_uniq].index[0])

        # Create the sequence of indexs of the windows
        sequences=[]
        for i in range(len(index_starting_class)):
            # Check if beginning of next class is there
            if i!=len(index_starting_class)-1:
                ranges=np.arange(index_starting_class[i], index_starting_class[i+1])
            else:
                ranges = np.arange(index_starting_class[i], len(self._data))
            for j in range(0,len(ranges),int(self._window_size/2)):
                if len(ranges[j:j+self._window_size])==16:
                    sequences.append(ranges[j:j+self._window_size])
        self._sequences=sequences


        # Take only the 'Class' which are the actual labels and store it in the labels of self
        self._labels=self._data['stabf']
        # Dropping columns which have constant measurements because they would return nan in std
        self._data.drop(classes_to_drop, inplace=True, axis=1)

        # Function to normalize the data either with min_max or mean_std
        if normalize and not auth:
            for col in self._data.columns:
                if normalize_method=='min_max':
                    min_max_norm(self,col)
                elif normalize_method=="mean_std":
                    std_scaler(self,col)

        # Create the array holding the windowed multidimensional arrays
        X=np.empty((len(sequences), self._window_size, len(self._data.columns)))
        y=[]

        for n_row, sequence in enumerate(sequences):
            X[n_row,:,:]=self._data.iloc[sequence]
            # The corresponding driver of the sequence is the driver at first sequence
            y.append(self._labels[sequence[0]])

        assert len(y)==len(X)
        # Assign the windowed dataset to the X of self
        self._X= X

        # Targets is a transformed version of y with drivers are encoded into 0 to 9
        targets = preprocessing.LabelEncoder().fit_transform(y)
        class_labels = encoder.classes_
        for code, label in enumerate(class_labels):
          print(f'Code: {code} -> Label: {label}')
        targets = torch.as_tensor(targets)  # Just converting it to a pytorch tensor
        self._y=targets # Assign it to y of self


    def __len__(self):
        return len(self._X)


    def __getitem__(self, index):
        return torch.FloatTensor(self._X[index,:,:]), self._y[index]


def evaluate(model, dataloader, criterion):
    model.eval()
    running_loss = 0.0
    running_corrects = 0
    y_true = []
    y_pred = []

    for inputs, labels in dataloader:
        inputs = inputs.to(device)
        labels = labels.to(device)

        inputs = inputs
        labels = labels

        # Forward pass
        with torch.no_grad():
            outputs = model(inputs)
            loss = criterion(outputs, labels)

        _, preds = torch.max(outputs, 1)
        running_loss += loss.item() * inputs.size(0)
        running_corrects += torch.sum(preds == labels.data)

        # Collect predictions and true labels
        y_true += labels.data.cpu().numpy().tolist()
        y_pred += preds.cpu().numpy().tolist()

    # Calculate accuracy and loss
    epoch_loss = running_loss / len(dataloader.dataset)
    epoch_acc = running_corrects.double() / len(dataloader.dataset)
    epoch_f1 = f1_score(y_true, y_pred, average='macro')

    return epoch_loss, epoch_acc, epoch_f1


def evaluateBinary(model, dataloader, criterion):
    model.eval()
    running_loss = 0.0
    running_corrects = 0
    y_true = []
    y_pred = []

    for inputs, labels in dataloader:
        inputs = inputs.to(device)
        labels = labels.to(device)

        # Forward pass
        with torch.no_grad():
            outputs = model(inputs)
            # loss = criterion(outputs, labels)
            loss = criterion(outputs.squeeze(), labels.float())

        _, preds = torch.max(outputs, 1)
        # preds = (outputs > 0.5).float()
        running_loss += loss.item() * inputs.size(0)
        running_corrects += torch.sum(preds == labels.data)

        # Collect predictions and true labels
        y_true += labels.data.cpu().numpy().tolist()
        y_pred += preds.cpu().numpy().tolist()

    # Calculate accuracy and loss
    epoch_loss = running_loss / len(dataloader.dataset)
    epoch_acc = running_corrects.double() / len(dataloader.dataset)
    epoch_f1 = f1_score(y_true, y_pred, average='macro')

    return epoch_loss, epoch_acc, epoch_f1



In [None]:
dataset_path = '/content/smart_grid_stability_augmented.csv'
df = pd.read_csv(dataset_path)
df

In [None]:
from sklearn.preprocessing import LabelEncoder

# Assuming 'marker' is a categorical column in your DataFrame
encoder = LabelEncoder()
df['stabf'] = encoder.fit_transform(df['stabf'])

# Retrieve the mapping of numerical codes to original class labels
class_labels = encoder.classes_

# Display the mapping
for code, label in enumerate(class_labels):
    print(f'Code: {code} -> Label: {label}')
df

In [None]:
df.to_csv('new_dataset.csv', index=False)
df

In [8]:
import torch
import numpy as np
np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = True


In [None]:
a = CustomDataset()

# Defining sizes
train_size = int(trainSize * len(a))
val_size = int(valSize * len(a))
test_size = len(a)-train_size-val_size

train_dataset, val_dataset, test_dataset = torch.utils.data.random_split(
    a, [train_size, val_size, test_size])


train_loader = torch.utils.data.DataLoader(dataset=train_dataset,
                                           batch_size=batch_size,
                                           shuffle=False,
                                           drop_last=True)

validation_loader = torch.utils.data.DataLoader(dataset=val_dataset,
                                                batch_size=batch_size,
                                                shuffle=False,
                                                drop_last=True)

test_loader = torch.utils.data.DataLoader(dataset=test_dataset,
                                          batch_size=batch_size,
                                          shuffle=False,
                                          drop_last=True)

# LSTM model

In [None]:
import torch.optim as optim

class RNNBinaryClassification(torch.nn.Module):
    def __init__(self, batch_size, window_size, num_features, dropout_rate=0.5):
        super(RNNBinaryClassification, self).__init__()
        self.rnn1 = torch.nn.LSTM(num_features, 220, batch_first=True, bidirectional=True)
        self.dropout = torch.nn.Dropout(p=dropout_rate)
        self.fc = torch.nn.Linear(440, 1)  # Output size is 1 for binary classification
        self.sigmoid = torch.nn.Sigmoid()  # Sigmoid activation for binary classification

    def forward(self, x):
        rnn1_out, _ = self.rnn1(x)
        rnn1_out = self.dropout(rnn1_out[:, -1, :])
        fc_out = self.fc(rnn1_out)
        out = self.sigmoid(fc_out)
        return out
# Initialize your model, criterion, and optimizer
model = RNNBinaryClassification(batch_size, window_size, 12).to(device)
criterion = nn.BCELoss()

optimizer = optim.Adam(model.parameters(), lr=learning_rate)

retrain = False

if not os.path.exists('./models/rnn_auth.pt') or retrain:
    # Training loop
    for epoch in range(10):
        model.train()
        total_loss = 0.0

        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)

            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs.squeeze(), labels.float())
            loss.backward()
            optimizer.step()

            total_loss += loss.item()

        average_loss = total_loss / len(train_loader)

        print(f'[💪 EPOCH {epoch+1}/{10}] Loss: {average_loss:.3f}')

In [None]:
# Calculate accuracy
correct_predictions = 0
total_samples = 0

y_true = []
y_pred = []

for inputs, labels in test_loader:
    inputs, labels = inputs.to(device), labels.to(device)
    outputs = model(inputs)
    predictions = (outputs > 0.5).float()

    for p, l in zip(predictions, labels.float()):
        if p == l:
            correct_predictions += 1

    total_samples += labels.size(0)

    y_true.extend(labels.cpu().numpy())
    y_pred.extend(predictions.cpu().numpy())

acc = correct_predictions/total_samples
f1 = f1_score(y_true, y_pred, average='binary')

print('[👑 TEST GRU AUTH]\n')
print(f'[🎯 ACCURACY] {acc:.3f}')
print(f'[⚖️ F1 SCORE] {f1:.3f}')

# GAN-GRID against LSTM

In [12]:
class Generator(nn.Module):
    def __init__(self, batch_size, window_size, num_features,):
        super(Generator, self).__init__()
        self.batch_size = batch_size
        self.num_features = num_features
        self.window_size = window_size
        self.layer1 = nn.Linear(num_features, 128)
        self.layer2 = nn.Linear(128, 256)
        self.layer3 = nn.Linear(256, 512)
        self.layer4 = nn.Linear(512, batch_size*window_size)
        self.layer5 = nn.Linear(batch_size*window_size, num_features)

        self.leaky_relu = nn.LeakyReLU(0.2)

    def forward(self, x):
        x = self.leaky_relu(self.layer1(x))
        x = self.leaky_relu(self.layer2(x))
        x = self.leaky_relu(self.layer3(x))
        x = self.leaky_relu(self.layer4(x))
        x = self.layer5(x)
        return x

In [13]:
def train_gan(generator, surrogate, label, train_loader, num_epochs=100, lr=0.001, device=torch.device('cpu'), ml=False, num_episodes=150):

    losses = []

    if not ml:
        generator = generator.to(device)
        surrogate = surrogate.to(device)

        # for model in surrogate.models:
        #     model.to(device)
        #     model.train()

    # Define the loss function and optimizer
    binary_cross_entropy_loss = nn.BCEWithLogitsLoss()
    generator_optimizer = torch.optim.Adam(generator.parameters(), lr=lr)

    # Define the reinforcement learning parameters
    max_episode_length = 10
    alpha = 0.1
    gamma = 0.9

    for episode in range(num_episodes):
        # Initialize the latent input and the episode reward
        latent_input = torch.randn(4, 16, 12).to(device)
        episode_reward = 0

        for step in range(max_episode_length):
            # Generate a sample with the current latent input
            fake_input = generator(latent_input)

            # Evaluate the sample with the surrogate model
            if not ml:
                surrogate_output = surrogate(fake_input)
            else:
                surrogate_output = []
                # Looping through each group of 16
                for group in fake_input:
                    # Flatten the group to make it compatible with RandomForestClassifier
                    flat_group = group.view(-1, group.size(-1)).detach().numpy()
                    # Get the probabilities from the RandomForestClassifier
                    probabilities = surrogate.predict_proba(flat_group)
                    mean_probabilities = np.mean(probabilities, axis=0)
                    # Append the probabilities to the array
                    surrogate_output.append(mean_probabilities)
                with torch.no_grad():
                    surrogate_output = torch.tensor(surrogate_output, requires_grad=True)

            predictions = (surrogate_output > 0.5).float()
            targets = torch.randint_like(predictions, 0, 2)
            reward = (predictions == targets).float().mean().item()
            episode_reward += reward

            # Update the latent input using reinforcement learning
            td_error = reward - episode_reward
            latent_input += alpha * td_error * gamma**step * torch.randn_like(latent_input)

        # Update the generator using the final latent input of the episode
        generator_optimizer.zero_grad()
        fake_input = generator(latent_input)

        if not ml:
            surrogate_output = surrogate(fake_input)
        else:
            surrogate_output = []
            # Looping through each group of 16
            for group in fake_input:
                # Flatten the group to make it compatible with RandomForestClassifier
                flat_group = group.view(-1, group.size(-1)).detach().numpy()
                # Get the probabilities from the RandomForestClassifier
                probabilities = surrogate.predict_proba(flat_group)
                mean_probabilities = np.mean(probabilities, axis=0)
                # Append the probabilities to the array
                surrogate_output.append(mean_probabilities)
            with torch.no_grad():
                surrogate_output = torch.tensor(surrogate_output, requires_grad=True)

        target_labels = targets.view(-1, 1).float()

        generator_loss = binary_cross_entropy_loss(surrogate_output, target_labels)

        if ml:
            generator_optimizer.zero_grad()

        generator_loss.backward()
        generator_optimizer.step()

        losses.append(generator_loss.item())

        if episode % 10 == 0:
            print(f'[⏭️ EP {episode}/{num_episodes} | D{label}] LOSS: {round(generator_loss.item(), 3)}')

    print()

    return generator, losses

Restart the process if the gennrator did not converge in the first try

In [None]:
lr = 3e-3

generators = []
losses = []

inputs, classes = next(iter(train_loader))

# For each driver
for d in range(1):
        print(f'[🤖 GENERATORS] Label {d}')

        batch_size, window_size, num_features = inputs.shape
        generator = Generator(batch_size, window_size, num_features)
        surrogate_model = model

        generator, loss = train_gan(generator, surrogate_model, train_loader=train_loader, num_epochs=20, lr=lr, label=0, ml=False, num_episodes=100)
        print()

        generators.append(generator)
        losses.append(loss)

In [None]:
results = []

threshold = 0.5

for i in range(1):
    predicted_labels = []
    generator = generators[i].to(device)

    for batch in test_loader:
        input_batch, true_labels = batch[0].to(device), batch[1].to(device)
        # Generate data
        generated_data = generator(torch.randn(4, 16, 12).to(device))
        # generated_data = generated_data * ones_tensor

        # Add the result to the ones tensor
        final_result =  generated_data

        # Get the surrogate outputs for each sample in the generated data
        surrogate_outputs = model(final_result)

        # Apply the threshold for binary classification
        predicted_labels_batch = (surrogate_outputs > threshold).float()

        # Append the predicted labels to the lists
        predicted_labels.extend(predicted_labels_batch.squeeze().tolist())  # Squeeze the tensor

    asr = predicted_labels.count(i) / len(predicted_labels)
    results.append(asr)
    print(f'[👑 DRIVER {i}] ASR: {round(asr, 3)}')

#Random noise attack

In [None]:
from sklearn.metrics import accuracy_score
class WhiteBoxAttack:
    def __init__(self, model, epsilon=0.5, num_samples=50):
        self.model = model
        self.epsilon = epsilon
        self.num_samples = num_samples

    def attack(self, data_loader):
        self.model.eval()  # Set the model to evaluation mode

        for inputs, labels in data_loader:
            inputs_adv = inputs.clone().detach().to(device)

            for _ in range(self.num_samples):
                perturbation = torch.randn_like(inputs_adv) * self.epsilon
                inputs_perturbed = inputs_adv + perturbation

                with torch.no_grad():
                    outputs_perturbed = self.model(inputs_perturbed)
                    y_pred_perturbed = torch.round(outputs_perturbed).squeeze().cpu().numpy()
                    y_pred_orig = torch.round(self.model(inputs_adv)).squeeze().cpu().numpy()

                for i in range(len(inputs_adv)):
                    if accuracy_score([labels[i].item()], [y_pred_perturbed[i]]) <= accuracy_score([labels[i].item()], [y_pred_orig[i]]):
                        inputs_adv[i] = inputs_perturbed[i]

            yield inputs_adv
attack = WhiteBoxAttack(model, epsilon=0.5, num_samples=50)

# Generate adversarial examples
X_test_adv_loader = attack.attack(test_loader)

with torch.no_grad():
    y_true = []
    y_pred_orig = []
    y_pred_adv = []
    for inputs, labels in test_loader:
        X_test_tensor = inputs.to(device)
        y_true.extend(labels.cpu().numpy())
        X_test_adv = next(X_test_adv_loader)  # Get adversarial examples from the generator
        y_pred_orig.extend(torch.round(model(inputs)).squeeze().cpu().numpy())
        y_pred_adv.extend(torch.round(model(X_test_adv.to(device))).squeeze().cpu().numpy())

accuracy_orig = accuracy_score(y_true, y_pred_orig)
accuracy_adv = accuracy_score(y_true, y_pred_adv)

print("Accuracy on original test examples:", accuracy_orig)
print("Accuracy on adversarial test examples:", accuracy_adv)

# FGSM,BIM,PGD

In [None]:
pip install adversarial-robustness-toolbox


In [None]:
from art.attacks.evasion import FastGradientMethod, ProjectedGradientDescentPyTorch, ProjectedGradientDescentNumpy, CarliniLInfMethod, CarliniWagnerASR,UniversalPerturbation
from art.estimators.classification import PyTorchClassifier
from art.attacks.evasion.iterative_method import BasicIterativeMethod
from sklearn.metrics import accuracy_score, f1_score

classifier = PyTorchClassifier(
    model=model,
    loss=criterion,
    input_shape=(12,),
    nb_classes=2,
    device_type="cpu"
    # device_type="gpu" if torch.cuda.is_available() else "cpu"
)

# Define FGSM, BIM, and CW attacks
fgsm_attack = FastGradientMethod(estimator=classifier, eps=0.5)
bim_attack = BasicIterativeMethod(estimator=classifier, eps=0.5)
pgd_attack = ProjectedGradientDescentNumpy(estimator=classifier, eps=0.5)
def evaluate_clean_data(model, test_loader):
    model.eval()
    correct_predictions = 0
    total_samples = 0

    y_true = []
    y_pred = []

    with torch.no_grad():
        for inputs, labels in test_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            predictions = (outputs > 0.5).float()

            correct_predictions += (predictions == labels).sum().item()
            total_samples += labels.size(0)

            y_true.extend(labels.cpu().numpy())
            y_pred.extend(predictions.cpu().numpy())

    acc = accuracy_score(y_true,y_pred)
    f1 = f1_score(y_true, y_pred, average='binary')

    print('[👑 EVALUATION ON CLEAN DATA]\n')
    print(f'[🎯 ACCURACY] {acc:.3f}')
    print(f'[⚖️ F1 SCORE] {f1:.3f}')

def evaluate_attack(model, test_loader, attack):
    model.eval()
    y_true = []
    y_pred = []

    for inputs, labels in test_loader:
        inputs, labels = inputs.to(device), labels.to(device)  # Move inputs and labels to devic
        adv_inputs = attack.generate(x=inputs.cpu().numpy())
        adv_inputs = torch.tensor(adv_inputs).to(device)
        outputs = model(adv_inputs)
        predictions = (outputs > 0.5).float()

        y_true.extend(labels.cpu().numpy())
        y_pred.extend(predictions.cpu().numpy())

    acc = accuracy_score(y_true, y_pred)
    f1 = f1_score(y_true, y_pred, average='binary')

    print('[👑 EVALUATION UNDER ATTACK]\n')
    print(f'[🎯 ACCURACY] {acc:.3f}')
    print(f'[⚖️ F1 SCORE] {f1:.3f}')

# Evaluate on clean data
evaluate_clean_data(model, test_loader)




In [None]:
# Evaluate under FGSM attack
evaluate_attack(model, test_loader, fgsm_attack)

In [None]:
# Evaluate under BIM attack
evaluate_attack(model, test_loader, bim_attack)

In [None]:
evaluate_attack(model, test_loader, pgd_attack)

In [None]:
import matplotlib.pyplot as plt

# Given ASR values for two sets
asr_values_set1 = [0.999,0.995, 0.981, 0.866, 0.729, 0.563, 0.457, 0.404, 0.387, 0.383, 0.383]
asr_values_set2 = [0.999,0.995, 0.972, 0.856, 0.714,0.537, 0.454,  0.397, 0.384, 0.383, 0.382]
asr_values_set3 = [0.999,0.995, 0.971, 0.859, 0.714, 0.537, 0.442, 0.397, 0.383, 0.383, 0.382]
asr_values_set4 = [0.999,0.996, 0.987, 0.94, 0.884, 0.826, 0.755, 0.67, 0.634, 0.568, 0.497]


# Create a list of feature numbers starting from one
feature_numbers = [0,0.05,0.1,0.15,0.20,0.25,0.30,0.35,0.40,0.45,0.50]

# Plotting the graphs
plt.ylim(0, 1.1)
plt.plot(feature_numbers, asr_values_set1, marker='o', linestyle='-', color='b', label='fgsm')
plt.plot(feature_numbers, asr_values_set2, marker='s', linestyle='-', color='r', label='bim')
plt.plot(feature_numbers, asr_values_set3, marker='^', linestyle='-', color='g', label='pgd')
plt.plot(feature_numbers, asr_values_set4, marker='^', linestyle='-', color='y', label='random_noise') # Different color for set3
# Adding legend
plt.legend()

plt.xlabel('epsilon')
plt.ylabel('Accuracy')
plt.title('Accuracy vs epsilon')
plt.grid(True)
plt.tight_layout()
plt.savefig('/content/model_WB_LSTM.pdf', format='pdf')
plt.show()

# SHAP for LSTM

In [None]:
!pip install shap

In [None]:
import pandas as pd
import shap
import torchvision.transforms as transforms

# Assuming you have train_loader and test_loader defined

# Function to convert DataLoader to numpy array
def loader_to_numpy(loader):
    all_data = []
    all_labels = []
    for data, labels in loader:
        all_data.append(data)
        all_labels.append(labels)
    return torch.cat(all_data).numpy(), torch.cat(all_labels).numpy()

# Convert train_loader and test_loader to numpy arrays
X_train_array, y_train_array = loader_to_numpy(train_loader)
X_test_array, y_test_array = loader_to_numpy(test_loader)

# Create a SHAP explainer object
explainer = shap.GradientExplainer(model, torch.from_numpy(X_train_array))

# Calculate the SHAP values for the NumPy array
shap_values = explainer.shap_values(torch.from_numpy(X_test_array))

In [None]:


# Convert the list of SHAP values to a numpy array
shap_values_array = np.array(shap_values)

# Calculate the mean across all models
# shap_values_mean = np.mean(shap_values_array, axis=0)
# Reshape shap_values_mean and X_test_array to remove the dimension with size 16
shap_values_mean_reshaped = shap_values_array.reshape(-1, 12)  # Assuming the last dimension is 50
X_test_array_reshaped = X_test_array.reshape(-1, 12)

# Plot the summary plot
shap.summary_plot(shap_values_mean_reshaped, X_test_array_reshaped, feature_names=df.columns,show=False)

plt.savefig('/content/SHAP.jpg', dpi=300)

# Display the plot (optional)
plt.show()

# Cumulative distribution

In [None]:
#  Create a new DataFrame containing only rows with value 0 in 'column_name'
new_df = df[df['stabf'] == 0]

# Display the new DataFrame
new_df

In [None]:
classes_to_drop=['stabf','stab']
new_df.drop(classes_to_drop,inplace=True, axis=1)
new_df

In [None]:
generated_data = generator(torch.randn(1358, 16, 12).to(device))
generated_data_numpy = generated_data.detach().cpu().numpy()
generated_data_reshaped = generated_data_numpy.reshape(-1, 12)
df_generated_data = pd.DataFrame(generated_data_reshaped, columns=new_df.columns)
df_generated_data


In [None]:
# Summary statistics for df1
new_df.describe()


In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

# Function to calculate CDF
def calculate_cdf(data):
    sorted_data = np.sort(data)
    cdf = np.arange(1, len(sorted_data) + 1) / len(sorted_data)
    return sorted_data, cdf
columns_to_plot = ['tau1', 'tau2','tau3','tau4','g1','g2','g3','g4']
# Plot CDF for each feature
plt.figure(figsize=(5, 5))
for column in columns_to_plot:
    sorted_data, cdf = calculate_cdf(new_df[column])
    plt.plot(sorted_data, cdf, label=column)

plt.xlabel('Value')
plt.ylabel('Cumulative Probability')
plt.title('CDF Plot')
plt.legend()
plt.grid(True)
plt.savefig('/content/CDF_realdata.jpg', dpi=300)
plt.show()


In [None]:
df_generated_data.describe()

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

# Function to calculate CDF
def calculate_cdf(data):
    sorted_data = np.sort(data)
    cdf = np.arange(1, len(sorted_data) + 1) / len(sorted_data)
    return sorted_data, cdf
columns_to_plot = ['tau1', 'tau2','tau3','tau4','g1','g2','g3','g4']
# Plot CDF for each feature
plt.figure(figsize=(5, 5))
for column in columns_to_plot:
    sorted_data, cdf = calculate_cdf(df_generated_data[column])
    plt.plot(sorted_data, cdf, label=column)

plt.xlabel('Value')
plt.ylabel('Cumulative Probability')
plt.title('CDF Plot')
plt.legend()
plt.grid(True)
plt.savefig('/content/CDF_DL.jpg', dpi=300)
plt.show()


# XGBoost Model

In [32]:
import torch
import numpy as np
np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = True


In [None]:
train_datasets = []
validation_datasets = []
test_datasets = []

train_loaders = []
validation_loaders = []
test_loaders = []


for i in range(1):
    print(f'[📚 LOADERS] {i}')
    target_label = 0
    a = CustomDataset(auth=True, target=target_label)

    # Defining sizes
    train_size = int(trainSize * len(a))
    val_size = int(valSize * len(a))
    test_size = len(a)-train_size-val_size

    train_dataset, val_dataset, test_dataset = torch.utils.data.random_split(
        a, [train_size, val_size, test_size])

    train_datasets.append(train_dataset)
    validation_datasets.append(val_dataset)
    test_datasets.append(test_dataset)


    train_loader = torch.utils.data.DataLoader(dataset=train_dataset,
                                            batch_size=batch_size,
                                            shuffle=False,
                                            drop_last=True)

    train_loaders.append(train_loader)

    validation_loader = torch.utils.data.DataLoader(dataset=val_dataset,
                                                    batch_size=batch_size,
                                                    shuffle=False,
                                                    drop_last=True)

    validation_loaders.append(validation_loader)

    test_loader = torch.utils.data.DataLoader(dataset=test_dataset,
                                            batch_size=batch_size,
                                            shuffle=False,
                                            drop_last=True)

    test_loaders.append(test_loader)

In [None]:
classes_to_drop=['stabf','stab']
df.drop(classes_to_drop,inplace=True, axis=1)
df

In [None]:
from xgboost import XGBClassifier
from sklearn.metrics import accuracy_score, confusion_matrix, classification_report
from sklearn.model_selection import train_test_split
for d in range(1):
    X_train = []
    y_train = []
    for data in train_datasets[d]:
        X_train.append(data[0])
        for i in range(len(data[0])):
            y_train.append(data[1])

    # Convert lists to numpy arrays
    X_train = np.array(X_train)
    X_train = [tensor for tensor in X_train]
    X_train = np.concatenate(X_train, axis=0)
    X_train = pd.DataFrame(X_train, columns=df.columns.tolist())

    y_train = np.array(y_train)
    y_train = pd.DataFrame(y_train, columns=['stabf'])

    model = XGBClassifier(objective='binary:logistic', random_state=42)

    # Training
    model.fit(X_train, y_train)

    X_test = []
    y_test = []
    for data in test_datasets[d]:
        X_test.append(data[0])
        for i in range(len(data[0])):
            y_test.append(data[1])

    # Convert lists to numpy arrays
    X_test = np.array(X_test)
    X_test = [tensor for tensor in X_test]
    X_test = np.concatenate(X_test, axis=0)
    X_test = pd.DataFrame(X_test, columns=df.columns.tolist())

    y_test = np.array(y_test)
    y_test = pd.DataFrame(y_test, columns=['stabf'])

    # Testing
    y_pred = model.predict(X_test)

    accuracy = accuracy_score(y_test, y_pred)
    f1 = f1_score(y_test, y_pred, average='weighted')

    print(f'[💪 TRAINING {d}] ACC: {accuracy:.3f}')
    print(f'[⚖️ F1 SCORE] {f1:.3f}')

In [36]:
import numpy as np

# Convert DataFrame to arrays
X_train_array = X_train.values
X_test_array = X_test.values
y_train_array = y_train.values
y_test_array = y_test.values


# Random noise Attack

In [None]:
import numpy as np
from sklearn.metrics import accuracy_score
from xgboost import XGBClassifier

class WhiteBoxAttack:
    def __init__(self, model, epsilon=0.05, num_samples=50):
        self.model = model
        self.epsilon = epsilon
        self.num_samples = num_samples

    def attack(self, X, y):
        y_pred_orig = self.model.predict(X)
        accuracy_orig = accuracy_score(y, y_pred_orig)

        X_adv = np.copy(X)
        for i in range(len(X_adv)):
            for _ in range(self.num_samples):
                perturbation = np.random.normal(loc=0.0, scale=self.epsilon, size=X_adv[i].shape)
                X_perturbed = X_adv[i] + perturbation
                y_pred_perturbed = self.model.predict([X_perturbed])[0]
                if accuracy_score([y[i]], [y_pred_perturbed]) < accuracy_orig:
                    X_adv[i] = X_perturbed
                    break

        return X_adv

# Example usage:
# Create and train an XGBoost model
xgb_model = XGBClassifier()
xgb_model.fit(X_train_array, y_train_array)

# Instantiate the WhiteBoxAttack
attack = WhiteBoxAttack(model=xgb_model, epsilon=0.05, num_samples=50)

# Generate adversarial examples
X_test_adv = attack.attack(X_test_array, y_test_array)

# Evaluate the performance of the attack
y_pred_orig = xgb_model.predict(X_test_array)
y_pred_adv = xgb_model.predict(X_test_adv)
accuracy_orig = accuracy_score(y_test_array, y_pred_orig)
accuracy_adv = accuracy_score(y_test_array, y_pred_adv)

print("Accuracy on original test examples:", accuracy_orig)
print("Accuracy on adversarial test examples:", accuracy_adv)


In [None]:
import matplotlib.pyplot as plt

# Given ASR values for two sets
asr_values_set1 = [0.999,0.797, 0.616, 0.464, 0.335, 0.238, 0.162, 0.11, 0.07, 0.05, 0.03]
# asr_values_set2 = [0.999,0.995, 0.972, 0.856, 0.714,0.537, 0.454,  0.397, 0.384, 0.383, 0.382]
# asr_values_set3 = [0.999,0.995, 0.971, 0.859, 0.714, 0.537, 0.442, 0.397, 0.383, 0.383, 0.382]
# asr_values_set4 = [0.999,0.996, 0.987, 0.94, 0.884, 0.826, 0.755, 0.67, 0.634, 0.568, 0.497]


# Create a list of feature numbers starting from one
feature_numbers = [0,0.05,0.1,0.15,0.20,0.25,0.30,0.35,0.40,0.45,0.50]

# Plotting the graphs
plt.ylim(0, 1.1)
# plt.plot(feature_numbers, asr_values_set1, marker='o', linestyle='-', color='b', label='fgsm')
# plt.plot(feature_numbers, asr_values_set2, marker='s', linestyle='-', color='r', label='bim')
# plt.plot(feature_numbers, asr_values_set3, marker='^', linestyle='-', color='g', label='pgd')
plt.plot(feature_numbers, asr_values_set1, marker='^', linestyle='-', color='y', label='random_noise') # Different color for set3
# Adding legend
plt.legend()

plt.xlabel('epsilon')
plt.ylabel('Accuracy')
plt.title('Accuracy vs epsilon')
plt.grid(True)
plt.tight_layout()
plt.savefig('/content/model_WB_XGBoost.pdf', format='pdf')
plt.show()

# SHAP values

In [None]:
import shap
import matplotlib.pyplot as plt
import matplotlib
# Create an explainer object using TreeExplainer
explainer = shap.TreeExplainer(model)

# Calculate Shapley values for a set of samples (e.g., X_test)
shap_values = explainer.shap_values(X_test)

# Summary plot
shap.summary_plot(shap_values, X_test,feature_names=df.columns,show=False)

plt.savefig('/content/SHAP.jpg',dpi=300)

plt.show()

# GAN-GRID agaisnt XgBoost

In [39]:
def train_gan(generator, surrogate, label, train_loader, num_epochs=100, lr=0.001, device=torch.device('cpu'), ml=False, num_episodes=150):

    losses = []

    if not ml:
        generator = generator.to(device)
        surrogate = surrogate

        # for model in surrogate.models:
        #     model.to(device)
        #     model.train()

    # Define the loss function and optimizer
    cross_entropy_loss = nn.CrossEntropyLoss()
    generator_optimizer = torch.optim.Adam(generator.parameters(), lr=lr)

    # Define the reinforcement learning parameters
    max_episode_length = 10
    alpha = 0.1
    gamma = 0.9

    for episode in range(num_episodes):
        # Initialize the latent input and the episode reward
        latent_input = torch.randn(4, 16, 12).to(device)
        episode_reward = 0

        for step in range(max_episode_length):
            # Generate a sample with the current latent input
            fake_input = generator(latent_input)

            # Evaluate the sample with the surrogate model
            if not ml:
                surrogate_output = surrogate(fake_input)
            else:
                surrogate_output = []
                # Looping through each group of 16
                for group in fake_input:
                    # Flatten the group to make it compatible with RandomForestClassifier
                    flat_group = group.view(-1, group.size(-1)).detach().numpy()
                    # Get the probabilities from the RandomForestClassifier
                    probabilities = surrogate.predict_proba(flat_group)
                    mean_probabilities = np.mean(probabilities, axis=0)
                    # Append the probabilities to the array
                    surrogate_output.append(mean_probabilities)
                with torch.no_grad():
                    surrogate_output = torch.tensor(surrogate_output, requires_grad=True)

            predictions = surrogate_output.argmax(dim=1)
            targets = torch.randint_like(
                predictions, 0, surrogate_output.shape[1])
            reward = (predictions == targets).float().mean().item()
            episode_reward += reward

            # Update the latent input using reinforcement learning
            td_error = reward - episode_reward
            latent_input += alpha * td_error * \
                gamma**step * torch.randn_like(latent_input)

        # Update the generator using the final latent input of the episode
        generator_optimizer.zero_grad()
        fake_input = generator(latent_input)

        if not ml:
            surrogate_output = surrogate(fake_input)
        else:
            surrogate_output = []
            # Looping through each group of 16
            for group in fake_input:
                # Flatten the group to make it compatible with RandomForestClassifier
                flat_group = group.view(-1, group.size(-1)).detach().numpy()
                # Get the probabilities from the RandomForestClassifier
                probabilities = surrogate.predict_proba(flat_group)
                mean_probabilities = np.mean(probabilities, axis=0)
                # Append the probabilities to the array
                surrogate_output.append(mean_probabilities)
            with torch.no_grad():
                surrogate_output = torch.tensor(surrogate_output, requires_grad=True)


        target_labels = torch.full_like(surrogate_output.argmax(dim=1), label)

        generator_loss = cross_entropy_loss(surrogate_output, target_labels)

        if ml:
            generator_optimizer.zero_grad()

        generator_loss.backward()
        generator_optimizer.step()

        losses.append(generator_loss.item())

        if episode % 10 == 0:
            print(f'[⏭️ EP {episode}/{num_episodes} | D{label}] LOSS: {round(generator_loss.item(), 3)}')

    print()

    return generator, losses

In [None]:
lr = 3e-3

generators = []
losses = []

inputs, classes = next(iter(train_loader))

# For each driver
for d in range(1):
        print(f'[🤖 GENERATORS] Label {d}')

        batch_size, window_size, num_features = inputs.shape
        generator = Generator(batch_size, window_size, num_features)
        surrogate_model = model

        generator, loss = train_gan(generator, surrogate_model, train_loader=train_loader, num_epochs=20, lr=lr, label=0, ml=True, num_episodes=150)
        print()

        generators.append(generator)
        losses.append(loss)

In [None]:
results = []

for d in range(1):
    predicted_labels = []
    generator = generators[d].to(device)

    for batch in test_loaders[d]:
        input_batch = batch[0].to(device)

        # # Create a tensor of zeros with the same shape as `input_batch`
        # zeros_tensor = torch.zeros_like(input_batch).to(device)
        # ones_tensor = torch.ones_like(input_batch).to(device)

        # # Set the corresponding values to one in `zeros_tensor`
        # zeros_tensor[:, :, nonmodifiable_indices] = 1
        # ones_tensor[:, :, nonmodifiable_indices] = 0

        # Multiply the original tensor with the zeros tensor
        # result = input_batch

        generated_data = generator(torch.randn(4, 16, 12).to(device))

        # generated_data = generated_data.view(-1, generated_data.size(-1))

        # Add the result to the ones tensor
        final_result = generated_data

        # Initialize an array to store probabilities
        surrogate_outputs = []
        surrogate = model

        # Loop through each group of 16
        for group in final_result:
            # Flatten the group to make it compatible with RandomForestClassifier
            flat_group = group.view(-1, group.size(-1)).cpu().detach().numpy()

            # Get the probabilities from the RandomForestClassifier
            probabilities = surrogate.predict_proba(flat_group)

            mean_probabilities = np.mean(probabilities, axis=0)

            # Append the probabilities to the array
            surrogate_outputs.append(mean_probabilities)

        with torch.no_grad():
            surrogate_outputs = torch.tensor(surrogate_outputs, requires_grad=True)

        # Get the predicted class labels for each sample in the generated data
        predicted_labels_batch = torch.argmax(surrogate_outputs, dim=1)

        # Append the predicted labels to the lists
        predicted_labels.extend(predicted_labels_batch.tolist())

    asr = predicted_labels.count(0)/len(predicted_labels)
    results.append(asr)
    print(f'[👑 DRIVER {d}] ASR: {round(asr, 3)}')

print(f'\n[🏆 ASR] {round(np.mean(results), 3)}')

In [None]:
generated_data = generator(torch.randn(1358, 16, 12).to(device))
generated_data_numpy = generated_data.detach().cpu().numpy()
generated_data_reshaped = generated_data_numpy.reshape(-1, 12)
df_generated_data = pd.DataFrame(generated_data_reshaped, columns=new_df.columns)
df_generated_data


In [None]:
df_generated_data.describe()

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

# Function to calculate CDF
def calculate_cdf(data):
    sorted_data = np.sort(data)
    cdf = np.arange(1, len(sorted_data) + 1) / len(sorted_data)
    return sorted_data, cdf
columns_to_plot = ['tau1', 'tau2','tau3','tau4','g1','g2','g3','g4']
# Plot CDF for each feature
plt.figure(figsize=(5, 5))
for column in columns_to_plot:
    sorted_data, cdf = calculate_cdf(df_generated_data[column])
    plt.plot(sorted_data, cdf, label=column)

plt.xlabel('Value')
plt.ylabel('Cumulative Probability')
plt.title('CDF Plot')
plt.legend()
plt.grid(True)
plt.savefig('/content/cdf_ML.jpg',dpi=300)
plt.show()


# Other classifiers

In [None]:
from lightgbm import LGBMClassifier
from sklearn.metrics import accuracy_score, confusion_matrix, classification_report
from sklearn.model_selection import train_test_split
for d in range(1):
    X_train = []
    y_train = []
    for data in train_datasets[d]:
        X_train.append(data[0])
        for i in range(len(data[0])):
            y_train.append(data[1])

    # Convert lists to numpy arrays
    X_train = np.array(X_train)
    X_train = [tensor for tensor in X_train]
    X_train = np.concatenate(X_train, axis=0)
    X_train = pd.DataFrame(X_train, columns=df.columns.tolist())

    y_train = np.array(y_train)
    y_train = pd.DataFrame(y_train, columns=['stabf'])

    model = LGBMClassifier(objective='binary', random_state=42)

    # Training
    model.fit(X_train, y_train)

    X_test = []
    y_test = []
    for data in test_datasets[d]:
        X_test.append(data[0])
        for i in range(len(data[0])):
            y_test.append(data[1])

    # Convert lists to numpy arrays
    X_test = np.array(X_test)
    X_test = [tensor for tensor in X_test]
    X_test = np.concatenate(X_test, axis=0)
    X_test = pd.DataFrame(X_test, columns=df.columns.tolist())

    y_test = np.array(y_test)
    y_test = pd.DataFrame(y_test, columns=['stabf'])

    # Testing
    y_pred = model.predict(X_test)

    accuracy = accuracy_score(y_test, y_pred)
    f1 = f1_score(y_test, y_pred, average='weighted')

    print(f'[💪 TRAINING {d}] ACC: {accuracy:.3f}')
    print(f'[⚖️ F1 SCORE] {f1:.3f}')

In [None]:
from sklearn.tree import DecisionTreeClassifier
from sklearn.metrics import accuracy_score, confusion_matrix, classification_report
from sklearn.model_selection import train_test_split
for d in range(1):
    X_train = []
    y_train = []
    for data in train_datasets[d]:
        X_train.append(data[0])
        for i in range(len(data[0])):
            y_train.append(data[1])

    # Convert lists to numpy arrays
    X_train = np.array(X_train)
    X_train = [tensor for tensor in X_train]
    X_train = np.concatenate(X_train, axis=0)
    X_train = pd.DataFrame(X_train, columns=df.columns.tolist())

    y_train = np.array(y_train)
    y_train = pd.DataFrame(y_train, columns=['stabf'])

    model = DecisionTreeClassifier(random_state=42)

    # Training
    model.fit(X_train, y_train)

    X_test = []
    y_test = []
    for data in test_datasets[d]:
        X_test.append(data[0])
        for i in range(len(data[0])):
            y_test.append(data[1])

    # Convert lists to numpy arrays
    X_test = np.array(X_test)
    X_test = [tensor for tensor in X_test]
    X_test = np.concatenate(X_test, axis=0)
    X_test = pd.DataFrame(X_test, columns=df.columns.tolist())

    y_test = np.array(y_test)
    y_test = pd.DataFrame(y_test, columns=['stabf'])

    # Testing
    y_pred = model.predict(X_test)

    accuracy = accuracy_score(y_test, y_pred)
    f1 = f1_score(y_test, y_pred, average='weighted')

    print(f'[💪 TRAINING {d}] ACC: {accuracy:.3f}')
    print(f'[⚖️ F1 SCORE] {f1:.3f}')

In [None]:
from sklearn.ensemble import ExtraTreesClassifier
from sklearn.metrics import accuracy_score, confusion_matrix, classification_report
from sklearn.model_selection import train_test_split
for d in range(1):
    X_train = []
    y_train = []
    for data in train_datasets[d]:
        X_train.append(data[0])
        for i in range(len(data[0])):
            y_train.append(data[1])

    # Convert lists to numpy arrays
    X_train = np.array(X_train)
    X_train = [tensor for tensor in X_train]
    X_train = np.concatenate(X_train, axis=0)
    X_train = pd.DataFrame(X_train, columns=df.columns.tolist())

    y_train = np.array(y_train)
    y_train = pd.DataFrame(y_train, columns=['stabf'])

    model = ExtraTreesClassifier(n_estimators=100, random_state=42)

    # Training
    model.fit(X_train, y_train)

    X_test = []
    y_test = []
    for data in test_datasets[d]:
        X_test.append(data[0])
        for i in range(len(data[0])):
            y_test.append(data[1])

    # Convert lists to numpy arrays
    X_test = np.array(X_test)
    X_test = [tensor for tensor in X_test]
    X_test = np.concatenate(X_test, axis=0)
    X_test = pd.DataFrame(X_test, columns=df.columns.tolist())

    y_test = np.array(y_test)
    y_test = pd.DataFrame(y_test, columns=['stabf'])

    # Testing
    y_pred = model.predict(X_test)

    accuracy = accuracy_score(y_test, y_pred)
    f1 = f1_score(y_test, y_pred, average='weighted')

    print(f'[💪 TRAINING {d}] ACC: {accuracy:.3f}')
    print(f'[⚖️ F1 SCORE] {f1:.3f}')

In [None]:
from sklearn.neighbors import KNeighborsClassifier

from sklearn.metrics import accuracy_score, confusion_matrix, classification_report
from sklearn.model_selection import train_test_split
for d in range(1):
    X_train = []
    y_train = []
    for data in train_datasets[d]:
        X_train.append(data[0])
        for i in range(len(data[0])):
            y_train.append(data[1])

    # Convert lists to numpy arrays
    X_train = np.array(X_train)
    X_train = [tensor for tensor in X_train]
    X_train = np.concatenate(X_train, axis=0)
    X_train = pd.DataFrame(X_train, columns=df.columns.tolist())

    y_train = np.array(y_train)
    y_train = pd.DataFrame(y_train, columns=['stabf'])

    model = KNeighborsClassifier(n_neighbors=5)

    # Training
    model.fit(X_train, y_train)

    X_test = []
    y_test = []
    for data in test_datasets[d]:
        X_test.append(data[0])
        for i in range(len(data[0])):
            y_test.append(data[1])

    # Convert lists to numpy arrays
    X_test = np.array(X_test)
    X_test = [tensor for tensor in X_test]
    X_test = np.concatenate(X_test, axis=0)
    X_test = pd.DataFrame(X_test, columns=df.columns.tolist())

    y_test = np.array(y_test)
    y_test = pd.DataFrame(y_test, columns=['stabf'])

    # Testing
    y_pred = model.predict(X_test)

    accuracy = accuracy_score(y_test, y_pred)
    f1 = f1_score(y_test, y_pred, average='weighted')

    print(f'[💪 TRAINING {d}] ACC: {accuracy:.3f}')
    print(f'[⚖️ F1 SCORE] {f1:.3f}')

In [None]:
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.metrics import accuracy_score, confusion_matrix, classification_report
from sklearn.model_selection import train_test_split
for d in range(1):
    X_train = []
    y_train = []
    for data in train_datasets[d]:
        X_train.append(data[0])
        for i in range(len(data[0])):
            y_train.append(data[1])

    # Convert lists to numpy arrays
    X_train = np.array(X_train)
    X_train = [tensor for tensor in X_train]
    X_train = np.concatenate(X_train, axis=0)
    X_train = pd.DataFrame(X_train, columns=df.columns.tolist())

    y_train = np.array(y_train)
    y_train = pd.DataFrame(y_train, columns=['stabf'])

    model = GradientBoostingClassifier(n_estimators=100, random_state=42)

    # Training
    model.fit(X_train, y_train)

    X_test = []
    y_test = []
    for data in test_datasets[d]:
        X_test.append(data[0])
        for i in range(len(data[0])):
            y_test.append(data[1])

    # Convert lists to numpy arrays
    X_test = np.array(X_test)
    X_test = [tensor for tensor in X_test]
    X_test = np.concatenate(X_test, axis=0)
    X_test = pd.DataFrame(X_test, columns=df.columns.tolist())

    y_test = np.array(y_test)
    y_test = pd.DataFrame(y_test, columns=['stabf'])

    # Testing
    y_pred = model.predict(X_test)

    accuracy = accuracy_score(y_test, y_pred)
    f1 = f1_score(y_test, y_pred, average='weighted')

    print(f'[💪 TRAINING {d}] ACC: {accuracy:.3f}')
    print(f'[⚖️ F1 SCORE] {f1:.3f}')

In [None]:
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, confusion_matrix, classification_report
from sklearn.model_selection import train_test_split
for d in range(1):
    X_train = []
    y_train = []
    for data in train_datasets[d]:
        X_train.append(data[0])
        for i in range(len(data[0])):
            y_train.append(data[1])

    # Convert lists to numpy arrays
    X_train = np.array(X_train)
    X_train = [tensor for tensor in X_train]
    X_train = np.concatenate(X_train, axis=0)
    X_train = pd.DataFrame(X_train, columns=df.columns.tolist())

    y_train = np.array(y_train)
    y_train = pd.DataFrame(y_train, columns=['stabf'])

    model = RandomForestClassifier(n_estimators=100, random_state=42)

    # Training
    model.fit(X_train, y_train)

    X_test = []
    y_test = []
    for data in test_datasets[d]:
        X_test.append(data[0])
        for i in range(len(data[0])):
            y_test.append(data[1])

    # Convert lists to numpy arrays
    X_test = np.array(X_test)
    X_test = [tensor for tensor in X_test]
    X_test = np.concatenate(X_test, axis=0)
    X_test = pd.DataFrame(X_test, columns=df.columns.tolist())

    y_test = np.array(y_test)
    y_test = pd.DataFrame(y_test, columns=['stabf'])

    # Testing
    y_pred = model.predict(X_test)

    accuracy = accuracy_score(y_test, y_pred)
    f1 = f1_score(y_test, y_pred, average='weighted')

    print(f'[💪 TRAINING {d}] ACC: {accuracy:.3f}')
    print(f'[⚖️ F1 SCORE] {f1:.3f}')