In [1]:
##### This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

/kaggle/input/matrix-lsa/LSA64_matrix.csv
/kaggle/input/matrix-lsamediapipe/padded_matrix_file (1).csv


In [2]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
import numpy as np
import csv
import ast
import torch.nn.functional as F
import matplotlib.pyplot as plt
import plotly.graph_objs as go
from plotly.subplots import make_subplots
from sklearn.manifold import TSNE
from mpl_toolkits.mplot3d import Axes3D
from sklearn.preprocessing import LabelEncoder

In [3]:
# Your code for loading the data (unchanged)
input_file = "/kaggle/input/matrix-lsamediapipe/padded_matrix_file (1).csv"

values = []
matrix_labels = []
num_rows = 0

with open(input_file, "r") as f_input:
    reader = csv.reader(f_input)
    for row in reader:
        row_values = []
        for i in range(len(row) - 1):
            column_value = ast.literal_eval(row[i])
            row_values.append(column_value)
        values.append(torch.tensor(row_values))
        matrix_labels.append(ast.literal_eval(row[-1]))
        num_rows += 1

# matrix_labels = [label - 1 for label in matrix_labels]

In [4]:
def sample_triplets(data, labels):
    anchor_indices = torch.arange(len(data))
    positive_indices = torch.empty(len(data), dtype=torch.long)
    negative_indices = torch.empty(len(data), dtype=torch.long)

    for i, label in enumerate(labels):
        # Sample positive instances
        positive_candidates = (labels == label).nonzero(as_tuple=False).view(-1)
        positive_candidates = positive_candidates[positive_candidates != i]
        
        if len(positive_candidates) > 0:
            positive_indices[i] = positive_candidates[torch.randint(0, len(positive_candidates), (1,))]
        else:
            positive_indices[i] = i  # Set the positive index as itself if there are no other instances with the same label
        # Sample negative instances
        negative_candidates = (labels != label).nonzero(as_tuple=False).view(-1)
        negative_indices[i] = negative_candidates[torch.randint(0, len(negative_candidates), (1,))]
        
    anchor = data[anchor_indices]
    positive = data[positive_indices]
    negative = data[negative_indices]
    return anchor, positive, negative

In [5]:
# Triplet loss function
class TripletLoss(nn.Module):
    def __init__(self, margin=1.0):
        super(TripletLoss, self).__init__()
        self.margin = margin

    def forward(self, anchor, positive, negative):
        pos_distance = F.pairwise_distance(anchor,positive)
        neg_distance = F.pairwise_distance(anchor, negative)
        loss = F.relu(pos_distance - neg_distance + self.margin)
        return loss.mean()

In [6]:
# Define the dataset class
class SkeletonDataset(Dataset):
    def __init__(self, data, labels):
        self.data = data
        self.labels = labels

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        return self.data[idx], self.labels[idx]

In [7]:
class TransformerEncoder(nn.Module):
    def __init__(self, n_features, d_model=64, nhead=16, num_layers=2):
        super(TransformerEncoder, self).__init__()
        self.embedding = nn.Linear(n_features, d_model)
        self.positional_encoding = self.generate_positional_encoding(d_model)
        self.transformer_encoder = nn.TransformerEncoder(
            nn.TransformerEncoderLayer(d_model, nhead), num_layers
        )

    def generate_positional_encoding(self, d_model, max_len=243):
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(
            torch.arange(0, d_model, 2).float() * (-torch.log(torch.tensor(10000.0)) / d_model)
        )
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        return pe.unsqueeze(0)

    def forward(self, x):
        x = self.embedding(x)
        x = x + self.positional_encoding[:, : x.size(1)]
        x = self.transformer_encoder(x)
        x = x.mean(dim=1)
        return x

In [None]:
# Split the dataset into train, validation, and test sets
train_data, test_data, train_labels, test_labels = train_test_split(
    torch.stack(values), torch.tensor(matrix_labels), test_size=0.4, random_state=42, stratify=torch.tensor(matrix_labels)
)

validation_data, test_data, validation_labels, test_labels = train_test_split(
    test_data, test_labels, test_size=0.5, random_state=42, stratify=test_labels
)

# Create train, validation, and test datasets
train_dataset = SkeletonDataset(train_data, train_labels)
validation_dataset = SkeletonDataset(validation_data, validation_labels)
test_dataset = SkeletonDataset(test_data, test_labels)

