In [None]:
import pandas as pd
import numpy as np
from pathlib import Path
from typing import List, Tuple, Dict
import torch
from torch.utils.data import Dataset, DataLoader
import re
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, random_split
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from pathlib import Path
import os
from torchsummary import summary
from torchinfo import summary
from tqdm import tqdm
from sklearn.metrics import confusion_matrix
import wandb
import torch.nn.functional as F
import hashlib
from typing import Dict, Tuple
import random

device = 'cuda' if torch.cuda.is_available() else 'cpu'
print("Device: ", device)

In [None]:
config = {
    'batch_size': 16,            # Reduced batch size for better regularization
    'lr': 0.001,                # Lower learning rate for more stable training
    'epochs': 10,                # Increased epochs (early stopping will prevent overfitting)
    'input_dim': 22,             # Expanded input dimension for engineered features
    'num_classes': 7,            # Same number of classes
    'hidden_dim': 128,           # Reduced hidden dimension (more efficient)
    'num_blocks': 2,             # Reduced number of blocks
    'checkpoint_dir': "/kaggle/working/",
    'device': 'cuda' if torch.cuda.is_available() else 'cpu',

    # New parameters for regularization and training
    'weight_decay': 1e-4,        # L2 regularization strength
    'dropout': 0.4,              # Increased dropout rate to reduce overfitting
    'patience': 7,               # Early stopping patience
    'mixup_alpha': 0.2,          # Mixup augmentation strength
    'gradient_clip': 1.0,        # Gradient clipping value
    'seq_len': 2000,             # Sequence length for windowed data (reduced from 2000)
    'label_smoothing': 0.1       # Label smoothing for loss function
}

print("Device: ", device)

In [None]:
# Configuration Dictionary
# config = {
#     'batch_size': 32,
#     'lr': 0.001,
#     'epochs': 10,
#     'input_dim': 9,
#     'num_classes': 7,
#     'hidden_dim': 512,
#     'num_blocks': 3,
#     'checkpoint_dir': "/content/drive/MyDrive/IDL/Checkpoint",
#     'device': 'cuda' if torch.cuda.is_available() else 'cpu'
# }

In [None]:
# Define category mapping
CATEGORIES = {
    'Blueball': 0,
    'Box': 1,
    'Pencilcase': 2,
    'Pinkball': 3,
    'StuffedAnimal': 4,
    'Tennis': 5,
    'Waterbottle': 6,
}

In [None]:
# Path to the folder containing the dataset files
folder_path = "/kaggle/input/idl-dataset/IDL_Data"

In [None]:
# Stats trackers
total_count = 0
kept_count = 0
valid_file_count = 0
skipped_due_to_missing_waypoints = 0

In [None]:
WAYPOINTS = [
    (30, -30), (30, 30), (15, -30), (15, 30),
    (0, -30), (0, 30), (-15, -30), (-15, 30),
    (-30, -30), (-30, 30), (-30, -30), (30, -30),
    (30, 30), (-30, 30)
]


In [None]:
# Step 1: Load and label dataset
def load_and_label_file(file_path, file_name):
    global total_count
    category = next((key for key in CATEGORIES if key in file_name), None)
    if category is None:
        return pd.DataFrame()

    data = []
    with open(file_path, "r") as f:
        for line in f:
            parts = line.strip().split(',')
            if len(parts) == 10:
                try:
                    timestamp = parts[0]
                    microsec = int(parts[1])
                    x = float(parts[2])
                    y = float(parts[3])
                    x_target = float(parts[4])
                    y_target = float(parts[5])
                    pwm1 = int(parts[6])
                    pwm2 = int(parts[7])
                    pwm3 = int(parts[8])
                    pwm4 = int(parts[9])
                    total_count += 1

                    data.append([
                        timestamp, microsec, x, y, x_target, y_target,
                        pwm1, pwm2, pwm3, pwm4, category, CATEGORIES[category]
                    ])
                except ValueError:
                    continue

    return pd.DataFrame(data, columns=[
        "timestamp", "microseconds", "x", "y", "x_target", "y_target",
        "pwm1", "pwm2", "pwm3", "pwm4", "category", "label"
    ])

In [None]:
# Step 2: Assign sequential waypoint numbers
def assign_sequential_waypoints(df, tol=1.0):
    df = df.reset_index(drop=True)
    wp_index = 0
    assigned_wp = []

    for i in range(len(df)):
        x_t, y_t = df.loc[i, "x_target"], df.loc[i, "y_target"]
        current_expected = WAYPOINTS[wp_index]

        if np.isclose(x_t, current_expected[0], atol=tol) and np.isclose(y_t, current_expected[1], atol=tol):
            assigned_wp.append(wp_index)
        else:
            if wp_index + 1 < len(WAYPOINTS):
                next_expected = WAYPOINTS[wp_index + 1]
                if np.isclose(x_t, next_expected[0], atol=tol) and np.isclose(y_t, next_expected[1], atol=tol):
                    wp_index += 1
                    assigned_wp.append(wp_index)
                else:
                    assigned_wp.append(wp_index)
            else:
                assigned_wp.append(wp_index)

    df["waypoint_number"] = assigned_wp
    return df

