In [16]:
import subprocess
import re
from pathlib import Path

In [3]:
# === Configuration ===
data_path = "/xustrg0/2024B8049"
run_number = "222753"
mask_path = "/UserData/fperakis/test_data_2025/utilities/empty_mask.npy"
output_path = "/UserData/fperakis/test_data_2025/processed"
poni_file = "/UserData/fperakis/test_data_2025/utilities/geometry_test.poni"
nbins = 150
n_phi = 72
n_chunks = 10  # total number of chunks

In [5]:
# === Submit array job ===
env_vars = ",".join([
    f"DATA_PATH={data_path}",
    f"RUN_NUMBER={run_number}",
    f"MASK_PATH={mask_path}",
    f"OUTPUT_PATH={output_path}",
    f"PONI_FILE={poni_file}",
    f"NBINS={nbins}",
    f"N_PHI={n_phi}",
    f"N_CHUNKS={n_chunks}"
])

submit_cmd = [
    "qsub", "-J", f"0-{n_chunks-1}",
    "-v", env_vars,
    "submit_chunk.pbs"
]

array_result = subprocess.run(submit_cmd, capture_output=True, text=True)

# === Extract array job ID ===
match = re.search(r"(\d+)", array_result.stdout)
if not match:
    raise RuntimeError("Failed to parse array job ID. Output:\n" + array_result.stdout)

array_job_id = match.group(1)
print(f"\nArray job submitted with ID: {array_job_id}")



Array job submitted with ID: 8420967


In [18]:
# === Submit dependent merge job ===

# Define parameters
output_file = Path(output_path) / f"Iq_{run_number}_combined.h5"
pbs_script = "combine_chunks.pbs"

# Submit the PBS job
merge_cmd = [
    "qsub",
    "-v", f"INPUT_DIR={Path(output_path)},RUN_NUMBER={run_number},OUTPUT_FILE={output_file}",
    pbs_script
]

result = subprocess.run(merge_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)

# Output results
if result.returncode == 0:
    merge_job_id = result.stdout.strip()
    print(f"Merge job submitted successfully with job ID: {merge_job_id}")
    #print(f"Check logs: logs/combine_chunks.o{merge_job_id.split('.')[0]}")
else:
    print("Failed to submit merge job.")
    print(result.stderr)

Merge job submitted successfully with job ID: 8420970.fep01
Check logs: logs/combine_chunks.o8420970


In [6]:
print(f"Submitting merge job after array job ID: {array_job_id}")

Submitting merge job after array job ID: 8420967


In [12]:
# === Submit dependent merge job ===
merge_result = subprocess.run(
    ["qsub", "-W", f"depend=afterok:{array_job_id}", "combine_chunks.pbs"],
    capture_output=True,
    text=True
)

print("\nSubmitting merge job...")
print("STDOUT:")
# print(merge_result.stdout)
# print("STDERR:")
# print(merge_result.stderr)

if merge_result.returncode != 0:
    raise RuntimeError("Failed to submit merge job:\n" + merge_result.stderr)

print("Merge job submitted successfully.")


Submitting merge job...
STDOUT:
Merge job submitted successfully.


In [13]:
# Replace or keep these job IDs from earlier submissions
job_ids = [array_job_id]
if 'merge_job_id' in globals():
    job_ids.append(merge_job_id)

# Run qstat and filter for relevant jobs
qstat = subprocess.run(["qstat"], capture_output=True, text=True).stdout
print("Job Status:\n")
for jid in job_ids:
    lines = [line for line in qstat.splitlines() if jid in line]
    print("\n".join(lines) if lines else f"Job {jid} finished or not found.")

Job Status:

Job 8420967 finished or not found.
