In [5]:
import numpy as np
import pandas as pd
import torch
from torch.utils.data import DataLoader
from torch.utils.data import Dataset
from torch.utils.data import random_split
import pandas as pd
import matplotlib.pyplot as plt

In [None]:
data_train = np.load('mmhpe_train_data.npy')
labels_train = np.load('mmhpe_train_labels.npy')
data_test = np.load('mmhpe_test_data.npy')

# Print its structure
print(f"Train Data type: {type(data_train)}")
print(f"Train Data shape: {data_train.shape}")

print(f"Test Data type: {type(data_test)}")
print(f"Test Data shape: {data_test.shape}")

print(f"Train  labels type: {type(labels_train)}")
print(f"Train labels shape: {labels_train.shape}")

import matplotlib.pyplot as plt

# Assuming 'data_train' is loaded and has shape (:, 60, 32, 32)
# We want to visualize 2D images, typically a slice of (32, 32).
# Let's plot a few frames from the first sample (index 0).

num_frames_to_plot = 6

plt.figure(figsize=(15, 3))
for i in range(num_frames_to_plot):
    # Select the first sample, and the i-th frame
    image_data = data_train[0, i, :, :]

    plt.subplot(1, num_frames_to_plot, i + 1)
    plt.imshow(image_data, cmap='gray') # Assuming grayscale images; adjust cmap if color
    plt.title(f'Frame {i+1}')
    plt.axis('on')

    # Set x and y axis ticks from 0 to 31 with a step of 5
    tick_positions = np.arange(0, 32, 5) # Generates [0, 5, 10, 15, 20, 25, 30]
    plt.xticks(tick_positions)
    plt.yticks(tick_positions)

plt.tight_layout()
plt.show()

# Let's visualize a few frames from the `data_test` array to understand its structure.

# Assuming 'data_test' is loaded and has shape (:, 60, 32, 32)
# We want to visualize 2D images, typically a slice of (32, 32).
# Let's plot a few frames from the first sample (index 0).

num_frames_to_plot = 6

plt.figure(figsize=(15, 3))
for i in range(num_frames_to_plot):
    # Select the first sample, and the i-th frame
    image_data = data_test[0, i, :, :]

    plt.subplot(1, num_frames_to_plot, i + 1)
    plt.imshow(image_data, cmap='gray') # Assuming grayscale images; adjust cmap if color
    plt.title(f'Test Frame {i+1}')
    plt.axis('on')

    # Set x and y axis ticks from 0 to 31 with a step of 5
    tick_positions = np.arange(0, 32, 5) # Generates [0, 5, 10, 15, 20, 25, 30]
    plt.xticks(tick_positions)
    plt.yticks(tick_positions)

plt.tight_layout()
plt.show()

unique_labels, counts = np.unique(labels_train, return_counts=True)
print("\nUnique labels and their counts:")
for label, count in zip(unique_labels, counts):
    print(f"Label {label}: {count} occurrences")

out_csv = "submission_team_Loss_But_Vibing.csv"
seed = 234
id_prefix = "test_"

# --- Generate random labels ---
# Get the number of samples from the already loaded data_test
n = data_test.shape[0]

# Generate 'n' random labels between 1 and 5 (inclusive)
rng = np.random.default_rng(seed)
pred = rng.integers(1, 6, size=n) # Generate integers from 1 up to (but not including) 6

ids = [f"{id_prefix}{i:04d}" for i in range(n)]

df = pd.DataFrame({"Id": ids, "Label": pred})
df.to_csv(out_csv, index=False)

print(f"Wrote {out_csv} with {n} random labels.")
print(df.head())