In [None]:
# Step 3: Filter out rows where y <= 0
def filter_by_y(df):
    global kept_count
    filtered = df[df["y"] > 15].reset_index(drop=True)
    kept_count += len(filtered)
    return filtered

In [None]:
def process_file(file_path, file_name):
    global valid_file_count

    # Step 1: Load and label
    df = load_and_label_file(file_path, file_name)
    if df.empty:
        return pd.DataFrame()

    # Step 2: Assign waypoint numbers
    df = assign_sequential_waypoints(df)

    # 📌 Show how many waypoints existed before filtering
    waypoint_count_before = df["waypoint_number"].nunique()
    print(f"\n📌 {file_name} → {waypoint_count_before} waypoints BEFORE filtering")

    # Step 3: Filter out rows where y ≤ 0
    df = filter_by_y(df)

    # 📌 Show how many remain after filtering
    waypoint_count_after = df["waypoint_number"].nunique()
    print(f"📌 {file_name} → {waypoint_count_after} waypoints AFTER filtering")

    # Count as valid if any data was kept
    if not df.empty:
        valid_file_count += 1

    return df

In [None]:
# # Step 5: Process all .txt files
# all_data = pd.DataFrame()

# for file_name in os.listdir(folder_path):
#     if not file_name.endswith(".txt") or file_name.startswith("."):
#         continue
#     file_path = os.path.join(folder_path, file_name)
#     df = process_file(file_path, file_name)
#     if not df.empty:
#         all_data = pd.concat([all_data, df], ignore_index=True)

# # Step 6: Summary
# print("\n📄 Summary:")
# print(f"Total files scanned: {len([f for f in os.listdir(folder_path) if f.endswith('.txt')])}")
# print(f"✅ Files with 14 valid waypoints: {valid_file_count}")
# print(f"⚠️ Skipped due to missing waypoints: {skipped_due_to_missing_waypoints}")
# print(f"📊 Data points before filtering: {total_count}")
# print(f"✅ Data points after filtering: {kept_count}")
# print(f"🚫 Dropped data points: {total_count - kept_count}")

In [None]:
# print(all_data.head(1350))

In [None]:
import torch
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from tqdm import tqdm
import pandas as pd
import numpy as np

In [None]:
# class ObjectSensorDataset(Dataset):
#     def __init__(self, df):
#         features = df[["x", "y", "x_target", "y_target", "pwm1", "pwm2", "pwm3", "pwm4", "microseconds"]].values
#         labels = df["label"].values

#         self.X = torch.tensor(features, dtype=torch.float32)
#         self.y = torch.tensor(labels, dtype=torch.long)

#     def __len__(self):
#         return len(self.y)

#     def __getitem__(self, idx):
#         return self.X[idx], self.y[idx]

In [None]:
# # Normalize features before split
# features_to_scale = ["x", "y", "x_target", "y_target", "pwm1", "pwm2", "pwm3", "pwm4", "microseconds"]
# scaler = StandardScaler()
# all_data[features_to_scale] = scaler.fit_transform(all_data[features_to_scale])

# # 🧪 Split into train/val/test with stratified sampling
# train_df, temp_df = train_test_split(
#     all_data, test_size=0.3, stratify=all_data["label"], random_state=42
# )
# val_df, test_df = train_test_split(
#     temp_df, test_size=0.5, stratify=temp_df["label"], random_state=42
# )

# # 📦 Create datasets
# train_dataset = ObjectSensorDataset(train_df)
# val_dataset = ObjectSensorDataset(val_df)
# test_dataset = ObjectSensorDataset(test_df)

In [None]:
from collections import defaultdict, Counter

def build_datasets(data_dir: str):
    data_dir = Path(data_dir)
    file_paths = list(data_dir.glob("*.txt"))
    random.seed(42)

    # 1. Group files by object class
    class_to_files = defaultdict(list)
    for file_path in file_paths:
        for class_name in CATEGORIES:
            if class_name in file_path.name:
                class_to_files[class_name].append(file_path)
                break

    # 2. Stratified split (each class in train/val/test)
    train_files, val_files, test_files = [], [], []
    for class_name, files in class_to_files.items():
        random.shuffle(files)
        n = len(files)
        train_split = int(0.65 * n)
        val_split = int(0.85 * n)
        train_files += files[:train_split]
        val_files += files[train_split:val_split]
        test_files += files[val_split:]

    print("🔍 Per-class file counts:")
    for cls in CATEGORIES:
        print(f"  {cls:<15} → {len(class_to_files[cls])} total files")

    print("\n✅ Final split file counts:")
    print(f"Train: {len(train_files)}")
    print(f"Val:   {len(val_files)}")
    print(f"Test:  {len(test_files)}")

    # 3. Process each split
    def process_file_list(file_list):
        dfs = []
        for fp in file_list:
            df = process_file(fp, fp.name)
            if not df.empty:
                dfs.append(df)
        return pd.concat(dfs, ignore_index=True) if dfs else pd.DataFrame()

    print("Processing train files")
    train_df = process_file_list(train_files)
    print("Processing validation files")
    val_df = process_file_list(val_files)
    print("Processing test files")
    test_df = process_file_list(test_files)

    return train_df, val_df, test_df


