In [40]:
import subprocess, time, re, os, shlex, shlex, textwrap


CONTAINER_DATA_DIR = "/data"
SERVICE_HINT = "slurmctld"
POLL_INTERVAL = 2.0
LOCAL_FILE = "./test.py"  # change if needed
REMOTE_FILE = f"{CONTAINER_DATA_DIR}/test.py"


In [2]:
def run(cmd, input_bytes=None, check=True):
    """Run a shell command and return (stdout, stderr, rc)."""
    p = subprocess.run(
        cmd,
        input=input_bytes,
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE,
        check=False,
    )
    if check and p.returncode != 0:
        raise RuntimeError(f"cmd failed: {' '.join(cmd)}\nstdout:\n{p.stdout.decode()}\nstderr:\n{p.stderr.decode()}")
    return p.stdout.decode(), p.stderr.decode(), p.returncode

def find_slurmctld_container():
    out, _, _ = run(["docker", "ps", "--format", "{{.Names}}"])
    names = [n for n in out.strip().splitlines() if SERVICE_HINT in n]
    if not names:
        raise RuntimeError("Could not find a running container with name containing 'slurmctld'. Is the cluster up?")
    # Prefer the first match
    return names[0]

In [3]:
container = find_slurmctld_container()
container

'slurmctld'

In [4]:
def dexec(container, inner_cmd, stdin_bytes=None):
    """docker exec (non-interactive, login shell)"""
    cmd = ["docker", "exec", "-i", container, "bash", "-lc", inner_cmd]
    return run(cmd, input_bytes=stdin_bytes)

def ensure_dirs(container):
    dexec(container, f"mkdir -p {shlex.quote(CONTAINER_DATA_DIR)}/logs {shlex.quote(CONTAINER_DATA_DIR)}/results")

In [5]:
ensure_dirs(container)

In [18]:
def current_partition(container):
    out, _, _ = dexec(container, "sinfo -h -o '%P' | head -n1")
    p = out.strip()
    if not p:
        return None
    # take first entry, drop commas/spaces and trailing '*'
    p = p.split()[0].split(',')[0].rstrip('*')
    return p or None

In [7]:
part = current_partition(container) or None
print(f"Using controller container: {container}")
print(f"Using partition: {part or '(default)'}")

Using controller container: slurmctld
Using partition: normal*


In [19]:
def submit_job(container, job_name="hello_py", minutes=1, partition=None, cpus=1, mem_mb=256):
    if partition:
        partition = partition.split(',')[0].rstrip('*').strip()
        if not partition:
            partition = None
            
    script = textwrap.dedent(f"""\
        #!/bin/bash
        #SBATCH -J {job_name}
        #SBATCH -D /data
        #SBATCH -o /data/logs/slurm-%j.out
        #SBATCH -e /data/logs/slurm-%j.err
        #SBATCH -t 00:{minutes:02d}:00
        #SBATCH -c {cpus}
        #SBATCH --mem={mem_mb}
        {f"#SBATCH -p {partition}" if partition else ""}
        set -euo pipefail
        echo "Running on: $(hostname)"
        echo "SLURM_JOB_ID=$SLURM_JOB_ID"
        echo "pwd=$(pwd)"
        mkdir -p /data/results
        echo "$(date) : hello from Slurm job $SLURM_JOB_ID" > /data/results/{job_name}_$SLURM_JOB_ID.txt
        sleep 1
        """)
    out, err, _ = dexec(
        container,
        'f="/tmp/job_$RANDOM_$RANDOM.sbatch"; cat >"$f"; chmod +x "$f"; sbatch "$f"',
        stdin_bytes=script.encode(),
    )
    m = re.search(r"Submitted batch job (\d+)", out)
    if not m:
        raise RuntimeError(f"Could not parse job id.\nstdout:\n{out}\nstderr:\n{err}")
    return int(m.group(1))

In [20]:
job_name = "hello_py"
job_id = submit_job(container, job_name=job_name, minutes=1, partition=part, cpus=1, mem_mb=256)
print(f"Submitted job {job_id}")


Submitted job 6


In [21]:
def job_state(container, job_id):
    # 1) if present in squeue, return its live state
    out, _, _ = dexec(container, f"squeue -h -j {job_id} -o '%T'")
    state = out.strip()
    if state:
        return state  # e.g., PENDING, RUNNING
    # 2) try sacct for a terminal state (requires accounting to be registered)
    out, _, rc = dexec(container, f"sacct -X -n -j {job_id} -o State%20",)
    if rc == 0 and out.strip():
        return out.strip().splitlines()[0].strip()
    return "COMPLETED?"  # fallback when sacct is not available

def wait_for_completion(container, job_id):
    while True:
        st = job_state(container, job_id)
        if st in ("PENDING", "RUNNING", "COMPLETING", "CONFIGURING"):
            time.sleep(POLL_INTERVAL)
            continue
        return st


In [22]:
final_state = wait_for_completion(container, job_id)
print(f"Job {job_id} finished with state: {final_state}")

Job 6 finished with state: COMPLETED


In [30]:
def get_job_node(container, job_id):
    # Try sacct first
    out, _, _ = dexec(container, f"sacct -X -n -j {job_id} -o NodeList")
    node = out.strip().split(",")[0] if out.strip() else ""
    if node:
        return node
    # Fallback to scontrol
    out, _, _ = dexec(container, f"scontrol show job {job_id}")
    m = re.search(r"BatchHost=(\S+)", out) or re.search(r"Nodes?=(\S+)", out)
    return m.group(1) if m else ""

