In [None]:
import sys
import os
import json
import tempfile
import shutil
import tarfile
import subprocess
from urllib import request
import ctypes
import ctypes.util

# First we'll write a very simple function that takes in a 

In [None]:
registry_base = "https://registry-1.docker.io/v2/library"
auth_base = "https://auth.docker.io"

In [None]:

# Load necessary libraries
libc = ctypes.CDLL(ctypes.util.find_library('c'), use_errno=True)
libcgroup = ctypes.CDLL(ctypes.util.find_library('cgroup'), use_errno=True)

# Define constants for namespaces
CLONE_NEWNS = 0x00020000
CLONE_NEWUTS = 0x04000000
CLONE_NEWIPC = 0x08000000
CLONE_NEWPID = 0x20000000
CLONE_NEWNET = 0x40000000

In [None]:
def get_auth_token(service: str) -> str:
    uri = f"{auth_base}/token?service=registry.docker.io&scope=repository:library/{service}:pull"
    resp = request.urlopen(request.Request(uri, method="GET"))
    return json.loads(resp.read(8096).decode("utf-8"))["token"]

In [None]:
def get_image_blobs(service: str, tag: str, auth_token: str) -> list[str]:
    uri = f"{registry_base}/{service}/manifests/{tag}"
    req = request.Request(
        uri, method="GET", headers={"Authorization": f"Bearer {auth_token}"}
    )
    resp = request.urlopen(req)
    resp = json.loads(resp.read().decode("utf-8"))
    blobs = [layer["blobSum"] for layer in resp["fsLayers"]]
    return blobs

In [None]:
def pull_image_layers(service: str, blobs: list[str], auth_token: str, output_dir: str):
    for blob in blobs:
        uri = f"{registry_base}/{service}/blobs/{blob}"
        req = request.Request(
            uri, method="GET", headers={"Authorization": f"Bearer {auth_token}"}
        )
        with tempfile.TemporaryDirectory() as tmp_dir:
            with open(os.path.join(tmp_dir, f"{blob}.tar"), "wb") as f:
                with request.urlopen(req) as resp:
                    f.write(resp.read())
            for file in os.listdir(tmp_dir):
                ff = tarfile.open(os.path.join(tmp_dir, file))
                ff.extractall(output_dir)

In [None]:
def setup_cgroups(container_id: str):
    cgroup_path = f"/sys/fs/cgroup/unified/{container_id}"
    os.makedirs(cgroup_path, exist_ok=True)
    
    # Set CPU limit (e.g., 50% of one CPU core)
    with open(f"{cgroup_path}/cpu.max", "w") as f:
        f.write("50000 100000")
    
    # Set memory limit (e.g., 512MB)
    with open(f"{cgroup_path}/memory.max", "w") as f:
        f.write("536870912")
    
    # Add current process to the cgroup
    with open(f"{cgroup_path}/cgroup.procs", "w") as f:
        f.write(str(os.getpid()))

In [None]:
def setup_network():
    # Create a new network namespace
    if libc.unshare(CLONE_NEWNET) != 0:
        raise OSError(ctypes.get_errno(), "Failed to unshare network namespace")
    
    # Set up a virtual ethernet pair
    subprocess.run(["ip", "link", "add", "veth0", "type", "veth", "peer", "name", "veth1"])
    subprocess.run(["ip", "link", "set", "veth1", "up"])
    subprocess.run(["ip", "addr", "add", "172.17.0.2/16", "dev", "veth1"])
    
    # Set up NAT for outgoing connections
    subprocess.run(["iptables", "-t", "nat", "-A", "POSTROUTING", "-o", "eth0", "-j", "MASQUERADE"])
    subprocess.run(["iptables", "-A", "FORWARD", "-i", "eth0", "-o", "veth1", "-m", "state", "--state", "RELATED,ESTABLISHED", "-j", "ACCEPT"])
    subprocess.run(["iptables", "-A", "FORWARD", "-i", "veth1", "-o", "eth0", "-j", "ACCEPT"])


In [None]:
def container_process(command: str, args: list):
    # Set up mount namespace
    if libc.unshare(CLONE_NEWNS) != 0:
        raise OSError(ctypes.get_errno(), "Failed to unshare mount namespace")
    
    # Set up UTS namespace (hostname)
    if libc.unshare(CLONE_NEWUTS) != 0:
        raise OSError(ctypes.get_errno(), "Failed to unshare UTS namespace")
    
    # Set up IPC namespace
    if libc.unshare(CLONE_NEWIPC) != 0:
        raise OSError(ctypes.get_errno(), "Failed to unshare IPC namespace")
    
    # Set up PID namespace
    if libc.unshare(CLONE_NEWPID) != 0:
        raise OSError(ctypes.get_errno(), "Failed to unshare PID namespace")
    
    # Set up network
    setup_network()
    
    # Execute the command
    os.execvp(command, [command] + args)

In [None]:
def main():
    image = sys.argv[2]
    tag = "latest" if ":" not in image else image.split(":")[1]
    command = sys.argv[3]
    args = sys.argv[4:]
    
    container_id = f"container_{os.getpid()}"
    
    with tempfile.TemporaryDirectory() as tmp_dir:
        # Pull and set up the container filesystem
        auth_token = get_auth_token(image)
        blobs = get_image_blobs(image, tag, auth_token)
        pull_image_layers(image, blobs, auth_token, tmp_dir)
        
        # Set up cgroups
        setup_cgroups(container_id)
        
        # Fork a new process for the container
        pid = os.fork()
        if pid == 0:  # Child process
            os.chroot(tmp_dir)
            os.chdir("/")
            container_process(command, args)
        else:  # Parent process
            _, status = os.waitpid(pid, 0)
            sys.exit(os.WEXITSTATUS(status))
            
main()