In [73]:
#| default_exp job_runners.LSF

In [97]:
#| export
from pydantic import BaseModel, PrivateAttr
from typing import List
from fastcore.meta import delegates
from fastcore.basics import basic_repr
#from prefect_shell import shell_run_command
from prefect import flow
# from corradin_ovp_utils.prefect_flows.step1 import SNPPairInfo
from rich.logging import logging
import subprocess
from subprocess import Popen, PIPE
import re
import pandas as pd
from datetime import datetime
import os
from pathlib import Path
from tqdm.auto import tqdm

In [71]:
#| export

def fix_submit_time_format(row):
    submit_time = ("-").join(row[-3:])
    new_row = row[:-3] + [submit_time]
    return new_row

In [124]:
#| export

valid_attributes = ["queue_name", "job_group", "dry_run", "n_cpus", "email"]

class JobRunnerGeneralClass:
    pass

class LSFJobRunner(JobRunnerGeneralClass):
    """
    valid_attributes = ["queue_name", "job_group", "dry_run", "n_cpus", "email"]
    """
    __repr__ = basic_repr("submit_command")
    
    def __init__(self, **kwargs):
        for attr_name in valid_attributes:
            setattr(self, f"_{attr_name}", None)
        for attr_name, attr_val in kwargs.items():
            if attr_name in valid_attributes:
                setattr(self, f"_{attr_name}", attr_val)
            else:
                raise ValueError("Unrecognized attribute: ", attr_name)
    
    def create_LSF_command(self, queue_name = None, job_group = None, n_cpus=None, email=None):
        flag_list = [self.create_queue_flag(queue_name),
                     self.create_job_group_flag(job_group),
                     self.create_n_cpus_flag(n_cpus),
                     self.create_email_flag(email)]
        if not self.create_queue_flag(queue_name):
            raise ValueError("Must specify `queue_name` either in runner object or as argument in run function")
            
        all_processed_flags = [flag for flag in flag_list if flag]
        full_LSF_command = f"{self._base_command} {(' ').join(all_processed_flags)}" 
        return full_LSF_command
    
    @property
    def submit_command(self):
        return self.create_LSF_command()
        
    @property
    def _base_command(self):
        return "bsub"
        
    def create_queue_flag(self, queue_name = None):
        queue = self._check_runtime_option(queue_name,self._queue_name)
        return self.create_option_str("-q", queue)
    
    def create_email_flag(self, email = None):
        email = self._check_runtime_option(email, self._email)
        if email:
            return f"-u {email}"
        else:
            return "-o /dev/null -e /dev/null"
    
    def create_n_cpus_flag(self, n_cpus = None):
        n_cpus = self._check_runtime_option(n_cpus, self._n_cpus)
        return self.create_option_str("-n", n_cpus)
    
    
    def create_job_group_flag(self, job_group = None):
        job_group = self._check_runtime_option(job_group,self._job_group)
        return self.create_option_str("-g", job_group)
                              
        
    def _check_runtime_option(self, runtime_val, default_val):
        if runtime_val is not None:
            val = runtime_val
        else:
            val = default_val
        return val
                              
    def create_option_str(self, flag, val, default_return_val = None):
        if val is None:
            if default_return_val:
                return default_return_val
            else:
                return ""
        else:
            return f"{flag} {val}"
        
    @delegates(create_LSF_command)    
    def run(self, job, dry_run=True, **kwargs):
        """
        Queue this job in LSF.
        :param job: The Job object containing the command we want to run.
        :return: job
        """
#         if job.command is None:
#             raise ValueError("Job command cannot be None. Please check rendered_command method or your input")

#         filled_outside_folder_struct = self.outside_folder_struct_template.format(GWAS_rsid=job.rsid_pair[0], outside_rsid=job.rsid_pair[1])
#         # save template for debugging using email
#         #-o {filled_outside_folder_struct}/myStdOut.out -e {filled_outside_folder_struct}/myStdErr.err

#         #save template for putting error in outside folder
#         #cmd = "bsub -o /dev/null -e {filled_outside_folder_struct}/myStdErr.err -q {queue_name} -g {job_group} -J {job_name} {job_command}".format(filled_outside_folder_struct=filled_outside_folder_struct,queue_name = self.queue_name, job_group=self.job_group, job_name=job.name, rsid_pair=job.rsid_pair, job_command=job.command)