# Create data loaders for train, validation, and test sets
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
validation_loader = DataLoader(validation_dataset, batch_size=32, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

model = TransformerEncoder(114)
criterion = TripletLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

train_losses = []
validation_losses = []

num_epochs = 100
for epoch in range(num_epochs):
    model.train()
    epoch_train_loss = 0
    for batch_idx, (data, labels) in enumerate(train_loader):
        optimizer.zero_grad()
        anchor, positive, negative = sample_triplets(data, labels)
        anchor_embeddings = model(anchor)
        positive_embeddings = model(positive)
        negative_embeddings = model(negative)
        loss = criterion(anchor_embeddings, positive_embeddings, negative_embeddings)
        loss.backward()
        optimizer.step()
        epoch_train_loss += loss.item()

    epoch_train_loss /= len(train_loader)
    train_losses.append(epoch_train_loss)

    # Calculate validation loss
    model.eval()
    epoch_validation_loss = 0
    with torch.no_grad():
        for batch_idx, (data, labels) in enumerate(validation_loader):
            anchor, positive, negative = sample_triplets(data, labels)
            anchor_embeddings = model(anchor)
            positive_embeddings = model(positive)
            negative_embeddings = model(negative)
            loss = criterion(anchor_embeddings, positive_embeddings, negative_embeddings)
            epoch_validation_loss += loss.item()

    epoch_validation_loss /= len(validation_loader)
    validation_losses.append(epoch_validation_loss)

    print(f"Epoch {epoch + 1}/{num_epochs}, Train Loss: {epoch_train_loss:.4f}, Validation Loss: {epoch_validation_loss:.4f}")

    # Save the model
    torch.save(model.state_dict(), f"model_epoch_{epoch + 1}.pt")

# Plot training and validation losses
plt.plot(train_losses, label="Training Loss")
plt.plot(validation_losses, label="Validation Loss")
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.legend()
plt.show()

Epoch 1/100, Train Loss: 0.2887, Validation Loss: 0.2088
Epoch 2/100, Train Loss: 0.2000, Validation Loss: 0.1632
Epoch 3/100, Train Loss: 0.2056, Validation Loss: 0.1527
Epoch 4/100, Train Loss: 0.1825, Validation Loss: 0.1505
Epoch 5/100, Train Loss: 0.1714, Validation Loss: 0.1309
Epoch 6/100, Train Loss: 0.1732, Validation Loss: 0.1117
Epoch 7/100, Train Loss: 0.1321, Validation Loss: 0.1118
Epoch 8/100, Train Loss: 0.1476, Validation Loss: 0.1401
Epoch 9/100, Train Loss: 0.1310, Validation Loss: 0.1321


In [None]:
# Extract embeddings for the test set
model.eval()
test_embeddings = []
test_labels_list = []

with torch.no_grad():
    for data, labels in train_loader:
        embeddings = model(data)
        test_embeddings.append(embeddings)
        test_labels_list.append(labels)

test_embeddings = torch.cat(test_embeddings).cpu().numpy()
test_labels_list = torch.cat(test_labels_list).cpu().numpy()

# Reduce dimensionality of the embeddings using t-SNE
tsne = TSNE(n_components=3, random_state=42)
test_embeddings_3d = tsne.fit_transform(test_embeddings)

# Create a label encoder to convert class labels to integers
le = LabelEncoder()
test_labels_int = le.fit_transform(test_labels_list)

# Plot the 3D scatter plot with multiple subplots using plotly
n_classes = 64
n_subplots = 10
n_classes_per_subplot = n_classes // n_subplots

subplot_titles = [
    f"Embeddings for classes {i * n_classes_per_subplot}-{(i + 1) * n_classes_per_subplot - 1}"
    for i in range(n_subplots)
]
fig = make_subplots(rows=n_subplots // 2, cols=2, specs=[[{'type': 'scatter3d'}] * 2] * (n_subplots // 2), subplot_titles=subplot_titles)

for i in range(n_subplots):
    class_indices = np.arange(i * n_classes_per_subplot, (i + 1) * n_classes_per_subplot)

    for idx in class_indices:
        class_mask = test_labels_int == idx
        fig.add_trace(
            go.Scatter3d(
                x=test_embeddings_3d[class_mask, 0],
                y=test_embeddings_3d[class_mask, 1],
                z=test_embeddings_3d[class_mask, 2],
                mode="markers",
                name=str(le.inverse_transform([idx])[0]),
                marker=dict(size=3),
                showlegend=False,
            ),
            row=i // 2 + 1,
            col=i % 2 + 1,
        )

fig.update_layout(height=2000, width=1000, title_text="Embeddings Visualizations")
fig.show()