This is the set up cell that checks for the environment and sets up the resnet model.

In [None]:
import torch
import torchvision.models as models
from torch import nn
import torch.optim as optim
from torch.optim.lr_scheduler import ReduceLROnPlateau
from torchvision import transforms
from torch.utils.data import Dataset, DataLoader
from PIL import Image
import os
import xml.etree.ElementTree as ET

# Print system information first
print(f"PyTorch version: {torch.__version__}")
print(f"CUDA available: {torch.cuda.is_available()}")
if torch.cuda.is_available():
    print(f"CUDA version: {torch.version.cuda}")
    print(f"GPU device: {torch.cuda.get_device_name()}")

# Initialize device without clearing cache
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

def setup_model():
    """
    Creates a ResNet18 model with careful initialization
    """
    print("\nInitializing model...")
    try:
        # Force model to initialize on CPU
        with torch.device('cpu'):
            # Load the model
            model = models.resnet18(pretrained=True)
            # Modify the final layer
            model.fc = nn.Linear(model.fc.in_features, 40)
            print("Model initialized successfully on CPU")
        return model
    except Exception as e:
        print(f"Error in model setup: {str(e)}")
        raise

this is the DataLoader class

In [None]:
class Stanford40Dataset(Dataset):
    def __init__(self, root_dir, split='train', transform=None):
        self.root_dir = root_dir
        self.transform = transform
        self.split = split

        # Load action classes
        with open(os.path.join(root_dir, 'ImageSplits/actions.txt'), 'r') as f:
            self.classes = [line.split()[0] for line in f.readlines()[1:]]
        self.class_to_idx = {cls: idx for idx, cls in enumerate(self.classes)}

        # Build list of all images and their corresponding actions
        self.images = []
        missing_xmls = []  # Keep track of missing XML files

        for action in self.classes:
            split_file = os.path.join(root_dir, f'ImageSplits/{action}_{split}.txt')
            with open(split_file, 'r') as f:
                image_ids = [os.path.splitext(line.strip())[0] for line in f.readlines()]

                # Check XML existence for each image before adding it
                for img_id in image_ids:
                    xml_path = os.path.join(root_dir, 'XMLAnnotations', f'{img_id}.xml')
                    if os.path.exists(xml_path):
                        self.images.append((action, img_id))
                    else:
                        missing_xmls.append((action, img_id))

        # Print summary of missing XML files
        if missing_xmls:
            print(f"Warning: Found {len(missing_xmls)} images without XML annotations")
            print("First few missing XMLs:")
            for action, img_id in missing_xmls[:5]:
                print(f"  - Missing XML for {action}: {img_id}")
            print(f"Total images retained: {len(self.images)}")

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        action, img_id = self.images[idx]

        # Load image
        img_path = os.path.join(self.root_dir, 'JPEGImages', f'{img_id}.jpg')
        if not os.path.exists(img_path):
            raise FileNotFoundError(f"Image file not found: {img_path}")

        image = Image.open(img_path).convert('RGB')

        # Load bounding box (we know XML exists because we checked in __init__)
        xml_path = os.path.join(self.root_dir, 'XMLAnnotations', f'{img_id}.xml')
        try:
            tree = ET.parse(xml_path)
            root = tree.getroot()
            bbox = root.find('object/bndbox')

            if bbox is None:
                raise ValueError(f"No bounding box found in {xml_path}")

            box = [
                float(bbox.find('xmin').text),
                float(bbox.find('ymin').text),
                float(bbox.find('xmax').text),
                float(bbox.find('ymax').text)
            ]

            # Validate box coordinates
            if not (0 <= box[0] <= box[2] and 0 <= box[1] <= box[3]):
                raise ValueError(f"Invalid box coordinates in {xml_path}: {box}")

        except ET.ParseError as e:
            raise ValueError(f"XML parsing error in {xml_path}: {str(e)}")

        # Original image dimensions for scaling
        orig_width, orig_height = image.size

        # Apply transforms
        if self.transform:
            image = self.transform(image)

            # Scale bounding box coordinates
            scale_x = 224.0 / orig_width
            scale_y = 224.0 / orig_height
            box = [
                box[0] * scale_x,
                box[1] * scale_y,
                box[2] * scale_x,
                box[3] * scale_y
            ]

        return image, self.class_to_idx[action], torch.tensor(box)

# Usage example:
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

dataset = Stanford40Dataset('/content/drive/MyDrive/Stanford40', split='train', transform=transform)
dataloader = DataLoader(dataset, batch_size=32, shuffle=True)

