In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load in

from sklearn.model_selection import train_test_split
from torch.utils.data import Dataset, DataLoader
from sklearn.metrics import confusion_matrix
from torch.optim import lr_scheduler
from tqdm import tqdm
import torch.nn as nn
!pip install seaborn --upgrade
import seaborn as sns
import time
import torch
import numpy as np  # linear algebra
import pandas as pd  # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the "../input/" directory.
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# Any results you write to the current directory are saved as output.

Lets load the CSV file into a Pandas dataframe.

In [None]:
# data = pd.read_csv('/kaggle/input/data-csv/data.csv', dtype=str)
# data = pd.read_csv('/Users/fernando/PyNetSim/tutorials/surrogate/data/data.csv', dtype=str)
data = pd.read_csv(
    '/Users/ffjla/PyNetSim/tutorials/surrogate/cluster_heads/data/data.csv', dtype=str)

Lets check the shape of the dataframe.


In [None]:
print(data.shape)

Lets get only the first 10% of the data to speed up the training process.

In [None]:
# print data set info
print(data.info())

In [None]:
# Dataframe to hold mean and std of each column
data_stats = pd.DataFrame(columns=['name', 'mean', 'std'])

In [None]:
# Calculate the mean and std of each column
name_values = data['name']
name_values = name_values.apply(lambda x: eval(x))
alpha_values = name_values.apply(lambda x: x[0])
beta_values = name_values.apply(lambda x: x[1])
gamma_values = name_values.apply(lambda x: x[2])
name_stats_dict = {
    'alpha_mean': alpha_values.mean(),
    'alpha_std': alpha_values.std(),
    'beta_mean': beta_values.mean(),
    'beta_std': beta_values.std(),
    'gamma_mean': gamma_values.mean(),
    'gamma_std': gamma_values.std()
}
# Lets add to the dataframe
data_stats = pd.concat([data_stats, pd.DataFrame(
    {'name': 'alpha', 'mean': alpha_values.mean(), 'std': alpha_values.std()}, index=[0])])
data_stats = pd.concat([data_stats, pd.DataFrame(
    {'name': 'beta', 'mean': beta_values.mean(), 'std': beta_values.std()}, index=[0])])
data_stats = pd.concat([data_stats, pd.DataFrame(
    {'name': 'gamma', 'mean': gamma_values.mean(), 'std': gamma_values.std()}, index=[0])])
# reset index
data_stats = data_stats.reset_index(drop=True)
print(data_stats)

In [None]:
def compute_stats(name, data):
    data = data.apply(lambda x: eval(x))
    data_mean = data.mean()
    data_std = data.std()
    data_stats_dict = {
        'name': name,
        'mean': data_mean,
        'std': data_std
    }
    return data_stats_dict

In [None]:
def compute_array_stats(name, data):
    data = data.apply(lambda x: eval(x))
    data_mean = data.apply(lambda x: np.mean(x)).mean()
    data_std = data.apply(lambda x: np.std(x)).mean()
    data_stats_dict = {
        'name': name,
        'mean': data_mean,
        'std': data_std
    }
    return data_stats_dict

In [None]:
# Get the remaining_energy column
remaining_energy_stats_dict = compute_stats('re',
                                            data['remaining_energy'])
data_stats = pd.concat([data_stats, pd.DataFrame(
    remaining_energy_stats_dict, index=[0])]).reset_index(drop=True)
chs_dict = compute_array_stats('chs', data['cluster_heads'])
data_stats = pd.concat([data_stats, pd.DataFrame(
    chs_dict, index=[0])]).reset_index(drop=True)
el_dict = compute_array_stats('el', data['energy_levels'])
data_stats = pd.concat([data_stats, pd.DataFrame(
    el_dict, index=[0])]).reset_index(drop=True)
energy_dissipated_ch_to_sink_dict = compute_array_stats(
    'energy_dissipated_ch_to_sink', data['energy_dissipated_ch_to_sink'])
data_stats = pd.concat([data_stats, pd.DataFrame(
    energy_dissipated_ch_to_sink_dict, index=[0])]).reset_index(drop=True)
energy_dissipated_non_ch_to_ch_dict = compute_array_stats(
    'energy_dissipated_non_ch_to_ch', data['energy_dissipated_non_ch_to_ch'])
data_stats = pd.concat([data_stats, pd.DataFrame(
    energy_dissipated_non_ch_to_ch_dict, index=[0])]).reset_index(drop=True)
