In [None]:
# Cell 1: Imports, configuration, GPU detection, and GPU sampler
import os
import math
import time
import csv
import random
import threading
import subprocess
from datetime import datetime, timezone
from typing import List, Dict, Optional

import torch
import torch.nn as nn
import torch.optim as optim

import warnings
warnings.filterwarnings("ignore", category=FutureWarning)
warnings.filterwarnings("ignore", category=UserWarning, module="tqdm")
warnings.filterwarnings("ignore", category=DeprecationWarning, module="tqdm")

from torch.utils.data import Dataset, DataLoader
from torch.cuda.amp import autocast, GradScaler
from tqdm.notebook import tqdm

# -----------------------------
# PARAMS (tunable in one place)
# -----------------------------
PARAMS = {
    "EPOCHS": 5,
    "BATCH_SIZE": 512,
    "IMAGE_SIZE": 224,
    "NUM_CLASSES": 1000,
    "NUM_SAMPLES": 200_000,
    "MODEL_SIZE": "small",
    "USE_AMP": True,
    "GRAD_ACCUM_STEPS": 1,
    "NUM_GPUS_TO_USE": None,
    "WARMUP_STEPS": 500,
    "BASE_LR": 0.1,
    "WEIGHT_DECAY": 1e-4,
    "MOMENTUM": 0.9,
    "AUGMENT_NOISE": True,
    "LABEL_SMOOTHING": 0.05,
    "VARY_BATCH_SIZE_SCHEDULE": True,
    "BATCH_SCHEDULE_MULTIPLIERS": [1, 1, 2, 2, 3],
    "UTIL_SAMPLING_INTERVAL_SEC": 1.0,
    "CSV_PATH": "/opt/app-root/src/data/gpu_samples.csv",
    "SEED": 42
}

def set_seed(seed=42):
    random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)

set_seed(PARAMS["SEED"])

IS_CUDA = torch.cuda.is_available()
WORLD_SIZE = int(os.environ.get("WORLD_SIZE", "1"))
USING_DDP = WORLD_SIZE > 1

if IS_CUDA:
    torch.backends.cudnn.benchmark = True

def gpu_inventory():
    count = torch.cuda.device_count() if IS_CUDA else 0
    print(f"CUDA available: {IS_CUDA} | Visible GPUs: {count}")
    if count > 0:
        for i in range(count):
            props = torch.cuda.get_device_properties(i)
            total_mem_gb = props.total_memory / (1024**3)
            print(f"  GPU {i}: {props.name} | {total_mem_gb:.2f} GB")
    return count

VISIBLE_GPUS = gpu_inventory()
if PARAMS["NUM_GPUS_TO_USE"] is None:
    PARAMS["NUM_GPUS_TO_USE"] = VISIBLE_GPUS
NUM_GPUS_TO_USE = min(PARAMS["NUM_GPUS_TO_USE"], VISIBLE_GPUS) if IS_CUDA else 0

if USING_DDP and NUM_GPUS_TO_USE > 0:
    print("DDP environment detected. DataParallel skipped.")

if NUM_GPUS_TO_USE > 1 and not USING_DDP:
    print(f"Using DataParallel across {NUM_GPUS_TO_USE} GPUs.")
elif NUM_GPUS_TO_USE == 1:
    print("Running on single GPU.")
elif NUM_GPUS_TO_USE == 0:
    print("Running on CPU.")

PRIMARY_DEVICE = torch.device("cuda:0") if NUM_GPUS_TO_USE > 0 else torch.device("cpu")

