Skip to content

Commit

Permalink
Merge pull request #138 from guewen/advisory-lock-common
Browse files Browse the repository at this point in the history
Context manager to acquire Postgres advisory locks
  • Loading branch information
pedrobaeza committed Nov 2, 2015
2 parents 2c5720d + 6e23779 commit f68890c
Show file tree
Hide file tree
Showing 2 changed files with 168 additions and 11 deletions.
102 changes: 102 additions & 0 deletions connector/connector.py
Expand Up @@ -19,11 +19,15 @@
#
##############################################################################

import hashlib
import logging
import struct

from contextlib import contextmanager
from openerp import models, fields

from .deprecate import log_deprecate, DeprecatedClass
from .exception import RetryableJobError

_logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -227,6 +231,33 @@ def get_binder_for_model(self, model=None):
log_deprecate('renamed to binder_for()')
return self.binder_for(model=model)

@contextmanager
def try_advisory_lock(self, lock, retry_seconds=1):
""" Context manager, tries to acquire a Postgres transactional
advisory lock.
If the lock cannot be acquired, it raises a
``RetryableJobError`` so the jobs is retried after n
``retry_seconds``.
See :func:``openerp.addons.connector.connector.pg_try_advisory_lock``
for details.
:param lock: The lock name. Can be anything convertible to a
string. It needs to represents what should not be synchronized
concurrently so usually the string will contain at least: the
action, the backend type, the backend id, the model name, the
external id
:param retry_seconds: number of seconds after which a job should
be retried when the lock cannot be acquired.
"""
if pg_try_advisory_lock(self.env, lock):
yield
else:
raise RetryableJobError('Could not acquire advisory lock',
seconds=retry_seconds,
ignore_retry=True)


