Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
9822fc4
Add multiprocessing to AutoPopulate
mspacek Nov 18, 2019
6074900
Update docs
mspacek Nov 18, 2019
a7e4c2e
Rename max_processes -> multiprocess, accept bool or int
mspacek Nov 18, 2019
24de484
Fix reserved jobs
mspacek Nov 18, 2019
28aae7a
Use is instead of ==
mspacek Nov 21, 2019
236e62e
Replace assertion with DataJointError
mspacek Nov 21, 2019
9266668
Remove extra blank line
mspacek Nov 21, 2019
4f30328
fix #700
dimitri-yatsenko Nov 17, 2019
8d7d7d3
fix #700 differently - bypass the restriction against python-native d…
dimitri-yatsenko Nov 17, 2019
3e3d688
fix typo from previous commit
dimitri-yatsenko Nov 18, 2019
ad02fe1
tests/{schema,test_jobs}.py: add tests/test_jobs.py:test_suppress_dj_…
Nov 19, 2019
53d073f
cleanup
dimitri-yatsenko Nov 19, 2019
df27ada
minor syntax improvement
dimitri-yatsenko Nov 19, 2019
8f46e05
minor syntax
dimitri-yatsenko Nov 19, 2019
6ed83c6
fix #675
dimitri-yatsenko Nov 19, 2019
91556d5
Fix #698
dimitri-yatsenko Nov 19, 2019
2c8aedf
fix #699 -- add table definition to doc string
dimitri-yatsenko Nov 19, 2019
bc69123
Table doc strings now display reverse-engineered table declarations
dimitri-yatsenko Nov 19, 2019
1034797
update error message for un-upgraded external stores.
dimitri-yatsenko Nov 19, 2019
d934b7e
improve table definition in the doc string
dimitri-yatsenko Nov 19, 2019
2b8588c
minor improvement in display of table doc strings
dimitri-yatsenko Nov 19, 2019
c7b7989
replace .describe() with .definition to augment the table docstring
dimitri-yatsenko Nov 20, 2019
ca775da
improve unit test for definition in docstring
dimitri-yatsenko Nov 21, 2019
b646061
minor
dimitri-yatsenko Nov 21, 2019
60c952b
update CHANGELOG for version 0.12.3
dimitri-yatsenko Nov 22, 2019
95c2249
update docs for release 0.12.3
dimitri-yatsenko Nov 22, 2019
b3ee8d9
blob now accepts native complex scalars
dimitri-yatsenko Nov 22, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
## Release notes

## 0.12.3 -- Nov 22, 2019
* Bugfix #675 (PR #705) networkx 2.4+ is now supported
* Bugfix #698 and #699 (PR #706) display table definition in doc string and help
* Bugfix #701 (PR #702) job reservation works with native python datatype support disabled