#         cmd = "bsub -o /dev/null -e /lab/corradin_biobank/FOR_AN/OVP/lsf_jobs_err_files/%J -q {queue_name} -g {job_group} -J {job_name} {job_command}".format(filled_outside_folder_struct=filled_outside_folder_struct,queue_name = self.queue_name, job_group=self.job_group, job_name=job.name, rsid_pair=job.rsid_pair, job_command=job.command)

#         logging.info("RUN: %s" % cmd)
#         bsub_output = subprocess.check_output(cmd, shell=True)
#         print(bsub_output)
#         job.jobid = self.__parse_jobid(str(bsub_output))
#         job.status = "running"
#         return job

        if isinstance(job, str):
            job = Job(command = job, runner= self)
            
        if job.command is None:
            raise ValueError("Job command cannot be None. Please check rendered_command method or your input")
        if job.command.split(" ")[0] != "bsub":
            if job.job_name:
                cmd = f"{self.create_LSF_command(**kwargs)} -J {job.job_name} {job.command}"
            else:
                cmd = f"{self.create_LSF_command(**kwargs)} {job.command}"
        else:
            cmd = job.command
            
        logging.info("RUN: %s" % cmd)
        job.command = cmd
        job.runner = self
        
        if not dry_run:
            bsub_output = subprocess.check_output(cmd, shell=True)
            print(bsub_output)
            job.jobid = self.__parse_jobid(str(bsub_output))
            job.status
            #job.status = "running"
        return job
    
    #@classmethod
    def check_job_status(self, job):
        """
        Determine the job status from LSF.
        :param jobid:
        :return: "running" | "complete" | "failed" | "unknown"
        """
        
        jobid = job.jobid 
        # try:
        bjobs_cmd = ["bjobs", jobid]
        logging.info("\tbjobs command: '%s'" % bjobs_cmd)
        p = Popen(bjobs_cmd, stdout=PIPE, stderr=PIPE)
        bjobs_stdout, bjobs_stderr = p.communicate()
        #bjobs_stdout = (stdout)
        #bjobs_stderr = str(stderr)

        logging.debug("\tbjobs stdout: %s" % (bjobs_stdout))
        logging.debug("\tbjobs stderr: %s" % (bjobs_stderr))
        if re.compile("Job <\\d+> is not found").search(str(bjobs_stderr)) is None:
            # Found in bjobs output
            #lines = str(bjobs_stdout).split("\\n")
            lines = subprocess.run(f"bjobs -w {jobid}", shell=True, capture_output = True).stdout.decode().split("\n")#(, return_all = True)
            lines = [[col for col in line.split(" ") if col] for line in lines if line]
            #print(lines)
            header, *rows= lines
            header += ["Checked at"]
            rows = [fix_submit_time_format(row) + [datetime.now().strftime("%d/%m/%Y %H:%M:%S")] for row in rows]
            for i, row in enumerate(rows):
                #print(row)
                jobid, user, stat, queue, from_host, exec_host, *job_name, submit_time, checked_at = row
                rows[i] = [jobid, user, stat, queue, from_host, exec_host, (" ").join(job_name), submit_time, checked_at]
                
            status_df = pd.DataFrame(rows, columns = header)
            
            current_stat = status_df["STAT"].item()
            
            if job.status_df is None: 
                job.status_df = status_df
            else:
                if job.status_df.tail(1)["STAT"].item() != current_stat:
                    job.status_df = pd.concat([job.status_df, status_df]).reset_index(drop=True)
            logging.info("lines: %s" %  lines)
            logging.info("len lines: %d" %  len(lines))
            
            if len(lines)>=2:
                # JOBID   USER    STAT  QUEUE      FROM_HOST   EXEC_HOST   JOB_NAME   SUBMIT_TIME
                # 5612457 andy    RUN   normal     tak4        it-c03b05   sleep 1000 Jan  8 15:46
                # statline = str(lines[1])
                # statline_arr = re.compile("\s+").split(statline)
                # logging.debug(statline_arr)
                # stat = statline_arr[2]
                logging.debug("line1: %s" % lines[1])
                logging.debug("stat: %s" % current_stat)
                if current_stat == "PEND":
                    return "running"
                elif current_stat == "RUN":
                    return "running"
                elif current_stat == "DONE":
                    return "complete"
                elif current_stat == "EXIT":
                    return "failed"
        else:
            # # Not found in bjobs output
            # # Either too old or invalid id
            # # check bhist
            # logging.debug("\tNot found with bjobs")

            # bhist_cmd = ["bhist", "-b", jobid]
            # logging.debug("\tbhist command: '%s'" % bhist_cmd)
            # p = Popen(bhist_cmd, stdout=PIPE, stderr=PIPE)
            # stdout, stderr = p.communicate()
            # bhist_stdout = str(stdout)
            # bhist_stderr = str(stderr)
            # logging.debug("\tbhist stdout: %s" % (bhist_stdout))
            # logging.debug("\tbhist stderr: %s" % (bhist_stderr))

            # if re.compile("Done successfully\.").search(bhist_stdout):
            # 	return "complete"
            
            logging.debug("\tCannot find job in bjobs checking for \'finished\' file in job folder")
            job_file_path =  Path(job.file_path_to_check)
            if job_file_path.exists() and job_file_path.is_file():
                return "complete"
            else:
                return "failed"
            
