In [7]:
from pycondor import Job
from pathlib import Path
from pipelines.tasks.apptainer import CondorApptainerTask
import luigi
from pipelines.configs import aframe

import os
from luigi.util import inherits

In [23]:
class TaskA(luigi.Task):
    a = luigi.Parameter()
    d = luigi.Parameter()

    def run(self):
        self.output().open('w').close()
    
   
    def output(self):
        return luigi.LocalTarget('/tmp/log-{t.param_a}'.format(t=self))

@inherits(TaskA)
class TaskB(luigi.Task):
    a = luigi.Parameter()
    b = luigi.Parameter()
    c = luigi.Parameter()

    def requires(self):
        return self.clone(TaskA)
    
    def run(self):
        print('TaskA', self.a, self.b, self.c, self.d)


In [24]:
task = TaskA(a='a', b='b', c='c', d='d')
luigi.build([task], local_scheduler=True)

DEBUG: Checking if TaskA(d=d, a=a, b=b, c=c) is complete
  is_complete = task.complete()
DEBUG: Checking if TaskB(a=a, d=d) is complete
  is_complete = task.complete()
INFO: Informed scheduler that task   TaskA_a_b_c_1b91662a0f   has status   PENDING
INFO: Informed scheduler that task   TaskB_a_d_fd5abb946c   has status   PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 2
INFO: [pid 32686] Worker Worker(salt=2670283623, workers=1, host=ldas-pcdev5, username=ethan.marx, pid=32686) running   TaskB(a=a, d=d)
INFO: [pid 32686] Worker Worker(salt=2670283623, workers=1, host=ldas-pcdev5, username=ethan.marx, pid=32686) done      TaskB(a=a, d=d)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   TaskB_a_d_fd5abb946c   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 32686] Worker Worker(salt=2670283623, workers=1, host=ldas-pcdev5

TaskB d a


False

In [4]:
class QueryBackground(CondorApptainerTask):
    data_dir = luigi.Parameter(default=os.getenv("DATA_DIR", ""))
    start = luigi.FloatParameter()
    stop = luigi.FloatParameter()
    state_flag = luigi.Parameter()
    minimum_length = luigi.FloatParameter()
    ifos = luigi.ListParameter(default=aframe().ifos)

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
    
    @property
    def name(self):
        return 'query_background'

    @property
    def queue(self):
        return f"queue start,stop from {self.input()}"

    def requires(self):
        Segments(
            data_dir=self.data_dir,
            start=self.start,
            stop=self.stop,
            state_flag=self.state_flag,
            minimum_length=self.minimum_length,
            ifos=self.ifos
        )

    @property
    def command(self):
        command = f"""
            python /opt/aframe/aframe/datagen/datagen/background.py
                --start $(start)
                --stop $(stop)
                --state-flag {self.state_flag}
                --minimum-length {self.minimum_length}
                --ifos {' '.join(self.ifos)}
                --data-dir {self.data_dir}
        """
        return command
    
    @property
    def image(self) -> str:
        default = os.path.expanduser("~/aframe/images")
        root = os.environ.get("AFRAME_CONTAINER_ROOT", default)
        return os.path.join(root, "datagen.sif")

In [5]:
job = QueryBackground(start=0, stop=1, state_flag='H1:DMT-ANALYSIS_READY:1', minimum_length=4, submit_dir = str(Path.cwd()))
job.run()

In [12]:
submit_dir = str(Path.cwd())
job = Job(
    name="datagen", 
    executable="/opt/aframe/apptainer",
    submit = submit_dir,
    output = submit_dir,
    error = submit_dir ,
    log = submit_dir,
    arguments = ["exec --bind test:test --nv"],
    extra_lines = ["environment = APPTAINERENV_TEST=5"]
)
job.build(fancyname=False)

Job(name=datagen, executable=apptainer, _built=True, args=[JobArg(arg='exec --bind test:test --nv', name=None, retry=None)], error=/home/ethan.marx/public_html/forks/BBHNet/aframe/pipelines/pipelines/tasks, error_file=/home/ethan.marx/public_html/forks/BBHNet/aframe/pipelines/pipelines/tasks/datagen.error, extra_lines=['environment = APPTAINERENV_TEST=5'], log=/home/ethan.marx/public_html/forks/BBHNet/aframe/pipelines/pipelines/tasks, log_file=/home/ethan.marx/public_html/forks/BBHNet/aframe/pipelines/pipelines/tasks/datagen.log, output=/home/ethan.marx/public_html/forks/BBHNet/aframe/pipelines/pipelines/tasks, output_file=/home/ethan.marx/public_html/forks/BBHNet/aframe/pipelines/pipelines/tasks/datagen.output, submit=/home/ethan.marx/public_html/forks/BBHNet/aframe/pipelines/pipelines/tasks, submit_file=/home/ethan.marx/public_html/forks/BBHNet/aframe/pipelines/pipelines/tasks/datagen.submit, submit_name=datagen)