In [None]:
# Install required CV dependencies for this environment using uv
!uv pip install -r /Users/amisra/dev/ERA-v4/requirements/cv.txt

In [None]:
import atexit
import gc
import sys

import torch


def _nb_cleanup_on_exit() -> None:
    """Best-effort cleanup to release accelerator memory and Python objects on kernel exit."""
    try:
        # Encourage Python to drop references first
        gc.collect()

        # MPS cleanup
        try:
            if hasattr(torch, "backends") and torch.backends.mps.is_available():
                try:
                    torch.mps.synchronize()
                except Exception:
                    pass
                try:
                    torch.mps.empty_cache()
                except Exception:
                    pass
        except Exception:
            pass

        # CUDA cleanup (in case this notebook is reused on CUDA machines)
        try:
            if hasattr(torch, "cuda") and torch.cuda.is_available():
                try:
                    torch.cuda.synchronize()
                except Exception:
                    pass
                try:
                    torch.cuda.empty_cache()
                except Exception:
                    pass
        except Exception:
            pass

        gc.collect()
    except Exception as e:
        print(f"Cleanup on exit encountered an issue: {e}", file=sys.stderr)


# Register best-effort cleanup when kernel stops
atexit.register(_nb_cleanup_on_exit)
print("Registered kernel atexit cleanup handler.")

In [None]:
assert (
    torch.backends.mps.is_available()
), "MPS acceleration not available on this Mac. Ensure PyTorch with MPS support is installed and Metal is enabled."
print("MPS acceleration available: True")

In [None]:
device = torch.device("mps")
print(f"Using device: {device}")

In [None]:
# Quick tensor sanity check on MPS
a = torch.randn(1024, 1024, device=device)
b = torch.randn(1024, 1024, device=device)
c = a @ b
print(f"Matmul successful on {c.device}")

In [None]:
import torch

device = torch.device("cpu")
x = torch.rand((10000, 10000), dtype=torch.float32)
y = torch.rand((10000, 10000), dtype=torch.float32)
x = x.to(device)
y = y.to(device)

In [None]:
%%timeit
x * y

In [None]:
import torch

device = torch.device("mps")
x = torch.rand((10000, 10000), dtype=torch.float32)
y = torch.rand((10000, 10000), dtype=torch.float32)
x = x.to(device)
y = y.to(device)

In [None]:
%%timeit
x * y

In [None]:
import gc
import multiprocessing
import os
import signal

try:
    from IPython import get_ipython  # type: ignore
except Exception:

    def get_ipython():  # fallback
        return None


import torch


def cleanup_resources(verbose: bool = True) -> None:
    """Free accelerator caches, terminate child workers, and run GC."""
    if verbose:
        print("Cleaning up resources...")

    # Drop Python references and free accelerator caches
    try:
        gc.collect()

        # MPS
        try:
            if hasattr(torch, "backends") and torch.backends.mps.is_available():
                try:
                    torch.mps.synchronize()
                except Exception:
                    pass
                try:
                    torch.mps.empty_cache()
                except Exception:
                    pass
        except Exception:
            pass

        # CUDA (for portability)
        try:
            if hasattr(torch, "cuda") and torch.cuda.is_available():
                try:
                    torch.cuda.synchronize()
                except Exception:
                    pass
                try:
                    torch.cuda.empty_cache()
                except Exception:
                    pass
        except Exception:
            pass
    except Exception:
        pass

    # Terminate any multiprocessing children that might linger (e.g., DataLoader workers)
    try:
        for child in multiprocessing.active_children():
            try:
                child.terminate()
            except Exception:
                pass
    except Exception:
        pass

    gc.collect()
    if verbose:
        print("Cleanup complete.")


essential_signals = {"SIGINT": signal.SIGINT, "SIGTERM": signal.SIGTERM}


def shutdown_kernel(cleanup_first: bool = True, restart: bool = False) -> None:
    """Programmatically stop this Jupyter kernel (best-effort).

    Tries IPython shutdown first; falls back to sending SIGTERM to self.
    """
    if cleanup_first:
        cleanup_resources(verbose=False)

    try:
        ip = get_ipython()
        if ip is not None and hasattr(ip, "kernel"):
            ip.kernel.do_shutdown(restart=restart)
            return
    except Exception:
        pass

    # Fallback: signal this process
    try:
        os.kill(os.getpid(), signal.SIGTERM)
    except Exception:
        pass


print("Manual helpers ready: cleanup_resources(), shutdown_kernel().")