# Model Design and Temporal Aggregation Strategy
# 
# We design a 2D single-channel convolutional neural network (CNN) that classifies
# each 32×32 frame independently into one of five classes.
#
# For each input sample consisting of 60 frames, the CNN produces a set of
# pre-softmax scores (logits) for every frame. These logits are then aggregated
# across the temporal dimension to produce a single prediction for the full
# 60-frame sequence.
#
# As a baseline, we average the logits across all 60 frames and apply a softmax
# to obtain the final class prediction. We also explore alternative fusion
# strategies, such as weighted averaging and max pooling over time.
#
# We experiment with different CNN architectures, learning rates, and
# learning-rate schedules. Batch normalization and dropout are evaluated to
# improve generalization.

import torch.nn as nn

class FrameCNN(nn.Module):
    """
    CNN classifier for a SINGLE 32×32 grayscale frame.
    Adapted from the demo09b_cnn_classifier example.
    """

    def __init__(self, num_classes=5):
        super().__init__()

        self.conv_block = nn.Sequential(
            nn.Conv2d(1, 16, kernel_size=3, padding=1),
            nn.BatchNorm2d(16),
            nn.ReLU(),
            nn.MaxPool2d(2),          # 16 × 16 × 16

            nn.Conv2d(16, 16, kernel_size=3, padding=1),
            nn.BatchNorm2d(16),
            nn.ReLU(),
            nn.MaxPool2d(2)           # 16 × 8 × 8
        )

        self.fc_block = nn.Sequential(
            nn.Flatten(),
            nn.Linear(16 * 8 * 8, 512),
            nn.BatchNorm1d(512),
            nn.ReLU(),
            nn.Dropout(p=0.5),
            nn.Linear(512, num_classes)
        )

    def forward(self, x):
        x = self.conv_block(x)
        x = self.fc_block(x)
        return x   

class SequenceCNN(nn.Module):
    """
    Applies FrameCNN independently to each frame,
    then aggregates logits across time.
    """

    def __init__(self, frame_cnn, aggregation="mean"):
        super().__init__()
        self.frame_cnn = frame_cnn
        self.aggregation = aggregation

    def forward(self, x):
        """
        x shape: (B, 60, 32, 32)
        """
        B, T, H, W = x.shape

        # Treat each frame as an independent image
        x = x.view(B * T, 1, H, W)

        logits = self.frame_cnn(x)       # (B*T, 5)
        logits = logits.view(B, T, -1)   # (B, 60, 5)

        # Temporal fusion (pre-softmax)
        if self.aggregation == "mean":
            logits = logits.mean(dim=1)
        elif self.aggregation == "max":
            logits = logits.max(dim=1).values
        else:
            raise ValueError("Unknown aggregation method")

        return logits


# Enable GPU support from the demo09b_cnn_classifier example
if torch.cuda.is_available():
    # first try cuda
    device = torch.device("cuda")
elif torch.backends.mps.is_available() and torch.backends.mps.is_built():
    # if that fails, try mps (apple silicon)
    device = torch.device("mps")
else:
    # if that fails, use cpu
    device = torch.device("cpu")

device
frame_cnn = FrameCNN().to(device)
model = SequenceCNN(frame_cnn, aggregation="mean").to(device)

criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=5e-4)

# NumPy array from the DataSet
class MHPESequenceDataset(Dataset):
    def __init__(self, data, labels=None):
        self.data = torch.tensor(data, dtype=torch.float32)
        if labels is not None:
            self.labels = torch.tensor(labels - 1, dtype=torch.long)
        else:
            self.labels = None

    def __len__(self):
        return self.data.shape[0]

    def __getitem__(self, idx):
        if self.labels is None:
            return self.data[idx]
        return self.data[idx], self.labels[idx]

import torch.nn as nn
num_classes = 5
class BasicFrameNet(nn.Module):
    def __init__(self):
        super().__init__()
        self.relu = nn.ReLU()
        self.conv1 = nn.Conv2d(1, 16, 3, padding=1)
        self.pool = nn.MaxPool2d(2, 2)
        self.conv2 = nn.Conv2d(16, 16, 3, padding=1)
        self.fc1 = nn.Linear(16 * 8 * 8, 512)
        self.fc2 = nn.Linear(512, num_classes)

    def forward(self, x):
        # x: (B, 1, 32, 32)
        x = self.pool(self.relu(self.conv1(x)))
        x = self.pool(self.relu(self.conv2(x)))
        x = x.view(x.size(0), -1)
        x = self.relu(self.fc1(x))
        x = self.fc2(x)   # logits
        return x