#             logging.debug("\tThe current folder is {}".format(os.getcwd()))
#             logging.debug("\tThe job folder is {}".format(job_folder))

#             try:
#                 with cd(job_folder):
#                     if os.path.isfile("finished"):
#                         logging.debug("\tfound file \'finished\'")
#                         return "complete"
#                     else:
#                         logging.debug("\tcannot find file \'finished\' in directory {}. Marking job as failed".format(
#                             os.getcwd()))
#                         return "failed"

#             except FileNotFoundError:
#                 logging.info("\tin except clause b/c FileNotFoundError exception in check_job_status_function, returning failed".format(
#                     os.getcwd()))
#                 return "failed"
        # except FileNotFoundError:
        # 	# Probably no bjobs.
        # 	logging.warn("\tFileNotFound when checking status. Probably no bjobs command. Returning unknown")
        # 	return "unknown"
        # logging.warn("\tUnhandled bjobs output. Returning unknown.")
        return "unknown"  # parse bstatus
    def check_job_list_status(self, jobs, show_progress = True):
        if show_progress:
            for job in tqdm(jobs):
                self.check_job_status(job)
        else:
            for job in jobs:
                self.check_job_status(job)
        return pd.concat([job.status_df.tail(1) for job in jobs])
    
    def __parse_jobid(self, bsub_output):
        jobid = str(bsub_output).split(" ")[1].strip("<>")
        return jobid

    def cleanup_job(self, job):
        cmd = f"bkill {job.jobid}"
        print(cmd)
        bsub_output = subprocess.check_output(cmd, shell=True)
        print(bsub_output)

class Job(BaseModel):
    command: str = None
    jobid: str = None
    runner: JobRunnerGeneralClass = None
    file_path_to_check: str = None
    job_name: str = None
    #status:str = None
    status_df: pd.DataFrame = None #= PrivateAttr(default_factory= None)
    retries:int = 0
    _created_at: datetime = PrivateAttr(default_factory=datetime.now)
    __repr__ = basic_repr("command, status, runner_cls, jobid, has_file_path")

    @property
    def has_file_path(self):
        if self.file_path_to_check is not None:
            return True
        else:
            return False
    
    @property
    def runner_cls(self):
        return self.runner.__class__ if self.runner is not None else None
    
    def run(self, **kwargs):
        """
        valid_attributes = ["queue_name", "job_group", "dry_run", "n_cpus", "email"]
        """
        @delegates(self.runner.run)
        def _run(**kwargs):
            return self.runner.run(self, **kwargs)
        return _run(**kwargs)
    
    def rerun(self):
        self = Job(command = self.command,
                   runner = self.runner,
                   file_path_to_check=self.file_path_to_check,
                   retries = self.retries + 1)
    
    @property
    def status(self):
        if self.jobid is None:
            return "Not submitted"
        else:
            return self.runner.check_job_status(self)
        
    class Config:
        arbitrary_types_allowed = True
        
        
    def stub():
        """
        This is just so we can subclass SimpleNamespace.
        Do don't have any custom methods needed yet.
        """
        return False


In [122]:
!ls /tmp

7103184.tmpdir		   pim.info.c2b9
7107941.tmpdir		   pip-uninstall-ozdhv20u
7114287.tmpdir		   python-languageserver-cancellation
7114288.tmpdir		   quarto-session5967e56f
7114291.tmpdir		   tmp19fp1s71
copier.vcs.clone.evmgf6g_  tmp8shkicux
libtpu_lockfile		   tmpai2ld9k9
outdated_cache_outdated    tmpuhd_xc6p
outdated_cache_pingouin    vscode-typescript4638


In [112]:
test = subprocess.run(f"rm -rf ~/tmp/".split(" "), capture_output=True)
test.stdout

