# Automated Handling of Super Computer Jobs with Python
When working with a large data set we want to be able to upload jobs to the super computer run them and then download them. The process can be done manually, however it can be quite time consuming and annoying. We can automate this process with python using the scp and paramiko libraries.

In [10]:
import paramiko as po
import scp
import pathlib as pt
import logging
import multiprocessing as m
from functools import wraps

'''
personal config file exampe config.py:

host     = ****
username = ****
password = ****
'''
import config

Let us create a client class which allows us to copy and paste files as well as run commands within our super computer

In [11]:
class Copy_a_to_b_decorator:

    def __init__(self, path_a_type, path_b_type):
        self.path_a_type = path_a_type
        self.path_b_type = path_b_type

    def __call__(self, function):

        def decorator(obj: "Client", path_a: pt.Path, path_b: pt.Path, retries=0):
            retries += 1
            path_a = pt.Path(path_a)
            path_b = pt.Path(path_b)
            template = '%s %s To %s %s'%(self.path_a_type, path_a, self.path_b_type, path_b)
            if retries == 1:
                obj.logger.info('Copying %s'%template)
            try:
                function(obj, path_a, path_b, retries=retries)
                obj.logger.info('Completed Copy %s'%template)
                return True
            except ConnectionAbortedError:
                if not obj.ssh.get_transport().is_active():
                    obj.connect()
                obj.logger.info('Retrying Copy'%template)
                if retries > obj.retry_limit:
                    raise ConnectionError("Number of retries, %d, greater than retry limit %d"% (retries, obj.retry_limit) )
                obj.put(path_a, path_b, retries=retries)

        return decorator

class Client:

    def __init__(self, host, username, password, port=22, retry_limit=10):

        self.port        = port
        self.host        = host
        self.username    = username
        self.password    = password
        self.retry_limit = retry_limit
        self.logger      = logging.getLogger(f"{self.__class__.__name__}:{self.username}{self.host}")
        self.ssh         = po.SSHClient()
        self.ssh.load_system_host_keys()
        self.connect()

    def connect(self, *args, retries=0, **kwargs):
        retries+=1
        try:
            self.ssh.connect(self.host, username=self.username, password=self.password)
            self.transport = self.ssh.get_transport()
            self.scp       = scp.SCPClient(self.transport)
        except:
            if retries > self.retry_limit:
                raise ConnectionError(
                    "Number of retries, %d, greater than retry limit %d, \
                        cannot connect"%(retries, self.retry_limit)
                )
            self.logger.info('Retrying Connect')
            self.connect(retries=retries)

    def disconnect(self):
        self.ssh.close()
        self.scp.close()
        self.transport.close()

    @Copy_a_to_b_decorator("Local", "Remote")
    def copy_local_to_remote(self, local_folder, remote_folder, retries=0):
        self.scp.put(local_folder.as_posix(), remote_path=remote_folder.as_posix(),recursive=True)

    @Copy_a_to_b_decorator("Remote", "Local")
    def copy_remote_to_local(self, remote_folder, local_folder, retries=0):
        self.scp.get(remote_folder.as_posix(), local_path=local_folder.as_posix(), recursive=True)

    # utiltiy function for deleting a remote directory
    def delete_remote_directory(self, remote_folder):
        remote_folder = pt.Path(remote_folder)
        self.logger.info('Removing %s from %s'%(remote_folder,self.host))
        self.ssh.exec_command(
            "rm -rf %s"%(remote_folder.as_posix())
        )
        self.logger.info('Completed Removing %s from %s'%(remote_folder,self.host))


Now that we have created the client class let's create some helper functions which submit and download jobs. As we want to submit multiple jobs at once and manage those jobs we need to keep track of the job id and have some process keep track of the varying id's with a dictionary

In [12]:
logging.basicConfig(level=logging.INFO)

def submit_job(folder: pt.Path, remote_destination: pt.Path, manager_dict: dict):

    client   = Client(config.host, config.username, config.password)

    client.copy_local_to_remote(folder, remote_destination)

    # if on windows ensure that the shell script is dos
    client.ssh.exec_command(
        f"cd {(remote_destination/folder.name).as_posix()}; dos2unix openfoam_job.sh"
    )
    # submit the job
    _, stdout, _ = client.ssh.exec_command(
        f"cd {(remote_destination/folder.name).as_posix()}; qsub openfoam_job.sh"
    )
    # keep track job id
    output = stdout.read().decode("utf-8")
    job_id = output.split(".")[0]
    np.savez(log_path, job_id=job_id)
    client.logger.info(f"{folder.name} is running with JobID: {job_id}")
    
    manager_dict[job_id] = remote_destination/folder.name

    client.disconnect()


def download_job(remote_folder: pt.Path, local_destination: pt.Path):

    client = Client(config.host, config.username, config.password)
    client.logger.info(f"{remote_folder.name} has finished running")
    client.copy_remote_to_local(remote_folder, local_destination)
    client.delete_remote_directory(remote_folder)
    client.disconnect()


Now lets process the jobs

In [None]:
# we will use the same naming scheme, however we will have a different parent folder
local_main_folder  = pt.Path("../scripts/generate_steady_foam_cases/foam_cases")
remote_main_folder = pt.Path("/scratch/m45/cm5094/projects/bifurcation_steady_state")

client = Client(config.host, config.username, config.password)

# get a list of all the cases in our job folder
all_cases     = list(local_main_folder.glob("*"))
# create a shared object to keep track of all the running jobs
running_cases = mp.Manager().dict()

while True:

    # copy over the desired number of cases to the super computer using different processors
    if len(running_cases) < max_cases:
        try:
            folder = all_cases.pop()
            p = mp.Process(target=submit_job, args=(folder, remote_main_folder, running_cases))
            p.start()
            p.join()
        except:
            pass

    # for each running job check whether they are complete and copy them back to the local system
    for job_id in list(running_cases.keys()):
        remote_folder = running_cases[job_id]
        # the file which submits the job should create a dummy file called completed.tmp hits is so that we do not overload the pbs server, by running a lot of qstats
        _, stdout, _  = client.ssh.exec_command("cd %s; FILE=completed.tmp; [ -e \"./$FILE\" ] && echo 1 || echo 0"%remote_folder.as_posix())
        parsed_stdout = bool(int(stdout.read().decode("utf-8")))
        if parsed_stdout:i
            # download the job
            p             = mp.Process(
                target=download_job, args=(remote_folder, local_main_folder))
            p.start()
            p.join()
            # now remove the job from the dictionary
            running_cases.pop(job_id)

    # finally break the loop when all jobs have been run 
    if all([not len(all_cases), not len(running_cases)]):
        break
    
client.disconnect()