# DAta Loader

full_dataset = MHPESequenceDataset(data_train, labels_train)

train_size = int(0.8 * len(full_dataset))
val_size = len(full_dataset) - train_size

train_ds, val_ds = random_split(full_dataset, [train_size, val_size])

train_dl = DataLoader(train_ds, batch_size=16, shuffle=True)
test_dl  = DataLoader(val_ds, batch_size=16, shuffle=False)

frame_net = BasicFrameNet()
model = SequenceCNN(frame_net).to(device)

import torch.optim as optim

epochs = 20
lrate = 0.0005

criterion = nn.CrossEntropyLoss()
opt = optim.Adam(model.parameters(), lr=lrate)

scheduler = torch.optim.lr_scheduler.OneCycleLR(
    opt,
    max_lr=10 * lrate,
    steps_per_epoch=len(train_dl),
    epochs=epochs
)

print('No batch normalization nor dropout nor data augmentation.')

basic_tr_accuracy = []
basic_ts_accuracy = []

for epoch in range(epochs):
    # TRAIN
    correct = 0
    total = 0
    model.train()

    for train_iter, data in enumerate(train_dl):
        x_batch, y_batch = data[0].to(device), data[1].to(device)

        out = model(x_batch)
        loss = criterion(out, y_batch)

        opt.zero_grad()
        loss.backward()
        opt.step()

        _, predicted = torch.max(out.data, 1)
        total += y_batch.size(0)
        correct += (predicted == y_batch).sum().item()
        scheduler.step()
        
    basic_tr_accuracy.append(100 * correct / total)

    # Test
    correct = 0
    total = 0
    model.eval()

    with torch.no_grad():
        for images, labels in test_dl:
            images = images.to(device)
            labels = labels.to(device)
            
            outputs = model(images)
            _, predicted = torch.max(outputs.data, 1)

            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    basic_ts_accuracy.append(100 * correct / total)

    print(
        'Epoch: {0:2d}   Train Accuracy: {1:.3f}%   Test Accuracy: {2:.3f}%'
        .format(epoch + 1,
                basic_tr_accuracy[epoch],
                basic_ts_accuracy[epoch], epochs=epochs,anneal_strategy='linear')
    )

print('Done!')

model = BasicFrameNet()
model.eval()

class FrameCNN_BN(nn.Module):
    def __init__(self):
        super(FrameCNN_BN,self).__init__()

        self.conv1 = nn.Conv2d(1, 16, 3, padding=1)
        self.bn1   = nn.BatchNorm2d(16)

        self.conv2 = nn.Conv2d(16, 32, 3, padding=1)
        self.bn2   = nn.BatchNorm2d(32)

        self.pool = nn.MaxPool2d(2, 2)
        self.relu = nn.ReLU()

        self.fc1 = nn.Linear(32 * 8 * 8, 256)
        self.fc2 = nn.Linear(256, num_classes)

    def forward(self, x):
        # x: (B, 1, 32, 32)
        x = self.pool(self.relu(self.bn1(self.conv1(x))))
        x = self.pool(self.relu(self.bn2(self.conv2(x))))

        x = x.view(x.size(0), -1)
        x = self.relu(self.fc1(x))
        x = self.fc2(x)   # logits

        return x

