Skip to content

Commit

Permalink
Merge pull request #532 from dimitri-yatsenko/attachments
Browse files Browse the repository at this point in the history
Attachments and configurable blobs
  • Loading branch information
eywalker committed Feb 8, 2019
2 parents 2d21899 + afeadb1 commit 26fc8e1
Show file tree
Hide file tree
Showing 21 changed files with 549 additions and 185 deletions.
25 changes: 11 additions & 14 deletions datajoint/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,30 +14,27 @@
http://dx.doi.org/10.1101/031658
"""

__author__ = "Dimitri Yatsenko, Edgar Y. Walker, and Fabian Sinz at Baylor College of Medicine"
__date__ = "Nov 15, 2018"
__author__ = "DataJoint Contributors"
__date__ = "February 7, 2019"
__all__ = ['__author__', '__version__',
'config', 'conn', 'kill', 'Table',
'Connection', 'Heading', 'FreeTable', 'Not', 'schema',
'config', 'conn', 'Connection',
'schema', 'create_virtual_module', 'get_schema_names',
'Table', 'FreeTable',
'Manual', 'Lookup', 'Imported', 'Computed', 'Part',
'AndList', 'ERD', 'U', 'key',
'DataJointError', 'DuplicateError',
'set_password', 'create_virtual_module']
'Not', 'AndList', 'U', 'ERD',
'set_password', 'kill',
'DataJointError', 'DuplicateError', 'key']


# ------------- flatten import hierarchy -------------------------
from .version import __version__
from .settings import config
from .connection import conn, Connection
from .table import FreeTable, Table
from .schema import Schema as schema
from .schema import create_virtual_module, get_schema_names
from .table import Table, FreeTable
from .user_tables import Manual, Lookup, Imported, Computed, Part
from .expression import Not, AndList, U
from .heading import Heading
from .schema import Schema as schema
from .schema import create_virtual_module
from .erd import ERD
from .admin import set_password, kill
from .errors import DataJointError, DuplicateError
from .fetch import key


28 changes: 28 additions & 0 deletions datajoint/attach.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
"""
functionality for attaching files
"""
from os import path
from itertools import count, chain


def load(local_path):
""" make an attachment from a local file """
with open(local_path, mode='rb') as f: # b is important -> binary
contents = f.read()
return str.encode(path.basename(local_path)) + b'\0' + contents


def save(buffer, save_path='.'):
""" save attachment from memory buffer into the save_path """
p = buffer.find(b'\0')
file_path = path.abspath(path.join(save_path, buffer[:p].decode()))

if path.isfile(file_path):
# generate a new filename
file, ext = path.splitext(file_path)
file_path = next(f for f in ('%s_%04x%s' % (file, n, ext) for n in count())
if not path.isfile(f))

with open(file_path, mode='wb') as f:
f.write(buffer[p+1:])
return file_path
55 changes: 31 additions & 24 deletions datajoint/declare.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,6 @@ def declare(full_table_name, definition, context):
:param definition: DataJoint table definition
:param context: dictionary of objects that might be referred to in the table.
"""