energy_dissipated_ch_rx_from_non_ch_dict = compute_array_stats(
    'energy_dissipated_ch_rx_from_non_ch', data['energy_dissipated_ch_rx_from_non_ch'])
data_stats = pd.concat([data_stats, pd.DataFrame(
    energy_dissipated_ch_rx_from_non_ch_dict, index=[0])]).reset_index(drop=True)
print(data_stats.to_string())

Proportion of the data that will be used for training and testing.

In [None]:
# Standardize data using F1 score
def standardize_inputs(x, mean, std):
    standardized_x = (x - mean) / std
    return standardized_x

In [None]:
def split_sequence(sequence, n_steps):
    x_data = []
    y_data = []
    num_samples = len(sequence)

    for i in tqdm(range(num_samples), desc="Processing sequence"):
        end_ix = i + n_steps
        if end_ix > num_samples - 1:
            break
        # Get the alpha, beta, gamma
        name = sequence['name'][i]
        name = eval(name)
        alpha = name[0]
        beta = name[1]
        gamma = name[2]

        # Last alpha, beta, gamma
        last_name = sequence['name'][end_ix]
        last_name = eval(last_name)
        last_alpha = last_name[0]
        last_beta = last_name[1]
        last_gamma = last_name[2]

        if alpha != last_alpha or beta != last_beta or gamma != last_gamma:
            continue

        alpha = standardize_inputs(alpha, data_stats.loc[data_stats['name'] == 'alpha']['mean'].values[0],
                                   data_stats.loc[data_stats['name'] == 'alpha']['std'].values[0])
        beta = standardize_inputs(beta, data_stats.loc[data_stats['name'] == 'beta']['mean'].values[0],
                                  data_stats.loc[data_stats['name'] == 'beta']['std'].values[0])
        gamma = standardize_inputs(gamma, data_stats.loc[data_stats['name'] == 'gamma']['mean'].values[0],
                                   data_stats.loc[data_stats['name'] == 'gamma']['std'].values[0])
        # print(f"alpha: {alpha}, beta: {beta}, gamma: {gamma}")
        if n_steps <= 1:
            remaining_energy = sequence['remaining_energy'][i]
            remaining_energy = eval(remaining_energy)
            remaining_energy = [float(remaining_energy)]
            pchs = sequence['potential_cluster_heads'][i]
            pchs = eval(pchs)
            pchs = [[int(x) for x in pchs]]
            energy_levels = sequence['energy_levels'][i]
            energy_levels = eval(energy_levels)
            energy_levels = [[float(x) for x in energy_levels]]
            energy_dissipated_ch_to_sink = sequence['energy_dissipated_ch_to_sink'][i]
            energy_dissipated_ch_to_sink = eval(energy_dissipated_ch_to_sink)
            energy_dissipated_ch_to_sink = [
                [float(x) for x in energy_dissipated_ch_to_sink]]
            energy_dissipated_non_ch_to_ch = sequence['energy_dissipated_non_ch_to_ch'][i]
            energy_dissipated_non_ch_to_ch = eval(
                energy_dissipated_non_ch_to_ch)
            energy_dissipated_non_ch_to_ch = [
                [float(x) for x in energy_dissipated_non_ch_to_ch]]
            energy_dissipated_ch_rx_from_non_ch = sequence['energy_dissipated_ch_rx_from_non_ch'][i]
            energy_dissipated_ch_rx_from_non_ch = eval(
                energy_dissipated_ch_rx_from_non_ch)
            energy_dissipated_ch_rx_from_non_ch = [
                [float(x) for x in energy_dissipated_ch_rx_from_non_ch]]
            num_cluster_heads = sequence['alive_nodes'][i]
            num_cluster_heads = eval(num_cluster_heads)
            # lets round up to the nearest integer
            num_cluster_heads = [(int(num_cluster_heads*0.05)+1)/5]
            cluster_heads = sequence['cluster_heads'][i]
            cluster_heads = eval(cluster_heads)
            cluster_heads = [int(x) for x in cluster_heads]
        else:
            raise f"n_steps: {n_steps} not supported"
        # print(f"remaining_energy: {remaining_energy}")
        # print(f"pchs: {pchs}")
        # print(f"energy_levels: {energy_levels}")
        # print(
        #     f"energy_dissipated_ch_to_sink: {energy_dissipated_ch_to_sink}")
        # print(
        #     f"energy_dissipated_non_ch_to_ch: {energy_dissipated_non_ch_to_ch}")
        # print(
        #     f"energy_dissipated_ch_rx_from_non_ch: {energy_dissipated_ch_rx_from_non_ch}")
        # print(f"num_cluster_heads: {num_cluster_heads}")
        # print(f"cluster_heads: {cluster_heads}")
        # Standardize remaining energy
        remaining_energy = standardize_inputs(
            remaining_energy, data_stats.loc[data_stats['name']
                                             == 're']['mean'].values[0],
            data_stats.loc[data_stats['name'] == 're']['std'].values[0])

        seq_y = sequence['cluster_heads_index'][end_ix-1]
        seq_y = eval(seq_y)
        seq_y = [int(x) for x in seq_y]

        energy_levels = standardize_inputs(
            energy_levels, data_stats.loc[data_stats['name']
                                          == 'el']['mean'].values[0],
            data_stats.loc[data_stats['name'] == 'el']['std'].values[0])

        energy_dissipated_ch_to_sink = standardize_inputs(
            energy_dissipated_ch_to_sink, data_stats.loc[data_stats['name']
                                                         == 'energy_dissipated_ch_to_sink']['mean'].values[0],
            data_stats.loc[data_stats['name']
                           == 'energy_dissipated_ch_to_sink']['std'].values[0])

        energy_dissipated_non_ch_to_ch = standardize_inputs(
            energy_dissipated_non_ch_to_ch, data_stats.loc[data_stats['name']
                                                           == 'energy_dissipated_non_ch_to_ch']['mean'].values[0],
            data_stats.loc[data_stats['name']
                           == 'energy_dissipated_non_ch_to_ch']['std'].values[0])

        energy_dissipated_ch_rx_from_non_ch = standardize_inputs(
            energy_dissipated_ch_rx_from_non_ch, data_stats.loc[data_stats['name']
                                                                == 'energy_dissipated_ch_rx_from_non_ch']['mean'].values[0],
            data_stats.loc[data_stats['name']
                           == 'energy_dissipated_ch_rx_from_non_ch']['std'].values[0])

        cluster_heads = standardize_inputs(
            cluster_heads, data_stats.loc[data_stats['name']
                                          == 'chs']['mean'].values[0],
            data_stats.loc[data_stats['name'] == 'chs']['std'].values[0])
        # print(f"cluster_heads: {cluster_heads}")
        seq_x = []
        if n_steps <= 1:
            seq_x.extend([alpha, beta, gamma])
            seq_x.extend(remaining_energy)
            seq_x.extend(pchs[0])
            seq_x.extend(energy_levels[0])
            seq_x.extend(energy_dissipated_ch_to_sink[0])
            # seq_x.extend(energy_dissipated_non_ch_to_ch[0])
            seq_x.extend(energy_dissipated_ch_rx_from_non_ch[0])
            seq_x.extend(num_cluster_heads)
        else:
            raise f"n_steps: {n_steps} not supported"
        # print(f"seq_x: {seq_x}")
        # print(f"len(seq_x): {len(seq_x)}")
        # print(f"seq_y: {seq_y}")
        # return

        x_data.append(seq_x)
        y_data.append(seq_y)

    return np.array(x_data), np.array(y_data)