# 📦 Run everything
train_df, val_df, test_df = build_datasets(folder_path)

In [None]:
def extract_enhanced_features(df):
    """Extract additional meaningful features from the raw sensor data"""
    # Create a copy to avoid modifying the original
    enhanced_df = df.copy()

    # Compute deltas (changes between consecutive measurements)
    enhanced_df['x_delta'] = enhanced_df.groupby(['category', 'waypoint_number'])['x'].diff().fillna(0)
    enhanced_df['y_delta'] = enhanced_df.groupby(['category', 'waypoint_number'])['y'].diff().fillna(0)

    # Distance to target (error signal that drives the controller)
    enhanced_df['x_error'] = enhanced_df['x_target'] - enhanced_df['x']
    enhanced_df['y_error'] = enhanced_df['y_target'] - enhanced_df['y']

    # Magnitude of error and movement
    enhanced_df['error_magnitude'] = np.sqrt(enhanced_df['x_error']**2 + enhanced_df['y_error']**2)
    enhanced_df['movement_magnitude'] = np.sqrt(enhanced_df['x_delta']**2 + enhanced_df['y_delta']**2)

    # Control effort features (sum and differences of PWM signals)
    enhanced_df['total_pwm'] = enhanced_df['pwm1'] + enhanced_df['pwm2'] + enhanced_df['pwm3'] + enhanced_df['pwm4']
    enhanced_df['pwm_x_diff'] = enhanced_df['pwm1'] - enhanced_df['pwm3']  # Assuming these control x-axis
    enhanced_df['pwm_y_diff'] = enhanced_df['pwm2'] - enhanced_df['pwm4']  # Assuming these control y-axis

    # Interaction features (product of error and control)
    enhanced_df['x_control_response'] = enhanced_df['x_error'] * enhanced_df['pwm_x_diff']
    enhanced_df['y_control_response'] = enhanced_df['y_error'] * enhanced_df['pwm_y_diff']

    # Time derivatives of error (how quickly error is changing)
    enhanced_df['x_error_delta'] = enhanced_df.groupby(['category', 'waypoint_number'])['x_error'].diff().fillna(0)
    enhanced_df['y_error_delta'] = enhanced_df.groupby(['category', 'waypoint_number'])['y_error'].diff().fillna(0)

    return enhanced_df

# Process your dataframes with the enhanced features
train_df_enhanced = extract_enhanced_features(train_df)
val_df_enhanced = extract_enhanced_features(val_df)
test_df_enhanced = extract_enhanced_features(test_df)

In [None]:
class EnhancedWindowedDataset(torch.utils.data.Dataset):
    def __init__(self, df, seq_len=2000, augment=False, noise_scale=0.02, time_warp_scale=0.1):
        self.seq_len = seq_len
        self.df = df.reset_index(drop=True)  # Use the enhanced dataframe
        self.augment = augment
        self.noise_scale = noise_scale
        self.time_warp_scale = time_warp_scale

        # Select all features including the new engineered features
        feature_columns = [
            # Original features
            "x", "y", "x_target", "y_target", "pwm1", "pwm2", "pwm3", "pwm4", "waypoint_number",
            # New engineered features
            "x_delta", "y_delta", "x_error", "y_error", "error_magnitude", "movement_magnitude",
            "total_pwm", "pwm_x_diff", "pwm_y_diff", "x_control_response", "y_control_response",
            "x_error_delta", "y_error_delta"
        ]

        # Filter to only include columns that exist in the dataframe
        self.feature_columns = [col for col in feature_columns if col in self.df.columns]
        self.features = self.df[self.feature_columns].values.astype(np.float32)

        # Normalize features
        self.feature_means = np.mean(self.features, axis=0)
        self.feature_stds = np.std(self.features, axis=0) + 1e-6  # Avoid division by zero
        self.features = (self.features - self.feature_means) / self.feature_stds

        # Label per row
        self.labels = self.df["label"].values.astype(np.int64)

    def __len__(self):
        return len(self.df) - self.seq_len + 1

    def __getitem__(self, idx):
        x = self.features[idx:idx + self.seq_len]  # (seq_len, input_dim)
        y = self.labels[idx + self.seq_len - 1]

        # Apply augmentation if enabled
        if self.augment and np.random.random() > 0.5:
            # Get dimensions once at the start
            seq_len, feat_dim = x.shape

            # Add random noise
            noise = np.random.normal(0, self.noise_scale, x.shape)
            x = x + noise

            # Time warping (randomly stretch or compress parts of the sequence)
            if np.random.random() > 0.7:
                time_indices = np.arange(seq_len)
                warp = np.sin(np.linspace(0, 3*np.pi, seq_len)) * self.time_warp_scale
                warped_indices = np.clip(time_indices + warp, 0, seq_len-1).astype(int)
                x = x[warped_indices, :]

            # Random feature masking (occasionally zero out features)
            if np.random.random() > 0.8:
                mask_idx = np.random.choice(feat_dim, size=int(feat_dim * 0.1), replace=False)
                x[:, mask_idx] = 0

        x_tensor = torch.tensor(x, dtype=torch.float32)
        y_tensor = torch.tensor(y, dtype=torch.long)
        return x_tensor, y_tensor

