In [1]:
import torch
from torch.utils.data import Dataset
import os, json
import numpy as np
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader

In [2]:
class HumanAnimalSeqDataset(Dataset):
    def __init__(self, root_dir, augment=False):
        self.sessions = []
        self.labels = []
        self.augment = augment

        for subdir in os.listdir(root_dir):
            full_path = os.path.join(root_dir, subdir)
            if not os.path.isdir(full_path):
                continue
            label = 1 if "animal" in subdir.lower() else 0
            seq = []
            for f in os.listdir(full_path):
                if f.endswith(".jsonl"):
                    with open(os.path.join(full_path, f), "r") as jf:
                        for line in jf:
                            data = json.loads(line)
                            # Thermal
                            left = np.array(data["thermal"]["left"]).reshape(8, 8)
                            center = np.array(data["thermal"]["center"]).reshape(8, 8)
                            right = np.array(data["thermal"]["right"]).reshape(8, 8)
                            thermal = np.stack([left, center, right])
                            thermal = (thermal - np.mean(thermal)) / (np.std(thermal)+1e-6)
                            # Radar + Mic
                            r1 = data["mmWave"]["R1"]
                            r2 = data["mmWave"]["R2"]
                            radar = np.array([
                                r1["numTargets"], r1["range"], r1["speed"], r1["energy"], float(r1["valid"]),
                                r2["numTargets"], r2["range"], r2["speed"], r2["energy"], float(r2["valid"])
                            ])
                            mic = np.array([data["mic"]["left"], data["mic"]["right"]])
                            radar_mic = np.concatenate([radar, mic])
                            radar_mic = (radar_mic - np.mean(radar_mic)) / (np.std(radar_mic)+1e-6)
                            seq.append((thermal, radar_mic))
            if seq:
                self.sessions.append(seq)
                self.labels.append(label)

    def __len__(self):
        return len(self.sessions)

    def __getitem__(self, idx):
        seq = self.sessions[idx]
        thermal_seq = torch.tensor(np.array([s[0] for s in seq]), dtype=torch.float32)
        radar_seq   = torch.tensor(np.array([s[1] for s in seq]), dtype=torch.float32)

        # --- Augmentation for training ---
        if self.augment:
            # Thermal noise
            thermal_seq += torch.randn_like(thermal_seq) * 0.02
            # Thermal random shift (roll)
            for i in range(thermal_seq.shape[0]):
                axis = np.random.choice([1,2])
                shift = np.random.randint(-1,2)
                thermal_seq[i] = torch.roll(thermal_seq[i], shifts=shift, dims=axis)
            # Radar/Mic noise
            radar_seq += torch.randn_like(radar_seq) * 0.02

        # Normalize sequences
        thermal_seq = (thermal_seq - thermal_seq.mean()) / (thermal_seq.std()+1e-6)
        radar_seq   = (radar_seq - radar_seq.mean()) / (radar_seq.std()+1e-6)

        label = torch.tensor(self.labels[idx], dtype=torch.long)
        return thermal_seq, radar_seq, label

In [3]:
class HybridLSTMClassifier(nn.Module):
    def __init__(self):
        super().__init__()
        self.cnn = nn.Sequential(
            nn.Conv2d(3, 8, 3, padding=1),
            nn.ReLU(),
            nn.Conv2d(8, 16, 3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2,2)
        )
        self.flat_size = 16*4*4
        self.lstm = nn.LSTM(input_size=self.flat_size + 12, hidden_size=32, batch_first=True)
        self.dropout = nn.Dropout(0.4)  # <--- add here
        self.fc = nn.Linear(32, 2)
        self.fc = nn.Linear(32, 2)
        self.fc = nn.Linear(32, 2)

    def forward(self, thermal_seq, radar_seq):
        batch_size, seq_len, C, H, W = thermal_seq.shape
        # Flatten frames with CNN
        thermal_seq = thermal_seq.view(batch_size*seq_len, C, H, W)
        x = self.cnn(thermal_seq)
        x = x.view(batch_size, seq_len, -1)  # back to (batch, seq_len, features)
        # Combine with radar
        x = torch.cat([x, radar_seq], dim=2)
        out, (h_n, _) = self.lstm(x)
        h_n = self.dropout(h_n.squeeze(0))  # <--- apply dropout here
        out = self.fc(h_n)
        return F.log_softmax(out, dim=1)


