Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
5fb5989
implemented default populate relation (fixed issue #59)
dimitri-yatsenko Jun 26, 2015
194f5c2
renamed populate_relation into populated_from
dimitri-yatsenko Jun 26, 2015
0b9add9
erd.load_dependencies no longer needs the primary_key argument
dimitri-yatsenko Jun 26, 2015
d990b35
implemented job reservations for distributed computing (issue #5)
dimitri-yatsenko Jun 26, 2015
13fc228
simplified dj.conn()
dimitri-yatsenko Jun 26, 2015
ad98158
minor bugfix
dimitri-yatsenko Jun 26, 2015
e5d8544
implemented recursive drop: Fixed issue #16.
dimitri-yatsenko Jun 29, 2015
69a28f3
group_by
fabiansinz Jun 29, 2015
ed5dd2a
removed alter methods from Relation in preparation for issue #110.
dimitri-yatsenko Jun 29, 2015
0ef3f82
added sorted
fabiansinz Jun 29, 2015
36e360f
now clearing dependencies when a table is dropped.
dimitri-yatsenko Jul 1, 2015
7c461ce
fixed nosetests
dimitri-yatsenko Jul 1, 2015
94a859d
cleaned up tests, made Connection.transaction a property since it wil…
dimitri-yatsenko Jul 1, 2015
6d0f473
tables are now declared when relation.heading is requested.
dimitri-yatsenko Jul 1, 2015
96bcc6b
fixed clearing and reloading of dependencies when tables are dropped.
dimitri-yatsenko Jul 1, 2015
9aaa373
Merge branch 'master' of https://github.com/dimitri-yatsenko/datajoin…
fabiansinz Jul 4, 2015
0cdf456
more documentation
fabiansinz Jul 5, 2015
bac8d4d
fixed merge conflict in transaction
fabiansinz Jul 6, 2015
ba92501
changed insert to insert1.
fabiansinz Jul 6, 2015
719907a
added positional insert of tuples (attributes are identified by posit…
dimitri-yatsenko Jul 8, 2015
7da3539
Merge branch 'master' of github.com:fabiansinz/datajoint-python
dimitri-yatsenko Jul 8, 2015
ac82e1a
added prepare and content
fabiansinz Jul 9, 2015
cd31213
Merge branch 'master' of https://github.com/dimitri-yatsenko/datajoin…
fabiansinz Jul 9, 2015
1d96ebf
content -> contents
fabiansinz Jul 9, 2015
2e8e46d
converted size_on_disk into a property
dimitri-yatsenko Jul 9, 2015
efe6bc1
Merge github.com:fabiansinz/datajoint-python
dimitri-yatsenko Jul 9, 2015
1dd616d
minor cleanup. size_on_disk in now in bytes.
dimitri-yatsenko Jul 9, 2015
e07efe8
bugfix in job reservations
dimitri-yatsenko Jul 9, 2015
7f5d623
bugfix in error of jobs
fabiansinz Jul 9, 2015
6aed9b7
before batches
fabiansinz Jul 10, 2015
bf1e670
implemented batch processing in populate
fabiansinz Jul 10, 2015
1f71a2d
refactored job reservations and the schema decorator.
dimitri-yatsenko Jul 10, 2015
a54af98
schema now has the attribute `jobs` containing the job reservation table
dimitri-yatsenko Jul 10, 2015
9169efc
tuples iterator
fabiansinz Jul 10, 2015
3a6dc9d
fixed conflict from merge with dimitri's fork
fabiansinz Jul 10, 2015
a8ccb78
introduced new iterators values() and keys()
fabiansinz Jul 10, 2015
67f311b
bugfix in relational algebra. Projections with renames can now be res…
dimitri-yatsenko Jul 10, 2015
5c4b9e9
Merge branch 'master' of github.com:fabiansinz/datajoint-python
dimitri-yatsenko Jul 10, 2015
2ece673
minor
dimitri-yatsenko Jul 10, 2015
b01bede
bugfix in relational algebra (semijoin)
dimitri-yatsenko Jul 10, 2015
c79fcc1
updated nose tests
dimitri-yatsenko Jul 15, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 14 additions & 5 deletions datajoint/__init__.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,24 @@
"""
DataJoint for Python is a high-level programming interface for MySQL databases
to support data processing chains in science labs. DataJoint is built on the
foundation of the relational data model and prescribes a consistent method for
organizing, populating, and querying data.

DataJoint is free software under the LGPL License. In addition, we request
that any use of DataJoint leading to a publication be acknowledged in the publication.
"""

import logging
import os

__author__ = "Dimitri Yatsenko, Edgar Walker, and Fabian Sinz at Baylor College of Medicine"
__version__ = "0.2"
__all__ = ['__author__', '__version__',
'config',
'Connection', 'Heading', 'Relation', 'FreeRelation', 'Not',
'Relation',
'Relation', 'schema',
'Manual', 'Lookup', 'Imported', 'Computed',
'AutoPopulate', 'conn', 'DataJointError', 'blob']
'conn', 'DataJointError']


class DataJointError(Exception):
Expand Down Expand Up @@ -39,8 +50,6 @@ class DataJointError(Exception):
from .connection import conn, Connection
from .relation import Relation
from .user_relations import Manual, Lookup, Imported, Computed, Subordinate
from .autopopulate import AutoPopulate
from . import blob
from .relational_operand import Not
from .heading import Heading
from .relation import schema
from .schema import schema
94 changes: 54 additions & 40 deletions datajoint/autopopulate.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
from .relational_operand import RelationalOperand
from . import DataJointError, Relation
"""autopopulate containing the dj.AutoPopulate class. See `dj.AutoPopulate` for more info."""
import abc
import logging
from .relational_operand import RelationalOperand
from . import DataJointError
from .relation import Relation, FreeRelation
from . import jobs

# noinspection PyExceptionInherit,PyCallingNonCallable

Expand All @@ -12,17 +15,25 @@ class AutoPopulate(metaclass=abc.ABCMeta):
"""
AutoPopulate is a mixin class that adds the method populate() to a Relation class.
Auto-populated relations must inherit from both Relation and AutoPopulate,
must define the property pop_rel, and must define the callback method make_tuples.
must define the property populated_from, and must define the callback method _make_tuples.
"""
_jobs = None

@abc.abstractproperty
def populate_relation(self):
@property
def populated_from(self):
"""
Derived classes must implement the read-only property populate_relation, which is the
relational expression that defines how keys are generated for the populate call.
By default, populate relation is the join of the primary dependencies of the table.
:return: the relation whose primary key values are passed, sequentially, to the
`_make_tuples` method when populate() is called.The default value is the
join of the parent relations. Users may override to change the granularity
or the scope of populate() calls.
"""
pass
parents = [FreeRelation(self.target.connection, rel) for rel in self.target.parents]
if not parents:
raise DataJointError('A relation must have parent relations to be able to be populated')
ret = parents.pop(0)
while parents:
ret *= parents.pop(0)
return ret

@abc.abstractmethod
def _make_tuples(self, key):
Expand All @@ -39,52 +50,55 @@ def target(self):

def populate(self, restriction=None, suppress_errors=False, reserve_jobs=False):
"""
rel.populate() calls rel._make_tuples(key) for every primary key in self.populate_relation
rel.populate() calls rel._make_tuples(key) for every primary key in self.populated_from
for which there is not already a tuple in rel.

:param restriction: restriction on rel.populate_relation - target
:param restriction: restriction on rel.populated_from - target
:param suppress_errors: suppresses error if true
:param reserve_jobs: currently not implemented
:param batch: batch size of a single job
"""

assert not reserve_jobs, NotImplemented # issue #5
error_list = [] if suppress_errors else None
if not isinstance(self.populate_relation, RelationalOperand):
raise DataJointError('Invalid populate_relation value')
if not isinstance(self.populated_from, RelationalOperand):
raise DataJointError('Invalid populated_from value')

self.connection.cancel_transaction() # rollback previous transaction, if any
if self.connection.in_transaction:
raise DataJointError('Populate cannot be called during a transaction.')

if not isinstance(self, Relation):
raise DataJointError(
'AutoPopulate is a mixin for Relation and must therefore subclass Relation')

unpopulated = (self.populate_relation - self.target) & restriction
jobs = self.connection.jobs[self.target.database]
table_name = self.target.table_name
unpopulated = (self.populated_from - self.target) & restriction
for key in unpopulated.project():
self.connection.start_transaction()
if key in self.target: # already populated
self.connection.cancel_transaction()
else:
logger.info('Populating: ' + str(key))
try:
self._make_tuples(dict(key))
except Exception as error:
if not reserve_jobs or jobs.reserve(table_name, key):
self.connection.start_transaction()
if key in self.target: # already populated
self.connection.cancel_transaction()
if not suppress_errors:
raise
else:
logger.error(error)
error_list.append((key, error))
if reserve_jobs:
jobs.complete(table_name, key)
else:
self.connection.commit_transaction()
logger.info('Done populating.')
logger.info('Populating: ' + str(key))
try:
self._make_tuples(dict(key))
except Exception as error:
self.connection.cancel_transaction()
if reserve_jobs:
jobs.error(table_name, key, error_message=str(error))
if not suppress_errors:
raise
else:
logger.error(error)
error_list.append((key, error))
else:
self.connection.commit_transaction()
if reserve_jobs:
jobs.complete(table_name, key)
return error_list


def progress(self):
"""
report progress of populating this table
"""
total = len(self.populate_relation)
remaining = len(self.populate_relation - self.target)
print('Remaining %d of %d (%2.1f%%)' % (remaining, total, 100*remaining/total)
total = len(self.populated_from)
remaining = len(self.populated_from - self.target)
print('Completed %d of %d (%2.1f%%)' % (total - remaining, total, 100 - 100 * remaining / total)
if remaining else 'Complete', flush=True)
18 changes: 16 additions & 2 deletions datajoint/blob.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
"""
Provides serialization methods for numpy.ndarrays that ensure compatibility with Matlab.
"""

import zlib
from collections import OrderedDict
import numpy as np
Expand Down Expand Up @@ -29,7 +33,10 @@

def pack(obj):
"""
packs an object into a blob to be compatible with mym.mex
Packs an object into a blob to be compatible with mym.mex

:param obj: object to be packed
:type obj: numpy.ndarray
"""
if not isinstance(obj, np.ndarray):
raise DataJointError("Only numpy arrays can be saved in blobs")
Expand Down Expand Up @@ -58,9 +65,16 @@ def pack(obj):

def unpack(blob):
"""
unpack blob into a numpy array
Unpacks blob data into a numpy array.

:param blob: mysql blob
:returns: unpacked data
:rtype: numpy.ndarray
"""
# decompress if necessary
if blob is None:
return None

if blob[0:5] == b'ZL123':
blob_length = np.fromstring(blob[6:14], dtype=np.uint64)[0]
blob = zlib.decompress(blob[14:])
Expand Down
107 changes: 66 additions & 41 deletions datajoint/connection.py
Original file line number Diff line number Diff line change
@@ -1,43 +1,42 @@
"""
This module hosts the Connection class that manages the connection to the mysql database via
`pymysql`, and the `conn` function that provides access to a persistent connection in datajoint.

"""
from contextlib import contextmanager
import pymysql
from . import DataJointError
import logging
from . import config
from collections import defaultdict
from . import DataJointError, config
from .erd import ERD
from .jobs import JobManager

logger = logging.getLogger(__name__)


def conn_container():
def conn(host=None, user=None, passwd=None, init_fun=None, reset=False):
"""
creates a persistent connections for everyone to use
Returns a persistent connection object to be shared by multiple modules.
If the connection is not yet established or reset=True, a new connection is set up.
If connection information is not provided, it is taken from config which takes the
information from dj_local_conf.json. If the password is not specified in that file
datajoint prompts for the password.

:param host: hostname
:param user: mysql user
:param passwd: mysql password
:param init_fun: initialization function
:param reset: whether the connection should be reseted or not
"""
_connection = None # persistent connection object used by dj.conn()

def conn_function(host=None, user=None, passwd=None, init_fun=None, reset=False): # TODO: thin wrapping layer to mimic singleton
"""
Manage a persistent connection object.
This is one of several ways to configure and access a datajoint connection.
Users may customize their own connection manager.

Set rest=True to reset the persistent connection object
"""
nonlocal _connection
if not _connection or reset:
host = host if host is not None else config['database.host']
user = user if user is not None else config['database.user']
passwd = passwd if passwd is not None else config['database.password']

if passwd is None: passwd = input("Please enter database password: ")

init_fun = init_fun if init_fun is not None else config['connection.init_function']
_connection = Connection(host, user, passwd, init_fun)
return _connection

return conn_function

# The function conn is used by others to obtain a connection object
conn = conn_container()
if not hasattr(conn, 'connection') or reset:
host = host if host is not None else config['database.host']
user = user if user is not None else config['database.user']
passwd = passwd if passwd is not None else config['database.password']
if passwd is None:
passwd = input("Please enter database password: ")
init_fun = init_fun if init_fun is not None else config['connection.init_function']
conn.connection = Connection(host, user, passwd, init_fun)
return conn.connection


class Connection:
Expand All @@ -51,7 +50,6 @@ class Connection:
:param user: user name
:param passwd: password
:param init_fun: initialization function

"""

def __init__(self, host, user, passwd, init_fun=None):
Expand All @@ -69,6 +67,7 @@ def __init__(self, host, user, passwd, init_fun=None):
raise DataJointError('Connection failed.')
self._conn.autocommit(True)
self._in_transaction = False
self.jobs = JobManager(self)

def __del__(self):
logger.info('Disconnecting {user}@{host}:{port}'.format(**self.conn_info))
Expand All @@ -89,16 +88,15 @@ def __repr__(self):
return "DataJoint connection ({connected}) {user}@{host}:{port}".format(
connected=connected, **self.conn_info)

def __del__(self):
logger.info('Disconnecting {user}@{host}:{port}'.format(**self.conn_info))
self._conn.close()

def query(self, query, args=(), as_dict=False):
"""
Execute the specified query and return the tuple generator.

If as_dict is set to True, the returned cursor objects returns
query results as dictionary.
:param query: mysql query
:param args: additional arguments for the pymysql.cursor
:param as_dict: If as_dict is set to True, the returned cursor objects returns
query results as dictionary.
"""
cursor = pymysql.cursors.DictCursor if as_dict else pymysql.cursors.Cursor
cur = self._conn.cursor(cursor=cursor)
Expand All @@ -108,39 +106,66 @@ def query(self, query, args=(), as_dict=False):
cur.execute(query, args)
return cur

# ---------- transaction processing ------------------
# ---------- transaction processing
@property
def in_transaction(self):
"""
:return: True if there is an open transaction.
"""
self._in_transaction = self._in_transaction and self.is_connected
return self._in_transaction

def start_transaction(self):
"""
Starts a transaction error.

:raise DataJointError: if there is an ongoing transaction.
"""
if self.in_transaction:
raise DataJointError("Nested connections are not supported.")
self.query('START TRANSACTION WITH CONSISTENT SNAPSHOT')
self._in_transaction = True
logger.info("Transaction started")

def cancel_transaction(self):
"""
Cancels the current transaction and rolls back all changes made during the transaction.

"""
self.query('ROLLBACK')
self._in_transaction = False
logger.info("Transaction cancelled. Rolling back ...")

def commit_transaction(self):
"""
Commit all changes made during the transaction and close it.

"""
self.query('COMMIT')
self._in_transaction = False
logger.info("Transaction committed and closed.")


#-------- context manager for transactions
# -------- context manager for transactions
@property
@contextmanager
def transaction(self):
"""
Context manager for transactions. Opens an transaction and closes it after the with statement.
If an error is caught during the transaction, the commits are automatically rolled back. All
errors are raised again.

Example:
>>> import datajoint as dj
>>> with dj.conn().transaction as conn:
>>> # transaction is open here


"""
try:
self.start_transaction()
yield self
except:
self.cancel_transaction()
raise
else:
self.commit_transaction()

self.commit_transaction()
Loading