In [1]:
import os
import src.config as Config
from src.data_loader import DataLoader
from sklearn.decomposition import PCA
from sklearn.manifold import TSNE
import umap
import numpy as np
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import torch 
from torch import nn
from sklearn.utils import class_weight
from torch.utils.data import DataLoader as DataLoaderTorch
from torch.utils.data import TensorDataset
import torch.nn.functional as F

In [2]:
# seed 
seed = 42
# load data
data_loader = DataLoader(**Config.DATA_LOADER_CONFIG)
train_data, test_data = data_loader.get_train_data(), data_loader.get_test_data()

Loading norman data...
Dataset directory already exists: data/norman
Loading dataset: norman
Preprocessing norman data...
Splitting norman data into train and test sets...
Number of training samples: 44608
Number of test samples: 11152


In [3]:
# Adacos loss
class AdaCosLoss(nn.Module):
    def __init__(self, num_classes, emb_size, class_weights=None):
        super().__init__()
        self.num_classes = num_classes
        self.emb_size = emb_size
        self.w = nn.Parameter(
            data=torch.randn(size=(num_classes, emb_size)), requires_grad=True
        )
        # self.scale = torch.sqrt(torch.tensor(2.0)) * torch.log(torch.tensor(num_classes-1))
        self.class_weights = class_weights
        self.scale = torch.sqrt(torch.tensor(2.0)) * torch.log(
            torch.tensor(num_classes - 1)
        )

    def forward(self, embedding, y_true):

        # logits
        cosine_logits = self.logits(embedding)  # size (B, n_classes)

        # angle from cosine_logits
        angle = self.angle(cosine_logits)

        # onehot vector based on y_true
        onehot = self.onehot_true_label(y_true)  # size (B, n_classes)

        # new scale
        if self.training:
            with torch.no_grad():
                # B_avg
                batch_size = y_true.shape[0]
                B_avg = torch.where(
                    onehot < 1,
                    torch.exp(self.scale * cosine_logits),
                    torch.zeros_like(cosine_logits),
                )  # size (B, n_classes)
                B_avg = torch.sum(B_avg) / batch_size  # size (1,)

                # medium of the angles of true labels
                angle_median = torch.median(angle[onehot == 1])  # size (1,)

                # update scale
                self.scale = torch.log(B_avg) / torch.cos(
                    torch.min(
                        torch.pi / 4 * torch.ones_like(angle_median),
                        angle_median,
                    )
                )

                # if torch.min(
                #     torch.pi / 10 * torch.ones_like(angle_median), angle_median
                # ) == torch.pi / 10 * torch.ones_like(angle_median):
                #     print("Pi")

                # else:
                #     print("angle")

        # calculate new logits
        logits = self.scale * cosine_logits

        # apply cross entropy loss
        ce = nn.CrossEntropyLoss(weight=self.class_weights)
        loss = ce(logits, y_true)

        return loss

    def logits(self, embedding, y_true=None):
        # cos(phi) =  (x @ w.t) / (||w.t||.||x|| ) = normalize(x) @ normalize(w.t) / 1 beacause (||normalize(w.T)|| = ||normalize(x)|| )
        cosine_logits = F.linear(
            input=F.normalize(embedding), weight=F.normalize(self.w)
        )  # size (B, n_classes)
        return cosine_logits

    def angle(self, cosine_logits):
        # angle from given cosine logits
        eps = 1e-7
        angle = torch.acos(torch.clamp(cosine_logits, -1 + eps, 1 - eps))
        return angle

    def onehot_true_label(self, y_true):
        """
        y_true = [0,2,1]
        n_classes = 10
        onehot = [[1,0,0,0,0,0,0,0,0,0],
                  [0,0,1,0,0,0,0,0,0,0],
                  [0,1,0,0,0,0,0,0,0,0]]
        """
        batch_size = y_true.shape[0]
        onehot = torch.zeros(batch_size, self.num_classes)
        onehot.scatter_(1, y_true.unsqueeze(-1), 1)
        return onehot


In [4]:
# get X_train, X_test, y_train, y_test
X_train,y_train = train_data
print("X_train shape:", X_train.shape)
X_test, y_test = test_data
class_weights = class_weight.compute_class_weight(
    class_weight="balanced", classes=np.unique(y_train), y=y_train
)

X_train = torch.from_numpy(X_train).float()
X_test = torch.from_numpy(X_test).float()
y_train = torch.from_numpy(y_train).long()
y_test = torch.from_numpy(y_test).long()

class_weights = torch.from_numpy(class_weights).float()

X_train shape: (44608, 2056)


In [5]:
# turn data in Dataloader
batch_size = 1024
train_data = TensorDataset(X_train, y_train)
test_data = TensorDataset(X_test, y_test)
train_dataloader = DataLoaderTorch(dataset=train_data, batch_size=batch_size, shuffle=True)
test_dataloader = DataLoaderTorch(dataset=test_data, batch_size=batch_size, shuffle=True)

