Skip to content

Commit

Permalink
added check(), auto_add
Browse files Browse the repository at this point in the history
  • Loading branch information
David Marin committed Jul 8, 2011
1 parent cc7ab3c commit 10b305b
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 36 deletions.
2 changes: 1 addition & 1 deletion bin/create-doloop-table
Expand Up @@ -19,7 +19,7 @@ def main(args):
print

def make_option_parser():
usage = '%prog [options] table [table ...]'
usage = '%prog [options] table [table ...] | mysql -D dbname'
description = ('Print SQL to create one or more task loop tables.')
parser = optparse.OptionParser(usage=usage, description=description)

Expand Down
139 changes: 104 additions & 35 deletions doloop.py
Expand Up @@ -11,8 +11,8 @@
foo_ids = doloop.get(dbconn, 'foo_loop', 100)
for foo_id in foo_ids:
# run your updating function
rebarify_foo(foo_id)
# update foo_id
...
doloop.did(dbconn, 'foo_loop', foo_ids)
"""
Expand Down Expand Up @@ -71,7 +71,7 @@ def _trans(dbconn):
yield cursor

dbconn.commit()

except:
dbconn.rollback()
raise
Expand All @@ -89,8 +89,7 @@ def create(cursor, table, id_type='INT'):
`last_updated` INT DEFAULT NULL,
`lock_until` INT DEFAULT NULL,
PRIMARY KEY (`id`),
INDEX (`lock_until`, `last_updated`),
INDEX (`last_updated`)
INDEX (`lock_until`, `last_updated`)
) ENGINE=InnoDB
* *id* is the ID of the thing you want to update. It can refer to anything that has a unique ID (doesn't need to be another table in this database). It also need not be an ``INT``; see *id_type*, below.
Expand All @@ -117,8 +116,7 @@ def create_sql(table, id_type='INT'):
`last_updated` INT default NULL,
`lock_until` INT default NULL,
PRIMARY KEY (`id`),
INDEX (`lock_until`, `last_updated`),
INDEX (`last_updated`)
INDEX (`lock_until`, `last_updated`)
) ENGINE=InnoDB""" % (table, id_type)


Expand All @@ -138,17 +136,34 @@ def add(dbconn, table, id_or_ids, updated=False):
if not ids:
return 0

with _trans(dbconn) as cursor:
_add(cursor, table, ids, updated=updated)
return cursor.rowcount


def _add(cursor, table, ids, updated=False):
"""Helper function to ``INSERT IGNORE`` IDs into the the table. By default,
``last_updated`` and ``lock_until`` will be ``NULL``.
:param dbconn: a :py:mod:`MySQLdb` connection object
:param str table: name of your task loop table
:param id_or_ids: ID or list of IDs to add
:param updated: Set ``last_updated`` to the current time rather than ``NULL``.
"""
assert ids
assert isinstance(ids, list)

if updated:
cols = '(`id`, `last_updated`)'
row_sql = '(%s, UNIX_TIMESTAMP())'
else:
row_sql = '(%s, NULL)'
cols = '(`id`)'
row_sql = '(%s)'

sql = ('INSERT IGNORE INTO `%s` (`id`, `last_updated`)'
' VALUES %s' % (table, ', '.join(row_sql for _ in ids)))
sql = ('INSERT IGNORE INTO `%s` %s VALUES %s' %
(table, cols, ', '.join(row_sql for _ in ids)))

with _trans(dbconn) as cursor:
cursor.execute(sql, ids)
return cursor.rowcount
cursor.execute(sql, ids)


def remove(dbconn, table, id_or_ids):
Expand All @@ -171,6 +186,7 @@ def remove(dbconn, table, id_or_ids):
cursor.execute(sql)
return cursor.rowcount


### Getting and running tasks ###