In [32]:
def tail_job_output(container, job_id, n=50, which="out"):
    ext = "err" if which == "err" else "out"
    path = f"{CONTAINER_DATA_DIR}/logs/slurm-{job_id}.{ext}"
    # Try controller first
    out, _, _ = dexec(container, f"test -f {shlex.quote(path)} && tail -n {n} {shlex.quote(path)} || echo __MISSING__")
    if "__MISSING__" not in out:
        return out
    # Fall back to the node the job ran on
    node = get_job_node(container, job_id)
    if not node:
        return f"(stdout path {path} missing and job node unknown)"
    o, e, rc = run(["docker","exec","-i",node,"bash","-lc", f"tail -n {n} {shlex.quote(path)} || echo '(not found on node either)'"])
    return o

In [34]:
print("\n=== Job stdout (tail) ===")
print(tail_job_output(container, job_id, n=50))

print("\n=== Job stderr (tail) ===")
print(tail_job_output(container, job_id, n=50, which="err"))

print("\nCheck your mount for files under 'results/' and 'logs/'.")



=== Job stdout (tail) ===
Running on: c1
SLURM_JOB_ID=6
pwd=/data


=== Job stderr (tail) ===


Check your mount for files under 'results/' and 'logs/'.


In [35]:
def submit_existing_file(container, path_in_container, job_name="job", minutes=2, cpus=1, mem_mb=512, args=None, partition=None):
    # sanitize partition (avoid trailing '*')
    if partition:
        partition = partition.split(',')[0].rstrip('*').strip() or None
    argstr = " ".join(shlex.quote(a) for a in (args or []))
    opts = [
        "-J", job_name,
        "-D", "/data",
        "-o", f"/data/logs/{job_name}-%j.out",
        "-e", f"/data/logs/{job_name}-%j.err",
        "-t", f"00:{minutes:02d}:00",
        "-c", str(cpus),
        f"--mem={mem_mb}",
    ]
    if partition:
        opts += ["-p", partition]
    cmd = "sbatch " + " ".join(shlex.quote(x) for x in opts) + " --wrap " + shlex.quote(f"/usr/bin/python3 {path_in_container} {argstr}")
    out, err, _ = dexec(container, cmd)
    m = re.search(r"Submitted batch job (\d+)", out)
    if not m:
        raise RuntimeError(f"Could not parse job id.\nstdout:\n{out}\nstderr:\n{err}")
    return int(m.group(1))

In [36]:
def tail_logs(container, job_name, job_id, n=50):
    out_path = f"/data/logs/{job_name}-{job_id}.out"
    err_path = f"/data/logs/{job_name}-{job_id}.err"
    out_txt, _, _ = dexec(container, f"test -f {shlex.quote(out_path)} && tail -n {n} {shlex.quote(out_path)} || echo '(no stdout yet)'")
    err_txt, _, _ = dexec(container, f"test -f {shlex.quote(err_path)} && tail -n {n} {shlex.quote(err_path)} || echo '(no stderr yet)'")
    return out_txt, err_txt

In [37]:
def put_file(container, local_path, remote_path):
    local_path = os.path.abspath(local_path)
    if not os.path.isfile(local_path):
        raise FileNotFoundError(local_path)
    remote_dir = os.path.dirname(remote_path) or "/"
    dexec(container, f"mkdir -p {shlex.quote(remote_dir)}")
    # copy the file into the container
    run(["docker", "cp", local_path, f"{container}:{remote_path}"])
    # normalize line endings just in case, and make it readable
    dexec(container, f"sed -i 's/\\r$//' {shlex.quote(remote_path)}; chmod a+r {shlex.quote(remote_path)}")


In [38]:
def ensure_shared_dirs(container):
    dexec(container, "mkdir -p /data/logs /data/results && chmod 1777 /data/logs /data/results")

In [41]:
ensure_shared_dirs(container)

In [42]:
put_file(container, LOCAL_FILE, REMOTE_FILE)

In [39]:

def sanitize_partition(p):
    if not p:
        return None
    p = p.split(",")[0].rstrip("*").strip()
    return p or None


In [43]:
out, _, _ = dexec(container, "sinfo -h -o '%P' | head -n1")
partition = sanitize_partition(out.strip())

In [46]:
def detect_partition(container):
    out, _, _ = dexec(container, "sinfo -h -o '%P' | head -n1")
    raw = out.strip()
    clean = raw.split(',')[0].rstrip('*').strip() or None
    print("Using partition:", clean or "(default)")
    return clean
def fetch_artifacts_to_host(container, local_dir="./artifacts"):
    os.makedirs(local_dir, exist_ok=True)
    run(["docker","cp", f"{container}:/data/logs", os.path.join(local_dir,"logs")])
    run(["docker","cp", f"{container}:/data/results", os.path.join(local_dir,"results")])
    

In [48]:
fetch_artifacts_to_host(container)

In [44]:
jid = submit_existing_file(container, REMOTE_FILE, job_name="hello", minutes=1, cpus=1, mem_mb=256, args=["--message", "hello"], partition=partition)
print("submitted job:", jid)
state = wait_for_completion(container, jid)
print("final state:", state)
out_txt, err_txt = tail_logs(container, "hello", jid)
print("\n=== stdout ===\n", out_txt)
print("\n=== stderr ===\n", err_txt)
print("\nCheck /data/results for output files.")

submitted job: 7
final state: COMPLETED

=== stdout ===
 Wrote: /data/results/hello_7_0.json
Wrote: /data/results/hello_7_0.txt
Listing of /data: logs, results, slurm-1.out, test.py


=== stderr ===
 

Check /data/results for output files.