table_name = full_table_name.strip('`').split('.')[1]
if len(table_name) > MAX_TABLE_NAME_LENGTH:
raise DataJointError(
Expand Down Expand Up @@ -271,12 +270,13 @@ def compile_attribute(line, in_key, foreign_key_sql):
match['default'] = ''
match = {k: v.strip() for k, v in match.items()}
match['nullable'] = match['default'].lower() == 'null'
accepted_datatype = r'time|date|year|enum|(var)?char|float|real|double|decimal|numeric|' \
r'(tiny|small|medium|big)?int|bool|' \
r'(tiny|small|medium|long)?blob|external|attach'
blob_datatype = r'(tiny|small|medium|long)?blob'
accepted_datatype = (
r'time|date|year|enum|(var)?char|float|real|double|decimal|numeric|'
r'(tiny|small|medium|big)?int|bool|external|attach|' + blob_datatype)
if re.match(accepted_datatype, match['type'], re.I) is None:
raise DataJointError('DataJoint does not support datatype "{type}"'.format(**match))

is_blob = bool(re.match(blob_datatype, match['type'], re.I))
literals = ['CURRENT_TIMESTAMP'] # not to be enclosed in quotes
if match['nullable']:
if in_key:
Expand All @@ -285,38 +285,45 @@ def compile_attribute(line, in_key, foreign_key_sql):
else:
if match['default']:
quote = match['default'].upper() not in literals and match['default'][0] not in '"\''
match['default'] = ('NOT NULL DEFAULT ' +
('"%s"' if quote else "%s") % match['default'])
match['default'] = 'NOT NULL DEFAULT ' + ('"%s"' if quote else "%s") % match['default']
else:
match['default'] = 'NOT NULL'
match['comment'] = match['comment'].replace('"', '\\"') # escape double quotes in comment

is_external = match['type'].startswith('external')
is_attachment = match['type'].startswith('attachment')
if not is_external:
sql = ('`{name}` {type} {default}' + (' COMMENT "{comment}"' if match['comment'] else '')).format(**match)
else:
# process externally stored attribute
is_configurable = match['type'].startswith(('external', 'blob-', 'attach'))
is_external = False
if is_configurable:
if in_key:
raise DataJointError('External attributes cannot be primary in:\n%s' % line)
raise DataJointError('Configurable attributes cannot be primary in:\n%s' % line)
match['comment'] = ':{type}:{comment}'.format(**match) # insert configurable type into comment
store_name = match['type'].split('-')
if store_name[0] != 'external':
raise DataJointError('External store types must be specified as "external" or "external-<name>"')
if store_name[0] not in ('external', 'blob', 'attach'):
raise DataJointError('Configurable types must be in the form blob-<store> or attach-<store> in:\n%s' % line)
store_name = '-'.join(store_name[1:])
if store_name != '' and not store_name.isidentifier():
if store_name and not store_name.isidentifier():
raise DataJointError(
'The external store name `{type}` is invalid. Make like a python identifier.'.format(**match))
if len(store_name) > STORE_NAME_LENGTH:
raise DataJointError(
'The external store name `{type}` is too long. Must be <={max_len} characters.'.format(
max_len=STORE_NAME_LENGTH, **match))
if not match['default'] in ('DEFAULT NULL', 'NOT NULL'):
raise DataJointError('The only acceptable default value for an external field is null in:\n%s' % line)
if match['type'] not in config:
raise DataJointError('The external store `{type}` is not configured.'.format(**match))
spec = config.get_store_spec(store_name)
is_external = spec['protocol'] in {'s3', 'file'}
if not is_external:
is_blob = re.match(blob_datatype, spec['protocol'], re.I)
if not is_blob:
raise DataJointError('Invalid protocol {protocol} in external store in:\n{line}'.format(
line=line, **spec))
match['type'] = spec['protocol']

if (is_external or is_blob) and match['default'] not in ('DEFAULT NULL', 'NOT NULL'):
raise DataJointError(
'The default value for a blob or attachment can only be NULL in:\n%s' % line)

# append external configuration name to the end of the comment
sql = '`{name}` {hash_type} {default} COMMENT ":{type}:{comment}"'.format(
if not is_external:
sql = ('`{name}` {type} {default}' + (' COMMENT "{comment}"' if match['comment'] else '')).format(**match)
else:
# add hash field with a dependency on the ~external table
sql = '`{name}` {hash_type} {default} COMMENT "{comment}"'.format(
hash_type=HASH_DATA_TYPE, **match)
foreign_key_sql.append(
"FOREIGN KEY (`{name}`) REFERENCES {{external_table}} (`hash`) "
Expand Down
5 changes: 2 additions & 3 deletions datajoint/erd.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ class ERD:
"""

def __init__(self, *args, **kwargs):
warnings.warn('ERD functionality depends on matplotlib and pygraphviz. Please install both of these '
'libraries to enable the ERD feature.')
warnings.warn('ERD functionality depends on matplotlib and pygraphviz. '
'Please install both of these libraries to enable the ERD feature.')
else:
class ERD(nx.DiGraph):
"""
Expand Down Expand Up @@ -228,7 +228,6 @@ def _make_graph(self):
return graph

def make_dot(self):
import networkx as nx

graph = self._make_graph()
graph.nodes()
Expand Down
90 changes: 50 additions & 40 deletions datajoint/external.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,21 @@
import os
from tqdm import tqdm
import itertools
from .settings import config
from .errors import DataJointError
from .hash import long_hash
from .blob import pack, unpack
from .table import Table
from .declare import STORE_HASH_LENGTH, HASH_DATA_TYPE
from .s3 import Folder as S3Folder
from . import s3
from .utils import safe_write


def subfold(name, folds):
"""
subfolding for external storage: e.g. subfold('abcdefg', (2, 3)) --> ['ab','cde']
"""
return (name[:folds[0]].lower(),) + subfold(name[folds[0]:], folds[1:]) if folds else ()


class ExternalTable(Table):
"""
The table tracking externally stored objects.
Expand Down Expand Up @@ -42,15 +48,15 @@ def definition(self):
def table_name(self):
return '~external'

def put(self, store, obj):
def put(self, store, blob):
"""
put an object in external store
"""
spec = self._get_store_spec(store)
blob = pack(obj)
blob_hash = long_hash(blob) + store[len('external-'):]
store = ''.join(store.split('-')[1:])
spec = config.get_store_spec(store)
blob_hash = long_hash(blob) + store
if spec['protocol'] == 'file':
folder = os.path.join(spec['location'], self.database)
folder = os.path.join(spec['location'], self.database, *subfold(blob_hash, spec['subfolding']))
full_path = os.path.join(folder, blob_hash)
if not os.path.isfile(full_path):
try:
Expand All @@ -59,9 +65,10 @@ def put(self, store, obj):
os.makedirs(folder)
safe_write(full_path, blob)
elif spec['protocol'] == 's3':
S3Folder(database=self.database, **spec).put(blob_hash, blob)
folder = '/'.join(subfold(blob_hash, spec['subfolding']))
s3.Folder(database=self.database, **spec).put('/'.join((folder, blob_hash)), blob)
else:
raise DataJointError('Unknown external storage protocol {protocol} for {store}'.format(
raise DataJointError('Unknown external storage protocol {protocol} in store "-{store}"'.format(
store=store, protocol=spec['protocol']))

# insert tracking info
Expand All @@ -80,31 +87,33 @@ def get(self, blob_hash):
"""
if blob_hash is None:
return None
store = blob_hash[STORE_HASH_LENGTH:]
store = 'external' + ('-' if store else '') + store

cache_folder = config.get('cache', None)

# attempt to get object from cache
blob = None
cache_folder = config.get('cache', None)
if cache_folder:
try:
with open(os.path.join(cache_folder, blob_hash), 'rb') as f:
blob = f.read()
except FileNotFoundError:
pass

# attempt to get object from store
if blob is None:
spec = self._get_store_spec(store)
store = blob_hash[STORE_HASH_LENGTH:]
spec = config.get_store_spec(store)
if spec['protocol'] == 'file':
full_path = os.path.join(spec['location'], self.database, blob_hash)
subfolders = os.path.join(*subfold(blob_hash, spec['subfolding']))
full_path = os.path.join(spec['location'], self.database, subfolders, blob_hash)
try:
with open(full_path, 'rb') as f:
blob = f.read()
except FileNotFoundError:
raise DataJointError('Lost access to external blob %s.' % full_path) from None
elif spec['protocol'] == 's3':
try:
blob = S3Folder(database=self.database, **spec).get(blob_hash)
subfolder = '/'.join(subfold(blob_hash, spec['subfolding']))
blob = s3.Folder(database=self.database, **spec).get('/'.join((subfolder, blob_hash)))
except TypeError:
raise DataJointError('External store {store} configuration is incomplete.'.format(store=store))
else:
Expand All @@ -115,7 +124,7 @@ def get(self, blob_hash):
os.makedirs(cache_folder)
safe_write(os.path.join(cache_folder, blob_hash), blob)

return unpack(blob)
return blob

@property
def references(self):
Expand Down Expand Up @@ -156,34 +165,35 @@ def delete_garbage(self):
for ref in self.references) or "TRUE")
print('Deleted %d items' % self.connection.query("SELECT ROW_COUNT()").fetchone()[0])

def clean_store(self, store, display_progress=True):
def clean_store(self, store, verbose=True):
"""
Clean unused data in an external storage repository from unused blobs.
This must be performed after delete_garbage during low-usage periods to reduce risks of data loss.
"""
spec = self._get_store_spec(store)
progress = tqdm if display_progress else lambda x: x
spec = config.get_store_spec(store)
in_use = set(x for x in (self & '`hash` LIKE "%%{store}"'.format(store=store)).fetch('hash'))
if spec['protocol'] == 'file':
folder = os.path.join(spec['location'], self.database)
delete_list = set(os.listdir(folder)).difference(self.fetch('hash'))
print('Deleting %d unused items from %s' % (len(delete_list), folder), flush=True)
for f in progress(delete_list):
os.remove(os.path.join(folder, f))
count = itertools.count()
print('Deleting...')
deleted_folders = set()
for folder, dirs, files in os.walk(os.path.join(spec['location'], self.database), topdown=False):
if dirs and files:
raise DataJointError('Invalid repository with files in non-terminal folder %s' % folder)
dirs = set(d for d in dirs if os.path.join(folder, d) not in deleted_folders)
if not dirs:
files_not_in_use = [f for f in files if f not in in_use]
for f in files_not_in_use:
filename = os.path.join(folder, f)
next(count)
if verbose:
print(filename)
os.remove(filename)
if len(files_not_in_use) == len(files):
os.rmdir(folder)
deleted_folders.add(folder)
print('Deleted %d objects' % next(count))
elif spec['protocol'] == 's3':
try:
S3Folder(database=self.database, **spec).clean(self.fetch('hash'))
failed_deletes = s3.Folder(database=self.database, **spec).clean(in_use, verbose=verbose)
except TypeError:
raise DataJointError('External store {store} configuration is incomplete.'.format(store=store))

@staticmethod
def _get_store_spec(store):
try:
spec = config[store]
except KeyError:
raise DataJointError('Storage {store} is requested but not configured'.format(store=store)) from None
if 'protocol' not in spec:
raise DataJointError('Storage {store} config is missing the protocol field'.format(store=store))
if spec['protocol'] not in {'file', 's3'}:
raise DataJointError(
'Unknown external storage protocol "{protocol}" in "{store}"'.format(store=store, **spec))
return spec

0 comments on commit 26fc8e1

Please sign in to comment.