In [6]:
def accuracy_function(y_pred_label, y_true):
    check = torch.eq(y_pred_label, y_true)
    accuracy = sum(check).item() / len(y_true) * 100
    return accuracy


In [7]:
# creat a model
class MultiClassClassification(nn.Module):
    def __init__(self, input, hidden, output):
        super().__init__()
        self.layers = nn.Sequential(
            nn.Linear(in_features=input, out_features=hidden),
            nn.ReLU(),
            nn.Linear(in_features=hidden, out_features=hidden),
            nn.ReLU(),
            nn.Linear(in_features=hidden, out_features=hidden),
            nn.ReLU(),
            nn.Linear(in_features=hidden, out_features=hidden),
        )

    def forward(self, x):
        x = self.layers(x)
        return x

num_classes = 106
model = MultiClassClassification(input=2056, hidden=256, output=num_classes) 


# loss and optimizer:
loss = AdaCosLoss(num_classes=num_classes, emb_size=256)
lr = 0.01
optimizer = torch.optim.Adam(params=model.parameters(), lr=lr)

In [8]:
# training testing process:
epochs = 30
for epoch in range(epochs):
    loss_train = 0
    accuracy_train = 0
    print("epoch:", epoch)

    for batch_train, (X_train, y_train) in enumerate(train_dataloader):

        # training mode:
        model.train()
        loss.train()

        # forward pass:
        embedding_train = model(X_train)
        y_pred_train_logit = loss.logits(embedding=embedding_train, y_true=y_train)
        y_pred_train_probability = torch.softmax(y_pred_train_logit, dim=1)
        y_pred_train_label = y_pred_train_probability.argmax(dim=1)

        # calculate the loss and accuracy
        loss_train_this_batch = loss(embedding_train, y_train)
        loss_train = loss_train + loss_train_this_batch
        accuracy_train_this_batch = accuracy_function(y_pred_train_label, y_train)
        accuracy_train = accuracy_train + accuracy_train_this_batch

        # gradient zero grad
        optimizer.zero_grad()

        # backpropagation
        loss_train_this_batch.backward()

        # updates parameters
        optimizer.step()

        # # Check gradients
        # for name, param in model.named_parameters():
        #     if param.grad is not None:
        #         print(f"Gradient for {name}: {param.grad.norm()}")

    loss_train = loss_train.item() / len(train_dataloader)
    accuracy_train = accuracy_train / len(train_dataloader)

    loss_test = 0
    accuracy_test = 0
    model.eval()
    loss.eval()
    with torch.inference_mode():

        for batch_test, (X_test, y_test) in enumerate(test_dataloader):

            # forward pass:
            embedding_test = model(X_test)
            y_pred_train_logit = loss.logits(embedding=embedding_test, y_true=y_test)
            y_pred_test_probability = torch.softmax(y_pred_train_logit, dim=1)
            y_pred_test_label = y_pred_test_probability.argmax(dim=1)

            # calculate the loss and accuracy:
            loss_test_this_batch = loss(embedding_test, y_test)
            loss_test = loss_test + loss_test_this_batch
            accuracy_test_this_batch = accuracy_function(y_pred_test_label, y_test)
            accuracy_test = accuracy_test + accuracy_test_this_batch

        loss_test = loss_test / len(test_dataloader)
        accuracy_test = accuracy_test / len(test_dataloader)

    print("loss train = {}, accuracy train = {}".format(loss_train, accuracy_train))
    print("loss test = {}, accuracy test = {}".format(loss_test, accuracy_test))
    print()

epoch: 0
loss train = 4.356998443603516, accuracy train = 12.765842013888888
loss test = 4.204836368560791, accuracy test = 13.673899770733652

epoch: 1
loss train = 4.031681060791016, accuracy train = 15.272599037247476
loss test = 3.8974432945251465, accuracy test = 16.626482755183414

epoch: 2
loss train = 3.639152180064808, accuracy train = 21.44787720959596
loss test = 3.4915807247161865, accuracy test = 24.974145235247207

epoch: 3
loss train = 3.1534094376997515, accuracy train = 32.213492345328284
loss test = 3.141291856765747, accuracy test = 33.36946770334928

epoch: 4
loss train = 2.728024222634055, accuracy train = 41.97813091856061
loss test = 2.9041264057159424, accuracy test = 38.46472537878788

epoch: 5
loss train = 2.337852131236683, accuracy train = 50.841915246212125
loss test = 2.8053510189056396, accuracy test = 41.80185905103668

epoch: 6
loss train = 2.0129226337779653, accuracy train = 58.687953756313135
loss test = 2.780196189880371, accuracy test = 43.36295728

In [9]:
# seed 
seed = 42
# load data
data_loader = DataLoader(**Config.DATA_LOADER_CONFIG)
train_data, test_data = data_loader.get_train_data(), data_loader.get_test_data()
# get X_train, X_test, y_train, y_test
X_train,y_train = train_data
print("X_train shape:", X_train.shape)
X_test, y_test = test_data

