<a href="https://colab.research.google.com/github/ala-sk98/OpenGait/blob/master/baseline_gaitpart.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Import Packages

In [None]:
import numpy as np
import os
from google.colab import drive
drive.mount('/content/drive')
os.chdir('/content/drive/MyDrive')

Mounted at /content/drive


# Load Data

In [None]:
# Load the data from the .npy file:
data = np.load("CASIA-verification.npy", allow_pickle=True)

# Split the data back into separate arrays:
X_train_array, X_test_array, y_train_array, y_test_array = data


In [None]:
y_test_array[10]

array([[1.],
       [0.],
       [0.],
       [1.],
       [1.],
       [0.],
       [1.],
       [0.],
       [1.],
       [0.],
       [0.],
       [1.]])

In [None]:
# distance_train = np.load("train2.npy", allow_pickle=True)
# distance_test = np.load("test2.npy", allow_pickle=True)

In [None]:
X_train_array.shape

(96,)

In [None]:
X_train, X_test, y_train, y_test = X_train_array[0], X_test_array[0], y_train_array[0], y_test_array[0]
y_train = np.array(y_train[:],dtype='uint8')
y_train = np.eye(2)[y_train]

y_test = np.array(y_test[:],dtype='uint8')
y_test = np.eye(2)[y_test]

# Transpose the array to swap the 2nd and 4th dimensions
print(X_train.shape, X_test.shape)
X_train = np.transpose(X_train, axes=[0, 3, 1, 2])  # shape: (48, 100, 36, 20)
X_test  = np.transpose(X_test , axes=[0, 3, 1, 2])  # shape: (12, 100, 36, 20)

# Reshape the array to combine the 2nd and 3rd dimensions
# X_train = np.reshape(X_train, (X_train.shape[0], X_train.shape[1], -1))  # shape: (48, 100, 720)
# X_test  = np.reshape(X_test , (X_test.shape[0], X_test.shape[1], -1))  # shape: (12, 100, 720)

# X_train = np.reshape(X_train, (X_train.shape[0], 1, X_train.shape[1], -1))  # shape: (48, 100, 720)
# X_test  = np.reshape(X_test , (X_test.shape[0], 1, X_test.shape[1], -1))  # shape: (12, 100, 720)

print(X_train.shape, X_test.shape, y_train.shape, y_test.shape)


(48, 36, 20, 100) (12, 36, 20, 100)
(48, 100, 36, 20) (12, 100, 36, 20) (48, 1, 2) (12, 1, 2)


In [None]:
import torch
from torch.utils.data import Dataset

class CasiaDataset(Dataset):
    def __init__(self, x_data, y_data):
        self.x_data = x_data
        self.y_data = y_data

    def __len__(self):
        return len(self.x_data)

    def __getitem__(self, idx):
        x_sample = self.x_data[idx]
        y_sample = self.y_data[idx]
        return x_sample, y_sample

In [None]:
train_dataset = CasiaDataset(X_train, y_train)
val_dataset = CasiaDataset(X_test, y_test)


In [None]:
ID_new = np.array([  4.,   5.,   6.,   7.,   8.,   9.,  10.,  11.,  12.,  13.,  14.,
        15.,  16.,  17.,  18.,  19.,  20.,  21.,  22.,  24.,  25.,  26.,
        27.,  28.,  29.,  30.,  31.,  32.,  33.,  34.,  35.])#,  37.,
        # 38.,  39.,  40.,  41.,  42.,  43.,  44.,  45.,  46.,  47.,  48.,
        # 49.,  51.,  52.,  53.,  54.,  55.,  56.,  57.,  58.,  59.,  60.,
        # 61.,  62.,  63.,  64.,  65.,  66.,  67.,  68.,  69.,  70.,  71.,
        # 72.,  73.,  74.,  75.,  76.,  77.,  78.,  79.,  80.,  82.,  83.,
        # 84.,  85.,  87.,  88.,  89.,  90.,  91.,  92.,  93.,  94.,  95.,
        # 96.,  97.,  98.,  99., 100., 101., 102., 103.])

# Define Model

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

import copy
def clones(module, N):
    "Produce N identical layers."
    return nn.ModuleList([copy.deepcopy(module) for _ in range(N)])

class BasicConv1d(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size, **kwargs):
        super(BasicConv1d, self).__init__()
        self.conv = nn.Conv1d(in_channels, out_channels, kernel_size, bias=False, **kwargs)

    def forward(self, x):
        ret = self.conv(x)
        return ret