class SequenceCNN(nn.Module):
    """
    Sequence-level CNN wrapper
    - Applies a frame-level CNN to each frame independently
    - Aggregates logits across time (mean by default)

    Input:  (B, T, H, W)  e.g. (B, 60, 32, 32)
    Output: (B, num_classes)
    """
    def __init__(self, frame_cnn, aggregation="mean"):
        super().__init__()
        self.frame_cnn = frame_cnn
        self.aggregation = aggregation

    def forward(self, x):
        # x: (B, T, H, W)
        B, T, H, W = x.shape
        # Treat each frame as a grayscale image
        x = x.view(B * T, 1, H, W)   # (B*T, 1, 32, 32)
        # Frame-level logits
        logits = self.frame_cnn(x)  # (B*T, num_classes)
        # Restore time dimension
        logits = logits.view(B, T, -1)  # (B, T, num_classes)
        
        # Aggregate across time
        if self.aggregation == "mean":
            logits = logits.mean(dim=1)
        elif self.aggregation == "max":
            logits = logits.max(dim=1).values
        else:
            raise ValueError("Unsupported aggregation type")
            
        return logits

frame_cnn = FrameCNN_BN().to(device)
model = SequenceCNN(frame_cnn, aggregation="mean").to(device)
# initiate loss function
criterion = nn.CrossEntropyLoss()

# initiate Adam optimizer
opt = optim.Adam(model.parameters(), lr=lrate)

# create learning rate scheduler 
scheduler = torch.optim.lr_scheduler.OneCycleLR(opt, max_lr=10*lrate, steps_per_epoch=len(train_dl), epochs=epochs,anneal_strategy='linear')
print(str(model))

print('Using batch normalization but no dropout or data augmentation.')

batch_tr_accuracy = []
batch_ts_accuracy = []

for epoch in range(epochs):
    # TRAIN
    correct = 0
    total = 0
    model.train()

    for train_iter, data in enumerate(train_dl):
        x_batch, y_batch = data[0].to(device), data[1].to(device)

        out = model(x_batch)
        loss = criterion(out, y_batch)

        opt.zero_grad()
        loss.backward()
        opt.step()

        _, predicted = torch.max(out.data, 1)
        total += y_batch.size(0)
        correct += (predicted == y_batch).sum().item()
        
    scheduler.step()
    batch_tr_accuracy.append(100 * correct / total)

    # Test
    correct = 0
    total = 0
    model.eval()
    with torch.no_grad():
        for data in test_dl:
            images, labels = data[0].to(device),data[1].to(device)
            outputs = model(images)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    
    batch_ts_accuracy.append( 100*correct/total )
    
    # Print details every print_mod epoch
    print('Epoch: {0:2d}   Train Accuracy: {1:.3f}%   Test Accuracy: {2:.3f}%'.format(epoch+1, batch_tr_accuracy[epoch], batch_ts_accuracy[epoch]))
        

print('Done!')

class FrameCNN_BN_Dropout(nn.Module):
    def __init__(self):
        super(FrameCNN_BN_Dropout,self).__init__()
        self.relu = nn.ReLU()
        self.conv1 = nn.Conv2d(1, 16, 3, padding=1)
        self.pool = nn.MaxPool2d(2, 2)
        self.bn1   = nn.BatchNorm2d(16)
        self.conv2 = nn.Conv2d(16, 32, 3, padding=1)
        self.bn2   = nn.BatchNorm2d(32)
        self.dropout1 = nn.Dropout(p=0.5)
        self.fc1 = nn.Linear(32 * 8 * 8, 512)
        self.bn3 = nn.BatchNorm1d(512)
        self.dropout2 = nn.Dropout(p=0.5)
        self.fc2 = nn.Linear(512, num_classes)

    def forward(self,x):
        x = self.bn1(self.pool(self.relu(self.conv1(x))))
        x = self.pool(self.relu(self.conv2(x)))
        x = x.view(x.size(0), -1)
        x = self.relu(self.bn3(self.fc1(x)))    # BN1d AFTER fc
        x = self.dropout1(x)

        x = self.fc2(x)
        return x

frame_cnn_dropout = FrameCNN_BN_Dropout().to(device)
model = SequenceCNN(frame_cnn_dropout, aggregation="mean").to(device)

# initiate loss function
criterion = nn.CrossEntropyLoss()

# initiate Adam optimizer
opt = optim.Adam(model.parameters(), lr=lrate)

