In [1]:
%%writefile ddp_benchmark.py
import os
import time
import subprocess
import pandas as pd
from PIL import Image
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torch.multiprocessing as mp
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data import Dataset, DataLoader
from torch.utils.data.distributed import DistributedSampler
from torchvision import transforms
from sklearn.model_selection import train_test_split
import socket
import warnings

# --- WARNING SUPPRESSION ---
warnings.filterwarnings("ignore") 

# ==========================================
# 1. SETUP
# ==========================================

def find_free_port():
    """Finds a random open port on the machine."""
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
        s.bind(('', 0)) # Bind to port 0 lets the OS pick an available port
        return s.getsockname()[1]

def setup(rank, world_size, port):
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = str(port)
    dist.init_process_group("nccl", rank=rank, world_size=world_size)

def cleanup():
    dist.destroy_process_group()

def get_gpu_utilization(device_id):
    try:
        result = subprocess.check_output(
            ['nvidia-smi', f'--id={device_id}', '--query-gpu=utilization.gpu', '--format=csv,noheader,nounits'],
            encoding='utf-8'
        )
        return float(result.strip())
    except Exception:
        return 0.0

# ==========================================
# 2. DATA & MODEL
# ==========================================

class OralCancerDataset(Dataset):
    def __init__(self, dataframe, path_map, transform=None):
        self.data = dataframe
        self.path_map = path_map
        self.transform = transform

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        img_id = self.data.iloc[idx]['id']
        label = int(self.data.iloc[idx]['label'])
        
        img_path = self.path_map.get(img_id)
        if img_path is None:
            image = Image.new('RGB', (96, 96)) 
        else:
            image = Image.open(img_path).convert('RGB')
        
        if self.transform:
            image = self.transform(image)

        return image, label

class SimpleCNN(nn.Module):
    def __init__(self):
        super(SimpleCNN, self).__init__()
        self.conv1 = nn.Conv2d(3, 32, 3, padding=1)
        self.bn1 = nn.BatchNorm2d(32)
        self.conv2 = nn.Conv2d(32, 64, 3, padding=1)
        self.bn2 = nn.BatchNorm2d(64)
        self.conv3 = nn.Conv2d(64, 128, 3, padding=1)
        self.bn3 = nn.BatchNorm2d(128)
        self.pool = nn.MaxPool2d(2, 2)
        self.fc1 = nn.Linear(128 * 12 * 12, 256)
        self.dropout = nn.Dropout(0.5)
        self.fc2 = nn.Linear(256, 1)

    def forward(self, x):
        x = self.pool(F.relu(self.bn1(self.conv1(x))))
        x = self.pool(F.relu(self.bn2(self.conv2(x))))
        x = self.pool(F.relu(self.bn3(self.conv3(x))))
        x = x.view(-1, 128 * 12 * 12)
        x = F.relu(self.fc1(x))
        x = self.dropout(x)
        x = self.fc2(x)
        return x

# ==========================================
# 3. UNIVERSAL TRAINER (With Checkpointing)
# ==========================================

