# fastgpu

> A queue service for quickly developing scripts that use all your GPUs efficiently

- TOC: false

In [None]:
# default_exp core

In [None]:
#export
from fastcore.all import *
import pynvml

In [None]:
#hide
from nbdev.showdoc import *

## Overview

Here's what fastgpu does:

1. poll `to_run`
1. find first file
1. check there's an available worker id
1. move it to `running`
1. handle the script
   1. create lock file
   1. redirect stdout/err to `out`
   1. run it
   1. when done, move it to `complete` or `failed`
   1. unlock

For demonstrating how to use `fastgpu`, we first create a directory to store our scripts and outputs:

In [None]:
path = Path('data')
path.mkdir(exist_ok=True)

In [None]:
#export
def setup_dirs(path):
    "Create and return the following subdirs of `p`: to_run running complete fail out"
    dirs = L(path/o for o in 'to_run running complete fail out'.split())    
    for o in dirs: o.mkdir(exist_ok=True)
    return dirs

These are all the subdirectories that are created for us. Your scripts go in `to_run`.

In [None]:
path_run,path_running,path_complete,path_fail,path_out = setup_dirs(path)

Let's create a scripts directory with a couple of "scripts" (actually symlinks for this demo) in it.

In [None]:
def _setup_test_env():
    shutil.rmtree('data')
    path.mkdir(exist_ok=True)
    res = setup_dirs(path)
    os.symlink('/bin/ls', path_run/'ls')
    os.symlink('/bin/false', path_run/'false')
    (path_run/'test_dir').mkdir(exist_ok=True)

In [None]:
_setup_test_env()

## Helper functions for scripts

These functions are used to find and run scripts, and move scripts to the appropriate subdirectory at the appropriate time.

In [None]:
#export
def find_next_script(p):
    "Get the first script from `p` (in sorted order)"
    files = p.ls().sorted().filter(Self.is_file())
    if files: return files[0]

In [None]:
test_eq(find_next_script(path_run).name, 'false')
assert not find_next_script(path_complete)

In [None]:
#export
def safe_rename(file, dest):
    "Move `file` to `dest`, prefixing a random uuid if there's a name conflict"
    to_name = dest/file.name
    if to_name.exists():
        u = uuid4()
        to_name = dest/f'{name}-{u}'
        warnings.warn(f'Using unique name {to_name}')
    file.replace(to_name)
    return to_name

## ResourcePoolBase -

In [None]:
#export
class ResourcePoolBase():
    def __init__(self, path):
        self.path = Path(path)
        self.path.mkdir(exist_ok=True)
    
    def _lockpath(self,ident): return self.path/f'{ident}.lock'
    def _is_locked(self,ident): return self._lockpath(ident).exists()
    def lock(self,ident): self._lockpath(ident).write_text(str("locked"))
    def unlock(self,ident): return self._lockpath(ident).unlink() if self._is_locked(ident) else None
    def is_available(self,ident): return not self._is_locked(ident)
    def all_ids(self): raise NotImplementedError
    def find_next(self): return first(o for o in self.all_ids() if self.is_available(o))
    def lock_next(self):
        ident = self.find_next()
        if ident is None: return
        self.lock(ident)
        return ident

    def _run(self, script, ident):
        failed = False
        with (path/'out'/f'{script.name}.stderr').open("w") as stderr:
            with (path/'out'/f'{script.name}.stdout').open("w") as stdout:
                try: res = subprocess.call(str(script), stdout=stdout, stderr=stderr)
                except: failed = True
        (path/'out'/f'{script.name}.exitcode').write_text(str(res))
        dest = path/'fail' if res or failed else path/'complete'
        finish_name = safe_rename(script, dest)
        self.unlock(ident)

    def run(self, script, ident):
        thread = Thread(target=self._run, args=(script, ident))
        thread.start()
        thread.join()

    def poll_scripts(self, poll_interval=0.1):
        for i in range(10):
            sleep(poll_interval)
            script = find_next_script(self.path/'to_run')
            if script is None: continue
            ident = self.lock_next()
            if ident is None: continue
            run_name = safe_rename(script, self.path/'running')
            self.run(run_name, ident)

In [None]:
#export
add_docs(ResourcePoolBase, "Base class for locked access to list of idents",
         unlock="Remove lockfile for `ident`",
         lock="Create lockfile for `ident`",
         is_available="Is `ident` available",
         all_ids="All idents (abstract method)",
         find_next="Finds next available resource, or None",
         lock_next="Locks an available resource and returns its ident, or None",
         run="Run `script` using resource `ident`",
         poll_scripts="Poll `to_run` for scripts and run in parallel on available resources")

This abstract class locks and unlocks resources using lockfiles. Override `all_ids` to make the list of resources available. See `FixedWorkerPool` for a simple example and details on each method.

In [None]:
#export
class FixedWorkerPool(ResourcePoolBase):
    "Vends locked access to fixed list of idents"
    def __init__(self, worker_ids, path):
        super().__init__(path)
        self.worker_ids = worker_ids
    
    def all_ids(self):
        "All available idents"
        return self.worker_ids