In [4]:
# --- Create separate datasets for train/val ---
full_dataset = HumanAnimalSeqDataset('../tools/dataset')

# Shuffle indices for splitting
num_samples = len(full_dataset)
indices = torch.randperm(num_samples).tolist()
split_idx = int(0.8 * num_samples)
train_indices = indices[:split_idx]
val_indices   = indices[split_idx:]

# Subset datasets with augmentation applied to train only
train_ds = torch.utils.data.Subset(HumanAnimalSeqDataset('../tools/dataset', augment=True), train_indices)
val_ds   = torch.utils.data.Subset(HumanAnimalSeqDataset('../tools/dataset', augment=False), val_indices)

# --- DataLoaders ---
train_loader = DataLoader(train_ds, batch_size=1, shuffle=True)
val_loader   = DataLoader(val_ds, batch_size=1, shuffle=False)

In [5]:
device = torch.device('mps' if torch.backends.mps.is_available() else 'cuda' if torch.cuda.is_available() else 'cpu')
model = HybridLSTMClassifier().to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
criterion = nn.NLLLoss()

epochs = 30

for epoch in range(epochs):
    model.train()
    train_loss, correct, total = 0,0,0
    for thermal_seq, radar_seq, labels in train_loader:
        thermal_seq = thermal_seq.to(device)
        radar_seq   = radar_seq.to(device)
        labels      = labels.to(device)
        optimizer.zero_grad()
        outputs = model(thermal_seq, radar_seq)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        train_loss += loss.item()
        preds = outputs.argmax(1)
        correct += (preds == labels).sum().item()
        total += labels.size(0)
    print(f"Epoch {epoch+1}/{epochs}, Loss: {train_loss/total:.4f}, Acc: {100*correct/total:.2f}%")

Epoch 1/30, Loss: 0.6926, Acc: 45.45%
Epoch 2/30, Loss: 0.6774, Acc: 60.61%
Epoch 3/30, Loss: 0.6083, Acc: 60.61%
Epoch 4/30, Loss: 0.5460, Acc: 75.76%
Epoch 5/30, Loss: 0.4505, Acc: 84.85%
Epoch 6/30, Loss: 0.3603, Acc: 78.79%
Epoch 7/30, Loss: 0.1587, Acc: 100.00%
Epoch 8/30, Loss: 0.3080, Acc: 87.88%
Epoch 9/30, Loss: 0.2481, Acc: 93.94%
Epoch 10/30, Loss: 0.1365, Acc: 96.97%
Epoch 11/30, Loss: 0.1684, Acc: 90.91%
Epoch 12/30, Loss: 0.2135, Acc: 93.94%
Epoch 13/30, Loss: 0.0900, Acc: 100.00%
Epoch 14/30, Loss: 0.0432, Acc: 100.00%
Epoch 15/30, Loss: 0.0257, Acc: 100.00%
Epoch 16/30, Loss: 0.0253, Acc: 100.00%
Epoch 17/30, Loss: 0.0174, Acc: 100.00%
Epoch 18/30, Loss: 0.0141, Acc: 100.00%
Epoch 19/30, Loss: 0.0106, Acc: 100.00%
Epoch 20/30, Loss: 0.0162, Acc: 100.00%
Epoch 21/30, Loss: 0.0129, Acc: 100.00%
Epoch 22/30, Loss: 0.0059, Acc: 100.00%
Epoch 23/30, Loss: 0.0061, Acc: 100.00%
Epoch 24/30, Loss: 0.0072, Acc: 100.00%
Epoch 25/30, Loss: 0.0076, Acc: 100.00%
Epoch 26/30, Loss: 0

In [6]:
model.eval()
correct, total, loss = 0,0,0
with torch.no_grad():
    for thermal_seq, radar_seq, labels in val_loader:
        thermal_seq = thermal_seq.to(device)
        radar_seq   = radar_seq.to(device)
        labels      = labels.to(device)
        outputs = model(thermal_seq, radar_seq)
        preds = outputs.argmax(1)
        correct += (preds == labels).sum().item()
        loss += criterion(outputs, labels).item()
        total += labels.size(0)
print("Val Accuracy:", 100*correct/total)
print("Val Loss:", loss/total)


Val Accuracy: 88.88888888888889
Val Loss: 0.7078360293914253


In [7]:
torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)

tensor(0.0800, device='mps:0')