Skip to content

Commit

Permalink
Merge branch 'development'
Browse files Browse the repository at this point in the history
  • Loading branch information
dimitri-yatsenko committed Aug 14, 2015
2 parents f2f2164 + 2149b23 commit e0aadc0
Show file tree
Hide file tree
Showing 13 changed files with 367 additions and 142 deletions.
3 changes: 2 additions & 1 deletion datajoint/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
'Connection', 'Heading', 'Relation', 'FreeRelation', 'Not',
'Relation', 'schema',
'Manual', 'Lookup', 'Imported', 'Computed',
'conn']
'conn', 'kill']

# define an object that identifies the primary key in RelationalOperand.__getitem__
class PrimaryKey: pass
Expand Down Expand Up @@ -58,3 +58,4 @@ class DataJointError(Exception):
from .relational_operand import Not
from .heading import Heading
from .schema import schema
from .kill import kill
28 changes: 17 additions & 11 deletions datajoint/autopopulate.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@
import datetime
from .relational_operand import RelationalOperand
from . import DataJointError
from .relation import Relation, FreeRelation
from . import jobs
from .relation import FreeRelation

# noinspection PyExceptionInherit,PyCallingNonCallable

Expand Down Expand Up @@ -47,6 +46,10 @@ def _make_tuples(self, key):

@property
def target(self):
"""
relation to be populated.
Typically, AutoPopulate are mixed into a Relation object and the target is self.
"""
return self

def populate(self, restriction=None, suppress_errors=False, reserve_jobs=False):
Expand All @@ -68,7 +71,7 @@ def populate(self, restriction=None, suppress_errors=False, reserve_jobs=False):

jobs = self.connection.jobs[self.target.database]
table_name = self.target.table_name
unpopulated = (self.populated_from - self.target) & restriction
unpopulated = (self.populated_from & restriction) - self.target.project()
for key in unpopulated.fetch.keys():
if not reserve_jobs or jobs.reserve(table_name, key):
self.connection.start_transaction()
Expand All @@ -95,14 +98,17 @@ def populate(self, restriction=None, suppress_errors=False, reserve_jobs=False):
jobs.complete(table_name, key)
return error_list

def progress(self):
def progress(self, restriction=None, display=True):
"""
report progress of populating this table
:return: remaining, total -- tuples to be populated
"""
total = len(self.populated_from)
remaining = len(self.populated_from - self.target)
print('Completed %d of %d (%2.1f%%) %s' %
(total - remaining, total, 100 - 100 * remaining / total,
datetime.datetime.strftime(datetime.datetime.now(), '%Y-%m-%d %H:%M:%S')
) if remaining
else 'Complete', flush=True)
total = len(self.populated_from & restriction)
remaining = len((self.populated_from & restriction) - self.target.project())
if display:
print('%-20s' % self.__class__.__name__, flush=True, end=': ')
print('Completed %d of %d (%2.1f%%) %s' %
(total - remaining, total, 100 - 100 * remaining / (total+1e-12),
datetime.datetime.strftime(datetime.datetime.now(), '%Y-%m-%d %H:%M:%S')
), flush=True)
return remaining, total
14 changes: 7 additions & 7 deletions datajoint/connection.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
"""
This module hosts the Connection class that manages the connection to the mysql database via
`pymysql`, and the `conn` function that provides access to a persistent connection in datajoint.
This module hosts the Connection class that manages the connection to the mysql database,
and the `conn` function that provides access to a persistent connection in datajoint.
"""