b'JOBID   USER    STAT  QUEUE      FROM_HOST   EXEC_HOST   JOB_NAME   SUBMIT_TIME\n7107941 anhoang RUN   corradin   c2b9        c2b9        jupyter_lab_webapp Jan 12 07:48\n'

In [13]:
test = subprocess.run(f"bjobs -w 7107941".split(" "), capture_output=True, shell=True)#, return_all=True))()
test

CompletedProcess(args=['bjobs', '-w', '7107941'], returncode=0, stdout=b'JOBID   USER    STAT  QUEUE      FROM_HOST   EXEC_HOST   JOB_NAME   SUBMIT_TIME\n6123600 anhoang RUN   corradin   c2b9        c3b1        *p 0.0.0.0 Jul  1 07:45\n6257800 anhoang RUN   corradin   c2b9        c4b10       *o-browser Aug  4 20:44\n7084129 anhoang RUN   all_corrad c2b9        c2b9        *p 0.0.0.0 Dec  8 14:19\n7084523 anhoang RUN   corradin   c2b9        c2b9        *s_Jupyter Dec  9 09:38\n7103184 anhoang RUN   corradin   c2b9        c2b9        *bled True Dec 23 13:24\n7107941 anhoang RUN   corradin   c2b9        c2b9        *ab_webapp Jan 12 07:48\n', stderr=b'')

In [98]:
runner = LSFJobRunner()
test_job = Job(command = "ls", runner = LSFJobRunner())
test_job2 = Job(command = "pwd")

test_job.run(queue_name = "all_corradin", no_email = True, dry_run = False)
runner.run(test_job2,dry_run= False, queue_name = "all_corradin", no_email = True)


b'Job <7108675> is submitted to queue <all_corradin>.\n'
b'Job <7108676> is submitted to queue <all_corradin>.\n'


__main__.Job(command='bsub -q all_corradin -o /dev/null -e /dev/null pwd', status='running', runner_cls=<class '__main__.LSFJobRunner'>, jobid='7108676', has_file_path=False)

In [99]:
runner.check_job_list_status([test_job,test_job2])

  0%|          | 0/2 [00:00<?, ?it/s]

Unnamed: 0,JOBID,USER,STAT,QUEUE,FROM_HOST,EXEC_HOST,JOB_NAME,SUBMIT_TIME,Checked at
0,7108675,anhoang,PEND,all_corradin,c2b9,-,ls,Jan-12-11:32,12/01/2023 11:32:06
0,7108676,anhoang,PEND,all_corradin,c2b9,-,pwd,Jan-12-11:32,12/01/2023 11:32:06


In [66]:
test_job.status #query status
test_job.status_df

Unnamed: 0,JOBID,USER,STAT,QUEUE,FROM_HOST,EXEC_HOST,JOB_NAME,SUBMIT_TIME,Checked at
0,7107949,anhoang,PEND,all_corradin,c2b9,-,ls,Jan-12-08:16,12/01/2023 08:16:38
1,7107949,anhoang,DONE,all_corradin,c2b9,c2b9,ls,Jan-12-08:16,12/01/2023 08:16:51


In [67]:
test_job.run(dry_run = False) # this works because the job already has 

b'Job <7107950> is submitted to queue <all_corradin>.\n'


__main__.Job(command='bsub -q all_corradin -o /dev/null -e /dev/null ls', status='running', runner_cls=<class '__main__.LSFJobRunner'>, jobid='7107950', has_file_path=False)

In [364]:
test_job.runner.__class__

__main__.LSFJobRunner

In [75]:
test = LSFJobRunner()
job = test.run("ls", dry_run = False, queue_name ="all_corradin", no_email = True)
job

b'Job <7107951> is submitted to queue <all_corradin>.\n'


__main__.Job(command='bsub -q all_corradin -o /dev/null -e /dev/null ls', status='running', runner_cls=<class '__main__.LSFJobRunner'>, jobid='7107951', has_file_path=False)

In [76]:
job.status

'running'

In [77]:
job.status_df

Unnamed: 0,JOBID,USER,STAT,QUEUE,FROM_HOST,EXEC_HOST,JOB_NAME,SUBMIT_TIME,Checked at
0,7107951,anhoang,PEND,all_corradin,c2b9,-,ls,Jan-12-08:28,12/01/2023 08:28:03


In [372]:
Job(jobid = "5080872", runner= LSFJobRunner())

NameError: name 'job_folder' is not defined