def ddp_train_process(rank, world_size, df, path_map, batch_size, num_epochs, use_amp, shared_list, port):
    setup(rank, world_size, port)
    
    # --- Checkpoint Directory Setup ---
    ckpt_dir = "checkpoints_for_ddp"
    if rank == 0:
        os.makedirs(ckpt_dir, exist_ok=True)
    
    # Create a unique filename for this specific configuration
    # e.g., ckpt_gpu2_batch128_amp.pth
    precision_tag = "amp" if use_amp else "fp32"
    ckpt_filename = f"ckpt_gpu{world_size}_batch{batch_size}_{precision_tag}.pth"
    ckpt_path = os.path.join(ckpt_dir, ckpt_filename)
    
    # --- Hardware Info ---
    gpu_name = torch.cuda.get_device_name(rank)
    gpu_props = torch.cuda.get_device_properties(rank)
    total_mem_mb = gpu_props.total_memory / (1024**2)
    precision_str = "AMP" if use_amp else "FP32"
    
    # --- Data ---
    IMG_SIZE = 96
    train_transforms = transforms.Compose([
        transforms.Resize((IMG_SIZE, IMG_SIZE)),
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
        transforms.Normalize([0.5]*3, [0.5]*3)
    ])
    val_transforms = transforms.Compose([
        transforms.Resize((IMG_SIZE, IMG_SIZE)),
        transforms.ToTensor(),
        transforms.Normalize([0.5]*3, [0.5]*3)
    ])
    
    train_df, val_df = train_test_split(df, test_size=0.2, random_state=42, stratify=df['label'])
    train_dataset = OralCancerDataset(train_df, path_map, transform=train_transforms)
    val_dataset = OralCancerDataset(val_df, path_map, transform=val_transforms)
    
    train_sampler = DistributedSampler(train_dataset, num_replicas=world_size, rank=rank)
    val_sampler = DistributedSampler(val_dataset, num_replicas=world_size, rank=rank, shuffle=False)
    
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=False, num_workers=0, sampler=train_sampler)
    val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, num_workers=0, sampler=val_sampler)

    # --- Model & Optimizer ---
    model = SimpleCNN().to(rank)
    model = DDP(model, device_ids=[rank])
    loss_function = nn.BCEWithLogitsLoss()
    optimizer = optim.Adam(model.parameters(), lr=0.001)
    
    scaler = torch.cuda.amp.GradScaler() if use_amp else None
    
    # --- CHECKPOINT LOADING LOGIC ---
    start_epoch = 0
    if os.path.exists(ckpt_path):
        # We load to the specific GPU device (rank)
        map_location = f"cuda:{rank}"
        checkpoint = torch.load(ckpt_path, map_location=map_location)
        
        model.module.load_state_dict(checkpoint['model_state_dict'])
        optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
        if use_amp and checkpoint.get('scaler_state_dict'):
            scaler.load_state_dict(checkpoint['scaler_state_dict'])
            
        start_epoch = checkpoint['epoch'] + 1
        
        if rank == 0:
            print(f"   [Rank 0] Found checkpoint {ckpt_filename}. Resuming from Epoch {start_epoch+1}")
    
    # Ensure all ranks wait for the load to finish
    dist.barrier()

    if start_epoch < num_epochs:
        if rank == 0:
            print(f"   [Rank {rank}] Ready | {precision_str} | {gpu_name}")
    else:
        if rank == 0:
            print(f"   [Rank {rank}] Experiment finished in previous run. Skipping.")

    # --- Training Loop ---
    for epoch in range(start_epoch, num_epochs):
        train_sampler.set_epoch(epoch)
        torch.cuda.reset_peak_memory_stats(rank)
        
        model.train()
        train_loss = 0.0
        train_correct = 0
        train_total = 0
        start_time = time.time()
        
        for images, labels in train_loader:
            images = images.to(rank)
            labels = labels.float().unsqueeze(1).to(rank)
            optimizer.zero_grad()
            
            if use_amp:
                with torch.cuda.amp.autocast():
                    outputs = model(images)
                    loss = loss_function(outputs, labels)
                scaler.scale(loss).backward()
                scaler.step(optimizer)
                scaler.update()
            else:
                outputs = model(images)
                loss = loss_function(outputs, labels)
                loss.backward()
                optimizer.step()
            
            train_loss += loss.item() * images.size(0)
            probs = torch.sigmoid(outputs)
            preds = probs >= 0.5
            train_correct += (preds == labels).sum().item()
            train_total += labels.size(0)
            
        epoch_time = time.time() - start_time
        
        # Stats
        max_mem_mb = torch.cuda.max_memory_allocated(rank) / (1024**2)
        mem_util_percent = (max_mem_mb / total_mem_mb) * 100
        gpu_util_percent = get_gpu_utilization(rank)
        util_str = f"GPU:{gpu_util_percent:.1f}% / Mem:{mem_util_percent:.1f}%"
        
        model.eval()
        val_loss = 0.0
        val_correct = 0
        val_total = 0
        
        with torch.no_grad():
            for images, labels in val_loader:
                images = images.to(rank)
                labels = labels.float().unsqueeze(1).to(rank)
                
                if use_amp:
                    with torch.cuda.amp.autocast():
                        outputs = model(images)
                        loss = loss_function(outputs, labels)
                else:
                    outputs = model(images)
                    loss = loss_function(outputs, labels)
                
                val_loss += loss.item() * images.size(0)
                probs = torch.sigmoid(outputs)
                preds = probs >= 0.5
                val_correct += (preds == labels).sum().item()
                val_total += labels.size(0)

        # Sync Metrics
        metrics_tensor = torch.tensor([train_loss, train_correct, train_total, val_loss, val_correct, val_total], device=rank)
        dist.all_reduce(metrics_tensor, op=dist.ReduceOp.SUM)
        
        global_train_loss = metrics_tensor[0].item() / metrics_tensor[2].item()
        global_train_acc  = metrics_tensor[1].item() / metrics_tensor[2].item()
        global_val_loss   = metrics_tensor[3].item() / metrics_tensor[5].item()
        global_val_acc    = metrics_tensor[4].item() / metrics_tensor[5].item()
        total_samples     = metrics_tensor[2].item()
        train_throughput = total_samples / epoch_time

        if rank == 0:
            print(f"   >>> [GLOBAL] {precision_str} | Epoch {epoch+1}: Acc={global_train_acc:.4f} | Throughput={train_throughput:.1f} img/s")
            
            # --- SAVE CHECKPOINT ---
            print(f"   ... Saving checkpoint to {ckpt_filename} ...")
            torch.save({
                'epoch': epoch,
                'model_state_dict': model.module.state_dict(),
                'optimizer_state_dict': optimizer.state_dict(),
                'scaler_state_dict': scaler.state_dict() if use_amp else None,
            }, ckpt_path)
            
            # --- LOG TO LIST ---
            shared_list.append({
                'precision': 'AMP' if use_amp else 'FP32', 
                'gpu_count': world_size,
                'batch_size': batch_size,
                'epoch': epoch + 1,
                'train_acc': global_train_acc,
                'val_acc': global_val_acc,
                'train_loss': global_train_loss,
                'val_loss': global_val_loss,
                'train_throughput': train_throughput,
                'epoch_time': epoch_time,
                'gpu_util/mem_util': util_str,
                'gpu_name': gpu_name,
                'each_gpu_memory': f"{total_mem_mb:.0f}MB"
            })
            
            # --- INCREMENTAL CSV BACKUP (Optional but good) ---
            # This appends to a file so you don't lose data if the full script crashes later
            backup_file = "benchmark_master_results_for_ddp_backup.csv"
            header = not os.path.exists(backup_file)
            temp_df = pd.DataFrame([shared_list[-1]])
            temp_df.to_csv(backup_file, mode='a', header=header, index=False)

    cleanup()