n_steps = 1
x_data, y_data = split_sequence(data, n_steps)

In [None]:
# print the array where the first three rows are 0.8485311004611152, 0.6836234887293634, -0.4393387688162076
needed_data = np.where(x_data[:, 0] == 0.8485311004611152)
# print the entire row
print(list(x_data[needed_data]))

In [None]:
np_x = np.array(x_data)
print(f"np_x.shape: {np_x.shape}")
np_y = np.array(y_data)
print(np_y.shape)
# np_y = np.argmax(np_y, axis=2)
# print(np_y.shape)
# print(np_y[0])

In [None]:
# Split the data into train and test
x_train, x_test, y_train, y_test = train_test_split(
    np_x, np_y, test_size=0.1, random_state=42, shuffle=True)

Create the dataset class.

In [None]:
class ClusterHeadDataset(Dataset):
    def __init__(self, x, y):
        self.X = torch.from_numpy(x.astype(np.float32))
        self.y = torch.from_numpy(y.astype(np.float32))
        self.len = x.shape[0]

    def __len__(self):
        return self.len

    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]

    # Support batching
    def collate_fn(self, batch):
        X = torch.stack([x[0] for x in batch])
        y = torch.stack([x[1] for x in batch])
        return X, y

Create the network architecture.

In [None]:
class ForecastCCH(nn.Module):
    global non_cyclical_features_size, cyclical_features_size

    def __init__(self, input_size=10, h1=100, h2=100, output_size=101):
        super(ForecastCCH, self).__init__()
        self.batch_norm1 = nn.BatchNorm1d(input_size)
        self.fc1 = nn.Linear(input_size, h1)
        self.batch_norm2 = nn.BatchNorm1d(h1)
        self.relu1 = nn.ReLU()
        self.dropout1 = nn.Dropout(0.3)

        self.fc2 = nn.Linear(h1, h2)
        self.batch_norm3 = nn.BatchNorm1d(h2)
        self.relu2 = nn.ReLU()
        self.dropout2 = nn.Dropout(0.4)

        self.fc3 = nn.Linear(h2, output_size)

        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        # print(f"x shape: {x.shape}")
        out = self.batch_norm1(x)
        out = self.fc1(x)
        out = self.batch_norm2(out)
        out = self.relu1(out)
        out = self.dropout1(out)
        # print(f"out shape1: {out.shape}")

        out = self.fc2(out)
        out = self.batch_norm3(out)
        out = self.relu2(out)
        out = self.dropout2(out)
        # print(f"out shape2: {out.shape}")

        out = self.fc3(out)
        out = self.sigmoid(out)
        # print(f"out shape3: {out.shape}")

        return out

