In [None]:
import os
from tqdm import tqdm_notebook
import subprocess
import tempfile

In [None]:
# this allows us to run the pipeline not interactively. 
os.environ['HEADASNOQUERY'] = ''
os.environ['HEADASPROMPT'] = '/dev/null'

In [None]:
def get_immediate_subdirectories(a_dir):
    ''' Get a list of a directorys immediate subdirectories'''
    return [os.path.join(a_dir, name) for name in os.listdir(a_dir)
            if os.path.isdir(os.path.join(a_dir, name))]

def get_immediate_subfiles(a_dir):
    ''' Get a list of all the FILES in a directory'''
    return [os.path.join(a_dir, name) for name in os.listdir(a_dir)
            if os.path.isfile(os.path.join(a_dir, name))]

In [None]:
import multiprocessing
import traceback, functools

def error(msg, *args):
    multiprocessing.log_to_stderr()
    return multiprocessing.get_logger().error(msg, *args)

def trace_unhandled_exceptions(func):
    @functools.wraps(func)
    def wrapped_func(*args, **kwargs):
        try:
            func(*args, **kwargs)
        except Exception as e:
            error(traceback.format_exc())
            raise

    return wrapped_func

class AsyncFactory:
    def __init__(self, func, cb_func):
        self.func = func
        self.cb_func = cb_func
        self.pool = multiprocessing.Pool(
                                         processes=multiprocessing.cpu_count())

    def call(self,*args, **kwargs):
        self.pool.apply_async(self.func, args, kwargs, self.cb_func)

    def wait(self):
        self.pool.close()
        self.pool.join()

def cb_func(f):
    pass
    #print("PID: %d \t Value: %s completed" % (os.getpid(), f))

In [None]:
@trace_unhandled_exceptions
def run_pipeline(psz):
    print(psz)
    # set up enviroment
    if not os.path.isdir(f'/tmp/{os.getpid()}.tmp/pfiles'):
        os.makedirs(f'/tmp/{os.getpid()}.tmp/pfiles')
    
    # get a list of the XRT obs. 
    obs = get_immediate_subdirectories(psz)
    
    # if there aren't any observations, keep going
    if not len(obs):
        return
    
    if not os.path.isdir(f'{psz}/reduced'):
        os.makedirs(f'{psz}/reduced')
    
    # get a list of the reduced obs.
    reduc = get_immediate_subdirectories(f'{psz}/reduced')
    
    for ob_dir in obs:
        ob_id = ob_dir.split('/')[-1]
        reduc_ids = [r.split('/')[-1] for r in reduc]
        if ob_id == 'reduced':
            continue
        if ob_id in reduc_ids:
            continue
        env_cmd = f'export PFILES="/tmp/{os.getpid()}.tmp/pfiles;{os.environ["HEADAS"]}/syspfiles" \n'
        pipe_cmd = (f'xrtpipeline indir={ob_dir} outdir={psz}/reduced/{ob_id} steminputs=sw{ob_id} ' 
                    'srcra=OBJECT srcdec=OBJECT datamode=PC cleanup=yes vigflag=yes clobber=yes '
                    f' > {psz}/reduced/{ob_id}_reduce.log \n')

        fd, path = tempfile.mkstemp()
        try:
            with os.fdopen(fd, 'w') as tmp:
                # do stuff with temp file
                tmp.write(env_cmd)
                tmp.write(pipe_cmd)
            subproc = subprocess.Popen(f'sh {path}', shell=True)
            subproc.wait()
        finally:
            os.remove(path)
    return psz

In [None]:
def reduce_parallel():
    async_worker = AsyncFactory(run_pipeline, cb_func)
    # get a list of the PSZ fields
    PSZs = get_immediate_subdirectories('./data_full')
    for psz in PSZs:
        async_worker.call(psz)
    async_worker.wait()

In [None]:
def reduce_serial():
    # get a list of the PSZ fields
    PSZs = get_immediate_subdirectories('./data_full')

    for psz in tqdm(PSZs):
        # get a list of the XRT obs. 
        obs = get_immediate_subdirectories(psz)

        if len(obs) < 2:
            continue

        if not os.path.isdir(f'{psz}/reduced'):
            os.makedirs(f'{psz}/reduced')

        # get a list of the reduced obs.
        reduc = get_immediate_subdirectories(f'{psz}/reduced')

        run = False
        for ob_dir in obs:
            ob_id = ob_dir.split('/')[-1]
            reduc_ids = [r.split('/')[-1] for r in reduc]
            if ob_id == 'reduced':
                continue
            if ob_id in reduc_ids:
                continue
            pipe_cmd = (f'xrtpipeline indir={ob_dir} outdir={psz}/reduced/{ob_id} steminputs=sw{ob_id} ' 
                    'srcra=OBJECT srcdec=OBJECT datamode=PC cleanup=yes vigflag=yes clobber=yes '
                    f' > {psz}/reduced/{ob_id}_reduce.log \n')
            os.system(pipe_cmd)

In [None]:
def validate():
    PSZs = get_immediate_subdirectories('./data_full')
    for psz in tqdm_notebook(PSZs):
        # get a list of the XRT obs. 
            for ob_dir in get_immediate_subdirectories(psz):
                # check for events and exposure map
                evts = False
                expm = False
                ob_id = ob_dir.split('/')[-1]
                if ob_id == 'reduced':
                    continue
                else:
                    for f in get_immediate_subfiles(f'{psz}/reduced/{ob_id}'):
                        if 'xpcw3po_cl.evt' in f:
                            evts = True
                        elif 'xpcw3po_ex.img' in f:
                            expm = True
                        else:
                            continue
                
                    if not evts and expm:
                        print(f'{psz}/reduced/{ob_id} NOT VALID')

In [None]:
reduce_parallel()

In [None]:
validate()