In [1]:
import numpy as np
import rootutils

root = rootutils.setup_root(search_from=".", indicator=".git")

DATA_DIR_TRAIN = root / "data" / "DL" / "TRAIN"
DATA_DIR_TEST = root / "data" / "DL" / "TEST"

In [2]:
dl_data_path_train = DATA_DIR_TRAIN / "dl_data_train.npz"
dl_data_path_test = DATA_DIR_TEST / "dl_data_test.npz"
dl_data_train = np.load(dl_data_path_train, allow_pickle=True)
dl_data_test = np.load(dl_data_path_test, allow_pickle=True)

In [3]:
print(dl_data_train)
print("Keys:", dl_data_train.files)
for key in dl_data_train.files:
    print(f"{key}: shape={dl_data_train[key].shape}, dtype={dl_data_train[key].dtype}")

print(dl_data_test)
print("Keys:", dl_data_test.files)
for key in dl_data_test.files:
    print(f"{key}: shape={dl_data_test[key].shape}, dtype={dl_data_test[key].dtype}")

NpzFile '/home/arian/cdl1/CDL1-MChallenge/data/DL/TRAIN/dl_data_train.npz' with keys: X, y
Keys: ['X', 'y']
X: shape=(3699, 250, 20), dtype=float32
y: shape=(3699,), dtype=object
NpzFile '/home/arian/cdl1/CDL1-MChallenge/data/DL/TEST/dl_data_test.npz' with keys: X, y
Keys: ['X', 'y']
X: shape=(1253, 250, 20), dtype=float32
y: shape=(1253,), dtype=object


CNN
LSTM
MLP

In [4]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset, random_split
from sklearn.metrics import accuracy_score, precision_recall_fscore_support
import numpy as np

# Prepare data
X = dl_data_train['X']  # shape: (N, T, C) or (N, C, T)
y = dl_data_train['y']  # shape: (N,)

# If y is object dtype or string, encode to integer labels
if y.dtype.kind in {'U', 'S', 'O'} or not np.issubdtype(y.dtype, np.integer):
    from sklearn.preprocessing import LabelEncoder
    le = LabelEncoder()
    y = le.fit_transform(y)
    print("Label mapping:", dict(zip(le.classes_, le.transform(le.classes_))))
else:
    le = None  # No label encoding needed

if X.shape[1] < X.shape[2]:  # (N, T, C) -> (N, C, T)
    X = np.transpose(X, (0, 2, 1))

X = X.astype(np.float32)
y = y.astype(np.int64)
num_classes = len(np.unique(y))

# Convert to torch tensors
X_tensor = torch.from_numpy(X)
y_tensor = torch.from_numpy(y)

# Split into train, val, test (e.g., 80/10/10)
N = X.shape[0]
n_train = int(0.8 * N)
n_val = int(0.1 * N)
n_test = N - n_train - n_val
dataset = TensorDataset(X_tensor, y_tensor)
train_set, val_set, test_set = random_split(dataset, [n_train, n_val, n_test], generator=torch.Generator().manual_seed(42))

batch_size = 64
train_loader = DataLoader(train_set, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_set, batch_size=batch_size)
test_loader = DataLoader(test_set, batch_size=batch_size)

Label mapping: {'climbing': np.int64(0), 'joggen': np.int64(1), 'sitting': np.int64(2), 'walking': np.int64(3)}


In [6]:
class CNN(nn.Module):
    def __init__(self, in_channels, num_classes):
        super().__init__()
        self.conv = nn.Conv1d(in_channels, 2, kernel_size=1)
        self.pool = nn.AdaptiveAvgPool1d(1)
        self.fc = nn.Linear(2, num_classes)

    def forward(self, x):
        x = self.conv(x)
        x = self.pool(x).squeeze(-1)
        x = self.fc(x)
        return x

In [7]:
class CNNBaseline(nn.Module):
    def __init__(self, in_channels, num_classes):
        super().__init__()
        self.conv1 = nn.Conv1d(in_channels, 32, kernel_size=5, padding=2)
        self.relu1 = nn.ReLU()
        self.pool1 = nn.MaxPool1d(2)

        self.conv2 = nn.Conv1d(32, 64, kernel_size=5, padding=2)
        self.relu2 = nn.ReLU()
        self.pool2 = nn.AdaptiveAvgPool1d(1)

        self.fc = nn.Linear(64, num_classes)

    def forward(self, x):
        x = self.pool1(self.relu1(self.conv1(x)))
        x = self.pool2(self.relu2(self.conv2(x)))
        x = x.squeeze(-1)  # (B, 64)
        x = self.fc(x)
        return x