def inspect_dataset(dataset):

    print("\n1. Number of Images:")
    print(f"Total images: {len(dataset)}")

    # Check class distribution
    print("\n5. Checking Class Distribution:")
    class_counts = {}
    for action, _ in dataset.images:
        class_counts[action] = class_counts.get(action, 0) + 1
    print("Images per class (first 5 classes):")
    for class_name, count in list(class_counts.items()):
        print(f"{class_name}: {count} images")

# Run the inspection
inspect_dataset(dataset)

this is the test for checking wether the bounding boxes scale and descale works

In [None]:
import matplotlib.pyplot as plt
import numpy as np
import random
import torchvision.transforms as T

def show_images_with_boxes(dataset, num_images=5):
    """
    Display random images from the dataset with their bounding boxes and class labels.

    Args:
        dataset: The Stanford40Dataset instance
        num_images: Number of random images to display
    """
    # Create a figure with subplots
    fig, axes = plt.subplots(1, num_images, figsize=(20, 4))

    # Get random indices
    total_images = len(dataset)
    random_indices = random.sample(range(total_images), num_images)

    # Helper function to denormalize the image
    denormalize = T.Compose([
        T.Normalize(mean=[0, 0, 0], std=[1/0.229, 1/0.224, 1/0.225]),
        T.Normalize(mean=[-0.485, -0.456, -0.406], std=[1, 1, 1]),
    ])

    for idx, ax in zip(random_indices, axes):
        # Get image and its data
        image, class_idx, bbox = dataset[idx]

        # Denormalize the image
        image = denormalize(image)

        # Convert to numpy for displaying
        img_np = image.permute(1, 2, 0).numpy()
        img_np = np.clip(img_np, 0, 1)  # Clip values to valid range

        # Display the image
        ax.imshow(img_np)

        # Draw the bounding box
        x_min, y_min, x_max, y_max = bbox.numpy()
        width = x_max - x_min
        height = y_max - y_min

        # Create rectangle patch
        rect = plt.Rectangle((x_min, y_min), width, height,
                           fill=False, color='red', linewidth=2)
        ax.add_patch(rect)

        # Add class label
        class_name = [k for k, v in dataset.class_to_idx.items() if v == class_idx][0]
        ax.set_title(f"Class: {class_name}")

        # Remove axes ticks
        ax.set_xticks([])
        ax.set_yticks([])

    plt.tight_layout()
    plt.show()

# Use the function
show_images_with_boxes(dataset)

those 2 cells were actualy used in training and starting the procees

In [None]:
def train_epoch(model, dataloader, criterion, optimizer, device):
    """
    Train the model for one epoch with real-time monitoring
    """
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0
    batch_start_time = time.time()

    for batch_idx, (inputs, labels, _) in enumerate(dataloader):
        inputs, labels = inputs.to(device), labels.to(device)

        # Forward pass and optimization
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        # Batch statistics
        running_loss += loss.item()
        _, predicted = outputs.max(1)
        total += labels.size(0)
        correct += predicted.eq(labels).sum().item()

        # Real-time monitoring (every 10 batches)
        if batch_idx % 10 == 0:
            batch_time = time.time() - batch_start_time
            current_loss = running_loss / (batch_idx + 1)
            current_acc = 100. * correct / total

            print(f'Batch {batch_idx}/{len(dataloader)} | '
                  f'Time: {batch_time:.2f}s | '
                  f'Loss: {current_loss:.4f} | '
                  f'Acc: {current_acc:.2f}% | '
                  f'GPU Mem: {torch.cuda.memory_allocated()/1024**2:.0f}MB')

            batch_start_time = time.time()

    return running_loss / len(dataloader), 100. * correct / total

def train_model(model, train_loader, criterion, optimizer, device, num_epochs=10, save_dir='model_checkpoints'):
    """
    Simplified training pipeline without validation
    """
    os.makedirs(save_dir, exist_ok=True)
    training_history = []

    for epoch in range(num_epochs):
        epoch_start_time = time.time()

        # Training phase
        train_loss, train_acc = train_epoch(model, train_loader, criterion, optimizer, device)

        # Save training history
        history = {
            'epoch': epoch + 1,
            'train_loss': train_loss,
            'train_acc': train_acc,
            'lr': optimizer.param_groups[0]['lr']
        }
        training_history.append(history)

        # Save model periodically
        if (epoch + 1) % 5 == 0:  # Save every 5 epochs
            torch.save({
                'epoch': epoch,
                'model_state_dict': model.state_dict(),
                'optimizer_state_dict': optimizer.state_dict(),
                'train_loss': train_loss,
                'training_history': training_history
            }, os.path.join(save_dir, f'model_epoch_{epoch+1}.pth'))

        # Epoch summary
        epoch_time = time.time() - epoch_start_time
        print(f'\nEpoch {epoch+1} Summary:')
        print(f'Time: {epoch_time:.2f}s')
        print(f'Loss: {train_loss:.4f}')
        print(f'Accuracy: {train_acc:.2f}%')
        print(f'Learning Rate: {optimizer.param_groups[0]["lr"]}')
        print('-' * 50)

    return training_history

