In [68]:
from argparse import ArgumentParser
import sys
import os
import shutil
import subprocess
from pathlib import Path
import re
from filelock import FileLock
from datetime import datetime

import datalad.api as dl
import pandas as pd

In [121]:
job_name = 'test'
job_id = '1111'
status_csv = '/misc/geminis2/ramirezd/fb_test/code/status.csv'
status_lockfile = '/misc/geminis2/ramirezd/fb_test/code/status_job.lock'
super_ds_id = 'f5db201b-6eed-46ee-8c97-b586f3f694e6'
clone_target = '/misc/geminis2/ramirezd/test_bet/.fairlybig/input_ria/'
push_target = '/misc/geminis2/ramirezd/test_bet/.fairlybig/output_ria/'
push_lockfile = '/misc/geminis2/ramirezd/fb_test/code/status_job.lock'
inputs = 'inputs/mri-raw/sub-001A/anat/sub-001A_T1w.nii.gz'
outputs = 'outputs/sub-001A_T1w_bet.nii.gz'
output_datasets = None
preget_inputs = None
is_explicit = False
dl_cmd = 'bet inputs/mri-raw/sub-001A/anat/sub-001A_T1w.nii.gz outputs/sub-001A_T1w_bet.nii.gz'
commit = None
container = 'fsl-6-0-4'
message = None
ephemeral_locations = ['/tmp', '/misc/{host}[0-9]/ramirezd']
req_disk_gb = 40

host = os.uname().nodename
user= os.getenv('USER')

status_lock = FileLock(status_lockfile)
push_lock = FileLock(push_lockfile)

In [138]:
# Functions for disk space management
def get_locations(location_list):
    """
    Return tmp and non_tmp locations from the list of location patterns.
    """
    
    tmp=[]
    not_tmp_patterns=[]
    not_tmp_locations=[]

    # tmp and non_tmp list
    for location in location_list:
        if location == '/tmp' or location == '/tmp/':
            tmp.append(location)
        else:
            not_tmp_patterns.append(location)
    
    # get non_tmp locations according to the non_tmp_patterns (the script can accept multiple location patterns)
    for not_tmp_pattern in not_tmp_patterns:
        
        for index, part in enumerate(Path(not_tmp_pattern).parts):
            if host in part:
                break

        # node location
        mount_pattern = str(Path(*list(Path(not_tmp_pattern).parts[:index+1])))
        # location inside node
        after_pattern = str(Path(*list(Path(not_tmp_pattern).parts[index+1:])))
        
        # make sure those locations are within the node with the /etc/mtab file
        with open('/etc/mtab', 'r') as mtab:
            for line in mtab.readlines():
                # which mount pattern is within the node
                pattern = re.search(f'{mount_pattern} ', line)
                # which directories (after mount pattern) are within that mount
                if pattern:
                    pattern_glob = Path(pattern.group().strip()).glob(after_pattern) 
                else:
                    continue
                # after mount pattern could retrieve multiple locations
                if pattern_glob: 
                    not_tmp_locations += [str(pg) for pg in pattern_glob]
    
    return tmp, not_tmp_locations


def get_free_disk(location):
    """
    Return location's free disk space in gb.
    """
    
    _total, _used, free = shutil.disk_usage(location)
    # transform to gb
    return free // (2**30)


def get_used_disk(location):
    """
    Return location's used disk space in gb.
    """
    
    _total, used, _free = shutil.disk_usage(location)
    # transform to gb
    return used // (2**30)


def get_available_disk_resource(location, host, status_csv):
    """
    Return available disk space (in gb) resource.
    """
    
    total_req_disk_others_gb = (pd.read_csv(status_csv)
    .query("location == @location and status == 'ongoing' and host == @host")
    .assign(
        used_disk_gb = lambda df_: 
            df_['location'].apply(lambda x_: get_used_disk(x_)),
        req_disk_gb = lambda df_: 
            (df_['req_disk_gb'] - df_['used_disk_gb'])
    )
    .assign(
        req_disk_gb = lambda df_: 
            df_['req_disk_gb'].mask(df_['req_disk_gb'] < 0, 0)
    )
    ['req_disk_gb']
    .sum()
    )
        
    current_free_gb = get_free_disk(location)
    
    return current_free_gb - total_req_disk_others_gb