In [None]:
seq_len = config['seq_len']  # Adjust based on your needs

# Create datasets with augmentation
train_dataset = EnhancedWindowedDataset(train_df_enhanced, seq_len=seq_len, augment=True)
val_dataset = EnhancedWindowedDataset(val_df_enhanced, seq_len=seq_len)
test_dataset = EnhancedWindowedDataset(test_df_enhanced, seq_len=seq_len)

# Create data loaders with smaller batch size
batch_size = 16  # Smaller batch size for better regularization
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F


# class CNNBlock(nn.Module):
#     def __init__(self, in_channels, out_channels, kernel_size=3, dropout=0.2):
#         super(CNNBlock, self).__init__()
#         self.conv = nn.Conv1d(in_channels, out_channels, kernel_size=kernel_size, padding=kernel_size//2)
#         self.bn = nn.BatchNorm1d(out_channels)
#         self.relu = nn.ReLU()
#         self.dropout = nn.Dropout(dropout)

#     def forward(self, x):
#         return self.dropout(self.relu(self.bn(self.conv(x))))

class CNNBlock(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size=3, dropout=0.2):
        super(CNNBlock, self).__init__()
        self.conv = nn.Conv1d(in_channels, out_channels, kernel_size=kernel_size, padding=kernel_size//2)
        self.bn = nn.BatchNorm1d(out_channels)
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(dropout)
        
        # For skip connection when dimensions don't match
        self.use_residual = (in_channels == out_channels)
        if not self.use_residual:
            self.residual_conv = nn.Conv1d(in_channels, out_channels, kernel_size=1)
            self.residual_bn = nn.BatchNorm1d(out_channels)

    def forward(self, x):
        residual = x
        out = self.conv(x)
        out = self.bn(out)
        out = self.relu(out)
        out = self.dropout(out)
        
        # Apply skip connection if possible
        if self.use_residual:
            out = out + residual
        else:
            # When dimensions don't match, transform the residual
            transformed_residual = self.residual_bn(self.residual_conv(residual))
            out = out + transformed_residual
            
        return out

class CNNModel(nn.Module):
    def __init__(self, input_dim, num_classes, dropout=0.3):
        super(CNNModel, self).__init__()

        self.conv_layers = nn.Sequential(
            CNNBlock(input_dim, 32, kernel_size=5, dropout=dropout/2),
            CNNBlock(32, 64, kernel_size=5, dropout=dropout/2),
            CNNBlock(64, 128, kernel_size=3, dropout=dropout),
            nn.AdaptiveAvgPool1d(output_size=1)
        )

        self.classifier = nn.Sequential(
            nn.Flatten(),
            nn.Linear(128, 64),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(64, 32),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(32, num_classes)
        )

    def forward(self, x):
        # x shape: [batch_size, seq_len, features]
        x = x.permute(0, 2, 1)  # [batch_size, features, seq_len]
        x = self.conv_layers(x)
        x = self.classifier(x)
        return x

# class RNNModel(nn.Module):
#     def __init__(self, input_dim, num_classes, hidden_dim=128, num_layers=2, dropout=0.3):
#         super(RNNModel, self).__init__()

#         self.lstm = nn.LSTM(
#             input_size=input_dim,
#             hidden_size=hidden_dim,
#             num_layers=num_layers,
#             batch_first=True,
#             dropout=dropout if num_layers > 1 else 0,
#             bidirectional=True
#         )

#         self.attention = nn.Sequential(
#             nn.Linear(hidden_dim*2, 1),  # *2 for bidirectional
#             nn.Tanh()
#         )

#         self.classifier = nn.Sequential(
#             nn.Linear(hidden_dim*2, hidden_dim),
#             nn.ReLU(),
#             nn.Dropout(dropout),
#             nn.Linear(hidden_dim, num_classes)
#         )

#     def forward(self, x):
#         # x shape: [batch_size, seq_len, features]
#         lstm_out, _ = self.lstm(x)  # [batch_size, seq_len, hidden_dim*2]

#         # Attention mechanism
#         attention_weights = self.attention(lstm_out).squeeze(-1)  # [batch_size, seq_len]
#         attention_weights = F.softmax(attention_weights, dim=1).unsqueeze(1)  # [batch_size, 1, seq_len]

#         # Apply attention weights
#         context = torch.bmm(attention_weights, lstm_out).squeeze(1)  # [batch_size, hidden_dim*2]

#         # Classification
#         output = self.classifier(context)
#         return output

class RNNModel(nn.Module):
    def __init__(self, input_dim, num_classes, hidden_dim=128, num_layers=2, dropout=0.3):
        super(RNNModel, self).__init__()

        self.lstm = nn.LSTM(
            input_size=input_dim,
            hidden_size=hidden_dim,
            num_layers=num_layers,
            batch_first=True,
            dropout=dropout if num_layers > 1 else 0,
            bidirectional=True
        )

        self.attention = nn.Sequential(
            nn.Linear(hidden_dim*2, 1),  # *2 for bidirectional
            nn.Tanh()
        )

        # First layer of classifier
        self.fc1 = nn.Linear(hidden_dim*2, hidden_dim)
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(dropout)
        
        # Second layer of classifier
        self.fc2 = nn.Linear(hidden_dim, num_classes)
        
        # For skip connection in attention-to-output
        self.shortcut = nn.Linear(hidden_dim*2, num_classes)

    def forward(self, x):
        # x shape: [batch_size, seq_len, features]
        lstm_out, _ = self.lstm(x)  # [batch_size, seq_len, hidden_dim*2]

        # Attention mechanism
        attention_weights = self.attention(lstm_out).squeeze(-1)  # [batch_size, seq_len]
        attention_weights = F.softmax(attention_weights, dim=1).unsqueeze(1)  # [batch_size, 1, seq_len]

        # Apply attention weights
        context = torch.bmm(attention_weights, lstm_out).squeeze(1)  # [batch_size, hidden_dim*2]

        # Main path
        x1 = self.fc1(context)
        x1 = self.relu(x1)
        x1 = self.dropout(x1)
        x1 = self.fc2(x1)
        
        # Skip connection path
        x2 = self.shortcut(context)
        
        # Combine main and skip paths
        output = x1 + x2
        
        return output

class EnsembleModel(nn.Module):
    def __init__(self, input_dim, num_classes, dropout=0.3):
        super(EnsembleModel, self).__init__()

        # Component models
        self.cnn_model = CNNModel(input_dim, num_classes, dropout)
        self.rnn_model = RNNModel(input_dim, num_classes, dropout=dropout)

        # Meta-learner (combination weights)
        self.meta_learner = nn.Sequential(
            nn.Linear(num_classes*2, num_classes),
            nn.Dropout(dropout/2)
        )

    def forward(self, x):
        # Get predictions from individual models
        cnn_out = self.cnn_model(x)
        rnn_out = self.rnn_model(x)

        # Concatenate predictions
        combined = torch.cat((cnn_out, rnn_out), dim=1)

        # Meta-learner combines predictions
        final_out = self.meta_learner(combined)
        probs = F.softmax(final_out, dim=1)

        return {"feats": combined, "out": probs}


input_dim = train_dataset.features.shape[1]  # Number of features including engineered ones
num_classes = config['num_classes']  # Number of object classes
dropout = config['dropout']

model = EnsembleModel(
    input_dim=input_dim,
    num_classes=num_classes,
    dropout=dropout
).to(device)

# Print model summary
from torchinfo import summary
summary(model, input_data=torch.zeros(batch_size, seq_len, input_dim).to(device))

In [None]:
class AverageMeter:
    def __init__(self):
        self.reset()

    def reset(self):
        self.val = 0
        self.avg = 0
        self.sum = 0
        self.count = 0

    def update(self, val, n=1):
        self.val = val
        self.sum += val * n
        self.count += n
        self.avg = self.sum / self.count

In [None]:
def accuracy(output, target, topk=(1,)):
    with torch.no_grad():
        maxk = max(topk)
        batch_size = target.size(0)

        _, pred = output.topk(maxk, 1, True, True)
        pred = pred.t()
        correct = pred.eq(target.view(1, -1).expand_as(pred))

        res = []
        for k in topk:
            correct_k = correct[:k].reshape(-1).float().sum(0, keepdim=True)
            res.append(correct_k.mul_(100.0 / batch_size))
        return res

In [None]:
class EarlyStopping:
    """Early stops the training if validation loss doesn't improve after a given patience."""
    def __init__(self, patience=7, verbose=False, delta=0, path='checkpoint.pt'):
        self.patience = patience
        self.verbose = verbose
        self.counter = 0
        self.best_score = None
        self.early_stop = False
        self.val_loss_min = float('inf')
        self.delta = delta
        self.path = path

    def __call__(self, val_loss, model):
        score = -val_loss

        if self.best_score is None:
            self.best_score = score
            self.save_checkpoint(val_loss, model)
        elif score < self.best_score + self.delta:
            self.counter += 1
            if self.verbose:
                print(f'EarlyStopping counter: {self.counter} out of {self.patience}')
            if self.counter >= self.patience:
                self.early_stop = True
        else:
            self.best_score = score
            self.save_checkpoint(val_loss, model)
            self.counter = 0

    def save_checkpoint(self, val_loss, model):
        '''Saves model when validation loss decreases.'''
        if self.verbose:
            print(f'Validation loss decreased ({self.val_loss_min:.6f} --> {val_loss:.6f}). Saving model ...')
        torch.save(model.state_dict(), self.path)
        self.val_loss_min = val_loss

In [None]:
# def train_model(model, train_loader, criterion, optimizer, device):
#     model.train()
#     loss_m = AverageMeter()
#     acc_m = AverageMeter()
#     batch_bar = tqdm(total=len(train_loader), dynamic_ncols=True, leave=False, position=0, desc='Train')

#     for i, data in enumerate(train_loader):
#         optimizer.zero_grad()
#         x, y = data
#         x, y = x.to(device), y.to(device)
#         outputs = model(x)
#         loss = criterion(outputs['out'], y)
#         loss.backward()
#         torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
#         optimizer.step()

#         acc = accuracy(outputs['out'], y)[0].item()
#         loss_m.update(loss.item())
#         acc_m.update(acc)

#         batch_bar.set_postfix(
#             loss="{:.04f}".format(float(loss_m.avg)),
#             acc="{:.04f}%".format(float(acc_m.avg)),
#             lr="{:.06f}".format(float(optimizer.param_groups[0]['lr']))
#         )
#         batch_bar.update()

#         del x, y, outputs, loss
#         torch.cuda.empty_cache()

#     batch_bar.close()
#     return loss_m.avg, acc_m.avg

In [None]:
def train_model_with_regularization(model, train_loader, criterion, optimizer, device,
                                   weight_decay=1e-4, gradient_clip=1.0, mixup_alpha=0.2):
    model.train()
    loss_m = AverageMeter()
    acc_m = AverageMeter()
    batch_bar = tqdm(total=len(train_loader), dynamic_ncols=True, leave=False, position=0, desc='Train')

    for i, data in enumerate(train_loader):
        optimizer.zero_grad()
        x, y = data
        batch_size = x.size(0)

        # Move data to device first
        x, y = x.to(device), y.to(device)

        # Apply Mixup (data augmentation technique)
        if mixup_alpha > 0:
            lam = np.random.beta(mixup_alpha, mixup_alpha)
            index = torch.randperm(batch_size).to(device)
            x = lam * x + (1 - lam) * x[index]

        # Forward pass
        outputs = model(x)

        # Compute loss
        if mixup_alpha > 0:
            loss = lam * criterion(outputs['out'], y) + (1 - lam) * criterion(outputs['out'], y[index])
        else:
            loss = criterion(outputs['out'], y)

        # Add L2 regularization explicitly if needed (in addition to weight_decay in optimizer)
        l2_reg = torch.tensor(0., device=device)
        for param in model.parameters():
            l2_reg += torch.norm(param, 2)
        loss += weight_decay * l2_reg

        # Backward pass and optimization
        loss.backward()

        # Gradient clipping to prevent exploding gradients
        torch.nn.utils.clip_grad_norm_(model.parameters(), gradient_clip)

        optimizer.step()

        # Compute accuracy
        acc = accuracy(outputs['out'], y)[0].item()
        loss_m.update(loss.item())
        acc_m.update(acc)

        batch_bar.set_postfix(
            loss="{:.04f}".format(float(loss_m.avg)),
            acc="{:.04f}%".format(float(acc_m.avg)),
            lr="{:.06f}".format(float(optimizer.param_groups[0]['lr']))
        )
        batch_bar.update()

        del x, y, outputs, loss
        torch.cuda.empty_cache()

    batch_bar.close()
    return loss_m.avg, acc_m.avg

In [None]:
@torch.no_grad()
def validate_model(model, val_loader, criterion, class_names, device):
    model.eval()
    loss_m = AverageMeter()
    acc_m = AverageMeter()
    batch_bar = tqdm(total=len(val_loader), dynamic_ncols=True, position=0, leave=False, desc='Val')

    all_preds = []
    all_targets = []

    for i, data in enumerate(val_loader):
        x, y = data
        x, y = x.to(device), y.to(device)
        outputs = model(x)
        loss = criterion(outputs['out'], y)

        acc = accuracy(outputs['out'], y)[0].item()

        _, predicted = torch.max(outputs['out'], 1)
        all_preds.extend(predicted.cpu().numpy())
        all_targets.extend(y.cpu().numpy())

        loss_m.update(loss.item())
        acc_m.update(acc)

        batch_bar.set_postfix(
            loss="{:.04f}".format(float(loss_m.avg)),
            acc="{:.04f}%".format(float(acc_m.avg))
        )
        batch_bar.update()

        del x, y, outputs, loss
        torch.cuda.empty_cache()

    batch_bar.close()

    if class_names:
        print("\nPer-class Validation Accuracy:")
        per_class_acc = {}
        for i, class_name in enumerate(class_names):
            class_mask = (np.array(all_targets) == i)
            if np.sum(class_mask) > 0:
                class_correct = np.sum((np.array(all_preds)[class_mask] == i))
                class_total = np.sum(class_mask)
                acc_percent = 100 * class_correct / class_total
                print(f"  {class_name}: {acc_percent:.4f}% ({class_correct}/{class_total})")
                per_class_acc[f"val_acc_{class_name}"] = acc_percent

    return loss_m.avg, acc_m.avg

In [None]:
def save_model(model, optimizer, scheduler, metrics, epoch, path):
    torch.save({
        'epoch': epoch,
        'model_state_dict': model.state_dict(),
        'optimizer_state_dict': optimizer.state_dict(),
        'scheduler_state_dict': scheduler.state_dict() if scheduler else None,
        'metrics': metrics
    }, path)

In [None]:
criterion = nn.CrossEntropyLoss(label_smoothing=0.1)

# Use AdamW optimizer with weight decay
optimizer = torch.optim.AdamW(
    model.parameters(),
    lr=config['lr'],  # Lower learning rate
    weight_decay=config['weight_decay']
)

# Learning rate scheduler
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
    optimizer,
    mode='min',
    factor=0.5,
    patience=3,
    min_lr=1e-6,
    verbose=True
)