class GPUSampler:
    def __init__(self, interval_sec: float = 1.0):
        self.interval = interval_sec
        self._stop = threading.Event()
        self.samples: List[Dict] = []
        self._thread: Optional[threading.Thread] = None
        self.use_pynvml = False
        self.nvidia_smi_ok = False
        self._init_backends()

    def _init_backends(self):
        if not IS_CUDA or torch.cuda.device_count() == 0:
            return
        try:
            import pynvml
            pynvml.nvmlInit()
            self._pynvml = pynvml
            self.use_pynvml = True
            self._nvml_handles = [
                self._pynvml.nvmlDeviceGetHandleByIndex(i)
                for i in range(torch.cuda.device_count())
            ]
            print("GPUSampler: Using PyNVML.")
            return
        except Exception:
            self.use_pynvml = False
        try:
            subprocess.run(["nvidia-smi", "-L"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True)
            self.nvidia_smi_ok = True
            print("GPUSampler: Using nvidia-smi fallback.")
        except Exception:
            self.nvidia_smi_ok = False
            print("GPUSampler: GPU sampling disabled.")

    def start(self):
        if (not self.use_pynvml and not self.nvidia_smi_ok) or NUM_GPUS_TO_USE == 0:
            return
        self._stop.clear()
        self._thread = threading.Thread(target=self._run, daemon=True)
        self._thread.start()

    def stop(self):
        if self._thread is None:
            return
        self._stop.set()
        self._thread.join()

    def _run(self):
        while not self._stop.is_set():
            timestamp = datetime.now(timezone.utc).isoformat()
            try:
                if self.use_pynvml:
                    for i, handle in enumerate(self._nvml_handles):
                        util = self._pynvml.nvmlDeviceGetUtilizationRates(handle).gpu
                        mem = self._pynvml.nvmlDeviceGetMemoryInfo(handle)
                        self.samples.append({
                            "ts": timestamp, "gpu_index": i,
                            "utilization_pct": float(util),
                            "mem_used_mib": float(mem.used) / (1024**2),
                            "mem_total_mib": float(mem.total) / (1024**2),
                        })
                elif self.nvidia_smi_ok:
                    result = subprocess.run(
                        ["nvidia-smi", "--query-gpu=index,utilization.gpu,memory.used,memory.total",
                         "--format=csv,noheader,nounits"],
                        stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True, text=True
                    )
                    for line in result.stdout.strip().splitlines():
                        idx, util, mem_used, mem_total = [x.strip() for x in line.split(",")]
                        self.samples.append({
                            "ts": timestamp, "gpu_index": int(idx),
                            "utilization_pct": float(util),
                            "mem_used_mib": float(mem_used),
                            "mem_total_mib": float(mem_total),
                        })
            except Exception:
                pass
            time.sleep(self.interval)

    def save_csv(self, path: str):
        if len(self.samples) == 0:
            return
        os.makedirs(os.path.dirname(path), exist_ok=True)
        with open(path, "w", newline="") as f:
            writer = csv.DictWriter(f, fieldnames=["ts", "gpu_index", "utilization_pct", "mem_used_mib", "mem_total_mib"])
            writer.writeheader()
            writer.writerows(self.samples)

    def summarize(self):
        if len(self.samples) == 0:
            return {}
        by_gpu: Dict[int, List[Dict]] = {}
        for row in self.samples:
            by_gpu.setdefault(row["gpu_index"], []).append(row)
        summary = {}
        for gpu_idx, rows in by_gpu.items():
            avg_util = sum(r["utilization_pct"] for r in rows) / len(rows)
            max_util = max(r["utilization_pct"] for r in rows)
            avg_mem = sum(r["mem_used_mib"] for r in rows) / len(rows)
            max_mem = max(r["mem_used_mib"] for r in rows)
            summary[gpu_idx] = {
                "avg_util_pct": avg_util,
                "max_util_pct": max_util,
                "avg_mem_used_mib": avg_mem,
                "max_mem_used_mib": max_mem,
                "mem_total_mib": rows[-1]["mem_total_mib"],
            }
        return summary

gpu_sampler = GPUSampler(interval_sec=PARAMS["UTIL_SAMPLING_INTERVAL_SEC"])
print("Setup complete.")

In [None]:
# Cell 2: Model, synthetic dataset, dataloaders (with variable batch-size schedule support)

# -----------------------------
# Synthetic dataset
# -----------------------------
class RandomImageDataset(Dataset):
    def __init__(self, num_samples, image_size, num_classes, augment_noise=True):
        self.num_samples = num_samples
        self.image_size = image_size
        self.num_classes = num_classes
        self.augment_noise = augment_noise

    def __len__(self):
        return self.num_samples

    def __getitem__(self, idx):
        # Random images ~ N(0,1)
        x = torch.randn(3, self.image_size, self.image_size)
        if self.augment_noise:
            # small per-sample noise to change loss dynamics
            x += 0.01 * torch.randn_like(x)
        # Random labels each time (prevents memorization + non-constant loss)
        y = torch.randint(low=0, high=self.num_classes, size=(1,)).item()
        return x, y

# -----------------------------
# Simple scalable ConvNet
# -----------------------------
class ConvBlock(nn.Module):
    def __init__(self, in_ch, out_ch, k=3, s=1, p=1):
        super().__init__()
        self.net = nn.Sequential(
            nn.Conv2d(in_ch, out_ch, kernel_size=k, stride=s, padding=p, bias=False),
            nn.BatchNorm2d(out_ch),
            nn.ReLU(inplace=True)
        )
    def forward(self, x):
        return self.net(x)

class TinyCNN(nn.Module):
    def __init__(self, size="small", num_classes=1000):
        super().__init__()
        # width/depth scale by size
        cfg = {
            "tiny":  (32, 3),
            "small": (64, 4),
            "base":  (96, 6),
            "large": (128, 8),
        }[size]

        width, depth = cfg
        layers = []
        in_ch = 3
        for i in range(depth):
            out_ch = width * (2 ** (i // 2))  # grow slowly
            stride = 2 if i % 2 == 1 else 1   # occasional downsampling
            layers.append(ConvBlock(in_ch, out_ch, k=3, s=stride, p=1))
            in_ch = out_ch

        self.features = nn.Sequential(*layers)
        self.head = nn.Sequential(
            nn.AdaptiveAvgPool2d((1,1)),
            nn.Flatten(),
            nn.Linear(in_ch, num_classes)
        )

    def forward(self, x):
        x = self.features(x)
        return self.head(x)

# -----------------------------
# Dataloader factory (supports epoch-specific batch size)
# -----------------------------
def make_loader(dataset, batch_size, use_cuda):
    return DataLoader(
        dataset,
        batch_size=batch_size,
        shuffle=True,
        drop_last=True,
        num_workers=4 if use_cuda else 2,
        pin_memory=use_cuda
    )

dataset = RandomImageDataset(
    num_samples=PARAMS["NUM_SAMPLES"],
    image_size=PARAMS["IMAGE_SIZE"],
    num_classes=PARAMS["NUM_CLASSES"],
    augment_noise=PARAMS["AUGMENT_NOISE"],
)

# Prepare initial loader (may be replaced per-epoch if schedule enabled)
current_batch_size = PARAMS["BATCH_SIZE"]
train_loader = make_loader(dataset, current_batch_size, IS_CUDA)

# Build model
model = TinyCNN(size=PARAMS["MODEL_SIZE"], num_classes=PARAMS["NUM_CLASSES"])

# DataParallel wrapping if multi-GPU (and not DDP)
if NUM_GPUS_TO_USE > 1 and not USING_DDP:
    device_ids = list(range(NUM_GPUS_TO_USE))
    model = nn.DataParallel(model, device_ids=device_ids)
model = model.to(PRIMARY_DEVICE)

# Optimizer and Loss
optimizer = optim.SGD(model.parameters(),
                      lr=PARAMS["BASE_LR"],
                      momentum=PARAMS["MOMENTUM"],
                      weight_decay=PARAMS["WEIGHT_DECAY"])
criterion = nn.CrossEntropyLoss(label_smoothing=PARAMS["LABEL_SMOOTHING"])

# AMP scaler
scaler = GradScaler(enabled=PARAMS["USE_AMP"])

print("Model and dataloader ready.")


In [None]:
# Cell 3: Training loop with progress bar, LR schedule, grad accumulation, memory/metering

def cosine_after_warmup(step, warmup_steps, total_steps, base_lr):
    if step < warmup_steps:
        return base_lr * float(step + 1) / float(max(1, warmup_steps))
    # cosine from warmup_end..total_steps
    t = (step - warmup_steps) / float(max(1, total_steps - warmup_steps))
    return base_lr * 0.5 * (1.0 + math.cos(math.pi * t))

def cuda_sync():
    if IS_CUDA:
        torch.cuda.synchronize()

def mem_snapshot_per_gpu(tag: str):
    if not IS_CUDA or NUM_GPUS_TO_USE == 0:
        return
    print(f"[Memory Snapshot] {tag}")
    for i in range(NUM_GPUS_TO_USE):
        torch.cuda.synchronize(i)
        allocated = torch.cuda.memory_allocated(i) / (1024**2)
        reserved = torch.cuda.memory_reserved(i) / (1024**2)
        max_alloc = torch.cuda.max_memory_allocated(i) / (1024**2)
        max_resv  = torch.cuda.max_memory_reserved(i) / (1024**2)
        print(f"  GPU {i}: allocated={allocated:.1f} MiB | reserved={reserved:.1f} MiB | "
              f"max_alloc={max_alloc:.1f} MiB | max_reserved={max_resv:.1f} MiB")

def train():
    global train_loader, current_batch_size

    steps_per_epoch = len(train_loader)
    total_steps = steps_per_epoch * PARAMS["EPOCHS"]
    step_idx = 0

    # Start GPU sampler
    gpu_sampler.start()

    overall_start = time.time()
    running_total_images = 0
    last_epoch_stats = {}

    for epoch in range(PARAMS["EPOCHS"]):
        # Optionally vary batch size to demonstrate memory changes
        if PARAMS["VARY_BATCH_SIZE_SCHEDULE"]:
            mult = PARAMS["BATCH_SCHEDULE_MULTIPLIERS"][epoch % len(PARAMS["BATCH_SCHEDULE_MULTIPLIERS"])]
            new_bs = max(1, PARAMS["BATCH_SIZE"] * mult)
            if new_bs != current_batch_size:
                current_batch_size = new_bs
                train_loader = make_loader(dataset, current_batch_size, IS_CUDA)
                steps_per_epoch = len(train_loader)
                total_steps = steps_per_epoch * (PARAMS["EPOCHS"] - epoch) + step_idx
                print(f"[Epoch {epoch+1}] Adjusted batch size to {current_batch_size}. Steps/epoch: {steps_per_epoch}")

        model.train()
        epoch_loss_sum = 0.0
        epoch_images = 0

        # Reset peak memory stats each epoch for clearer snapshots
        if IS_CUDA:
            for i in range(NUM_GPUS_TO_USE):
                torch.cuda.reset_peak_memory_stats(i)

        pbar = tqdm(enumerate(train_loader), total=len(train_loader), desc=f"Epoch {epoch+1}/{PARAMS['EPOCHS']}")
        epoch_start = time.time()

        optimizer.zero_grad(set_to_none=True)
        for step_in_epoch, (images, targets) in pbar:
            images = images.to(PRIMARY_DEVICE, non_blocking=True) if IS_CUDA else images
            targets = targets.to(PRIMARY_DEVICE, non_blocking=True) if IS_CUDA else targets

            # Compute scheduled LR
            lr = cosine_after_warmup(step_idx, PARAMS["WARMUP_STEPS"], total_steps, PARAMS["BASE_LR"])
            for pg in optimizer.param_groups:
                pg["lr"] = lr

            # Forward + loss (AMP optional)
            with autocast(enabled=PARAMS["USE_AMP"] and IS_CUDA):
                outputs = model(images)
                loss = criterion(outputs, targets)

            # Grad scaling/accumulation
            loss_to_report = loss.detach().item()
            epoch_loss_sum += loss_to_report
            batch_size_effective = images.size(0)
            epoch_images += batch_size_effective
            running_total_images += batch_size_effective

            scaler.scale(loss).backward()

            if (step_in_epoch + 1) % PARAMS["GRAD_ACCUM_STEPS"] == 0:
                scaler.step(optimizer)
                scaler.update()
                optimizer.zero_grad(set_to_none=True)

            # Progress bar metrics
            # Compute instantaneous imgs/sec (sync for accurate timing)
            cuda_sync()
            elapsed_epoch = max(1e-6, time.time() - epoch_start)
            imgs_per_sec = epoch_images / elapsed_epoch
            pbar.set_postfix({
                "loss": f"{loss_to_report:.4f}",
                "lr": f"{lr:.4e}",
                "ips": f"{imgs_per_sec:.1f}"
            })

            step_idx += 1

        epoch_time = time.time() - epoch_start
        avg_loss = epoch_loss_sum / max(1, len(train_loader))
        cuda_sync()
        imgs_per_sec_epoch = epoch_images / max(1e-6, epoch_time)

        print(f"\n[Epoch {epoch+1} Summary] "
              f"time={epoch_time:.2f}s | avg_loss={avg_loss:.4f} | throughput={imgs_per_sec_epoch:.1f} img/s "
              f"| batch_size={current_batch_size}")

        mem_snapshot_per_gpu(f"End of Epoch {epoch+1}")

        last_epoch_stats = {
            "epoch": epoch + 1,
            "time_sec": epoch_time,
            "avg_loss": avg_loss,
            "throughput_img_per_sec": imgs_per_sec_epoch,
            "batch_size": current_batch_size
        }

    total_time = time.time() - overall_start
    gpu_sampler.stop()

    return {
        "total_time_sec": total_time,
        "total_images": running_total_images,
        "last_epoch": last_epoch_stats
    }

train_stats = train()
print("Training run complete.")


In [None]:
# Cell 4: Save GPU samples, print per-GPU summary and final metrics

# Save CSV (if any samples)
gpu_sampler.save_csv(PARAMS["CSV_PATH"])

summary = gpu_sampler.summarize()
if summary:
    print("\nPer-GPU Utilization & Memory Summary:")
    for gpu_idx in sorted(summary.keys()):
        s = summary[gpu_idx]
        print(f"  GPU {gpu_idx}: "
              f"avg_util={s['avg_util_pct']:.1f}% | max_util={s['max_util_pct']:.1f}% | "
              f"avg_mem={s['avg_mem_used_mib']:.0f} MiB | max_mem={s['max_mem_used_mib']:.0f} MiB "
              f"(total={s['mem_total_mib']:.0f} MiB)")

    # Effective average across used GPUs
    used_gpu_indices = [i for i in summary.keys() if i < NUM_GPUS_TO_USE]
    if used_gpu_indices:
        eff_avg_util = sum(summary[i]['avg_util_pct'] for i in used_gpu_indices) / len(used_gpu_indices)
        print(f"\nEffective average utilization across {len(used_gpu_indices)} GPU(s): {eff_avg_util:.1f}%")
else:
    if NUM_GPUS_TO_USE == 0:
        print("\nNo GPU detected; utilization sampling not available.")
    else:
        print("\nGPU sampler produced no samples (unexpected).")

# Final training summary
total_time = train_stats["total_time_sec"]
total_images = train_stats["total_images"]
last_epoch = train_stats["last_epoch"]
imgs_per_sec_total = total_images / max(1e-6, total_time)

print("\n=== Final Training Summary ===")
print(f"Total time: {total_time:.2f}s")
print(f"Total images seen: {total_images}")
print(f"Overall avg throughput: {imgs_per_sec_total:.1f} img/s")
print(f"Last epoch: #{last_epoch['epoch']} | time={last_epoch['time_sec']:.2f}s | "
      f"avg_loss={last_epoch['avg_loss']:.4f} | throughput={last_epoch['throughput_img_per_sec']:.1f} img/s "
      f"| batch_size={last_epoch['batch_size']}")

print(f"\nGPU samples CSV: {PARAMS['CSV_PATH']}")