In [None]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
model = ForecastCCH(
    input_size=401, h1=2000, h2=2000, output_size=101).to(device)
# # If there is a model saved, load it
if os.path.isfile('ch_model.pt'):
    model.load_state_dict(torch.load('ch_model.pt'))
    print("Model loaded")
else:
    print("No model found")

In [None]:

optimizer = torch.optim.Adam(model.parameters(), lr=5e-6, weight_decay=1e-5)
criterion = nn.BCELoss()
rl_scheduler = lr_scheduler.ExponentialLR(optimizer, gamma=0.99)

Create the dataset objects.

In [None]:
train = ClusterHeadDataset(x_train, y_train)
valid = ClusterHeadDataset(x_test, y_test)
train_loader = DataLoader(train, batch_size=16, shuffle=True)
valid_loader = DataLoader(valid, batch_size=16, shuffle=True)

In [None]:
def get_accuracy(y_true, y_prob, print_info=False):
    assert y_true.ndim == 1 and y_true.size() == y_prob.size()
    y_prob = y_prob > 0.5
    if print_info:
        print(f"y_true: {np.where(y_true == 1)}")
        print(f"y_prob: {np.where(y_prob == 1)}")
        # delay 1 second
        time.sleep(1)
    return (y_true == y_prob).sum().item() / y_true.size(0)

In [None]:
def get_accuracy_top_five(y_true, y_prob, print_info=False):
    assert y_true.ndim == 1 and y_true.size() == y_prob.size()
    y_prob = y_prob.cpu().numpy()
    y_prob = y_prob.argsort()[::-1][:5]
    # sort
    y_prob.sort()
    if print_info:
        y_true = y_true.cpu().numpy()
        y_true = np.where(y_true == 1)[0]
        # Check how many common elements are in both arrays
        correct = np.intersect1d(y_true, y_prob)
        # sum the number of correct elements
        correct = correct.size
        total = y_true.size
        if correct < 5:
            print(f"y_true: {y_true}")
            print(f"y_prob: {y_prob}")
            # delay 1 second
            # time.sleep(1)
        # delay 1 second
        # time.sleep(1)
    return correct / total

In [None]:
def test_predicted(print_info=False):
    model.eval()
    avg_accuracy = []
    losses = []
    with torch.no_grad():
        for idx, (inputs, labels) in enumerate(valid_loader):
            # print(f"inputs: {inputs}, shape: {inputs.shape}")
            # print(f"labels: {labels}, shape: {labels.shape}")
            inputs = inputs.to(device)
            labels = labels.to(device)
            # print(f"labels: {labels}, shape: {labels.shape}")
            optimizer.zero_grad()
            preds = model(inputs.float())
            # print(f"preds: {preds}, shape: {preds.shape}")
            loss = criterion(preds, labels)
            losses.append(loss.item())
            temp_accuracy = []
            # Loop over both the predictions and the labels
            for pred, label in zip(preds, labels):
                # print(f"pred: {pred}, shape: {pred.shape}")
                # print(f"label: {label}, shape: {label.shape}")
                accuracy = get_accuracy(label, pred, print_info)
                temp_accuracy.append(accuracy*100)
            avg_accuracy.append(np.mean(temp_accuracy))
    print(
        f"Average loss: {np.mean(losses)}")
    print(
        f"Average accuracy: {np.mean(avg_accuracy)}%")
    print(
        f"Min Accuracy: {np.min(avg_accuracy)}%")
    # number of samples with the lowest accuracy
    min_accuracy = np.min(avg_accuracy)
    min_accuracy_count = np.count_nonzero(avg_accuracy == min_accuracy)
    print(
        f"Min Accuracy count: {min_accuracy_count}")
    print(
        f"Max Accuracy: {np.max(avg_accuracy)}%")
    # number of samples with the highest accuracy
    max_accuracy = np.max(avg_accuracy)
    max_accuracy_count = np.count_nonzero(avg_accuracy == max_accuracy)
    print(
        f"Max Accuracy count: {max_accuracy_count}")