# Early stopping initialization
early_stopping = EarlyStopping(
    patience=3,
    verbose=True,
    path=os.path.join(config['checkpoint_dir'], 'best_model.pth')
)

In [None]:
# # from sklearn.utils.class_weight import compute_class_weight
# # import numpy as np

# # class_weights = compute_class_weight(
# #     class_weight='balanced',
# #     classes=np.unique(train_df['label']),
# #     y=train_df['label']
# # )
# # class_weights = torch.tensor(class_weights, dtype=torch.float32).to(device)

# criterion = nn.CrossEntropyLoss(label_smoothing=0.1)

# # # Define CrossEntropyLoss as the criterion
# # criterion = nn.CrossEntropyLoss(
# #     label_smoothing=0.1
# # )

# # Initialize optimizer with AdamW
# # optimizer = torch.optim.AdamW(
# #     model.parameters(),
# #     lr=config['lr'],
# #     weight_decay=1e-4
# # )

# optimizer = torch.optim.SGD(
#     model.parameters(),
#     lr=config['lr'],
#     momentum=0.9,
#     weight_decay=1e-4
# )

# # Learning rate scheduler
# scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
#     optimizer,
#     mode='min',
#     factor=0.5,
#     patience=3,
#     min_lr=1e-6,
#     verbose=True
# )