# ==========================================
# 4. MAIN LOOP
# ==========================================

if __name__ == '__main__':
    print("Loading Data...")
    df = pd.read_csv('oral_cancer_balanced.csv')
    
    path_map = {}
    for root, dirs, files in os.walk('Data'):
        if 'val' in dirs: dirs.remove('val')
        for file in files:
            if file.lower().endswith(('.jpg', '.jpeg')) and 'val' not in root:
                path_map[file] = os.path.join(root, file)

    max_gpus = torch.cuda.device_count()
    
    target_gpu_counts = [1, 2,4]
    gpu_counts = [g for g in target_gpu_counts if g <= max_gpus]
    batch_sizes = [64, 128, 512]
    amp_settings = [False]
    epochs_to_test = 10
    
    manager = mp.Manager()
    shared_metrics = manager.list()
    
    print(f"Available GPUs: {max_gpus}")
    print("Starting Combined Benchmark Loop with Checkpointing...")
    
    for n_gpus in gpu_counts:
        for b_size in batch_sizes:
            for use_amp in amp_settings:
                
                # --- AUTO-FIND FREE PORT ---
                current_port = find_free_port()
                
                mode_name = "DDP+AMP" if use_amp else "DDP (Standard)"
                print(f"\n--- Running: {n_gpus} GPU(s) | Batch {b_size} | {mode_name} | Port {current_port} ---")
                
                try:
                    mp.spawn(
                        ddp_train_process,
                        args=(n_gpus, df, path_map, b_size, epochs_to_test, use_amp, shared_metrics, current_port),
                        nprocs=n_gpus,
                        join=True
                    )
                    time.sleep(3) 
                    
                except Exception as e:
                    print(f"Error: {e}")

    print("\nSaving Master CSV...")
    if len(shared_metrics) > 0:
        results_df = pd.DataFrame(list(shared_metrics))
        cols = [
            'precision', 'gpu_count', 'batch_size', 'epoch', 
            'train_acc', 'val_acc', 'train_loss', 'val_loss', 
            'train_throughput', 'epoch_time', 'gpu_util/mem_util',
            'gpu_name', 'each_gpu_memory'
        ]
        results_df = results_df[cols]
        results_df.to_csv('benchmark_master_results_for_ddp.csv', index=False)
        print("Saved to 'benchmark_master_results_for_ddp.csv'")
        print(results_df.head(10))
    else:
        print("No results collected due to errors. Check 'benchmark_master_results_ddp_backup.csv' for partial data.")

Overwriting ddp_benchmark.py


In [2]:
!python ddp_benchmark.py

Loading Data...
Available GPUs: 4
Starting Combined Benchmark Loop with Checkpointing...

--- Running: 1 GPU(s) | Batch 64 | DDP (Standard) | Port 40137 ---
   [Rank 0] Found checkpoint ckpt_gpu1_batch64_fp32.pth. Resuming from Epoch 11
   [Rank 0] Experiment finished in previous run. Skipping.

--- Running: 1 GPU(s) | Batch 128 | DDP (Standard) | Port 34735 ---
   [Rank 0] Found checkpoint ckpt_gpu1_batch128_fp32.pth. Resuming from Epoch 11
   [Rank 0] Experiment finished in previous run. Skipping.

--- Running: 1 GPU(s) | Batch 512 | DDP (Standard) | Port 55445 ---
   [Rank 0] Found checkpoint ckpt_gpu1_batch512_fp32.pth. Resuming from Epoch 2
   [Rank 0] Ready | FP32 | Tesla P100-PCIE-12GB
   >>> [GLOBAL] FP32 | Epoch 2: Acc=0.8438 | Throughput=556.8 img/s
   ... Saving checkpoint to ckpt_gpu1_batch512_fp32.pth ...
   >>> [GLOBAL] FP32 | Epoch 3: Acc=0.8605 | Throughput=572.6 img/s
   ... Saving checkpoint to ckpt_gpu1_batch512_fp32.pth ...
   >>> [GLOBAL] FP32 | Epoch 4: Acc=0.8711