In [None]:
def test(epoch, epochs, best_loss):
    running_loss = .0

    model.eval()

    with torch.no_grad():
        for idx, (inputs, labels) in enumerate(valid_loader):
            inputs = inputs.to(device)
            labels = labels.to(device)
            # print(f"test inputs shape: {inputs.shape}")
            # print(f"test labels shape: {labels.shape}")
            optimizer.zero_grad()
            preds = model(inputs.float())
            preds = preds.squeeze()
            # print(f"test preds shape: {preds.shape}")
            loss = criterion(preds, labels)
            running_loss += loss

        valid_loss = running_loss/len(valid_loader)
        # print(f'valid_loss {valid_loss}')

        if valid_loss < best_loss:
            print(
                f"Epoch [{epoch}/{epochs}] Validation Loss Improved: {best_loss:.4f} -> {valid_loss:.4f}")
            best_loss = valid_loss
            torch.save(model.state_dict(), 'ch_model.pt')

    return best_loss

In [None]:


def train():
    train_losses = []
    best_loss = float('inf')
    epochs = 1000
    for epoch in range(epochs):
        print(
            f'epochs {epoch}/{epochs}, LR: {optimizer.param_groups[0]["lr"]}')
        model.train()
        training_loss = .0
        # Wrap the data loader with tqdm to add a progress bar
        for idx, (inputs, labels) in enumerate(tqdm(train_loader, desc="Training")):
            # print(f"inputs shape: {inputs.shape}")
            # print(f"labels shape: {labels.shape}")
            inputs = inputs.to(device)
            labels = labels.to(device)
            optimizer.zero_grad()
            preds = model(inputs.float())
            # preds = preds.squeeze()
            # print(f"predes shape: {preds.shape}")
            loss = criterion(preds, labels)
            loss.backward()
            optimizer.step()
            training_loss += loss

        train_loss = training_loss / len(train_loader)
        train_losses.append(train_loss.detach().numpy())
        print(f'train_loss {train_loss}')
        rl_scheduler.step()

        if epoch % 5 == 0:
            best_loss = test(epoch, epochs, best_loss)

        if epoch % 5 == 0:
            test_predicted()

In [None]:

train()

In [None]:
valid_loader = DataLoader(valid, batch_size=1, shuffle=True)
test_predicted(False)

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns
def custom_annot(value):
    return f'{value}' if value != 0 else ''
# Another approach to do the confusion matrix
y_true = []
y_pred = []
with torch.no_grad():
    for idx, (inputs, labels) in enumerate(valid_loader):
        inputs = inputs.to(device)
        labels = labels.to(device)
        labels = labels.cpu().numpy()
        labels = np.where(labels == 1)[1]
        # lables to int
        labels = labels.astype(int)
        preds = model(inputs.float())
        preds = preds.squeeze()
        preds = preds > 0.5
        preds = preds.cpu().numpy()
        predicted = np.where(preds == 1)[0]
        if len(predicted) > len(labels):
            indices = preds.argsort()[::-1]
            predicted_index = indices[:len(labels)]
            predicted_index.sort()
            labels_not_in_pred = np.setdiff1d(labels, predicted_index)
            pred_not_in_labels = np.setdiff1d(predicted_index, labels)
            labels_not_in_pred_pos = np.where(
                np.isin(labels, labels_not_in_pred))[0]
            pred_not_in_labels_pos = np.where(
                np.isin(predicted_index, pred_not_in_labels))[0]
            labels[labels_not_in_pred_pos] = predicted_index[pred_not_in_labels_pos]
            predicted_index = labels
            predicted = predicted_index
        elif len(predicted) < len(labels):
            predicted = np.append(
                predicted, np.zeros(len(labels)-len(predicted)))
            predicted_index = predicted
            predicted_index.sort()
            labels_not_in_pred = np.setdiff1d(labels, predicted_index)
            pred_not_in_labels = np.setdiff1d(predicted_index, labels)
            labels_not_in_pred_pos = np.where(
                np.isin(labels, labels_not_in_pred))[0]
            pred_not_in_labels_pos = np.where(
                np.isin(predicted_index, pred_not_in_labels))[0]
            labels[labels_not_in_pred_pos] = predicted_index[pred_not_in_labels_pos]
            predicted_index = labels
            predicted = predicted_index
        if len(labels) < 5:
            labels = np.append(labels, np.zeros(5-len(labels)))
            predicted = np.append(predicted, np.zeros(5-len(predicted)))
            labels.sort()
            predicted.sort()
        # predicted as int
        predicted = predicted.astype(int)
        y_true.append(labels)
        y_pred.append(predicted)