from contextlib import contextmanager
import pymysql
import pymysql as connector
import logging
from . import config
from . import DataJointError
Expand Down Expand Up @@ -59,7 +59,7 @@ def __init__(self, host, user, passwd, init_fun=None):
else:
port = config['database.port']
self.conn_info = dict(host=host, port=port, user=user, passwd=passwd)
self._conn = pymysql.connect(init_command=init_fun, **self.conn_info)
self._conn = connector.connect(**self.conn_info)
if self.is_connected:
logger.info("Connected {user}@{host}:{port}".format(**self.conn_info))
else:
Expand Down Expand Up @@ -96,11 +96,11 @@ def query(self, query, args=(), as_dict=False):
Execute the specified query and return the tuple generator (cursor).
:param query: mysql query
:param args: additional arguments for the pymysql.cursor
:param args: additional arguments for the connector.cursor
:param as_dict: If as_dict is set to True, the returned cursor objects returns
query results as dictionary.
"""
cursor = pymysql.cursors.DictCursor if as_dict else pymysql.cursors.Cursor
cursor = connector.cursors.DictCursor if as_dict else connector.cursors.Cursor
cur = self._conn.cursor(cursor=cursor)

# Log the query
Expand Down
2 changes: 1 addition & 1 deletion datajoint/declare.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def declare(full_table_name, definition, context):
for line in definition:
if line.startswith('#'): # additional comments are ignored
pass
elif line.startswith('---'):
elif line.startswith('---') or line.startswith('___'):
in_key = False # start parsing dependent attributes
elif line.startswith('->'):
# foreign key
Expand Down
35 changes: 14 additions & 21 deletions datajoint/erd.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,14 +212,12 @@ def up_down_neighbors(self, node, ups=2, downs=2, _prev=None):
s = {node}
if ups > 0:
for x in self.predecessors_iter(node):
if x == _prev:
continue
s.update(self.up_down_neighbors(x, ups-1, downs, node))
if x != _prev:
s.update(self.up_down_neighbors(x, ups-1, downs, node))
if downs > 0:
for x in self.successors_iter(node):
if x == _prev:
continue
s.update(self.up_down_neighbors(x, ups, downs-1, node))
if x != _prev:
s.update(self.up_down_neighbors(x, ups, downs-1, node))
return s

def n_neighbors(self, node, n, directed=False, prev=None):
Expand All @@ -234,23 +232,20 @@ def n_neighbors(self, node, n, directed=False, prev=None):
Set directed=True to follow only outgoing edges.
"""
s = {node}
if n < 1:
return s
if n == 1:
s.update(self.predecessors(node))
s.update(self.successors(node))
return s
if not directed:
for x in self.predecesors_iter():
if x == prev: # skip prev point
continue
s.update(self.n_neighbors(x, n-1, prev))
for x in self.succesors_iter():
if x == prev:
continue
s.update(self.n_neighbors(x, n-1, prev))
elif n > 1:
if not directed:
for x in self.predecesors_iter():
if x != prev: # skip prev point
s.update(self.n_neighbors(x, n-1, prev))
for x in self.succesors_iter():
if x != prev:
s.update(self.n_neighbors(x, n-1, prev))
return s


class ERM(RelGraph):
"""
Entity Relation Map
Expand Down Expand Up @@ -278,9 +273,7 @@ def update_graph(self, reload=False):
# create primary key foreign connections
for table, parents in self._parents.items():
mod, cls = (x.strip('`') for x in table.split('.'))

self.add_node(table, label=table,
mod=mod, cls=cls)
self.add_node(table, label=table, mod=mod, cls=cls)
for parent in parents:
self.add_edge(parent, table, rel='parent')

Expand Down
9 changes: 7 additions & 2 deletions datajoint/fetch.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ def prepare_attributes(relation, item):


def copy_first(f):
"""
decorates methods that return an altered copy of self
"""
@wraps(f)
def ret(*args, **kwargs):
args = list(args)
Expand Down Expand Up @@ -70,7 +73,6 @@ def offset(self, offset):
self.behavior['offset'] = offset
return self


@copy_first
def set_behavior(self, **kwargs):
self.behavior.update(kwargs)
Expand All @@ -90,7 +92,8 @@ def __call__(self, **kwargs):
"""
behavior = dict(self.behavior, **kwargs)
if behavior['limit'] is None and behavior['offset'] is not None:
warnings.warn('Offset set, but no limit. Setting limit to a large number. Consider setting a limit yourself.')
warnings.warn('Offset set, but no limit. Setting limit to a large number. '
'Consider setting a limit explicitly.')
behavior['limit'] = 2*len(self._relation)
cur = self._relation.cursor(**behavior)