In [None]:
import time
from datetime import datetime
import torch.cuda as cuda

# Set up data transformations
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

# Print dataset information
print("Setting up dataset and model...")
train_dataset = Stanford40Dataset('/content/drive/MyDrive/Stanford40', split='train', transform=transform)
print(f"Training samples: {len(train_dataset)}")

# Create dataloader
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)

# Setup for training
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"\nUsing device: {device}")

if device.type == 'cuda':
    print(f"GPU: {torch.cuda.get_device_name(0)}")
    print(f"Memory allocated: {torch.cuda.memory_allocated(0)/1024**2:.2f} MB")
    print(f"Memory cached: {torch.cuda.memory_reserved(0)/1024**2:.2f} MB")

model = setup_model().to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Calculate total parameters
total_params = sum(p.numel() for p in model.parameters())
trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
print(f"\nModel parameters:")
print(f"Total parameters: {total_params:,}")
print(f"Trainable parameters: {trainable_params:,}")

# Print training configuration
print("\nTraining configuration:")
print(f"Batch size: 32")
print(f"Initial learning rate: 0.001")
print(f"Optimizer: Adam")
print(f"Loss function: CrossEntropyLoss")

# Start training
print("\nStarting training...")
start_time = time.time()
start_datetime = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"Start time: {start_datetime}")

training_history = train_model(model, train_loader, criterion, optimizer, device)