class FocalConv2d(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size, halving, **kwargs):
        super(FocalConv2d, self).__init__()
        self.halving = halving
        self.conv = nn.Conv2d(in_channels, out_channels, kernel_size, bias=False, **kwargs).to(torch.float64)  # Specify the data type

    def forward(self, x):
        h = x.size(2)
        split_size = int(h // 2**self.halving)
        z = x.split(split_size, 2)
        z = torch.cat([self.conv(_) for _ in z], 2)
        return F.leaky_relu(z, inplace=True)

class TemporalFeatureAggregator(nn.Module):
    def __init__(self, in_channels, squeeze=4, part_num=16):
        super(TemporalFeatureAggregator, self).__init__()
        hidden_dim = int(in_channels // squeeze)
        self.part_num = part_num

        # MTB1
        conv3x1 = nn.Sequential(
                BasicConv1d(in_channels, hidden_dim, 3, padding=1),
                nn.LeakyReLU(inplace=True),
                BasicConv1d(hidden_dim, in_channels, 1))
        self.conv1d3x1 = clones(conv3x1, part_num)
        self.avg_pool3x1 = nn.AvgPool1d(3, stride=1, padding=1)
        self.max_pool3x1 = nn.MaxPool1d(3, stride=1, padding=1)

        # MTB1
        conv3x3 = nn.Sequential(
                BasicConv1d(in_channels, hidden_dim, 3, padding=1),
                nn.LeakyReLU(inplace=True),
                BasicConv1d(hidden_dim, in_channels, 3, padding=1))
        self.conv1d3x3 = clones(conv3x3, part_num)
        self.avg_pool3x3 = nn.AvgPool1d(5, stride=1, padding=2)
        self.max_pool3x3 = nn.MaxPool1d(5, stride=1, padding=2)

    def forward(self, x):
        """
          Input: x, [p, n, c, s]
        """
        p, n, c, s = x.size()
        feature = x.split(1, 0)
        x = x.view(-1, c, s)

        # MTB1: ConvNet1d & Sigmoid
        logits3x1 = torch.cat([conv(_.squeeze(0)).unsqueeze(0)
            for conv, _ in zip(self.conv1d3x1, feature)], 0)
        scores3x1 = torch.sigmoid(logits3x1)
        # MTB1: Template Function
        feature3x1 = self.avg_pool3x1(x) + self.max_pool3x1(x)
        feature3x1 = feature3x1.view(p, n, c, s)
        feature3x1 = feature3x1 * scores3x1

        # MTB2: ConvNet1d & Sigmoid
        logits3x3 = torch.cat([conv(_.squeeze(0)).unsqueeze(0)
            for conv, _ in zip(self.conv1d3x3, feature)], 0)
        scores3x3 = torch.sigmoid(logits3x3)
        # MTB2: Template Function
        feature3x3 = self.avg_pool3x3(x) + self.max_pool3x3(x)
        feature3x3 = feature3x3.view(p, n, c, s)
        feature3x3 = feature3x3 * scores3x3

        # Temporal Pooling
        ret = (feature3x1 + feature3x3).max(-1)[0]
        return ret

In [None]:
class GaitPart(nn.Module):
    def __init__(self, in_channels, num_classes, squeeze=4, part_num=16, halving=1):
        super(GaitPart, self).__init__()

        self.focal_conv = FocalConv2d(in_channels, 64, kernel_size=(3, 3), halving=halving)
        self.temporal_aggregator = TemporalFeatureAggregator(64, squeeze=squeeze, part_num=part_num)

        # You might need to adjust the output channels and kernel size based on your problem
        self.conv_final = nn.Conv1d(64, num_classes, kernel_size=3, padding=1)

    def forward(self, x):
        x = x.to(torch.float64)
        x = self.focal_conv(x)
        x = self.temporal_aggregator(x)
        x = self.conv_final(x)
        return x

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")


# Instantiate the model
model = GaitPart(in_channels=100, num_classes=2).to(torch.float64)

# Define loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)  # Adjust lr as needed
batch_size = 8
# Create data loaders for training and validation
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)

# Training loop
num_epochs = 10  # Adjust as needed
for epoch in range(num_epochs):
    model.train()
    for inputs, labels in train_loader:
        optimizer.zero_grad()
        inputs = inputs.to(device).to(torch.float64)  # Convert to the correct data type
        labels = labels.to(device)
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

    # Validation loop
    model.eval()
    val_loss = 0.0
    correct = 0
    total = 0
    with torch.no_grad():
        for inputs, labels in val_loader:
            outputs = model(inputs)
            val_loss += criterion(outputs, labels).item()
            _, predicted = outputs.max(1)
            total += labels.size(0)
            correct += predicted.eq(labels).sum().item()

    print(f"Epoch [{epoch+1}/{num_epochs}] - "
          f"Validation Loss: {val_loss/len(val_loader):.4f}, "
          f"Validation Accuracy: {(correct/total)*100:.2f}%")


RuntimeError: ignored