In [None]:
import wandb

# Intialize wandb
wandb.login(key="") # API Key is in your wandb account, under settings (wandb.ai/settings)

In [None]:
run = wandb.init(
    name="ensemble_model",
    project="object_classification",
    config={
        'architecture': 'CNN-RNN-Ensemble',
        'input_dim': input_dim,
        'seq_len': seq_len,
        'batch_size': batch_size,
        'learning_rate': optimizer.param_groups[0]['lr'],
        'weight_decay': 1e-4,
        'dropout': dropout,
        'mixup_alpha': 0.2,
        'gradient_clip': 1.0
    }
)


In [None]:
best_val_loss = float('inf')
best_val_acc = 0
epochs = 10
class_names = list(CATEGORIES.keys())

for epoch in range(epochs):
    print(f"\nEpoch {epoch + 1}/{epochs}")

    # Train
    train_loss, train_acc = train_model_with_regularization(
        model,
        train_loader,
        criterion,
        optimizer,
        device,
        weight_decay=1e-3,
        gradient_clip=1.0,
        mixup_alpha=0.2
    )
    print(f"Train Loss: {train_loss:.4f}, Train Accuracy: {train_acc:.2f}%")

    # Validate
    val_loss, val_acc = validate_model(model, val_loader, criterion, class_names, device)
    print(f"Validation Loss: {val_loss:.4f}, Validation Accuracy: {val_acc:.2f}%")

    # Update learning rate
    scheduler.step(val_loss)

    # Early stopping
    early_stopping(val_loss, model)
    if early_stopping.early_stop:
        print("Early stopping triggered")
        break

    # Save checkpoint
    if val_acc > best_val_acc:
        best_val_acc = val_acc
        best_val_loss = val_loss
        print(f"Saved best model with validation loss: {best_val_loss:.4f} and accuracy: {best_val_acc:.2f}%")

    # Log metrics to wandb
    wandb.log({
        'epoch': epoch + 1,
        'train_loss': train_loss,
        'train_acc': train_acc,
        'val_loss': val_loss,
        'val_acc': val_acc,
        'learning_rate': optimizer.param_groups[0]['lr']
    })

    print(f"End of Epoch {epoch+1}/{epochs}")

