In [None]:
# default_exp train.local_object_store

# Local Object Store

Namespace for these functions. I'm trying a functional approach instead of wrapping everything in a one-type-use class.

In [None]:
#hide
pwd = %pwd
if pwd.split('/')[-1] == 'nbs':
    %cd ..

/Users/davidrichards/codes/hydra/lab


In [None]:
from lab.util.test_functions import *

from pathlib import Path
import pathlib
import re
import shutil
import os

In [None]:
#export

WHITE_LIST_CONFIG = ['root', 'name', 'filter', 'force']

def white_list(keywords, keys, require_keys=False, expand_missing=False):
    """Filter a dictionary by a set of keys"""
    if require_keys:
        return {key:keywords[key] for key in keys}
    if expand_missing:
        return {key:keywords.get(key) for key in keys}
    result = {}
    for key in keys:
        if key in keywords: result[key] = keywords[key]
    return result

def merge_config(**kw):
    """Get the local environemnt variables, convert the keys
    to lower case, merge new keywords over the environment
    variables, and use a white list filter on valid entries.
    
    Right now, I'm only white listing the environment variables,
    but possibly I'll do this for everything."""
    
    env = {k.lower():v for k, v in os.environ.items()}
    env = white_list(env, WHITE_LIST_CONFIG)
    return {**env, **kw}

In [None]:
d = dict(a=1, b=2)

assert 'b' not in white_list(d, ['a'])
assert 'c' in white_list(d, ['a', 'c'], expand_missing=True)
assert white_list(d, ['c'], expand_missing=True)['c'] is None

with check_raises():
    white_list(d, ['c'], require_keys=True)
    
known_keys = ['force', 'filter', 'name', 'root']
for key in known_keys:
    assert key in WHITE_LIST_CONFIG


This is interesting, to bring in the environment variables. It's because in a Docker container, this is the best way to handle configuration.

In [None]:
#export

ROOT = '/tmp'
def root(**kw):
    kw = merge_config(**kw)
    return Path(kw.get('root', ROOT))

def name(**kw):
    kw = merge_config(**kw)
    o = kw.get('name', kw.get('filter', None))
    if isinstance(o, re.Pattern):
        return o.pattern
    if o is None or isinstance(o, str):
        return o
    return str(o)

def bucket_filter(**kw):
    kw = merge_config(**kw)
    o = kw.get('name', kw.get('filter', ''))
    if isinstance(o, re.Pattern): return o
    return re.compile(f".*{o}.*")

In [None]:
assert root(root='a') == Path('a')
assert root() == Path(ROOT)

assert name() is None
assert name(name='name') == 'name'
assert name(filter='name') == 'name'
r = re.compile('name')
assert name(name=r) == 'name'
assert name(filter=r) == 'name'
assert name(name=42) == '42'
assert name(filter=42) == '42'

assert bucket_filter().pattern == '.*.*'
assert bucket_filter().match('')
assert bucket_filter().match('anything')
assert bucket_filter(name='foo').match('foo')
assert bucket_filter(name=re.compile('foo')).match('foo')
assert bucket_filter(filter='foo').match('foo')
assert bucket_filter(filter=re.compile('foo')).match('foo')

Getting root, a name, and a filter for buckets. This is mostly just establishing a convention.

In [None]:
#export

def list_paths(**kw):
    r = root(**kw)
    p = bucket_filter(**kw)
    return [f for f in r.iterdir() if f.is_dir() and p.match(f.name)]

def list_buckets(**kw):
    return [p.name for p in list_paths(**kw)]

def get_slice(seq, key, default=None):
    d = dict(enumerate(seq))
    return d.get(key, default)

def first_path(**kw):
    return get_slice(list_paths(**kw), 0)

def first_bucket(**kw):
    return get_slice(list_buckets(**kw), 0)

def exists(**kw):
    found = first_path(**kw)
    return not found is None and found.exists()

def is_empty(**kw):
    if not exists(**kw): return True
    return len(list(first_path(**kw).glob('*'))) == 0

def can_remove_bucket(**kw):
    kw = merge_config(**kw)
    if is_empty(**kw): return True
    if kw.get('force', False): return True
    return False

def full_path(**kw):
    return root(**kw)/name(**kw)
    
def find_or_create_bucket(**kw):
    path = full_path(**kw)
    path.mkdir(parents=True, exist_ok=True)
    return path.name

def remove_bucket(**kw):
    if not can_remove_bucket(**kw): return False
    try:
        path = full_path(**kw)
        shutil.rmtree(path)
        return True
    except:
        return False

In [None]:
assert isinstance(list_buckets(), list)

remove_bucket(name='junk', force=True)
assert full_path(name='junk') == Path(ROOT)/'junk'
assert not exists(name='junk')
assert first_bucket(name='junk') is None
assert first_path(name='junk') is None
assert list_buckets(name='junk') == []
assert can_remove_bucket(name='junk')

find_or_create_bucket(name='junk')

assert len(list_buckets(name='junk')) == 1
assert is_empty(name='junk')

Bucket management is kind of cool. We can force a removal of a bucket, even with contents in it, but I'll create objects in the bucket first, then I can come back to that.

In [None]:
def item_filter(**kw):
    kw = merge_config(**kw)
    return '*' # TODO
    
def find_items(**kw):
    path = first_path(**kw)
    if path is None: return []
    return [o for o in path.glob(item_filter(**kw)) if o.is_file()]

def put(**kw):
    pass

def get(**kw):
    pass

def get_stats(**kw):
    pass

def copy(**kw):
    pass

def remove(**kw):
    pass


In [None]:
find_items()

[]

TODO:

* what is the thing? (open file reference, filename, url, content)
* item filter syntax (include white list above)
* underscore second-class functions (still exported, but not meant for the main interface)
* put an object by its reference
* deal with pickle or better-than-pickle

In [None]:
first_path()

PosixPath('/tmp/powerlog')

In [None]:
path = Path('/tmp')

In [None]:
path.is_file()

False

In [None]:
re.Pattern

re.Pattern