y_true = np.array(y_true)
y_pred = np.array(y_pred)
print(y_true.shape)
print(y_pred.shape)
# lest print the first 10 elements
print(y_true[:10])
print(y_pred[:10])
#  Find unique classes in the dataset
unique_classes = np.unique(np.concatenate((y_true, y_pred)))
# convert to int
unique_classes = unique_classes.astype(int)
print(unique_classes)
# Initialize an overall confusion matrix
overall_cm = np.zeros(
    (len(unique_classes), len(unique_classes)), dtype=np.int64)
print(overall_cm)
# Loop through each pair of true and predicted values
for true_label, pred_label in zip(y_true, y_pred):
    cm = confusion_matrix(true_label, pred_label, labels=unique_classes)
    overall_cm += cm
# Calculate row and column percentages
row_percentages = overall_cm / overall_cm.sum(axis=1, keepdims=True)
col_percentages = overall_cm / overall_cm.sum(axis=0, keepdims=True)

# Plot the confusion matrix using seaborn
fig, ax = plt.subplots(figsize=(40, 40))

# Use a custom annotation function
annot_values = np.vectorize(custom_annot)(overall_cm)
# Mask non-diagonal elements
mask = np.eye(len(unique_classes), dtype=bool)


# Plot the heatmap
sns.heatmap(overall_cm, annot=False, fmt='', cmap='Blues',
            xticklabels=unique_classes, yticklabels=unique_classes, ax=ax,
            annot_kws={"size": 27}, mask=~mask,
            cbar=False)

# Add row percentages on the right side
for i, row in enumerate(row_percentages):
    ax.text(len(unique_classes) + 1.5, i + 0.5,
            f'{row.max() * 100:.1f}%', ha='center', va='center', fontsize=27)

# Add vertical column percentages at the bottom
for i, col in enumerate(col_percentages.T):
    ax.text(i + 0.5, -2, f'{col.max() * 100:.1f}%',
            ha='center', va='center', rotation=90, fontsize=27)

# set ticks of the y axis to be horizontal
ax.set_yticklabels(ax.get_yticklabels(), rotation=0)

# Increase the gap between the ticks and the annotations

# Adjust the layout to increase the gap
# plt.subplots_adjust(right=1, bottom=0.05)

# plt.title('Confusion Matrix with Row and Column Percentages')
plt.xlabel('Predicted')
plt.ylabel('True')
# set the font size of the x and y labels
ax.xaxis.label.set_size(40)
ax.yaxis.label.set_size(40)
# Change the orientation of the x ticks
plt.xticks(rotation=90)
# Set the font size of the tick labels
ax.tick_params(labelsize=27)
# Set the font size of the heatmap annotations
# tight
plt.tight_layout()
# save the confusion matrix
plt.savefig('confusion_matrix.pdf')
plt.close()

In [None]:
# Load a sample given alpha, beta, gamma values
alpha_values = 54.82876630831832
beta_values = 14.53707859358856
gamma_values = 35.31010127750784
# filter out the data with the given alpha, beta, gamma values
filtered_data = data[(data['name'].str.contains(
    str(alpha_values))) & (data['name'].str.contains(str(beta_values))) & (data['name'].str.contains(str(gamma_values)))]
filtered_data = filtered_data.copy().reset_index(drop=True)
x_data, y_data = split_sequence(filtered_data, 1)
print(f"x_data shape: {x_data.shape}")
print(f"y_data shape: {y_data.shape}")



In [None]:
data_to_predict = ClusterHeadDataset(x_data, y_data)
valid_loader = DataLoader(data_to_predict, batch_size=1, shuffle=False)
test_predicted(True)