# Print training summary
end_time = time.time()
elapsed_time = end_time - start_time
hours = int(elapsed_time // 3600)
minutes = int((elapsed_time % 3600) // 60)
seconds = int(elapsed_time % 60)

print("\nTraining completed!")
print(f"Total training time: {hours}h {minutes}m {seconds}s")
print(f"Final training loss: {training_history[-1]['train_loss']:.4f}")
print(f"Final training accuracy: {training_history[-1]['train_acc']:.2f}%")

# If using GPU, print final memory usage
if device.type == 'cuda':
    print(f"\nFinal GPU memory usage:")
    print(f"Memory allocated: {torch.cuda.memory_allocated(0)/1024**2:.2f} MB")
    print(f"Memory cached: {torch.cuda.memory_reserved(0)/1024**2:.2f} MB")

# Save final training summary
summary_file = os.path.join('model_checkpoints', 'training_summary.txt')
with open(summary_file, 'w') as f:
    f.write(f"Training Summary\n")
    f.write(f"===============\n")
    f.write(f"Start time: {start_datetime}\n")
    f.write(f"Training duration: {hours}h {minutes}m {seconds}s\n")
    f.write(f"Training samples: {len(train_dataset)}\n")
    f.write(f"Final training loss: {training_history[-1]['train_loss']:.4f}\n")
    f.write(f"Final training accuracy: {training_history[-1]['train_acc']:.2f}%\n")

Validation


In [None]:
import torch
from torch.utils.data import DataLoader
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import confusion_matrix, classification_report
import numpy as np
import time
from tqdm import tqdm

def validate_model(model, test_loader, criterion, device, classes):
    """
    Validates the model on test data and returns detailed metrics
    
    Args:
        model: The trained PyTorch model
        test_loader: DataLoader for test data
        criterion: Loss function
        device: Device to run validation on
        classes: List of class names
    
    Returns:
        dict: Dictionary containing various validation metrics
    """
    model.eval()
    test_loss = 0
    correct = 0
    total = 0
    all_preds = []
    all_labels = []
    class_correct = [0] * len(classes)
    class_total = [0] * len(classes)
    
    with torch.no_grad():
        for inputs, labels, boxes in tqdm(test_loader, desc="Validating"):
            inputs, labels = inputs.to(device), labels.to(device)
            
            # Forward pass
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            
            # Accumulate loss
            test_loss += loss.item()
            
            # Get predictions
            _, predicted = outputs.max(1)
            total += labels.size(0)
            correct += predicted.eq(labels).sum().item()
            
            # Accumulate predictions for confusion matrix
            all_preds.extend(predicted.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())
            
            # Per-class accuracy
            for label, pred in zip(labels, predicted):
                if label == pred:
                    class_correct[label] += 1
                class_total[label] += 1
    
    # Calculate metrics
    avg_loss = test_loss / len(test_loader)
    accuracy = 100. * correct / total
    
    # Per-class accuracies
    class_accuracies = {}
    for i in range(len(classes)):
        if class_total[i] > 0:
            class_accuracies[classes[i]] = 100 * class_correct[i] / class_total[i]
    
    # Generate confusion matrix
    cm = confusion_matrix(all_labels, all_preds)
    
    # Generate classification report
    report = classification_report(all_labels, all_preds, 
                                 target_names=classes, 
                                 output_dict=True)
    
    return {
        'loss': avg_loss,
        'accuracy': accuracy,
        'class_accuracies': class_accuracies,
        'confusion_matrix': cm,
        'classification_report': report,
        'predictions': all_preds,
        'true_labels': all_labels
    }

def plot_validation_results(results, classes, save_dir='validation_results'):
    """
    Creates and saves visualization plots for validation results
    """
    os.makedirs(save_dir, exist_ok=True)
    
    # 1. Plot confusion matrix
    plt.figure(figsize=(20, 20))
    sns.heatmap(results['confusion_matrix'], 
                xticklabels=classes,
                yticklabels=classes,
                annot=True, fmt='d', cmap='Blues')
    plt.title('Confusion Matrix')
    plt.xlabel('Predicted')
    plt.ylabel('True')
    plt.xticks(rotation=45)
    plt.yticks(rotation=45)
    plt.tight_layout()
    plt.savefig(f'{save_dir}/confusion_matrix.png')
    plt.close()
    
    # 2. Plot per-class accuracies
    plt.figure(figsize=(15, 8))
    accuracies = list(results['class_accuracies'].values())
    class_names = list(results['class_accuracies'].keys())
    plt.bar(class_names, accuracies)
    plt.title('Per-Class Accuracies')
    plt.xlabel('Classes')
    plt.ylabel('Accuracy (%)')
    plt.xticks(rotation=45)
    plt.tight_layout()
    plt.savefig(f'{save_dir}/class_accuracies.png')
    plt.close()

def save_validation_report(results, save_dir='validation_results'):
    """
    Saves validation results to a detailed report file
    """
    os.makedirs(save_dir, exist_ok=True)
    
    with open(f'{save_dir}/validation_report.txt', 'w') as f:
        f.write("Model Validation Report\n")
        f.write("=====================\n\n")
        
        f.write(f"Overall Accuracy: {results['accuracy']:.2f}%\n")
        f.write(f"Average Loss: {results['loss']:.4f}\n\n")
        
        f.write("Per-Class Performance:\n")
        f.write("--------------------\n")
        for class_name, accuracy in results['class_accuracies'].items():
            f.write(f"{class_name}: {accuracy:.2f}%\n")
        
        f.write("\nDetailed Classification Report:\n")
        f.write("----------------------------\n")
        report = results['classification_report']
        for class_name, metrics in report.items():
            if isinstance(metrics, dict):
                f.write(f"\n{class_name}:\n")
                f.write(f"  Precision: {metrics['precision']:.3f}\n")
                f.write(f"  Recall: {metrics['recall']:.3f}\n")
                f.write(f"  F1-score: {metrics['f1-score']:.3f}\n")
                f.write(f"  Support: {metrics['support']}\n")

# Main validation pipeline
def run_validation(model_path, test_dataset, device, batch_size=32):
    """
    Runs the complete validation pipeline
    """
    # Load the trained model
    model = setup_model().to(device)
    checkpoint = torch.load(model_path, map_location=device)
    model.load_state_dict(checkpoint['model_state_dict'])
    
    # Create test dataloader
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)
    
    # Set up criterion
    criterion = nn.CrossEntropyLoss()
    
    # Run validation
    print("Starting validation...")
    results = validate_model(model, test_loader, criterion, device, test_dataset.classes)
    
    # Create visualizations and save results
    print("Generating validation reports and visualizations...")
    plot_validation_results(results, test_dataset.classes)
    save_validation_report(results)
    
    print(f"\nValidation completed!")
    print(f"Overall Accuracy: {results['accuracy']:.2f}%")
    print(f"Average Loss: {results['loss']:.4f}")
    print("\nDetailed results have been saved to the validation_results directory.")
    
    return results

In [None]:
test_dataset = Stanford40Dataset('/content/drive/MyDrive/Stanford40', split='test', transform=transform)
model_path = '/content/model_epoch_10.pth'
validation_results = run_validation(model_path, test_dataset, device)

Validation completed!
Overall Accuracy: 51.54%
Average Loss: 2.1094