# Submitting and Managing Jobs

Launch this tutorial in a Jupyter Notebook on Binder: 
[![Binder](https://mybinder.org/badge_logo.svg)](https://mybinder.org/v2/gh/htcondor/htcondor-python-bindings-tutorials/master?urlpath=lab/tree/users/Submitting-and-Managing-Jobs.ipynb)

In this module, we will learn how to submit and manage jobs from Python. 
We will learn how to submit jobs with various toy executables, how to ask HTCondor for information about them, and how to tell HTCondor to do things with them.

We start by importing the relevant modules:

In [None]:
import htcondor  # for submitting jobs, querying HTCondor, etc.
import classad   # ClassAds are HTCondor's internal data format

## Submitting a Simple Job

To submit a job, we must first describe it.
A submit description is held in a `Submit` object.
`Submit` objects consist of key-value pairs, and generally behave like Python dictionaries.
If you're familiar with HTCondor's submit file syntax, you should think of each line in the submit file as a single key-value pair in the `Submit` object.

Let's start by writing a `Submit` object that describes a job that executes the `hostname` command on an execute node, which prints out the "name" of the node.
Since `hostname` prints its results to standard output (stdout), we will capture stdout and bring it back to the submit machine so we can see the name.

In [None]:
hostname_job = htcondor.Submit({
    "executable": "/bin/hostname",  # the program to run on the execute node
    "output": "hostname.out",       # anything the job prints to standard output will end up in this file
    "error": "hostname.err",        # anything the job prints to standard error will end up in this file
    "log": "hostname.log",          # this file will contain a record of what happened to the job
    "request_cpus": "1",            # how many CPU cores we want
    "request_memory": "128MB",      # how much memory we want
    "request_disk": "128MB",        # how much disk space we want
})

print(hostname_job)

The available descriptors are documented in the `condor_submit` [manual](https://htcondor.readthedocs.io/en/latest/man-pages/condor_submit.html).

Note that we gave it several relative filepaths.
These paths are relative to the directory containing this Jupyter notebook (or, more generally, the current working directory).
When we run the job, you should see those files appear in the file browser on the left as HTCondor creates them.

Now that we have a description, let's submit a job.
To do so, we must ask the HTCondor scheduler to open a transaction.
Once we have the transaction, we can "queue" a job via the `Submit` object.

In [None]:
schedd = htcondor.Schedd()          # get the Python representation of the scheduler
with schedd.transaction() as txn:   # open a transaction, represented by `txn`
    cluster_id = hostname_job.queue(txn)     # queues one job in the current transaction; returns job's ClusterID
    
print(cluster_id)

The number returned by the `queue` method is the `ClusterID` for the submission.
It uniquely identifies this submission.
Later in this module, we will use it to ask the scheduler for information about our jobs.

It isn't important to understand the transaction mechanics for now; think of it as boilerplate.
(There are advanced use cases where it might be useful.)

For now, our job will hopefully have finished running.
You should be able to see the files in the file browser on the left.
Try opening one of them and seeing what's inside.

We can also look at the output from inside Python:

In [None]:
with open('hostname.out', mode = 'r') as f:
    print(f.read())

If the file doesn't exist for some reason, it means your job didn't run.
If you got some text, it worked!

## Submitting Multiple Jobs

By default, each `queue` will submit a single job.
A more common use case is to submit many jobs at once, often sharing some base submit description.
Let's write a new submit description which runs `sleep`.

When we have multiple **jobs** in a single **cluster**, each job will be identified not just by its **ClusterID** but also by a **ProcID**.
We can use the ProcID to separate the output and error files for each individual job.
Anything that looks like `$(...)` in a submit description is a **macro**, which will be expanded later by HTCondor.
The ProcID expands to a series of incrementing integers, starting at 0.
So the first job in a cluster will have ProcID 0, the next will have ProcID 1, etc.

In [None]:
sleep_job = htcondor.Submit({
    "executable": "/bin/sleep",      
    "arguments": "1m",                # sleep for 1 minute
    "output": "sleep-$(ProcID).out",  # output and error separated by job, using the $(ProcID) macro
    "error": "sleep-$(ProcID).err",  
    "log": "sleep.log",               # we send all of the HTCondor logs for every individual job to the same file still (not split up!)
    "request_cpus": "1",             
    "request_memory": "128MB",       
    "request_disk": "128MB",           
})

print(sleep_job)

We will submit 10 of these jobs.
All we need to change from our previous `queue` call is to add the `count` keyword argument.

In [None]:
schedd = htcondor.Schedd()                
with schedd.transaction() as txn:       
    cluster_id = sleep_job.queue(txn, count=10)  # submit 10 jobs
print(cluster_id)

Now that we have a bunch of jobs in flight, we might want to check how they're doing.
We can ask the scheduler about jobs by using its `query` method.
We give it a **constraint**, which tells it which jobs to look for, and a **projection** (called ``attr_list`` for historical reasons), which tells it what information to return.

In [None]:
schedd.query(
    constraint='ClusterId=={}'.format(cluster_id),
    attr_list=["ClusterId", "ProcId", "JobStatus", "Out"],
)

There are a few things to notice here:
- Depending on how long it took you to run the cell, you may only get a few of your 10 jobs in the query. Jobs that have finished **leave the queue**, and will no longer show up in queries. To see those jobs, you must use the `history` method instead, which behaves like `query`, but **only** looks at jobs that have left the queue.
- The results most likely did not come back in ProcID-sorted order. If you want to order the results, you must do so yourself.
- Attributes are often renamed between the submit description and the actual job description in the queue. See [the manual](https://htcondor.readthedocs.io/en/latest/classad-attributes/job-classad-attributes.html) for a description of the job attribute names.
- The objects returned by the query are instances of `ClassAd`. ClassAds are the common data exchange format used by HTCondor. In Python, they mostly behave like dictionaries.

## Using Itemdata to Vary Over Parameters

By varying some part of the submit description using the ProcID, we can change how each individual job behaves.
Perhaps it will use a different input file, or a different argument.
However, we often want more flexibility than that.
Perhaps our input files are named after different cities, or by timestamp, or whatever other naming scheme already exists.

To use such information in the submit description, we need to use **itemdata**.
Itemdata lets us pass arbitrary extra information when we queue, which we can reference with macros inside the submit description.
This lets use the full power of Python to generate the submit descriptions for our jobs.

Let's mock this situation out by generating some files with randomly-chosen names.

In [None]:
import pathlib
import random
import string
import shutil

def random_string(length):
    """Produce a random lowercase ASCII string with the given length."""
    return ''.join(random.choices(string.ascii_lowercase, k = length))

# make a directory to hold the input files; don't worry about this code
input_dir = pathlib.Path.cwd() / "inputs"
shutil.rmtree(input_dir, ignore_errors = True)
input_dir.mkdir()

# make 10 input files
for idx in range(10):
    input_file = input_dir / "{}.txt".format(random_string(5))
    input_file.write_text("Hello from job {}".format(idx))

Now we'll get a list of all the files in the input directory, for later use:

In [None]:
input_files = [path.as_posix() for path in input_dir.iterdir()]

for path in input_files:
    print(path)

Now we'll make our submit description.
Our goal is just to print out the text held in each file, which we can do using `cat`.

In [None]:
cat_job = htcondor.Submit({
    "executable": "/bin/cat",      
    "arguments": "$(input_file)",             # we will pass in the value for this macro via itemdata
    "transfer_input_files": "$(input_file)",  # we also need HTCondor to move the file to the execute node
    "output": "cat-$(ProcID).out",  
    "error": "cat-$(ProcID).err",  
    "log": "cat.log",              
    "request_cpus": "1",             
    "request_memory": "128MB",       
    "request_disk": "128MB",           
})

print(cat_job)

The itemdata should be passed as a list of dictionaries, where the keys are the macro names to replace in the submit description.
In our case, the key should be `input_file`, and we should have a list of 10 dictionaries, each with one entry.

In [None]:
itemdata = [{'input_file': path} for path in input_files]

for item in itemdata:
    print(item)

Now we'll submit the jobs, using `queue_with_itemdata` instead of `queue`:

In [None]:
schedd = htcondor.Schedd()                
with schedd.transaction() as txn:         
    submit_result = cat_job.queue_with_itemdata(txn, itemdata = iter(itemdata))  # submit one job for each item in the itemdata
    
print(submit_result.cluster())

Note that `queue_with_itemdata` returns a "submit result", not just the ClusterID.
The ClusterID can be retreived from the submit result with its `cluster()` method.

## Managing Jobs

Once a job is in queue, the scheduler will try its best to execute it to completion. 
There are several cases where you may want to interrupt the normal flow of jobs. 
Perhaps the results are no longer needed; perhaps the job needs to be edited to correct a submission error. 
These actions fall under the purview of **job management**.

There are two `Schedd` methods dedicated to job management:

* `edit()`: Change an attribute for a set of jobs.
* `act()`: Change the state of a job (remove it ffrom the queue, hold it, suspend it, etc.).

The `act` method takes an argument from the `JobAction` enum.
Commonly-used values include:

* `Hold`: put a job on hold, vacating a running job if necessary.  A job will stay in the hold
   state until told otherwise.
* `Release`: Release a job from the hold state, returning it to Idle.
* `Remove`: Remove a job from the queue. If it is running, it will stop running.
   This requires the execute node to acknowledge it has successfully vacated the job, so ``Remove`` may
   not be instantaneous.
* `Vacate`: Cause a running job to be killed on the remote resource and return to the Idle state.  With
  `Vacate`, jobs may be given significant time to cleanly shut down.

To play with this, let's bring back our sleep submit description, but increase the sleep time significantly so that we have time to interact with the jobs.

In [None]:
long_sleep_job = htcondor.Submit({
    "executable": "/bin/sleep",      
    "arguments": "10m",                # sleep for 10 minutes
    "output": "sleep-$(ProcID).out", 
    "error": "sleep-$(ProcID).err",  
    "log": "sleep.log", 
    "request_cpus": "1",             
    "request_memory": "128MB",       
    "request_disk": "128MB",           
})

print(long_sleep_job)

In [None]:
schedd = htcondor.Schedd()
with schedd.transaction() as txn:
    cluster_id = long_sleep_job.queue(txn, 5)

As an experiment, let's set an arbitrary attribute on the jobs and check that it worked.
When we're really working, we could do things like change the amount of memory a job has requested by editing its attributes.

In [None]:
# sets attribute foo to the string "bar" for all of our jobs
schedd.edit("ClusterID == {}".format(cluster_id), "foo", '"bar"')

# do a query to check the value of attribute foo
schedd.query(
    constraint='ClusterId == {}'.format(cluster_id),
    attr_list=["ClusterId", "ProcId", "JobStatus", "foo"],
)

Although the job status appears to be an attribute, we cannot `edit` it directly.
As mentioned above, we must instead `act` on the job.
Let's hold the first two jobs so that they stop running, but leave the others going.

In [None]:
# hold the first two jobs
schedd.act(htcondor.JobAction.Hold, "ClusterID=={} && ProcID <= 1".format(cluster_id))

# check the status of the jobs
schedd.query(
    constraint='ClusterId == {}'.format(cluster_id),
    attr_list=["ClusterId", "ProcId", "JobStatus"],
)

The various job statuses are represented by numbers. `1` means `Idle`, `2` means `Running`, and `5` means `Held`. If you see some `JobStatus = 5` above, then we succeeded!

The opposite of `JobAction.Hold` is `JobAction.Release`.
Let's release those jobs and let them go back to `Idle`.

In [None]:
schedd.act(htcondor.JobAction.Release, "ClusterID=={}".format(cluster_id))

schedd.query(
    constraint='ClusterId == {}'.format(cluster_id),
    attr_list=["ClusterId", "ProcId", "JobStatus"],
)

Note that we simply released all the jobs in the cluster. Releasing a job that is not held doesn't do anything, so we don't have to be extremely careful.

# Exercises

Now let's practice what we've learned.

- In each exercise, you will be given a piece of code and a test that does not yet pass.
- Modify the code, or add new code to it, to pass the test.
- You can run the test by running the block it is in.
- Feel free to look at the test for clues as to how to modify the code.
- Many of the exercises can be solved either by using Python to generate inputs, or by using advanced features of the [ClassAd language](https://htcondor.readthedocs.io/en/latest/misc-concepts/classad-mechanism.html#htcondor-s-classad-mechanism). Either way is valid!
- Don't modify the test. That's cheating!

## Exercise 1: Incrementing Sleeps

Submit five jobs which sleep for `5`, `6`, `7`, `8`, and `9` seconds, respectively.

In [None]:
# MODIFY OR ADD TO THIS BLOCK...

incrementing_sleep = htcondor.Submit({
    "executable": "/bin/sleep",      
    "arguments": "1",
    "output": "ex1-$(ProcID).out",
    "error": "ex1-$(ProcID).err",  
    "log": "ex1.log",
    "request_cpus": "1",
    "request_memory": "128MB",
    "request_disk": "128MB",
})

schedd = htcondor.Schedd()
with schedd.transaction() as txn:
    cluster_id = incrementing_sleep.queue(txn, 5)

In [None]:
# ... TO MAKE THIS TEST PASS

expected = [str(i) for i in range(5, 10)]
print('Expected ', expected)

ads = schedd.query("ClusterID == {}".format(cluster_id), attr_list = ["Args"])
arguments = sorted(ad["Args"] for ad in ads)
print('Got      ', arguments)

assert arguments == expected, "Arguments were not what we expected!"
print("The test passed. Good job!")

## Exercise 2: Holding Odds

Hold all of the odd-numbered jobs in this large cluster.

- Note that the test block **removes all of the jobs you own** when it runs, to prevent these long-running jobs from corrupting other tests!

In [None]:
# MODIFY OR ADD TO THIS BLOCK...

long_sleep = htcondor.Submit({
    "executable": "/bin/sleep",      
    "arguments": "10m",
    "output": "ex2-$(ProcID).out",
    "error": "ex2-$(ProcID).err",  
    "log": "ex2.log",
    "request_cpus": "1",
    "request_memory": "128MB",
    "request_disk": "128MB", 
})

schedd = htcondor.Schedd()
with schedd.transaction() as txn:
    cluster_id = long_sleep.queue(txn, 100)

In [None]:
# ... TO MAKE THIS TEST PASS

ads = schedd.query("ClusterID == {}".format(cluster_id), attr_list = ["ProcID", "JobStatus"])
proc_to_status = {int(ad['ProcID']): ad['JobStatus'] for ad in sorted(ads, key = lambda ad: ad['ProcID'])}

for proc, status in proc_to_status.items():
    print("Proc {} has status {}".format(proc, status))

schedd.act(htcondor.JobAction.Remove, 'true')
assert len(proc_to_status) == 100, "Wrong number of jobs (perhaps you need to resubmit them?."
assert all(status == "5" for proc, status in proc_to_status.items() if proc % 2 != 0), "Not all odd jobs were held."
assert all(status != "5" for proc, status in proc_to_status.items() if proc % 2 == 0), "An even job was held."
    
print("The test passed. Good job!")

## Exercise 3: Echo to Target

Run a job that makes the text `Echo to Target` appear in the job's standard output file `ex3.out`.

In [None]:
# MODIFY OR ADD TO THIS BLOCK...

echo = htcondor.Submit({
    "output": "ex3.out",
    "error": "ex3.err",  
    "log": "ex3.log",
    "request_cpus": "1",
    "request_memory": "128MB",
    "request_disk": "128MB", 
})

In [None]:
# ... TO MAKE THIS TEST PASS

import os.path

does_file_exist = os.path.exists('ex3.out')
assert does_file_exist, "ex3.txt does not exist!"

expected = 'Echo to Target'
print('Expected ', expected)

contents = open('ex3.out', mode = 'r').read().strip()
print('Got      ', contents)

assert expected in contents, "Contents were not what we expected!"

print("The test passed. Good job!")