# Create and submits a mobotix job to the Waggle scheduler.

It is expected to monitors or trigger by an event (`scan_event`) to submit the jobs.

My idea is to,

0. A model or radar analyzer running in the HPC will know about the detected/expected weather system.
1. This code will then dynamically generates a YAML file for the specified direction for IR scanning with direction to scan are passed as a list (e.g., ["NW", "SW", "NE"]) for the my plugin arguments.
2. Then submits the job using `sesctl`.
    - Sets the SES environment variables (`SES_HOST` and `SES_USER_TOKEN`).
    - Creates the job and the `job_id` is extracted from the response.
    - The job is then submitted to the scheduler using the `job_id`.

3. The driving function or the main script will monitor the `scan_event` variables or ENV or may be a thread something like that. If it is `True`, it calls the job creation and submission functions.

## Setting Up `sesctl`

1. **Download `sesctl`**:
    - Visit the [Waggle Edge Scheduler Releases](https://github.com/waggle-sensor/edge-scheduler/releases/) page.
    - For Mac users, download the [sesctl-darwin-amd64](https://github.com/waggle-sensor/edge-scheduler/releases/download/0.27.2/sesctl-darwin-amd64) executable.
2.  `sesctl` is a stand alone executable program that runs from terminal.
3. **Set Up Environment Variables**:
    - Open your terminal and set the following environment variables:

      ```sh
      export SES_HOST=https://es.sagecontinuum.org
      export SES_USER_TOKEN=<<VALID TOKEN>>
      ```


In [1]:
import subprocess
import os
import logging

If we have multiple configurations created then we do not need this function. However, for the custom configuration for each events, like camera scan in perticular direction, we can modify the and  make it more flexible.

In [2]:
import yaml
from datetime import datetime
import os

def create_job_file(directions, nodes, ip, username, password, south, 
                    job_path="./", job_name=None):
    """
    Creates a job file dynamically for the Waggle scheduler.

    Parameters:
        directions (list): List of directions for scanning (e.g., ["NEH", "NEB", "NEG"]).
        nodes (dict): Dictionary of node names and their statuses (e.g., {"W020": True}).
        ip (str): IP address of the camera (e.g., "camera-mobotix-thermal").
        username (str): Username for the camera (e.g., "admin").
        password (str): Password for the camera (e.g., "wagglesage").
        south (str): South parameter value (e.g., "22").
        filepath (str): The path of the output YAML file.

    Returns:
        str: The name of the generated job file.
    """

    job = {
        "name": job_name,
        "plugins": [
            {
                "name": job_name,
                "pluginSpec": {
                    "image": "registry.sagecontinuum.org/bhupendraraut/mobotix-scan:0.24.8.20",
                    "args": [
                        "--ip",
                        ip,
                        "--mode",
                        "direction",
                        "-south",
                        south,
                        "-pt",
                        f"{','.join(directions)}",
                        "-u",
                        username,
                        "-p",
                        password
                    ]
                }
            }
        ],
        "nodes": nodes,
        "scienceRules": [
            f'schedule({job_name}): cronjob("{job_name}", "* * * * *")'
        ],
        "successCriteria": []
    }

    out_file = os.path.join(job_path, f"{job_name}.yaml")
    with open(out_file, "w") as file:
        yaml.dump(job, file, default_flow_style=False)
    print(f"Job file {out_file} created successfully.")
    return out_file

In [3]:
directions = ["NEH", "NEB", "NEG", "EH", "EB", "EG", "SEH", "SEB", "SEG", "SH", "SB", "SG", "SWH", "SWB", "SWG"]
nodes = {"V032": True}
ip = "camera-mobotix-thermal"
username = "admin"
password = "meinsm"
south = "15"
directions = directions[1:3]

# Create a job file with new naming convention
timestamp = datetime.now().strftime("%Y%m%d-%H%M%S")
job_name = f"sdl-mobo-{timestamp}"
job_path = "/Users/bhupendra/projects/ADAM-SDL-EESS/code/Python/self-scan/jobs/"
os.makedirs(job_path, exist_ok=True)
job_file = create_job_file(directions, nodes, ip, username, password, south, 
                           job_path=job_path, job_name=job_name)
print(f"Generated job file: {job_file}")

Job file /Users/bhupendra/projects/ADAM-SDL-EESS/code/Python/self-scan/jobs/sdl-mobo-20251204-124805.yaml created successfully.
Generated job file: /Users/bhupendra/projects/ADAM-SDL-EESS/code/Python/self-scan/jobs/sdl-mobo-20251204-124805.yaml


In [4]:
import os
import subprocess
import logging
from dotenv import load_dotenv

load_dotenv()  # Loads .env file if present

def submit_job(filename):
    try:
        token = os.environ.get('SES_USER_TOKEN')
        host = os.environ.get('SES_HOST')
        if not token or not host:
            raise ValueError("SES_USER_TOKEN or SES_HOST not set in environment")
        
        logging.info("Creating job.")
        result = subprocess.run([
            "./sesctl-darwin-amd64", "create", "--file-path", filename,
            "--token", token, "--server", host
        ], check=True, capture_output=True, text=True)
        logging.info(f"Job creation response: {result.stdout}")
        
        job_id = yaml.safe_load(result.stdout).get("job_id")
        if not job_id:
            raise ValueError("Job ID not found in the response.")
        
        logging.info(f"Submitting job with job_id: {job_id}.")
        result = subprocess.run([
            "./sesctl-darwin-amd64", "submit", "--job-id", job_id,
            "--token", token, "--server", host
        ], check=True, capture_output=True, text=True)
        logging.info(f"Job submission response: {result.stdout}")
    except Exception as e:
        logging.error(f"An error occurred: {e}")

In [5]:
import logging

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

def submit_job(filename):
    try:
        logging.info("Setting SES environment variables.")
        subprocess.run(["export", "SES_HOST=https://es.sagecontinuum.org"])
        logging.info(os.getenv("SES_HOST"))

        logging.info("Creating job.")
        result = subprocess.run(["./sesctl-darwin-amd64", "create", "--file-path", filename], check=True, capture_output=True, text=True)
        logging.info(f"Job creation response: {result.stdout}")
        
        logging.info("Extracting job_id from the response.")
        job_id = yaml.safe_load(result.stdout).get("job_id")
        if not job_id:
            raise ValueError("Job ID not found in the response.")
        
        logging.info(f"Submitting job with job_id: {job_id}.")
        result = subprocess.run(["sesctl", "submit", "--job-id", job_id], check=True, capture_output=True, text=True)
        logging.info(f"Job submission response: {result.stdout}")
    except subprocess.CalledProcessError as e:
        logging.error(f"Error during job submission: {e.stderr}")
    except Exception as e:
        logging.error(f"An error occurred: {e}")


In [6]:
job_file = os.path.join(job_path, f"{job_name}.yaml")
submit_job(job_file)

2025-12-04 12:48:05,521 - INFO - Setting SES environment variables.
2025-12-04 12:48:05,525 - ERROR - An error occurred: [Errno 2] No such file or directory: 'export'
2025-12-04 12:48:05,525 - ERROR - An error occurred: [Errno 2] No such file or directory: 'export'


In [7]:
import os
print("SES_USER_TOKEN:", os.environ.get("SES_USER_TOKEN"))
print("SES_HOST:", os.environ.get("SES_HOST"))

SES_USER_TOKEN: None
SES_HOST: None


My environment was not set so this did not work, but it submitted the job before when the nv was set properly. I need to fix the part where e are setting up the environment.