In [None]:

import subprocess, sys, time, math, os

def run(x):
    try:
        p = subprocess.run(x, shell=True, capture_output=True, text=True, timeout=8)
        if p.stdout: print(p.stdout.strip())
    except Exception:
        pass

gpu = "None"
backend = None
has_cuda = False

try:
    import torch
    try:
        torch.set_float32_matmul_precision("high")
        import torch.backends.cuda
        torch.backends.cuda.matmul.allow_tf32 = True
    except Exception:
        pass
    has_cuda = torch.cuda.is_available()
    if has_cuda:
        gpu = torch.cuda.get_device_name(0)
        backend = "torch"
except Exception:
    pass

print(f"Backend: {backend}")
print(f"GPU: {gpu}")
run("nvidia-smi -L")


In [None]:

import time, math

import torch

def pick_sizes():
    if torch.cuda.is_available():
        free_b, total_b = torch.cuda.mem_get_info()
        # Aim to use at most ~40% of free VRAM for mm (3*N*N*bytes). Stay capped at 8192 for CPU friendliness.
        budget = int(min(free_b*0.4, total_b*0.25))
        # Assume fp32 worst case (4 bytes) and 3 matrices in memory
        N = int((budget / (3*4))**0.5)
        N = max(2048, min(N, 8192))
        N = (N//128)*128
        # FFT uses complex64; be conservative on size
        M = max(1024, min(4096, N))
        M = (M//128)*128
    else:
        # Safe CPU-only defaults
        N = 4096
        M = 2048
    return N, M

def gflops_mm(N, seconds):
    ops = 2.0 * N * N * N
    return (ops / 1e9) / seconds

def bench_mm(device, N, repeats=5, dtype=torch.float32, amp_dtype=None):
    # If amp_dtype is set, run under autocast to hit Tensor Cores
    a = torch.randn((N,N), device=device, dtype=dtype)
    b = torch.randn((N,N), device=device, dtype=dtype)
    if device.type == "cuda": torch.cuda.synchronize()
    # Warmup
    for _ in range(2):
        _ = a @ b
    if device.type == "cuda": torch.cuda.synchronize()

    if amp_dtype is not None and device.type == "cuda":
        runs = []
        for _ in range(repeats):
            t0 = time.perf_counter()
            with torch.autocast(device_type="cuda", dtype=amp_dtype):
                _ = a @ b
            torch.cuda.synchronize()
            runs.append(time.perf_counter()-t0)
        per = sum(runs)/len(runs)
    else:
        t0 = time.perf_counter()
        for _ in range(repeats):
            _ = a @ b
        if device.type == "cuda": torch.cuda.synchronize()
        per = (time.perf_counter()-t0)/repeats
    return per, gflops_mm(N, per)

def bench_elem(device, N, repeats=5, dtype=torch.float32):
    x = torch.randn((N,N), device=device, dtype=dtype)
    if device.type == "cuda": torch.cuda.synchronize()
    # Warmup
    y = torch.sin(x) + torch.exp(x) + torch.tanh(x)
    if device.type == "cuda": torch.cuda.synchronize()
    t0 = time.perf_counter()
    for _ in range(repeats):
        y = torch.sin(y) + torch.exp(y) + torch.tanh(y)
    if device.type == "cuda": torch.cuda.synchronize()
    per = (time.perf_counter()-t0)/repeats
    return per

def bench_fft2(device, M, repeats=3, complex_dtype=torch.complex64):
    x = torch.randn((M,M), device=device, dtype=torch.float32)
    x = x.to(complex_dtype) + 1j*torch.zeros_like(x, dtype=complex_dtype)
    if device.type == "cuda": torch.cuda.synchronize()
    # Warmup
    _ = torch.fft.fft2(x)
    if device.type == "cuda": torch.cuda.synchronize()
    t0 = time.perf_counter()
    for _ in range(repeats):
        _ = torch.fft.fft2(x)
    if device.type == "cuda": torch.cuda.synchronize()
    per = (time.perf_counter()-t0)/repeats
    return per

def run_all():
    N, M = pick_sizes()
    print(f"Using SAME sizes on CPU and GPU for fairness: N={N}, M={M}")
    dev_cpu = torch.device("cpu")
    dev_gpu = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    res = {}

    # CPU (float32)
    per, gfl = bench_mm(dev_cpu, N, repeats=3, dtype=torch.float32)
    res["CPU GEMM (fp32) s"] = per; res["CPU GEMM (fp32) GF/s"] = gfl
    res["CPU Elem (fp32) s"] = bench_elem(dev_cpu, N, repeats=2, dtype=torch.float32)
    res["CPU FFT2 (c64) s"] = bench_fft2(dev_cpu, M, repeats=2, complex_dtype=torch.complex64)

    if torch.cuda.is_available():
        # GPU float32 (with TF32 enabled in backend)
        per, gfl = bench_mm(dev_gpu, N, repeats=5, dtype=torch.float32)
        res["GPU GEMM (fp32/TF32) s"] = per; res["GPU GEMM (fp32/TF32) GF/s"] = gfl
        res["GPU Elem (fp32) s"] = bench_elem(dev_gpu, N, repeats=3, dtype=torch.float32)
        res["GPU FFT2 (c64) s"] = bench_fft2(dev_gpu, M, repeats=3, complex_dtype=torch.complex64)

        # GPU Tensor Core test (bfloat16 autocast)
        per, gfl = bench_mm(dev_gpu, N, repeats=6, dtype=torch.float32, amp_dtype=torch.bfloat16)
        res["GPU GEMM (bf16 autocast) s"] = per; res["GPU GEMM (bf16) GF/s"] = gfl

        # Speedups
        res["Speedup GEMM fp32"] = res["CPU GEMM (fp32) s"]/res["GPU GEMM (fp32/TF32) s"]
        res["Speedup GEMM bf16"] = res["CPU GEMM (fp32) s"]/res["GPU GEMM (bf16 autocast) s"]
        res["Speedup Elem fp32"] = res["CPU Elem (fp32) s"]/res["GPU Elem (fp32) s"]
        res["Speedup FFT2"] = res["CPU FFT2 (c64) s"]/res["GPU FFT2 (c64) s"]

    for k,v in res.items():
        if isinstance(v, float):
            print(f"{k}: {v:.4f}")
        else:
            print(f"{k}: {v}")
    return res

_ = run_all()


In [None]:

# Optional: Tiny CNN throughput (images/sec) to demonstrate ML relevance.
import torch, time

def cnn_throughput(device, batch=128, steps=30, amp=False):
    try:
        from torch import nn
        model = nn.Sequential(
            nn.Conv2d(3, 64, 7, stride=2, padding=3),
            nn.ReLU(),
            nn.AdaptiveAvgPool2d((1,1)),
            nn.Flatten(),
            nn.Linear(64, 10),
        ).to(device)
    except Exception as e:
        print("Model build failed:", e); return

    x = torch.randn(batch,3,224,224, device=device)
    model.eval()
    if device.type == "cuda":
        torch.cuda.synchronize()
    # Warmup
    for _ in range(5):
        y = model(x)
    if device.type == "cuda":
        torch.cuda.synchronize()

    t0 = time.perf_counter()
    if amp and device.type == "cuda":
        for _ in range(steps):
            with torch.autocast(device_type="cuda", dtype=torch.bfloat16):
                y = model(x)
        torch.cuda.synchronize()
    else:
        for _ in range(steps):
            y = model(x)
        if device.type == "cuda":
            torch.cuda.synchronize()
    elapsed = time.perf_counter()-t0
    imgs = batch*steps/elapsed
    print(f"{'GPU' if device.type=='cuda' else 'CPU'} CNN throughput ({'bf16 amp' if amp else 'fp32'}): {imgs:.1f} images/s")

device_cpu = torch.device('cpu')
cnn_throughput(device_cpu, batch=64, steps=20, amp=False)
if torch.cuda.is_available():
    device_gpu = torch.device('cuda')
    cnn_throughput(device_gpu, batch=128, steps=40, amp=False)
    cnn_throughput(device_gpu, batch=128, steps=40, amp=True)