class ConnectorEnvironment(object):
""" Environment used by the different units for the synchronization.
Expand Down Expand Up @@ -466,3 +497,74 @@ def unwrap_model(self):
'Cannot unwrap model %s, because it has no %s fields'
% (self.model._name, self._openerp_field))
return column.comodel_name


def pg_try_advisory_lock(env, lock):
""" Try to acquire a Postgres transactional advisory lock.
The function tries to acquire a lock, returns a boolean indicating
if it could be obtained or not. An acquired lock is released at the
end of the transaction.
A typical use is to acquire a lock at the beginning of an importer
to prevent 2 jobs to do the same import at the same time. Since the
record doesn't exist yet, we can't put a lock on a record, so we put
an advisory lock.
Example:
- Job 1 imports Partner A
- Job 2 imports Partner B
- Partner A has a category X which happens not to exist yet
- Partner B has a category X which happens not to exist yet
- Job 1 import category X as a dependency
- Job 2 import category X as a dependency
Since both jobs are executed concurrently, they both create a record
for category X so we have duplicated records. With this lock:
- Job 1 imports Partner A, it acquires a lock for this partner
- Job 2 imports Partner B, it acquires a lock for this partner
- Partner A has a category X which happens not to exist yet
- Partner B has a category X which happens not to exist yet
- Job 1 import category X as a dependency, it acquires a lock for
this category
- Job 2 import category X as a dependency, try to acquire a lock
but can't, Job 2 is retried later, and when it is retried, it
sees the category X created by Job 1.
The lock is acquired until the end of the transaction.
Usage example:
::
lock_name = 'import_record({}, {}, {}, {})'.format(
self.backend_record._name,
self.backend_record.id,
self.model._name,
self.lefac_id,
)
if pg_try_advisory_lock(lock_name):
# do sync
else:
raise RetryableJobError('Could not acquire advisory lock',
seconds=2,
ignore_retry=True)
:param env: the Odoo Environment
:param lock: The lock name. Can be anything convertible to a
string. It needs to represents what should not be synchronized
concurrently so usually the string will contain at least: the
action, the backend type, the backend id, the model name, the
external id
:return True/False whether lock was acquired.
"""
hasher = hashlib.sha1()
hasher.update('{}'.format(lock))
# pg_lock accepts an int8 so we build an hash composed with
# contextual information and we throw away some bits
int_lock = struct.unpack('q', hasher.digest()[:8])

env.cr.execute('SELECT pg_try_advisory_xact_lock(%s);', (int_lock,))
acquired = env.cr.fetchone()[0]
return acquired
77 changes: 66 additions & 11 deletions connector/tests/test_connector.py
Expand Up @@ -3,13 +3,31 @@
import mock
import unittest2

from openerp import api
from openerp.modules.registry import RegistryManager
from openerp.tests import common
from openerp.addons.connector import connector
from openerp.addons.connector.connector import (ConnectorUnit,
ConnectorEnvironment)
from openerp.addons.connector.exception import RetryableJobError
from openerp.addons.connector.connector import (
ConnectorEnvironment,
ConnectorUnit,
pg_try_advisory_lock,
)
from openerp.addons.connector.session import ConnectorSession


def mock_connector_unit(env):
session = ConnectorSession(env.cr, env.uid,
context=env.context)
backend_record = mock.Mock(name='BackendRecord')
backend = mock.Mock(name='Backend')
backend_record.get_backend.return_value = backend
connector_env = connector.ConnectorEnvironment(backend_record,
session,
'res.users')
return ConnectorUnit(connector_env)


class ConnectorHelpers(unittest2.TestCase):

def test_openerp_module_name(self):
Expand Down Expand Up @@ -122,15 +140,7 @@ def test_instance(self):
class ModelUnit(ConnectorUnit):
_model_name = 'res.users'

session = ConnectorSession(self.env.cr, self.env.uid,
context=self.env.context)
backend_record = mock.Mock(name='BackendRecord')
backend = mock.Mock(name='Backend')
backend_record.get_backend.return_value = backend
connector_env = connector.ConnectorEnvironment(backend_record,
session,
'res.users')
unit = ConnectorUnit(connector_env)
unit = mock_connector_unit(self.env)
self.assertEqual(unit.model, self.env['res.users'])
self.assertEqual(unit.env, self.env)
self.assertEqual(unit.localcontext, self.env.context)
Expand Down Expand Up @@ -177,3 +187,48 @@ def __init__(self, backend_record, session, model_name, api=None):

self.assertEqual(type(new_env), MyConnectorEnvironment)
self.assertEqual(new_env.api, api)


class TestAdvisoryLock(common.TransactionCase):

def setUp(self):
super(TestAdvisoryLock, self).setUp()
self.registry2 = RegistryManager.get(common.get_db_name())
self.cr2 = self.registry2.cursor()
self.env2 = api.Environment(self.cr2, self.env.uid, {})

@self.addCleanup
def reset_cr2():
# rollback and close the cursor, and reset the environments
self.env2.reset()
self.cr2.rollback()
self.cr2.close()

def test_concurrent_lock(self):
""" 2 concurrent transactions cannot acquire the same lock """
lock = 'import_record({}, {}, {}, {})'.format(
'backend.name',
1,
'res.partner',
'999999',
)
acquired = pg_try_advisory_lock(self.env, lock)
self.assertTrue(acquired)
inner_acquired = pg_try_advisory_lock(self.env2, lock)
self.assertFalse(inner_acquired)

def test_concurrent_import_lock(self):
""" A 2nd concurrent transaction must retry """
lock = 'import_record({}, {}, {}, {})'.format(
'backend.name',
1,
'res.partner',
'999999',
)
connector_unit = mock_connector_unit(self.env)
with connector_unit.try_advisory_lock(lock):
connector_unit2 = mock_connector_unit(self.env2)
with self.assertRaises(RetryableJobError) as cm:
with connector_unit2.try_advisory_lock(lock, retry_seconds=3):
pass
self.assertEquals(cm.exception.seconds, 3)

0 comments on commit f68890c

Please sign in to comment.