/
export_synchronizer.py
408 lines (334 loc) · 15.7 KB
/
export_synchronizer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
# -*- coding: utf-8 -*-
# © 2013 Guewen Baconnier,Camptocamp SA,Akretion
# © 2016 Sodexis
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl).
import logging
from contextlib import contextmanager
from datetime import datetime
import psycopg2
import openerp
from openerp import _
from openerp.addons.connector.queue.job import job, related_action
from openerp.addons.connector.unit.synchronizer import Exporter
from openerp.addons.connector.exception import (IDMissingInBackend,
RetryableJobError)
from .import_synchronizer import import_record
from .backend_adapter import MAGENTO_DATETIME_FORMAT
from ..connector import get_environment
from ..related_action import unwrap_binding
_logger = logging.getLogger(__name__)
"""
Exporters for Magento.
In addition to its export job, an exporter has to:
* check in Magento if the record has been updated more recently than the
last sync date and if yes, delay an import
* call the ``bind`` method of the binder to update the last sync date
"""
class MagentoBaseExporter(Exporter):
""" Base exporter for Magento """
def __init__(self, connector_env):
"""
:param connector_env: current environment (backend, session, ...)
:type connector_env: :class:`connector.connector.ConnectorEnvironment`
"""
super(MagentoBaseExporter, self).__init__(connector_env)
self.binding_id = None
self.magento_id = None
def _delay_import(self):
""" Schedule an import of the record.
Adapt in the sub-classes when the model is not imported
using ``import_record``.
"""
# force is True because the sync_date will be more recent
# so the import would be skipped
assert self.magento_id
import_record.delay(self.session, self.model._name,
self.backend_record.id, self.magento_id,
force=True)
def _should_import(self):
""" Before the export, compare the update date
in Magento and the last sync date in OpenERP,
if the former is more recent, schedule an import
to not miss changes done in Magento.
"""
assert self.binding_record
if not self.magento_id:
return False
sync = self.binding_record.sync_date
if not sync:
return True
record = self.backend_adapter.read(self.magento_id,
attributes=['updated_at'])
if not record['updated_at']:
# in rare case it can be empty, in doubt, import it
return False
sync_date = openerp.fields.Datetime.from_string(sync)
magento_date = datetime.strptime(record['updated_at'],
MAGENTO_DATETIME_FORMAT)
return sync_date < magento_date
def _get_openerp_data(self):
""" Return the raw OpenERP data for ``self.binding_id`` """
return self.model.browse(self.binding_id)
def run(self, binding_id, *args, **kwargs):
""" Run the synchronization
:param binding_id: identifier of the binding record to export
"""
self.binding_id = binding_id
self.binding_record = self._get_openerp_data()
self.magento_id = self.binder.to_backend(self.binding_id)
try:
should_import = self._should_import()
except IDMissingInBackend:
self.magento_id = None
should_import = False
if should_import:
self._delay_import()
result = self._run(*args, **kwargs)
self.binder.bind(self.magento_id, self.binding_id)
# Commit so we keep the external ID when there are several
# exports (due to dependencies) and one of them fails.
# The commit will also release the lock acquired on the binding
# record
self.session.commit()
self._after_export()
return result
def _run(self):
""" Flow of the synchronization, implemented in inherited classes"""
raise NotImplementedError
def _after_export(self):
""" Can do several actions after exporting a record on magento """
pass
class MagentoExporter(MagentoBaseExporter):
""" A common flow for the exports to Magento """
def __init__(self, connector_env):
"""
:param connector_env: current environment (backend, session, ...)
:type connector_env: :class:`connector.connector.ConnectorEnvironment`
"""
super(MagentoExporter, self).__init__(connector_env)
self.binding_record = None
def _lock(self):
""" Lock the binding record.
Lock the binding record so we are sure that only one export
job is running for this record if concurrent jobs have to export the
same record.
When concurrent jobs try to export the same record, the first one
will lock and proceed, the others will fail to lock and will be
retried later.
This behavior works also when the export becomes multilevel
with :meth:`_export_dependencies`. Each level will set its own lock
on the binding record it has to export.
"""
sql = ("SELECT id FROM %s WHERE ID = %%s FOR UPDATE NOWAIT" %
self.model._table)
try:
self.session.cr.execute(sql, (self.binding_id, ),
log_exceptions=False)
except psycopg2.OperationalError:
_logger.info('A concurrent job is already exporting the same '
'record (%s with id %s). Job delayed later.',
self.model._name, self.binding_id)
raise RetryableJobError(_(
'A concurrent job is already exporting the same record '
'(%s with id %s). The job will be retried later.') %
(self.model._name, self.binding_id))
def _has_to_skip(self):
""" Return True if the export can be skipped """
return False
@contextmanager
def _retry_unique_violation(self):
""" Context manager: catch Unique constraint error and retry the
job later.
When we execute several jobs workers concurrently, it happens
that 2 jobs are creating the same record at the same time (binding
record created by :meth:`_export_dependency`), resulting in:
IntegrityError: duplicate key value violates unique
constraint "magento_product_product_openerp_uniq"
DETAIL: Key (backend_id, openerp_id)=(1, 4851) already exists.
In that case, we'll retry the import just later.
.. warning:: The unique constraint must be created on the
binding record to prevent 2 bindings to be created
for the same Magento record.
"""
try:
yield
except psycopg2.IntegrityError as err:
if err.pgcode == psycopg2.errorcodes.UNIQUE_VIOLATION:
raise RetryableJobError(_(
'A database error caused the failure of the job:\n'
'%s\n\n'
'Likely due to 2 concurrent jobs wanting to create '
'the same record. The job will be retried later.') % err)
else:
raise
def _export_dependency(self, relation, binding_model, exporter_class=None,
binding_field='magento_bind_ids',
binding_extra_vals=None):
"""
Export a dependency. The exporter class is a subclass of
``MagentoExporter``. If a more precise class need to be defined,
it can be passed to the ``exporter_class`` keyword argument.
.. warning:: a commit is done at the end of the export of each
dependency. The reason for that is that we pushed a record
on the backend and we absolutely have to keep its ID.
So you *must* take care not to modify the OpenERP
database during an export, excepted when writing
back the external ID or eventually to store
external data that we have to keep on this side.
You should call this method only at the beginning
of the exporter synchronization,
in :meth:`~._export_dependencies`.
:param relation: record to export if not already exported
:type relation: :py:class:`openerp.models.BaseModel`
:param binding_model: name of the binding model for the relation
:type binding_model: str | unicode
:param exporter_cls: :py:class:`openerp.addons.connector\
.connector.ConnectorUnit`
class or parent class to use for the export.
By default: MagentoExporter
:type exporter_cls: :py:class:`openerp.addons.connector\
.connector.MetaConnectorUnit`
:param binding_field: name of the one2many field on a normal
record that points to the binding record
(default: magento_bind_ids).
It is used only when the relation is not
a binding but is a normal record.
:type binding_field: str | unicode
:binding_extra_vals: In case we want to create a new binding
pass extra values for this binding
:type binding_extra_vals: dict
"""
if not relation:
return
if exporter_class is None:
exporter_class = MagentoExporter
rel_binder = self.binder_for(binding_model)
# wrap is typically True if the relation is for instance a
# 'product.product' record but the binding model is
# 'magento.product.product'
wrap = relation._model._name != binding_model
if wrap and hasattr(relation, binding_field):
domain = [('openerp_id', '=', relation.id),
('backend_id', '=', self.backend_record.id)]
binding = self.env[binding_model].search(domain)
if binding:
assert len(binding) == 1, (
'only 1 binding for a backend is '
'supported in _export_dependency')
# we are working with a unwrapped record (e.g.
# product.category) and the binding does not exist yet.
# Example: I created a product.product and its binding
# magento.product.product and we are exporting it, but we need to
# create the binding for the product.category on which it
# depends.
else:
bind_values = {'backend_id': self.backend_record.id,
'openerp_id': relation.id}
if binding_extra_vals:
bind_values.update(binding_extra_vals)
# If 2 jobs create it at the same time, retry
# one later. A unique constraint (backend_id,
# openerp_id) should exist on the binding model
with self._retry_unique_violation():
binding = (self.env[binding_model]
.with_context(connector_no_export=True)
.sudo()
.create(bind_values))
# Eager commit to avoid having 2 jobs
# exporting at the same time. The constraint
# will pop if an other job already created
# the same binding. It will be caught and
# raise a RetryableJobError.
self.session.commit()
else:
# If magento_bind_ids does not exist we are typically in a
# "direct" binding (the binding record is the same record).
# If wrap is True, relation is already a binding record.
binding = relation
if not rel_binder.to_backend(binding):
exporter = self.unit_for(exporter_class, model=binding_model)
exporter.run(binding.id)
def _export_dependencies(self):
""" Export the dependencies for the record"""
return
def _map_data(self):
""" Returns an instance of
:py:class:`~openerp.addons.connector.unit.mapper.MapRecord`
"""
return self.mapper.map_record(self.binding_record)
def _validate_data(self, data):
""" Check if the values to import are correct
Kept for retro-compatibility. To remove in 8.0
Pro-actively check before the ``Model.create`` or ``Model.update``
if some fields are missing or invalid
Raise `InvalidDataError`
"""
_logger.warning('Deprecated: _validate_data is deprecated '
'in favor of validate_create_data() '
'and validate_update_data()')
self._validate_create_data(data)
self._validate_update_data(data)
def _validate_create_data(self, data):
""" Check if the values to import are correct
Pro-actively check before the ``Model.create`` if some fields
are missing or invalid
Raise `InvalidDataError`
"""
return
def _validate_update_data(self, data):
""" Check if the values to import are correct
Pro-actively check before the ``Model.update`` if some fields
are missing or invalid
Raise `InvalidDataError`
"""
return
def _create_data(self, map_record, fields=None, **kwargs):
""" Get the data to pass to :py:meth:`_create` """
return map_record.values(for_create=True, fields=fields, **kwargs)
def _create(self, data):
""" Create the Magento record """
# special check on data before export
self._validate_create_data(data)
return self.backend_adapter.create(data)
def _update_data(self, map_record, fields=None, **kwargs):
""" Get the data to pass to :py:meth:`_update` """
return map_record.values(fields=fields, **kwargs)
def _update(self, data):
""" Update an Magento record """
assert self.magento_id
# special check on data before export
self._validate_update_data(data)
self.backend_adapter.write(self.magento_id, data)
def _run(self, fields=None):
""" Flow of the synchronization, implemented in inherited classes"""
assert self.binding_id
assert self.binding_record
if not self.magento_id:
fields = None # should be created with all the fields
if self._has_to_skip():
return
# export the missing linked resources
self._export_dependencies()
# prevent other jobs to export the same record
# will be released on commit (or rollback)
self._lock()
map_record = self._map_data()
if self.magento_id:
record = self._update_data(map_record, fields=fields)
if not record:
return _('Nothing to export.')
self._update(record)
else:
record = self._create_data(map_record, fields=fields)
if not record:
return _('Nothing to export.')
self.magento_id = self._create(record)
return _('Record exported with ID %s on Magento.') % self.magento_id
@job(default_channel='root.magento')
@related_action(action=unwrap_binding)
def export_record(session, model_name, binding_id, fields=None):
""" Export a record on Magento """
record = session.env[model_name].browse(binding_id)
env = get_environment(session, model_name, record.backend_id.id)
exporter = env.get_connector_unit(MagentoExporter)
return exporter.run(binding_id, fields=fields)