print(f"\nTraining complete! Best validation accuracy: {best_val_acc:.2f}%")

In [None]:
# # Training Loop
# best_val_loss = float('inf')
# best_val_acc = 0
# class_names = list(CATEGORIES.keys())

# for epoch in range(config['epochs']):
#     print(f"\nEpoch {epoch + 1}/{config['epochs']}")

#     train_loss, train_acc = train_model(model, train_loader, criterion, optimizer, config['device'])
#     print(f"Train Loss: {train_loss:.4f}, Train Accuracy: {train_acc:.2f}%")

#     val_loss, val_acc = validate_model(model, val_loader, criterion, class_names, config['device'])
#     print(f"Validation Loss: {val_loss:.4f}, Validation Accuracy: {val_acc:.2f}%")

#     scheduler.step(val_loss)
#     curr_lr = optimizer.param_groups[0]['lr']

#     if val_loss < best_val_loss:
#         best_val_loss = val_loss
#         best_val_acc = val_acc
#         best_model_path = os.path.join(config['checkpoint_dir'], 'best_model.pth')
#         torch.save({
#             'epoch': epoch,
#             'model_state_dict': model.state_dict(),
#             'optimizer_state_dict': optimizer.state_dict(),
#             'val_loss': val_loss,
#             'val_acc': val_acc,
#         }, best_model_path)
#         wandb.save(best_model_path)
#         print(f"Saved best model with validation loss: {best_val_loss:.4f} and accuracy: {best_val_acc:.2f}%")