In [8]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")
model = CNNBaseline(in_channels=X.shape[1], num_classes=num_classes).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=1e-3)

Using device: cuda


In [9]:
import wandb
import os

# Ensure the log directory exists
os.makedirs("../data/log", exist_ok=True)

# Initialize wandb with custom log directory
wandb.init(
    project="cdl1",
    name="simple-cnn-baseline",
    config={
        "epochs": 20,
        "batch_size": batch_size,
        "optimizer": "Adam",
        "lr": 1e-3,
        "model": "SimpleCNN",
        "in_channels": X.shape[1],
        "num_classes": num_classes,
    },
    dir="../data/log"
)

[34m[1mwandb[0m: Currently logged in as: [33marian-iseni[0m ([33marianarian[0m) to [32mhttps://api.wandb.ai[0m. Use [1m`wandb login --relogin`[0m to force relogin


In [10]:
# Training loop with wandb logging
def train_model(model, train_loader, val_loader, epochs=20):
    for epoch in range(epochs):
        model.train()
        train_loss = 0
        for xb, yb in train_loader:
            xb, yb = xb.to(device), yb.to(device)
            optimizer.zero_grad()
            out = model(xb)
            loss = criterion(out, yb)
            loss.backward()
            optimizer.step()
            train_loss += loss.item() * xb.size(0)
        train_loss /= len(train_loader.dataset)

        # Validation
        model.eval()
        val_loss = 0
        all_preds, all_labels = [], []
        with torch.no_grad():
            for xb, yb in val_loader:
                xb, yb = xb.to(device), yb.to(device)
                out = model(xb)
                loss = criterion(out, yb)
                val_loss += loss.item() * xb.size(0)
                preds = out.argmax(dim=1).cpu().numpy()
                all_preds.append(preds)
                all_labels.append(yb.cpu().numpy())
        val_loss /= len(val_loader.dataset)
        val_acc = accuracy_score(np.concatenate(all_labels), np.concatenate(all_preds))
        print(f"Epoch {epoch+1}/{epochs} - Train loss: {train_loss:.4f} - Val loss: {val_loss:.4f} - Val acc: {val_acc:.4f}")

        # Log to wandb
        wandb.log({
            "epoch": epoch + 1,
            "train_loss": train_loss,
            "val_loss": val_loss,
            "val_acc": val_acc,
        })

In [11]:
train_model(model, train_loader, val_loader, epochs=10)

Epoch 1/10 - Train loss: 0.6262 - Val loss: 0.1857 - Val acc: 0.9079
Epoch 2/10 - Train loss: 0.2260 - Val loss: 0.1455 - Val acc: 0.9458
Epoch 3/10 - Train loss: 0.1715 - Val loss: 0.1209 - Val acc: 0.9539
Epoch 4/10 - Train loss: 0.1798 - Val loss: 0.2257 - Val acc: 0.9079
Epoch 5/10 - Train loss: 0.1913 - Val loss: 0.1085 - Val acc: 0.9539
Epoch 6/10 - Train loss: 0.1261 - Val loss: 0.1163 - Val acc: 0.9485
Epoch 7/10 - Train loss: 0.1287 - Val loss: 0.1318 - Val acc: 0.9458
Epoch 8/10 - Train loss: 0.1063 - Val loss: 0.1354 - Val acc: 0.9214
Epoch 9/10 - Train loss: 0.1032 - Val loss: 0.1254 - Val acc: 0.9702
Epoch 10/10 - Train loss: 0.0811 - Val loss: 0.1550 - Val acc: 0.9133


In [12]:
# Evaluation on test set
model.eval()
all_preds, all_labels = [], []
with torch.no_grad():
    for xb, yb in test_loader:
        xb, yb = xb.to(device), yb.to(device)
        out = model(xb)
        preds = out.argmax(dim=1).cpu().numpy()
        all_preds.append(preds)
        all_labels.append(yb.cpu().numpy())
y_true = np.concatenate(all_labels)
y_pred = np.concatenate(all_preds)

acc = accuracy_score(y_true, y_pred)
prec, rec, f1, _ = precision_recall_fscore_support(y_true, y_pred, average='weighted')
print(f"Test Accuracy: {acc:.4f}")
print(f"Test Precision: {prec:.4f}")
print(f"Test Recall: {rec:.4f}")
print(f"Test F1-score: {f1:.4f}")

# Log test metrics to wandb
wandb.log({
    "test_accuracy": acc,
    "test_precision": prec,
    "test_recall": rec,
    "test_f1": f1,
})

# If you want to map predictions back to string labels:
if le is not None:
    y_true_labels = le.inverse_transform(y_true)
    y_pred_labels = le.inverse_transform(y_pred)
    print("Example true labels:", y_true_labels[:10])
    print("Example predicted labels:", y_pred_labels[:10])

wandb.finish()

Test Accuracy: 0.8949
Test Precision: 0.9056
Test Recall: 0.8949
Test F1-score: 0.8886
Example true labels: ['sitting' 'walking' 'joggen' 'sitting' 'climbing' 'joggen' 'walking'
 'climbing' 'sitting' 'walking']
Example predicted labels: ['sitting' 'climbing' 'joggen' 'sitting' 'climbing' 'joggen' 'walking'
 'climbing' 'sitting' 'walking']


0,1
epoch,▁▂▃▃▄▅▆▆▇█
test_accuracy,▁
test_f1,▁
test_precision,▁
test_recall,▁
train_loss,█▃▂▂▂▂▂▁▁▁
val_acc,▁▅▆▁▆▆▅▃█▂
val_loss,▆▃▂█▁▁▂▃▂▄

0,1
epoch,10.0
test_accuracy,0.89488
test_f1,0.88859
test_precision,0.90563
test_recall,0.89488
train_loss,0.08112
val_acc,0.91328
val_loss,0.15498


In [None]:
# UMAP and pynndescent are currently incompatible with numpy>=2.0 due to use of np.infty.
# As a workaround, use PCA for visualization instead of UMAP.

import matplotlib.pyplot as plt
from sklearn.decomposition import PCA

# Helper to get features from the penultimate layer
def get_penultimate_features(model, xb):
    """
    Try to extract features from the penultimate layer of the model.
    This function handles common model architectures.
    """
    # If model has a feature extractor, use it
    if hasattr(model, 'features'):
        feats = model.features(xb)
    elif hasattr(model, 'extract_features'):
        feats = model.extract_features(xb)
    else:
        # Try to find the last nn.Linear layer before the classifier
        # This works for models with a 'classifier' or 'fc' attribute that is nn.Sequential or nn.ModuleList
        if hasattr(model, 'classifier') and isinstance(model.classifier, torch.nn.Sequential):
            modules = list(model.classifier.children())
            if len(modules) > 1:
                penultimate = torch.nn.Sequential(*modules[:-1])
                feats = penultimate(xb)
            else:
                feats = xb
        elif hasattr(model, 'fc') and isinstance(model.fc, torch.nn.Sequential):
            modules = list(model.fc.children())
            if len(modules) > 1:
                penultimate = torch.nn.Sequential(*modules[:-1])
                feats = penultimate(xb)
            else:
                feats = xb
        elif hasattr(model, 'fc') and isinstance(model.fc, torch.nn.Linear):
            # Instead of calling model.fc(xb), call the model up to the penultimate layer
            # Try to find the layer before 'fc'
            # This is a best-effort fallback: run the model up to the last layer
            # If model has a 'forward_features' method, use it
            if hasattr(model, 'forward_features'):
                feats = model.forward_features(xb)
            else:
                # As a last resort, remove the last layer from model._modules if possible
                # This is fragile and may not work for all models
                # So just use the input
                feats = xb
        else:
            feats = xb  # fallback to input
    return feats

# Get the features from the train set
model.eval()
train_features = []
train_labels = []
with torch.no_grad():
    for xb, yb in train_loader:
        xb = xb.to(device)
        feats = get_penultimate_features(model, xb)
        feats = feats.view(feats.size(0), -1).cpu().numpy()
        train_features.append(feats)
        train_labels.append(yb.cpu().numpy())
train_features = np.concatenate(train_features)
train_labels = np.concatenate(train_labels)

# PCA projection (2D)
pca = PCA(n_components=2, random_state=42)
embedding = pca.fit_transform(train_features)

# Plot
plt.figure(figsize=(8,6))
scatter = plt.scatter(embedding[:,0], embedding[:,1], c=train_labels, cmap='tab10', alpha=0.7)
plt.colorbar(scatter, label='Class')
plt.title('PCA projection of train set features')
plt.xlabel('PCA 1')
plt.ylabel('PCA 2')
plt.show()