def set_status(status_csv, job_name, job_id, req_disk_gb, host, location, job_dir, status, start):
    """
    Add a new job status.
    """
    
    status_df = pd.read_csv(status_csv)
    
    new_status = {
        'job_name':[job_name],
        'job_id':[job_id],
        'req_disk_gb':[req_disk_gb],
        'host':[host],
        'location':[location],
        'job_dir':[job_dir],
        'status':[status],
        'start':[start],
        'update':[None],
        'traceback':[None]
        }
    
    new_status = pd.DataFrame(new_status)
    
    status_df = pd.concat([status_df, new_status])
    
    status_df.to_csv(status_csv, index=False)
    
    return status_df


def update_status(status_csv, job_name, id, host, location, status, update, traceback=None):
    """
    Update an existing job status.
    """
    
    status_df = pd.read_csv(status_csv)
    
    is_job = (
    (status_df['job_name'] == job_name) and
    (status_df['id'] == id) and
    (status_df['host'] == host) and
    (status_df['location'] == location) 
    )
    
    status_df = (status_df
    .assign(
        status = lambda df_: df_['status'].mask(is_job, status),
        update = update,
        traceback = lambda df_: df_['traceback'].mask(is_job, traceback)
        )
    )
    
    status_df.to_csv(status_csv, index=False)
    
    return status_df


# Functions for cloning and checking out
def do_dead_annex(dpath='cwd'):
    """
    Set cwd as dead annex or submodules as dead annex.
    """
    if dpath == 'cwd':
        cmd = ['git', 'annex', 'dead', 'here']
    else: 
        cmd = ['git', 'submodule', 'foreach', '--recursive', 'git', 'annex', 'dead', 'here']
    subprocess.run(cmd)
    

def do_checkout(job_name, dpath='cwd'):
    """
    Change to a job branch.
    """
    if dpath == 'cwd':
        cmd = ['git', 'checkout', '-b', job_name]
    else:
        cmd = ['git', '-C', dpath ,'checkout', '-b', job_name]
    
    subprocess.run(cmd)
    
def get_private_subdataset(clone_target, sd_path, sd_id):
    # Assume clone_target is a RIA store
    clone_path = str(Path(clone_target) / Path(sd_id[:3]) / Path(sd_id[3:]))
    
    ampersand = ['&&']
    git_clone_command = ['git', 'clone', clone_path, sd_path]
    git_config_annex_private = ['git', '-C', sd_path, 'config', 'annex.private', 'true']
    git_annex_init = ['git', '-C', sd_path, 'annex', 'init']
    
    cmd = git_clone_command + ampersand + git_config_annex_private + ampersand + git_annex_init
    
    subprocess.run(cmd, shell=True)

def git_add_remote(push_path, dpath='cwd'):
    if dpath == 'cwd':
        cmd = ['git', 'remote', 'add', 'outputstore', push_path]
    else:
        cmd = ['git', '-C', dpath, 'remote', 'add', 'outputstore', push_path]
        
    subprocess.run(cmd)

def git_push(dpath='cwd'):
    if dpath == 'cwd':
        cmd = ['git', 'push', 'outputstore']
    else:
        cmd = ['git', '-C', dpath, 'push', 'outputstore']
    
    subprocess.run(cmd)


# cleanup and exception handling
def cleanup(job_dir):
    subprocess.run(['chmod', '-R', '+w', job_dir])
    subprocess.run(['rm', '-rf', job_dir])

def excepthook(exctype, value, tb):
    with status_lock:
        update_status(status_csv, job_name, id, host, location, status='error', traceback=tb)
    print('Type:', exctype)
    print('Value:', value)
    print('Traceback:', tb)


In [132]:
subprocess.run('ls')

CHANGELOG.md
code
inputs
outputs
README.md


CompletedProcess(args='ls', returncode=0)

In [73]:
tmp, not_tmp_locations = get_locations(ephemeral_locations)

# manage available disk space
if req_disk_gb is None:
    req_disk_gb = 0
    
with status_lock:
    
    found_location=False
    
    if tmp:
        tmp = '/tmp'
        available_disk = get_available_disk_resource(tmp, host, status_csv)
        if req_disk_gb < available_disk:
            found_location=True
            location=tmp
            
    
    elif not_tmp_locations and not found_location:
        not_tmp_df = (
            pd.DataFrame({'location':not_tmp_locations})
            .assign(available_disk = lambda df_: 
                df_['location'].apply(lambda x_: get_available_disk_resource(x_, host, status_csv))
                )
            .sort_values('free_space', ascending=False)
            )

        if req_disk_gb < not_tmp_df['available_disk'].iat[0]:
            found_location = True
            location = not_tmp_df['location'].iat[0]
            
            
    if found_location:
        job_dir = str(Path(location) / f'job-{job_name}-{user}')
        set_status(status_csv, job_name, job_id, req_disk_gb, host, location, job_dir, status='ongoing', start=datetime.today().strftime("%Y/%m/%d %H:%M:%S"))
    else:
        set_status(status_csv, job_name, job_id, req_disk_gb, host, location, job_dir=None, status='no-space', start=datetime.today().strftime("%Y/%m/%d %H:%M:%S"))
        raise Exception("Coulnd't find a place with enough disk space.")