# create learning rate scheduler 
scheduler = torch.optim.lr_scheduler.OneCycleLR(opt, max_lr=10*lrate, steps_per_epoch=len(train_dl), epochs=epochs,anneal_strategy='linear')
print(str(model))

print('Using batch normalization and dropout but no data augmentation.')
# Fit the model

drop_tr_accuracy = []
drop_ts_accuracy = []

for epoch in range(epochs):
    
    correct = 0 # initialize error counter
    total = 0 # initialize total counter
    model.train() # put model in training mode
    # iterate over training set
    for train_iter, data in enumerate(train_dl):
        x_batch,y_batch = data[0].to(device),data[1].to(device)
        out = model(x_batch)
        # Compute Loss
        loss = criterion(out,y_batch)
        # Zero gradients
        opt.zero_grad()
        # Compute gradients using back propagation
        loss.backward()
        # Take an optimization 'step'
        opt.step()
        
        # Compute Accuracy
        _, predicted = torch.max(out.data, 1)
        total += y_batch.size(0)
        correct += (predicted == y_batch).sum().item()
        
    # Take scheduler step
    scheduler.step()
    drop_tr_accuracy.append( 100*correct/total )
    
    correct = 0
    total = 0
    model.eval() # put model in evaluation mode
    with torch.no_grad():
        for data in test_dl:
            images, labels = data[0].to(device),data[1].to(device)
            outputs = model(images)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    
    drop_ts_accuracy.append( 100*correct/total )
    
    # Print details every print_mod epoch
    print('Epoch: {0:2d}   Train Accuracy: {1:.3f}%   Test Accuracy: {2:.3f}%'.format(epoch+1, drop_tr_accuracy[epoch], drop_ts_accuracy[epoch]))
        

print('Done!')

# submission via kaggle

model.eval()

all_preds = []

with torch.no_grad():
    for x in test_dl:      # test_dl must return ONLY data (no labels)
        x = data[0].to(device)
        outputs = model(x)
        preds = torch.argmax(outputs, dim=1)
        all_preds.extend(preds.cpu().numpy())

# Convert back to labels 1–5 if needed
all_preds = np.array(all_preds) + 1

print("Number of predictions:", len(all_preds))


submission = pd.DataFrame({
    "Id": np.arange(len(all_preds)),
    "Label": all_preds
})

submission.to_csv("model_neuralnet.csv", index=False)
print("Saved model_neuralnet.csv")

test_dataset = MHPESequenceDataset(data_test, labels=None)
test_dl_kaggle = DataLoader( test_dataset, batch_size=16, shuffle=False )
model.eval()
all_preds = []

with torch.no_grad():
    for x in test_dl_kaggle:   # x is ONLY data
        x = x.to(device)
        outputs = model(x)
        preds = torch.argmax(outputs, dim=1)
        all_preds.extend(preds.cpu().numpy())

# Convert back to labels 1–5
all_preds = np.array(all_preds) + 1

print("Number of predictions:", len(all_preds))

ids = [f"test_{i:04d}" for i in range(len(all_preds))]

submission = pd.DataFrame({
    "Id": ids,
    "Label": all_preds
})

submission.to_csv("submission.csv", index=False)
print("Saved submission.csv")

df = pd.read_csv("submission.csv")
print(df.head())
print(df.tail())
print(df.shape)
print(df["Label"].min(), df["Label"].max())

# Team name: Loss But Vibing
# Teammate: Micheal Pansari(.02)

epochsn=np.arange(1,epochs+1)
plt.plot(epochsn,basic_tr_accuracy)
plt.plot(epochsn,batch_tr_accuracy)
plt.plot(epochsn,drop_tr_accuracy)
plt.xlim((1,epochs))

plt.grid()
plt.xlabel('epochs')
plt.ylabel('accuarcy')
plt.title('Training Accuracy')
plt.legend(['Baseline', 'BN', 'BN+Dropout']);

FileNotFoundError: [Errno 2] No such file or directory: 'mmhpe_train_data.npy'