In [1]:
# default_exp watchdog

In [2]:
from fastcore.all import *

In [3]:
from nbdev.showdoc import *

## Requirements

Optional xtras

- 2nd line of script, if a comment, is added as a description to the log

In [4]:
shutil.rmtree('data')
path = Path('data')
path.mkdir(exist_ok=True)

In [5]:
def setup_dirs(path):
    path_run = path/'to_run'
    path_running = path/'running'
    path_complete = path/'complete'
    path_fail = path/'fail'
    path_out = path/'out'
    for o in path_run,path_running,path_complete,path_fail,path_out:
        o.mkdir(exist_ok=True)
    return path_run,path_running,path_complete,path_fail,path_out

In [6]:
path_run,path_running,path_complete,path_fail,path_out = setup_dirs(path)

In [7]:
def find_next_script(p):
    files = p.ls().sorted().filter(Self.is_file())
    if files: return files[0]

In [8]:
def _create_tests():
    os.symlink('/bin/ls', path_run/'ls')
    os.symlink('/bin/false', path_run/'false')
    (path_run/'test_dir').mkdir(exist_ok=True)

_create_tests()
test_eq(find_next_script(path_run).name, 'false')
assert not find_next_script(path_complete)

In [9]:
def is_available(i, p): return not (p/f'{i}.lock').exists()

def find_next_workerid(worker_ids, p):
    ids = worker_ids.filter(is_available, p=path)
    if ids: return ids[0]

def lock_worker(p, wid, details=""):
    (p/f'{wid}.lock').write_text(str(details))

def unlock_worker(p, wid):
    try: (p/f'{wid}.lock').unlink()
    except FileNotFoundError: pass

In [10]:
unlock_worker(path, 0)
worker_ids = L.range(4)
test_eq(find_next_workerid(worker_ids, path), 0)
(path/'0.lock').write_text('hi');
test_eq(find_next_workerid(worker_ids, path), 1)

In [11]:
f = path_run.ls()[0]

1. X poll a directory
1. X find first file
1. X check there's an available worker id
1. X move it to `running`
1. run it
   1. X create lock file
   1. X redirect stdout/err
   1. X run it
   1. X when done, move it to `complete` or `failed`
   1. X unlock

In [12]:
def safe_rename(file, dest):
    to_name = dest/file.name
    if to_name.exists():
        u = uuid4()
        to_name = dest/f'{name}-{u}'
        warnings.warn(f'Using unique name {to_name}')
    file.replace(to_name)
    return to_name

In [13]:
# TODO increase interval
def poll_scripts(worker_ids, p, poll_interval=0.01):
    path_run,path_running,path_complete,path_fail,path_out = setup_dirs(p)
    for i in range(10):
        sleep(poll_interval)
        script = find_next_script(path_run)
        if script is None: continue
        wid = find_next_workerid(worker_ids, p)
        if wid is None: continue
        run_name = safe_rename(script, path_running)
        lock_worker(p, wid)

In [14]:
shutil.rmtree('data')
path.mkdir(exist_ok=True)
path_run,path_running,path_complete,path_fail,path_out = setup_dirs(path)
_create_tests()

In [15]:
import threading

In [16]:
def run_1script(script, path_complete, path_fail, path_out, path, wid):
    with (path_out/f'{script.name}.stderr').open("w") as stderr:
        with (path_out/f'{script.name}.stdout').open("w") as stdout:
            res = subprocess.call(str(script), stdout=stdout, stderr=stderr)
    (path_out/f'{script.name}.exitcode').write_text(str(res))
    # TODO: handle failure
    finish_name = safe_rename(script, path_complete)
    unlock_worker(path, wid)

In [22]:
f = find_next_script(path_run)

In [23]:
thread = threading.Thread(target=run_1script, args=(f, path_complete, path_fail, path_out, path, 0))
thread.start()
thread.join()

In [24]:
find_next_script(path_run)

In [21]:
find_next_script(path_complete)

PosixPath('data/complete/false')

In [35]:
run_1script(f)

1

In [98]:
poll_scripts(worker_ids, path)

In [99]:
find_next_workerid(worker_ids, path)

0

In [100]:
path.ls()

(#5) [data/complete,data/fail,data/running,data/to_run,data/out]

In [101]:
path_running.ls()

(#0) []

In [102]:
path_complete.ls()

(#2) [data/complete/nvidia-smi,data/complete/ls]

In [103]:
print(Path('stderr').read_text())




In [104]:
print(Path('stdout').read_text())

00_core.ipynb
99_index.ipynb
build
data
dist
docs
examples
fastscript
fastscript.egg-info
LICENSE
Makefile
myfile
__pycache__
README.md
settings.ini
setup.py
stderr
stdout