#     last_model_path = os.path.join(config['checkpoint_dir'], f'model_epoch_{epoch+1}.pth')
#     torch.save(model.state_dict(), last_model_path)
#     wandb.save(last_model_path)
#     print(f"Saved model for epoch {epoch+1}")

#     wandb.log({
#         'epoch': epoch + 1,
#         'train_loss': train_loss,
#         'train_acc': train_acc,
#         'val_loss': val_loss,
#         'val_acc': val_acc,
#         'learning_rate': curr_lr
#     }, step=epoch)

#     print(f"End of Epoch {epoch+1}/{config['epochs']}")

# print(f"\nTraining complete! Best validation accuracy: {best_val_acc:.2f}%")

In [None]:
@torch.no_grad()
def test_model(model, test_loader, criterion, class_names, device, checkpoint_dir=None):
    model.eval()
    test_loss = 0.0
    correct = 0
    total = 0
    all_preds = []
    all_targets = []
    all_probs = []

    class_correct = {class_name: 0 for class_name in class_names}
    class_total = {class_name: 0 for class_name in class_names}

    for data in test_loader:
        inputs, targets = data
        inputs, targets = inputs.to(device), targets.to(device)
        outputs = model(inputs)
        outputs_for_loss = outputs['out'] if isinstance(outputs, dict) and 'out' in outputs else outputs
        loss = criterion(outputs_for_loss, targets)
        test_loss += loss.item() * inputs.size(0)

        probs = torch.nn.functional.softmax(outputs_for_loss, dim=1)
        _, predicted = torch.max(outputs_for_loss, 1)

        total += targets.size(0)
        correct += (predicted == targets).sum().item()

        for i in range(targets.size(0)):
            label = targets[i].item()
            pred = predicted[i].item()
            class_name = class_names[label]
            class_total[class_name] += 1
            if pred == label:
                class_correct[class_name] += 1

        all_preds.extend(predicted.cpu().numpy())
        all_targets.extend(targets.cpu().numpy())
        all_probs.extend(probs.cpu().numpy())

    test_loss /= len(test_loader.dataset)
    test_acc = correct / total

    class_accuracy = {
        name: class_correct[name]/class_total[name] if class_total[name] > 0 else 0
        for name in class_names
    }

    print("\n" + "="*50)
    print("TEST RESULTS")
    print("="*50)
    print(f"Test Loss: {test_loss:.4f}")
    print(f"Test Accuracy: {test_acc:.4f} ({correct}/{total})")
    print("\nPer-Class Accuracy:")
    for class_name in class_names:
        print(f" {class_name}: {class_accuracy[class_name]:.4f} ({class_correct[class_name]}/{class_total[class_name]})")

    return {
        'test_loss': test_loss,
        'test_accuracy': test_acc,
        'class_accuracy': class_accuracy,
        'predictions': all_preds,
        'targets': all_targets,
        'probabilities': all_probs
    }


In [None]:
model.load_state_dict(torch.load(os.path.join(config['checkpoint_dir'], 'best_model.pth')))

# Test the model
test_results = test_model(model, test_loader, criterion, class_names, device)

# Log final test results to wandb
wandb.log({
    'test_loss': test_results['test_loss'],
    'test_accuracy': test_results['test_accuracy'],
    'confusion_matrix': wandb.plot.confusion_matrix(
        probs=None,
        y_true=test_results['targets'],
        preds=test_results['predictions'],
        class_names=class_names
    )
})

# Finish wandb run
wandb.finish()

print(f"Final Test Accuracy: {test_results['test_accuracy']:.2f}%")