### 0.12.2 -- Nov 11, 2019
* Bugfix - Convoluted error thrown if there is a reference to a non-existent table attribute (#691)
* Bugfix - Insert into external does not trim leading slash if defined in `dj.config['stores']['<store>']['location']` (#692)
Expand Down
163 changes: 115 additions & 48 deletions datajoint/autopopulate.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,26 @@
from .errors import DataJointError
from .table import FreeTable
import signal
import multiprocessing as mp

# noinspection PyExceptionInherit,PyCallingNonCallable

logger = logging.getLogger(__name__)


def initializer(table):
"""Save pickled copy of (disconnected) table to the current process,
then reconnect to server. For use by call_make_key()"""
mp.current_process().table = table
table.connection.connect() # reconnect

def call_make_key(key):
"""Call current process' table.make_key()"""
table = mp.current_process().table
error = table.make_key(key)
return error


class AutoPopulate:
"""
AutoPopulate is a mixin class that adds the method populate() to a Relation class.
Expand Down Expand Up @@ -103,29 +117,36 @@ def _jobs_to_do(self, restrictions):

def populate(self, *restrictions, suppress_errors=False, return_exception_objects=False,
reserve_jobs=False, order="original", limit=None, max_calls=None,
display_progress=False):
display_progress=False, multiprocess=False):
"""
rel.populate() calls rel.make(key) for every primary key in self.key_source
for which there is not already a tuple in rel.
:param restrictions: a list of restrictions each restrict (rel.key_source - target.proj())
:param suppress_errors: if True, do not terminate execution.
:param return_exception_objects: return error objects instead of just error messages
:param reserve_jobs: if true, reserves job to populate in asynchronous fashion
:param reserve_jobs: if True, reserve jobs to populate in asynchronous fashion
:param order: "original"|"reverse"|"random" - the order of execution
:param limit: if not None, check at most this many keys
:param max_calls: if not None, populate at most this many keys
:param display_progress: if True, report progress_bar
:param limit: if not None, checks at most that many keys
:param max_calls: if not None, populates at max that many keys
:param multiprocess: if True, use as many processes as CPU cores, or use the integer
number of processes specified
"""
if self.connection.in_transaction:
raise DataJointError('Populate cannot be called during a transaction.')

valid_order = ['original', 'reverse', 'random']
if order not in valid_order:
raise DataJointError('The order argument must be one of %s' % str(valid_order))
error_list = [] if suppress_errors else None
jobs = self.connection.schemas[self.target.database].jobs if reserve_jobs else None

# define and setup signal handler for SIGTERM
self._make_key_kwargs = {'suppress_errors':suppress_errors,
'return_exception_objects':return_exception_objects,
'reserve_jobs':reserve_jobs,
'jobs':jobs,
}

# define and set up signal handler for SIGTERM:
if reserve_jobs:
def handler(signum, frame):
logger.info('Populate terminated by SIGTERM')
Expand All @@ -138,55 +159,101 @@ def handler(signum, frame):
elif order == "random":
random.shuffle(keys)

call_count = 0
logger.info('Found %d keys to populate' % len(keys))

make = self._make_tuples if hasattr(self, '_make_tuples') else self.make
if max_calls is not None:
keys = keys[:max_calls]
nkeys = len(keys)

for key in (tqdm(keys) if display_progress else keys):
if max_calls is not None and call_count >= max_calls:
break
if not reserve_jobs or jobs.reserve(self.target.table_name, self._job_key(key)):
self.connection.start_transaction()
if key in self.target: # already populated
self.connection.cancel_transaction()
if reserve_jobs:
jobs.complete(self.target.table_name, self._job_key(key))
if multiprocess: # True or int, presumably
if multiprocess is True:
nproc = mp.cpu_count()
else:
if not isinstance(multiprocess, int):
raise DataJointError("multiprocess can be False, True or a positive integer")
nproc = multiprocess
else:
nproc = 1
nproc = min(nproc, nkeys) # no sense spawning more than can be used
error_list = []
if nproc > 1: # spawn multiple processes
# prepare to pickle self:
self.connection.close() # disconnect parent process from MySQL server
del self.connection._conn.ctx # SSLContext is not picklable
print('*** Spawning pool of %d processes' % nproc)
# send pickled copy of self to each process,
# each worker process calls initializer(*initargs) when it starts
with mp.Pool(nproc, initializer, (self,)) as pool:
if display_progress:
with tqdm(total=nkeys) as pbar:
for error in pool.imap(call_make_key, keys, chunksize=1):
if error is not None:
error_list.append(error)
pbar.update()
else:
logger.info('Populating: ' + str(key))
call_count += 1
self.__class__._allow_insert = True
try:
make(dict(key))
except (KeyboardInterrupt, SystemExit, Exception) as error:
try:
self.connection.cancel_transaction()
except OperationalError:
pass
error_message = '{exception}{msg}'.format(
exception=error.__class__.__name__,
msg=': ' + str(error) if str(error) else '')
if reserve_jobs:
# show error name and error message (if any)
jobs.error(
self.target.table_name, self._job_key(key),
error_message=error_message, error_stack=traceback.format_exc())
if not suppress_errors or isinstance(error, SystemExit):
raise
else:
logger.error(error)
error_list.append((key, error if return_exception_objects else error_message))
else:
self.connection.commit_transaction()
if reserve_jobs:
jobs.complete(self.target.table_name, self._job_key(key))
finally:
self.__class__._allow_insert = False
for error in pool.imap(call_make_key, keys):
if error is not None:
error_list.append(error)
self.connection.connect() # reconnect parent process to MySQL server
else: # use single process
for key in tqdm(keys) if display_progress else keys:
error = self.make_key(key)
if error is not None:
error_list.append(error)

# place back the original signal handler
del self._make_key_kwargs # clean up

# restore original signal handler:
if reserve_jobs:
signal.signal(signal.SIGTERM, old_handler)
return error_list

if suppress_errors:
return error_list

def make_key(self, key):
make = self._make_tuples if hasattr(self, '_make_tuples') else self.make

kwargs = self._make_key_kwargs
suppress_errors = kwargs['suppress_errors']
return_exception_objects = kwargs['return_exception_objects']
reserve_jobs = kwargs['reserve_jobs']
jobs = kwargs['jobs']

if not reserve_jobs or jobs.reserve(self.target.table_name, self._job_key(key)):
self.connection.start_transaction()
if key in self.target: # already populated
self.connection.cancel_transaction()
if reserve_jobs:
jobs.complete(self.target.table_name, self._job_key(key))
else:
logger.info('Populating: ' + str(key))
self.__class__._allow_insert = True
try:
make(dict(key))
except (KeyboardInterrupt, SystemExit, Exception) as error:
try:
self.connection.cancel_transaction()
except OperationalError:
pass
error_message = '{exception}{msg}'.format(
exception=error.__class__.__name__,
msg=': ' + str(error) if str(error) else '')
if reserve_jobs:
# show error name and error message (if any)
jobs.error(
self.target.table_name, self._job_key(key),
error_message=error_message, error_stack=traceback.format_exc())
if not suppress_errors or isinstance(error, SystemExit):
raise
else:
logger.error(error)
return (key, error if return_exception_objects else error_message)
else:
self.connection.commit_transaction()
if reserve_jobs:
jobs.complete(self.target.table_name, self._job_key(key))
finally:
self.__class__._allow_insert = False

def progress(self, *restrictions, display=True):
"""
Expand Down
6 changes: 2 additions & 4 deletions datajoint/blob.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,10 +152,8 @@ def pack_blob(self, obj):
return self.pack_array(np.array(obj))
if isinstance(obj, (bool, np.bool, np.bool_)):
return self.pack_array(np.array(obj))
if isinstance(obj, float):
return self.pack_array(np.array(obj, dtype=np.float64))
if isinstance(obj, int):
return self.pack_array(np.array(obj, dtype=np.int64))
if isinstance(obj, (float, int, complex)):
return self.pack_array(np.array(obj))
if isinstance(obj, (datetime.datetime, datetime.date, datetime.time)):
return self.pack_datetime(obj)
if isinstance(obj, Decimal):
Expand Down
5 changes: 2 additions & 3 deletions datajoint/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@ def translate_query_error(client_error, query):
return errors.DuplicateError(*client_error.args[1:])
if isinstance(client_error, client.err.IntegrityError) and client_error.args[0] == 1452:
return errors.IntegrityError(*client_error.args[1:])
# Syntax Errors
# Syntax errors
if isinstance(client_error, client.err.ProgrammingError) and client_error.args[0] == 1064:
return errors.QuerySyntaxError(client_error.args[1], query)
# Existence Errors
# Existence errors
if isinstance(client_error, client.err.ProgrammingError) and client_error.args[0] == 1146:
return errors.MissingTableError(client_error.args[1], query)
if isinstance(client_error, client.err.InternalError) and client_error.args[0] == 1364:
Expand Down Expand Up @@ -286,4 +286,3 @@ def transaction(self):
raise
else:
self.commit_transaction()

6 changes: 3 additions & 3 deletions datajoint/diagram.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,8 +236,8 @@ def _make_graph(self):
for name in self.nodes_to_show:
foreign_attributes = set(
attr for p in self.in_edges(name, data=True) for attr in p[2]['attr_map'] if p[2]['primary'])
self.node[name]['distinguished'] = (
'primary_key' in self.node[name] and foreign_attributes < self.node[name]['primary_key'])
self.nodes[name]['distinguished'] = (
'primary_key' in self.nodes[name] and foreign_attributes < self.nodes[name]['primary_key'])
# include aliased nodes that are sandwiched between two displayed nodes
gaps = set(nx.algorithms.boundary.node_boundary(self, self.nodes_to_show)).intersection(
nx.algorithms.boundary.node_boundary(nx.DiGraph(self).reverse(), self.nodes_to_show))
Expand Down Expand Up @@ -307,7 +307,7 @@ def make_dot(self):
props = graph.get_edge_data(src, dest)
edge.set_color('#00000040')
edge.set_style('solid' if props['primary'] else 'dashed')
master_part = graph.node[dest]['node_type'] is Part and dest.startswith(src+'.')
master_part = graph.nodes[dest]['node_type'] is Part and dest.startswith(src+'.')
edge.set_weight(3 if master_part else 1)
edge.set_arrowhead('none')
edge.set_penwidth(.75 if props['multi'] else 2)
Expand Down
5 changes: 4 additions & 1 deletion datajoint/heading.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,10 @@ def init_from_database(self, conn, database, table_name, context):
category = next(c for c in SPECIAL_TYPES if TYPE_PATTERN[c].match(attr['type']))
except StopIteration:
if attr['type'].startswith('external'):
raise DataJointError('Legacy datatype `{type}`.'.format(**attr)) from None
url = "https://docs.datajoint.io/python/admin/5-blob-config.html" \
"#migration-between-datajoint-v0-11-and-v0-12"
raise DataJointError('Legacy datatype `{type}`. Migrate your external stores to '
'datajoint 0.12: {url}'.format(url=url, **attr)) from None
raise DataJointError('Unknown attribute type `{type}`'.format(**attr)) from None
if category == 'FILEPATH' and not _support_filepath_types():
raise DataJointError("""
Expand Down
32 changes: 18 additions & 14 deletions datajoint/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
import os
import platform
from .table import Table
from .errors import DuplicateError, IntegrityError
from .settings import config
from .errors import DuplicateError

ERROR_MESSAGE_LENGTH = 2047
TRUNCATION_APPENDIX = '...truncated'
Expand Down Expand Up @@ -76,7 +77,8 @@ def reserve(self, table_name, key):
key=key,
user=self._user)
try:
self.insert1(job, ignore_extra_fields=True)
with config(enable_python_native_blobs=True):
self.insert1(job, ignore_extra_fields=True)
except DuplicateError:
return False
return True
Expand All @@ -101,15 +103,17 @@ def error(self, table_name, key, error_message, error_stack=None):
"""
if len(error_message) > ERROR_MESSAGE_LENGTH:
error_message = error_message[:ERROR_MESSAGE_LENGTH-len(TRUNCATION_APPENDIX)] + TRUNCATION_APPENDIX
job_key = dict(table_name=table_name, key_hash=key_hash(key))
self.insert1(
dict(job_key,
status="error",
host=platform.node(),
pid=os.getpid(),
connection_id=self.connection.connection_id,
user=self._user,
key=key,
error_message=error_message,
error_stack=error_stack),
replace=True, ignore_extra_fields=True)
with config(enable_python_native_blobs=True):
self.insert1(
dict(
table_name=table_name,
key_hash=key_hash(key),
status="error",
host=platform.node(),
pid=os.getpid(),
connection_id=self.connection.connection_id,
user=self._user,
key=key,
error_message=error_message,
error_stack=error_stack),
replace=True, ignore_extra_fields=True)
4 changes: 4 additions & 0 deletions datajoint/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,10 @@ def process_table_class(self, table_class, context, assert_declared=False):
instance.declare(context)
is_declared = is_declared or instance.is_declared

# add table definition to the doc string
if isinstance(table_class.definition, str):
table_class.__doc__ = (table_class.__doc__ or "") + "\nTable definition:\n\n" + table_class.definition

# fill values in Lookup tables from their contents property
if isinstance(instance, Lookup) and hasattr(instance, 'contents') and is_declared:
contents = list(instance.contents)
Expand Down
Loading