diff --git a/connector/connector.py b/connector/connector.py index 8e74898c5..a7f18850f 100644 --- a/connector/connector.py +++ b/connector/connector.py @@ -19,11 +19,15 @@ # ############################################################################## +import hashlib import logging +import struct + from contextlib import contextmanager from openerp import models, fields from .deprecate import log_deprecate, DeprecatedClass +from .exception import RetryableJobError _logger = logging.getLogger(__name__) @@ -227,6 +231,33 @@ def get_binder_for_model(self, model=None): log_deprecate('renamed to binder_for()') return self.binder_for(model=model) + @contextmanager + def try_advisory_lock(self, lock, retry_seconds=1): + """ Context manager, tries to acquire a Postgres transactional + advisory lock. + + If the lock cannot be acquired, it raises a + ``RetryableJobError`` so the jobs is retried after n + ``retry_seconds``. + + See :func:``openerp.addons.connector.connector.pg_try_advisory_lock`` + for details. + + :param lock: The lock name. Can be anything convertible to a + string. It needs to represents what should not be synchronized + concurrently so usually the string will contain at least: the + action, the backend type, the backend id, the model name, the + external id + :param retry_seconds: number of seconds after which a job should + be retried when the lock cannot be acquired. + """ + if pg_try_advisory_lock(self.env, lock): + yield + else: + raise RetryableJobError('Could not acquire advisory lock', + seconds=retry_seconds, + ignore_retry=True) + class ConnectorEnvironment(object): """ Environment used by the different units for the synchronization. @@ -466,3 +497,74 @@ def unwrap_model(self): 'Cannot unwrap model %s, because it has no %s fields' % (self.model._name, self._openerp_field)) return column.comodel_name + + +def pg_try_advisory_lock(env, lock): + """ Try to acquire a Postgres transactional advisory lock. + + The function tries to acquire a lock, returns a boolean indicating + if it could be obtained or not. An acquired lock is released at the + end of the transaction. + + A typical use is to acquire a lock at the beginning of an importer + to prevent 2 jobs to do the same import at the same time. Since the + record doesn't exist yet, we can't put a lock on a record, so we put + an advisory lock. + + Example: + - Job 1 imports Partner A + - Job 2 imports Partner B + - Partner A has a category X which happens not to exist yet + - Partner B has a category X which happens not to exist yet + - Job 1 import category X as a dependency + - Job 2 import category X as a dependency + + Since both jobs are executed concurrently, they both create a record + for category X so we have duplicated records. With this lock: + + - Job 1 imports Partner A, it acquires a lock for this partner + - Job 2 imports Partner B, it acquires a lock for this partner + - Partner A has a category X which happens not to exist yet + - Partner B has a category X which happens not to exist yet + - Job 1 import category X as a dependency, it acquires a lock for + this category + - Job 2 import category X as a dependency, try to acquire a lock + but can't, Job 2 is retried later, and when it is retried, it + sees the category X created by Job 1. + + The lock is acquired until the end of the transaction. + + Usage example: + + :: + + lock_name = 'import_record({}, {}, {}, {})'.format( + self.backend_record._name, + self.backend_record.id, + self.model._name, + self.lefac_id, + ) + if pg_try_advisory_lock(lock_name): + # do sync + else: + raise RetryableJobError('Could not acquire advisory lock', + seconds=2, + ignore_retry=True) + + :param env: the Odoo Environment + :param lock: The lock name. Can be anything convertible to a + string. It needs to represents what should not be synchronized + concurrently so usually the string will contain at least: the + action, the backend type, the backend id, the model name, the + external id + :return True/False whether lock was acquired. + """ + hasher = hashlib.sha1() + hasher.update('{}'.format(lock)) + # pg_lock accepts an int8 so we build an hash composed with + # contextual information and we throw away some bits + int_lock = struct.unpack('q', hasher.digest()[:8]) + + env.cr.execute('SELECT pg_try_advisory_xact_lock(%s);', (int_lock,)) + acquired = env.cr.fetchone()[0] + return acquired diff --git a/connector/tests/test_connector.py b/connector/tests/test_connector.py index 0d5393cf0..2d4fec1ec 100644 --- a/connector/tests/test_connector.py +++ b/connector/tests/test_connector.py @@ -3,13 +3,31 @@ import mock import unittest2 +from openerp import api +from openerp.modules.registry import RegistryManager from openerp.tests import common from openerp.addons.connector import connector -from openerp.addons.connector.connector import (ConnectorUnit, - ConnectorEnvironment) +from openerp.addons.connector.exception import RetryableJobError +from openerp.addons.connector.connector import ( + ConnectorEnvironment, + ConnectorUnit, + pg_try_advisory_lock, +) from openerp.addons.connector.session import ConnectorSession +def mock_connector_unit(env): + session = ConnectorSession(env.cr, env.uid, + context=env.context) + backend_record = mock.Mock(name='BackendRecord') + backend = mock.Mock(name='Backend') + backend_record.get_backend.return_value = backend + connector_env = connector.ConnectorEnvironment(backend_record, + session, + 'res.users') + return ConnectorUnit(connector_env) + + class ConnectorHelpers(unittest2.TestCase): def test_openerp_module_name(self): @@ -122,15 +140,7 @@ def test_instance(self): class ModelUnit(ConnectorUnit): _model_name = 'res.users' - session = ConnectorSession(self.env.cr, self.env.uid, - context=self.env.context) - backend_record = mock.Mock(name='BackendRecord') - backend = mock.Mock(name='Backend') - backend_record.get_backend.return_value = backend - connector_env = connector.ConnectorEnvironment(backend_record, - session, - 'res.users') - unit = ConnectorUnit(connector_env) + unit = mock_connector_unit(self.env) self.assertEqual(unit.model, self.env['res.users']) self.assertEqual(unit.env, self.env) self.assertEqual(unit.localcontext, self.env.context) @@ -177,3 +187,48 @@ def __init__(self, backend_record, session, model_name, api=None): self.assertEqual(type(new_env), MyConnectorEnvironment) self.assertEqual(new_env.api, api) + + +class TestAdvisoryLock(common.TransactionCase): + + def setUp(self): + super(TestAdvisoryLock, self).setUp() + self.registry2 = RegistryManager.get(common.get_db_name()) + self.cr2 = self.registry2.cursor() + self.env2 = api.Environment(self.cr2, self.env.uid, {}) + + @self.addCleanup + def reset_cr2(): + # rollback and close the cursor, and reset the environments + self.env2.reset() + self.cr2.rollback() + self.cr2.close() + + def test_concurrent_lock(self): + """ 2 concurrent transactions cannot acquire the same lock """ + lock = 'import_record({}, {}, {}, {})'.format( + 'backend.name', + 1, + 'res.partner', + '999999', + ) + acquired = pg_try_advisory_lock(self.env, lock) + self.assertTrue(acquired) + inner_acquired = pg_try_advisory_lock(self.env2, lock) + self.assertFalse(inner_acquired) + + def test_concurrent_import_lock(self): + """ A 2nd concurrent transaction must retry """ + lock = 'import_record({}, {}, {}, {})'.format( + 'backend.name', + 1, + 'res.partner', + '999999', + ) + connector_unit = mock_connector_unit(self.env) + with connector_unit.try_advisory_lock(lock): + connector_unit2 = mock_connector_unit(self.env2) + with self.assertRaises(RetryableJobError) as cm: + with connector_unit2.try_advisory_lock(lock, retry_seconds=3): + pass + self.assertEquals(cm.exception.seconds, 3)