Expand Down Expand Up @@ -180,7 +183,9 @@ def __repr__(self):
def __len__(self):
return len(self._relation)


class Fetch1:

def __init__(self, relation):
self._relation = relation

Expand Down
47 changes: 47 additions & 0 deletions datajoint/kill.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import pymysql
from . import conn


def kill(restriction=None, connection=None):
"""
view and kill database connections.
:param restriction: restriciton to be applied to processlist
:param connection: a datajoint.Connection object. Default calls datajoint.conn()
Restrictions are specified as strings and can involve any of the attributes of
information_schema.processlist: ID, USER, HOST, DB, COMMAND, TIME, STATE, INFO.
Examples:
dj.kill('HOST LIKE "%compute%"') lists only connections from hosts containing "compute".
dj.kill('TIME > 600') lists only connections older than 10 minutes.
"""

if connection is None:
connection = conn()

query = 'SELECT * FROM information_schema.processlist WHERE id <> CONNECTION_ID()';
if restriction is not None:
query += ' AND (%s)' % restriction

while True:
print(' ID USER STATE TIME INFO')
print('+--+ +----------+ +-----------+ +--+')
for process in connection.query(query, as_dict=True).fetchall():
try:
print('{ID:>4d} {USER:<12s} {STATE:<12s} {TIME:>5d} {INFO}'.format(**process))
except TypeError as err:
print(process)

response = input('process to kill or "q" to quit)')
if response == 'q':
break
if response:
try:
id = int(response)
except ValueError:
pass # ignore non-numeric input
else:
try:
connection.query('kill %d' % id)
except pymysql.err.InternalError:
print('Process not found')
43 changes: 23 additions & 20 deletions datajoint/relation.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from collections import Mapping
from collections import Mapping, OrderedDict
import numpy as np
import logging
import abc
Expand Down Expand Up @@ -108,12 +108,14 @@ def descendants(self):
"""
:return: list of relation objects for all children and references, recursively,
in order of dependence.
Does not include self.
This is helpful for cascading delete or drop operations.
"""
relations = (FreeRelation(self.connection, table)
for table in self.connection.erm.get_descendants(self.full_table_name))
return [relation for relation in relations if relation.is_declared]


def _repr_helper(self):
return "%s.%s()" % (self.__module__, self.__class__.__name__)

Expand Down Expand Up @@ -206,32 +208,33 @@ def delete(self):
"""
relations = self.descendants
restrict_by_me = set()
rel_by_name = {r.full_table_name:r for r in relations}
for r in relations:
for ref in r.references:
restrict_by_me.add(ref)
restrict_by_me.update(r.references)
relations = OrderedDict((r.full_table_name, r) for r in relations)

if self.restrictions:
restrict_by_me.add(self.full_table_name)
rel_by_name[self.full_table_name] &= self.restrictions
relations[self.full_table_name] &= self.restrictions

for r in relations:
for name in relations:
r = relations[name]
for dep in (r.children + r.references):
rel_by_name[dep] &= r.project() if r.full_table_name in restrict_by_me else r.restrictions

if config['safemode']:
do_delete = False # indicate if there is anything to delete
print('The contents of the following tables are about to be deleted:')
for relation in relations:
count = len(relation)
if count:
do_delete = True
relations[dep] &= r.project() if name in restrict_by_me else r.restrictions

do_delete = False # indicate if there is anything to delete
print('The contents of the following tables are about to be deleted:')
for relation in relations.values():
count = len(relation)
if count:
do_delete = True
if config['safemode']:
print(relation.full_table_name, '(%d tuples)' % count)
if not do_delete or user_choice("Proceed?", default='no') != 'yes':
return
with self.connection.transaction:
while relations:
relations.pop().delete_quick()
else:
relations.pop(relation.full_table_name)
if do_delete and (not config['safemode'] or user_choice("Proceed?", default='no') == 'yes'):
with self.connection.transaction:
for r in reversed(list(relations.values())):
r.delete_quick()

def drop_quick(self):
"""
Expand Down
Loading

0 comments on commit e0aadc0

Please sign in to comment.