In [74]:
try:
    clone_ria_prefix = re.search(r'ria\+\w+:\/{2}', clone_target).group()
    clone_target = clone_target.replace(clone_ria_prefix, '')
except:
    # assume ria requires a file protocol if no protocol in the job_config
    clone_ria_prefix = 'ria+file://'
try:
    push_target = re.sub(r'ria\+\w+:\/{2}', '', push_target)
except:
    pass
    

In [79]:
super_clone_target = f'{clone_ria_prefix}{clone_target}#{super_ds_id}'

dl.clone(source=super_clone_target, path=job_dir, git_clone_opts=['-c annex.private=true'])
os.chdir(job_dir)

push_path = str(Path(push_target) / Path(super_ds_id[:3]) / Path(super_ds_id[3:]))
git_add_remote(push_path, 'cwd')

ds = dl.Dataset(job_dir)
sd = pd.DataFrame(ds.subdatasets())


[INFO] Attempting a clone into /tmp/job-test-ramirezd 
[INFO] Attempting to clone from file:///misc/geminis2/ramirezd/test_bet/.fairlybig/input_ria/f5d/b201b-6eed-46ee-8c97-b586f3f694e6 to /tmp/job-test-ramirezd 
[INFO] Completed clone attempts for Dataset(/tmp/job-test-ramirezd) 
[INFO] Reconfigured input_ria-storage for ria+file:///misc/geminis2/ramirezd/test_bet/.fairlybig/input_ria/ 
[INFO] Configure additional publication dependency on "input_ria-storage" 


configure-sibling(ok): . (sibling)
install(ok): /tmp/job-test-ramirezd (dataset)
action summary:
  configure-sibling (ok: 1)
  install (ok: 1)
subdataset(ok): inputs/mri-raw (dataset)


In [127]:
if output_datasets is None:
    output_datasets = []

In [84]:
if output_datasets and  (pd.Series(output_datasets).isin(sd['gitmodule_name']).all()):
    raise Exception("Not all output datasets are found.")

In [85]:
for output_dataset in output_datasets:
    sd_id = sd.query("gitmodule_name == @output_dataset")['gitmodule_datalad-id'].iat[0]
    get_private_subdataset(clone_target, output_dataset, sd_id)
    
    push_path = str(Path(push_target) / Path(sd_id[:3]) / Path(sd_id[3:]))
    git_add_remote(push_path, output_dataset)

In [87]:
# Checkout to job branch
branch_name = f'job-{job_name}'

for output_dataset in output_datasets:
    do_checkout(output_dataset, branch_name)
    
do_checkout(branch_name, 'cwd')

Switched to a new branch 'job-test'


In [88]:
if preget_inputs is None or None in preget_inputs:
    preget_inputs = []
for preget_input in preget_inputs:
    dl.get(preget_input)

In [115]:
sys.path

['/misc/geminis2/ramirezd/fb_test',
 '/misc/geminis/ramirezd/miniconda3/envs/py-minis/lib/python311.zip',
 '/misc/geminis/ramirezd/miniconda3/envs/py-minis/lib/python3.11',
 '/misc/geminis/ramirezd/miniconda3/envs/py-minis/lib/python3.11/lib-dynload',
 '',
 '/misc/geminis/ramirezd/miniconda3/envs/py-minis/lib/python3.11/site-packages',
 '/home/inb/soporte/lanirem_software/apptainer/bin/apptainer',
 '/home/inb/soporte/lanirem_software/apptainer/bin/apptainer']

In [108]:
import sys
sys.path.append('/home/inb/soporte/lanirem_software/apptainer/bin/apptainer')

In [109]:
%load_ext autoreload