The simplest possible `ResourcePoolBase` subclass - the resources are just a list of ids. For instance:

In [None]:
_setup_test_env()
wp = FixedWorkerPool(L.range(4), path)

In [None]:
show_doc(FixedWorkerPool.unlock)

<h4 id="ResourcePoolBase.unlock" class="doc_header"><code>ResourcePoolBase.unlock</code><a href="__main__.py#L10" class="source_link" style="float:right">[source]</a></h4>

> <code>ResourcePoolBase.unlock</code>(**`ident`**)

Remove lockfile for `ident`

If there are no locks, this does nothing:

In [None]:
wp.unlock(0)

In [None]:
show_doc(FixedWorkerPool.find_next)

<h4 id="ResourcePoolBase.find_next" class="doc_header"><code>ResourcePoolBase.find_next</code><a href="__main__.py#L13" class="source_link" style="float:right">[source]</a></h4>

> <code>ResourcePoolBase.find_next</code>()

Finds next available resource, or None

Initially all resources are available (unlocked), so the first from the provided list will be returned:

In [None]:
test_eq(wp.find_next(), 0)

In [None]:
show_doc(FixedWorkerPool.lock)

<h4 id="ResourcePoolBase.lock" class="doc_header"><code>ResourcePoolBase.lock</code><a href="__main__.py#L9" class="source_link" style="float:right">[source]</a></h4>

> <code>ResourcePoolBase.lock</code>(**`ident`**)

Create lockfile for `ident`

After locking the first resource, it is no longer returned next:

In [None]:
wp.lock(0)
test_eq(wp.find_next(), 1)

In [None]:
show_doc(FixedWorkerPool.lock_next)

<h4 id="ResourcePoolBase.lock_next" class="doc_header"><code>ResourcePoolBase.lock_next</code><a href="__main__.py#L14" class="source_link" style="float:right">[source]</a></h4>

> <code>ResourcePoolBase.lock_next</code>()

Locks an available resource and returns its ident, or None

This is the normal way to access a resource - it simply combines `find_next` and `lock`:

In [None]:
wp.lock_next()
test_eq(wp.find_next(), 2)

In [None]:
show_doc(FixedWorkerPool.run)

<h4 id="ResourcePoolBase.run" class="doc_header"><code>ResourcePoolBase.run</code><a href="__main__.py#L31" class="source_link" style="float:right">[source]</a></h4>

> <code>ResourcePoolBase.run</code>(**`script`**, **`ident`**)

Run `script` using resource `ident`

In [None]:
_setup_test_env()
f = find_next_script(path_run)
wp.run(f, 0)

test_eq(find_next_script(path_run), path_run/'ls')
test_eq((path_out/'false.exitcode').read_text(), '1')
assert (path_fail/'false').exists()

In [None]:
show_doc(FixedWorkerPool.poll_scripts)

<h4 id="ResourcePoolBase.poll_scripts" class="doc_header"><code>ResourcePoolBase.poll_scripts</code><a href="__main__.py#L36" class="source_link" style="float:right">[source]</a></h4>

> <code>ResourcePoolBase.poll_scripts</code>(**`poll_interval`**=*`0.1`*)

Poll `to_run` for scripts and run in parallel on available resources

In [None]:
_setup_test_env()
wp.poll_scripts()

In [None]:
assert not find_next_script(path_run), find_next_script(path_run)
test_eq((path_out/'false.exitcode').read_text(), '1')
test_eq((path_out/'ls.exitcode').read_text(), '0')
assert not (path_run/'false').exists()
assert (path_fail/'false').exists()
assert (path_complete/'ls').exists()
assert 'README.md' in (path_out/'ls.stdout').read_text()

## GPU

In [None]:
#export
class NVIDIAGPUs(ResourcePoolBase):
    "Vends locked access to NVIDIA GPUs"
    def __init__(self, path):
        super().__init__(path)
        pynvml.nvmlInit()
        self.device_count = pynvml.nvmlDeviceGetCount()
    
    def is_available(self,ident):
        "If a GPU's used_memory is less than 30M then it will be regarded as available"
        if not super().is_available(ident): return False
        device = pynvml.nvmlDeviceGetHandleByIndex(ident) 
        meminfo = pynvml.nvmlDeviceGetMemoryInfo(device)
        return meminfo.used <= 30*1e6

    def all_ids(self):
        "All GPUs"
        return range(self.device_count)

In [None]:
_setup_test_env()
g = NVIDIAGPUs(path)
g.is_available(0)

True

In [None]:
lockedID = g.lock_next()
lockedID

0

In [None]:
nextLockedID = g.lock_next()
nextLockedID

1

In [None]:
g.unlock(lockedID)
g.is_available(0)

### Export -

In [None]:
#hide
from nbdev.export import notebook2script
notebook2script()

Converted 00_core.ipynb.
Converted 99_index.ipynb.