X_train = torch.from_numpy(X_train).float()
X_test = torch.from_numpy(X_test).float()
y_train = torch.from_numpy(y_train).long()
y_test = torch.from_numpy(y_test).long()


Loading norman data...
Dataset directory already exists: data/norman
Loading dataset: norman
Preprocessing norman data...
Splitting norman data into train and test sets...
Number of training samples: 44608
Number of test samples: 11152
X_train shape: (44608, 2056)


In [10]:
umap_reducer = umap.UMAP(n_components=3)
with torch.no_grad():
    embedding_train = model(X_train)
    y_pred_train_logit = loss.logits(embedding=embedding_train, y_true=y_train)
    y_pred_train_probability = torch.softmax(y_pred_train_logit, dim=1)
    y_pred_train_label = y_pred_train_probability.argmax(dim=1)
    
embedding_train_umap = umap_reducer.fit_transform(embedding_train)

In [11]:
def l2_normalization( embedding):
    """
    normalize the embedding (array) to have len 1 each row
    """
    # Compute L2 norm for each row: l2 = sqrt (row)
    norms = np.linalg.norm(embedding, axis=1, keepdims=True)

    # Avoid division by zero (add a small epsilon)
    epsilon = 1e-10
    norms = np.maximum(norms, epsilon)

    # Normalize each row to have L2 norm = 1
    normalized_array = embedding / norms

    return normalized_array

In [18]:
embedding_train_umap = l2_normalization(embedding_train_umap)

In [19]:
# visualize umap 3d
def visualize(embedding,y_true, y_pred):
    # print(y_true.shape)#
    # print(y_pred.shape)
    fig =  go.Figure()
    scatter = go.Scatter3d(
        x = embedding[:,0],
        y = embedding[:,1],
        z =  embedding[:,2],
        mode = "markers",
        marker = dict(size = 3,
                    color= y_pred,
                    colorscale="Viridis",
                    colorbar = dict(title= "Labels")),
        name = "Label_true",
        customdata =np.stack([y_true, y_pred], axis=-1),
        hovertemplate=(
                    "X: %{x:.2f}<br>"
                    "Y: %{y:.2f}<br>"
                    "Z: %{z:.2f}<br>"
                    "True Label: %{customdata[0]}<br>"  # Display y_true
                    "Pred Label: %{customdata[1]}<br>"
                ),

    )
    fig.add_trace(scatter)
    fig.update_layout(
        title="Visualize raw data with UMAP",
        template="plotly",)
    fig.show()
 


In [20]:
visualize(embedding=embedding_train_umap,y_true=y_train,y_pred=y_pred_train_label)

In [30]:
from sklearn.metrics import accuracy_score

with torch.no_grad():
    embedding_test = model(X_test)
    y_pred_test_logits = loss.logits(embedding=embedding_test, y_true=y_test)
    y_pred_test_probability = torch.softmax(y_pred_test_logits, dim=1)
    y_pred_test_label = y_pred_test_probability.argmax(dim=1)

total_acc = accuracy_score(y_true=y_test,y_pred=y_pred_test_label)
print("total_acc:", total_acc)

umap_reducer = umap.UMAP(n_components=3)
embedding_test_umap = umap_reducer.fit_transform(np.array(embedding_test))
embedding_test_umap = l2_normalization(embedding_test_umap)

total_acc: 0.43642395982783355


In [31]:

y_pred_test_label =np.array(y_pred_test_label) 
y_test = np.array(y_test)
for i in range (106):
    index = y_test == i
    y_pred = y_pred_test_label[index]
    y_true = y_test[index]
    acc = accuracy_score(y_true=y_true,y_pred=y_pred)
    print("label {}, acc {}".format(i,acc))

label 0, acc 0.6770833333333334
label 1, acc 0.0
label 2, acc 0.09876543209876543
label 3, acc 0.3770491803278688
label 4, acc 0.38461538461538464
label 5, acc 0.03225806451612903
label 6, acc 0.1978021978021978
label 7, acc 0.6075949367088608
label 8, acc 0.42
label 9, acc 0.0
label 10, acc 0.16981132075471697
label 11, acc 0.2962962962962963
label 12, acc 0.5818181818181818
label 13, acc 0.24096385542168675
label 14, acc 0.30303030303030304
label 15, acc 0.8017241379310345
label 16, acc 0.6190476190476191
label 17, acc 0.8125
label 18, acc 0.21794871794871795
label 19, acc 0.0
label 20, acc 0.05263157894736842
label 21, acc 0.7155963302752294
label 22, acc 0.6141732283464567
label 23, acc 0.4666666666666667
label 24, acc 0.7450980392156863
label 25, acc 0.7473684210526316
label 26, acc 0.07042253521126761
label 27, acc 0.6115702479338843
label 28, acc 0.8484848484848485
label 29, acc 0.0
label 30, acc 0.445859872611465
label 31, acc 0.558252427184466
label 32, acc 0.8153846153846154


In [32]:
visualize(embedding=embedding_test_umap,y_true=y_test,y_pred=y_pred_test_label)