In [118]:
%set_env PATH=/home/inb/soporte/lanirem_software/go_1.20.6/bin:/home/inb/soporte/lanirem_software/apptainer/bin:/opt/sge/bin:/opt/sge/bin/lx-amd64:/opt/sge/bin:/opt/sge/bin/lx-amd64:/home/inb/ramirezd/.local/bin:/home/inb/soporte/lanirem_software/ANTs_2.4.4/Scripts:/home/inb/soporte/lanirem_software/ANTs_2.4.4/bin:/home/inb/soporte/lanirem_software/fsl_6.0.7.4/share/fsl/bin:/misc/geminis/ramirezd/miniconda3/envs/py-minis/bin:/misc/geminis/ramirezd/miniconda3/condabin:/opt/sge/bin:/opt/sge/bin/lx-amd64:/home/inb/soporte/inb_tools:/home/inb/ramirezd/.local/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/games:/usr/local/games:/snap/bin:/snap/bin:/misc/geminis2/hcp_workbench/bin_linux64:/misc/geminis2/hcp_workbench/bin_linux64:/misc/geminis2/hcp_workbench/bin_linux64

env: PATH=/home/inb/soporte/lanirem_software/go_1.20.6/bin:/home/inb/soporte/lanirem_software/apptainer/bin:/opt/sge/bin:/opt/sge/bin/lx-amd64:/opt/sge/bin:/opt/sge/bin/lx-amd64:/home/inb/ramirezd/.local/bin:/home/inb/soporte/lanirem_software/ANTs_2.4.4/Scripts:/home/inb/soporte/lanirem_software/ANTs_2.4.4/bin:/home/inb/soporte/lanirem_software/fsl_6.0.7.4/share/fsl/bin:/misc/geminis/ramirezd/miniconda3/envs/py-minis/bin:/misc/geminis/ramirezd/miniconda3/condabin:/opt/sge/bin:/opt/sge/bin/lx-amd64:/home/inb/soporte/inb_tools:/home/inb/ramirezd/.local/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/games:/usr/local/games:/snap/bin:/snap/bin:/misc/geminis2/hcp_workbench/bin_linux64:/misc/geminis2/hcp_workbench/bin_linux64:/misc/geminis2/hcp_workbench/bin_linux64


In [123]:
outputs

'outputs/sub-001A_T1w_bet.nii.gz'

In [124]:
if message is None:
    message = branch_name

if commit:
    dl.rerun(
        revision=commit,
        explicit=is_explicit
    )
    
elif container:
    dl.containers_run(
        dl_cmd,
        container_name=container,
        inputs=inputs,
        outputs=outputs,
        message=message,
        explicit=is_explicit
    )
    
else:
    dl.run(
        dl_cmd,
        inputs=inputs,
        outputs=outputs,
        message=message,
        explicit=is_explicit
    )

[INFO] Making sure inputs are available (this may take some time) 
[INFO] == Command start (output follows) ===== 
[INFO] == Command exit (modification check follows) ===== 


run(ok): /tmp/job-test-ramirezd (dataset) [apptainer run -e .datalad/environments/f...]
add(ok): outputs/sub-001A_T1w_bet.nii.gz (file)
save(ok): . (dataset)
action summary:
  add (ok: 1)
  get (notneeded: 3)
  run (ok: 1)
  save (notneeded: 1, ok: 1)


In [126]:
output_datasets

In [129]:

# push annex data
dl.push(
    path='.',
    to='output_ria-storage',
)

for output_dataset in output_datasets:
    dl.push(
        path=output_dataset,
        to='output_ria-storage',
    )
    
# push git data
with push_lock:
    git_push('cwd')
    for output_dataset in output_datasets:
        git_push(output_dataset)
    
    

[INFO] Determine push target 
[INFO] Push refspecs 
[INFO] Transfer data 
[INFO] Finished push of Dataset(/tmp/job-test-ramirezd) 


action summary:
  


To /misc/geminis2/ramirezd/test_bet/.fairlybig/output_ria/f5d/b201b-6eed-46ee-8c97-b586f3f694e6
 * [new branch]      job-test -> job-test


In [135]:
job_dir

'/tmp/job-test-ramirezd'

In [139]:
cleanup(job_dir)

ValueError: The truth value of a Series is ambiguous. Use a.empty, a.bool(), a.item(), a.any() or a.all().

In [140]:
with status_lock:
    
    update_status(status_csv, 
                    job_name, 
                    job_id, 
                    host, 
                    location, 
                    status='completed', 
                    update=datetime.today().strftime("%Y/%m/%d %H:%M:%S"), 
                    traceback=None
                    )

print("Job completed succesfully.")

ValueError: The truth value of a Series is ambiguous. Use a.empty, a.bool(), a.item(), a.any() or a.all().