def get(dbconn, table, limit, lock_for=ONE_HOUR, min_loop_time=ONE_HOUR):
Expand All @@ -196,15 +212,15 @@ def get(dbconn, table, limit, lock_for=ONE_HOUR, min_loop_time=ONE_HOUR):
"""
if not limit:
return []

min_loop_time = min_loop_time or 0

select1 = ('SELECT `id` FROM `%s`'
' WHERE `lock_until` <= UNIX_TIMESTAMP()'
' ORDER BY `lock_until` ASC, `last_updated` ASC'
' LIMIT %%s'
' FOR UPDATE' % (table,))

select2 = ('SELECT `id` FROM `%s`'
' WHERE `lock_until` IS NULL'
' AND `last_updated` IS NULL'
Expand All @@ -226,7 +242,7 @@ def update_sql():
return ('UPDATE `%s` SET `lock_until` = UNIX_TIMESTAMP() + %%s'
' WHERE `id` IN (%s)' %
(table, ', '.join('%s' for _ in ids)))

with _trans(dbconn) as cursor:
cursor.execute(select1, [limit])
ids.extend(row[0] for row in cursor.fetchall())
Expand All @@ -247,7 +263,7 @@ def update_sql():
return ids


def did(dbconn, table, id_or_ids):
def did(dbconn, table, id_or_ids, auto_add=True):
"""Mark IDs as updated and unlock them.
Usually, these will be IDs that you grabbed using :py:func:`~doloop.get`,
Expand All @@ -257,6 +273,7 @@ def did(dbconn, table, id_or_ids):
:param dbconn: a :py:mod:`MySQLdb` connection object
:param str table: name of your task loop table
:param id_or_ids: ID or list of IDs that we just updated
:param bool auto_add: Add any IDs that are not already in the table.
:return: number of rows updated (mostly useful as a sanity check)
"""
Expand All @@ -270,11 +287,14 @@ def did(dbconn, table, id_or_ids):
', '.join('%s' for _ in ids)))

with _trans(dbconn) as cursor:
if auto_add:
_add(cursor, table, ids)

cursor.execute(sql, ids)
return cursor.rowcount


def unlock(dbconn, table, id_or_ids):
def unlock(dbconn, table, id_or_ids, auto_add=True):
"""Unlock IDs without marking them updated.
Useful if you :py:func:`~doloop.get` IDs, but are then unable or unwilling
Expand All @@ -283,6 +303,7 @@ def unlock(dbconn, table, id_or_ids):
:param dbconn: a :py:mod:`MySQLdb` connection object
:param str table: name of your task loop table
:param id_or_ids: ID or list of IDs
:param bool auto_add: Add any IDs that are not already in the table.
:return: number of rows updated (mostly useful as a sanity check)
"""
Expand All @@ -294,13 +315,23 @@ def unlock(dbconn, table, id_or_ids):
' WHERE `id` IN (%s)' % (table, ', '.join('%s' for _ in ids)))

with _trans(dbconn) as cursor:
rowcount = 0

# newly added rows already have lock_until set to NULL, so these
# rows won't get hit by the UPDATE statement below
if auto_add:
_add(cursor, table, ids)
rowcount += cursor.rowcount

cursor.execute(sql, ids)
return cursor.rowcount
rowcount += cursor.rowcount

return rowcount


### Prioritization ###

def bump(dbconn, table, id_or_ids, lock_for=0):
def bump(dbconn, table, id_or_ids, lock_for=0, auto_add=True):
"""Bump priority of IDs.
Normally we set ``lock_until`` to the current time, which gives them
Expand All @@ -322,7 +353,8 @@ def bump(dbconn, table, id_or_ids, lock_for=0):
:param str table: name of your task loop table
:param id_or_ids: ID or list of IDs
:param lock_for: Number of seconds that the IDs should stay locked.
:param bool auto_add: Add any IDs that are not already in the table.
:return: number of IDs bumped (mostly useful as a sanity check)
"""
ids = _to_list(id_or_ids)
Expand All @@ -337,9 +369,43 @@ def bump(dbconn, table, id_or_ids, lock_for=0):
(table, ', '.join('%s' for _ in ids)))

with _trans(dbconn) as cursor:
if auto_add:
_add(cursor, table, ids)

cursor.execute(sql, [lock_for, lock_for] + ids)
return cursor.rowcount



### Auditing ###

def check(dbconn, table, id_or_ids):
"""Check the status of IDs.
:param dbconn: a :py:mod:`MySQLdb` connection object
:param str table: name of your task loop table
:param id_or_ids: ID or list of IDs
Returns a dictionary mapping ID to a tuple of ``(since_updated,
locked_for)``, that is, the current time minus ``last_updated``, and
``lock_for`` minus the current time (both of these in seconds).
This function does not require write access to your database.
"""
ids = _to_list(id_or_ids)
if not ids:
return {}

sql = ('SELECT `id`,'
' UNIX_TIMESTAMP() - `last_updated` as `since_updated`,'
' `lock_until` - UNIX_TIMESTAMP() as `locked_for`'
' FROM `%s` WHERE `id` IN (%s)' %
(table, ', '.join('%s' for _ in ids)))

with _trans(dbconn) as cursor:
cursor.execute(sql, ids)
return dict((id_, (since_updated, locked_for))
for id_, since_updated, locked_for in cursor.fetchall())


### Object-Oriented version ###

Expand All @@ -354,10 +420,10 @@ class DoLoop(object):
foo_ids = foo_loop.get(100)
for foo_id in foo_ids:
# run your updating function
rebarify_foo(foo_id)
# update foo_id
...
doloop.did(foo_ids)
foo_loop.did(foo_ids)
"""

def __init__(self, dbconn, table):
Expand Down Expand Up @@ -395,28 +461,31 @@ def get(self, limit, lock_for=ONE_HOUR, min_loop_time=ONE_HOUR):
return get(
self._make_dbconn(), self._table, limit, lock_for, min_loop_time)

def did(self, id_or_ids):
def did(self, id_or_ids, auto_add=True):
"""Mark IDs as updated and unlock them.
See :py:func:`~doloop.did` for details.
"""
return did(self._make_dbconn(), self._table, id_or_ids)
return did(self._make_dbconn(), self._table, id_or_ids, auto_add)

def unlock(self, id_or_ids):
def unlock(self, id_or_ids, auto_add=True):
"""Unlock IDs without marking them updated.
See :py:func:`~doloop.unlock` for details.
"""
return unlock(self._make_dbconn(), self._table, id_or_ids)
return unlock(self._make_dbconn(), self._table, id_or_ids, auto_add)

def bump(self, id_or_ids, lock_for=0):
def bump(self, id_or_ids, lock_for=0, auto_add=True):
"""Bump priority of IDs.
See :py:func:`~doloop.bump` for details.
"""
return bump(self._make_dbconn(), self._table, id_or_ids, lock_for)
return bump(
self._make_dbconn(), self._table, id_or_ids, lock_for, auto_add)


def check(self, id_or_ids):
"""Check the status of IDs.


See :py:func:`~doloop.check` for details.
"""
return check(self._make_dbconn(), self._table, id_or_ids)

0 comments on commit 10b305b

Please sign in to comment.