Skip to content

Commit

Permalink
Merge pull request #28 from akretion/10.0-refactor-export
Browse files Browse the repository at this point in the history
[REF] start to refactor the export, by splitting it in two step.
  • Loading branch information
sebastienbeau committed Aug 14, 2018
2 parents 084f6ba + 4ebfcc4 commit 01e17c1
Show file tree
Hide file tree
Showing 14 changed files with 328 additions and 130 deletions.
14 changes: 13 additions & 1 deletion connector_algolia/components/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import logging

from odoo.addons.component.core import Component
from odoo import _
from odoo.exceptions import UserError

_logger = logging.getLogger(__name__)

Expand All @@ -29,10 +31,20 @@ def _get_index(self):
backend.algolia_app_id, account._get_password())
return client.initIndex(self.work.index.name)

def add(self, datas):
def index(self, datas):
index = self._get_index()
# Ensure that the objectID is set because algolia will use it
# for creating or updating the record
for data in datas:
if not data.get('objectID'):
raise UserError(
_('The key objectID is missing in the data %s') % data)
index.add_objects(datas)

def delete(self, binding_ids):
index = self._get_index()
index.delete_objects(binding_ids)

def clear(self):
index = self._get_index()
index.clear_index()
100 changes: 52 additions & 48 deletions connector_algolia/tests/test_connector_algolia.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,14 @@

import mock
from odoo import _
from odoo.addons.queue_job.job import Job

from odoo.addons.queue_job.tests.common import JobMixin

from .common import ConnectorAlgoliaCase
from .common import mock_api
from .models import ResPartner


class TestConnectorAlgolia(ConnectorAlgoliaCase):
class TestConnectorAlgolia(ConnectorAlgoliaCase, JobMixin):

def _init_test_model(self, model_cls):
registry = self.env.registry
Expand All @@ -33,14 +32,14 @@ def setUp(self):
ir_model_res_partner = self.env['ir.model'].search(
[('model', '=', 'res.partner')])
res_lang = self.env['res.lang'].search([], limit=1)
exporter_id = self.env.ref('base_jsonify.ir_exp_partner')
self.exporter = self.env.ref('base_jsonify.ir_exp_partner')
self.se_index_model = self.env['se.index']
self.se_index = self.se_index_model.create({
'name': 'partner index',
'backend_id': self.backend.se_backend_id.id,
'model_id': ir_model_res_partner.id,
'lang_id': res_lang.id,
'exporter_id': exporter_id.id
'exporter_id': self.exporter.id
})
# init our test model after the creation of the index since this one
# will be used as default value
Expand All @@ -49,54 +48,59 @@ def setUp(self):
self.cr.commit = mock.MagicMock()
self._init_test_model(ResPartner)

def test_export_all_indexes(self):
def test_recompute_all_indexes(self):
partner = self.env.ref('base.main_partner')
self.assertEqual(partner.data, {})

with mock_api(self.env) as mocked_api:
self.assertFalse(self.se_index.name in mocked_api.index)
self.se_index_model.export_all_index(delay=False)
self.assertTrue(self.se_index.name in mocked_api.index)
index = mocked_api.index[self.se_index.name]
self.assertEqual(1, len(index._calls))
method, values = index._calls[0]
self.assertEqual('add_objects', method)
self.assertEqual(
self.env['res.partner'].search_count([]),
len(values)
)
jobs = self.job_counter()
self.se_index_model.recompute_all_index(
[('id', '=', self.se_index.id)])

# check that a job have been created for each binding
nbr_binding = self.env['res.partner'].search_count([])
self.assertEqual(jobs.count_created(), nbr_binding)
self.perform_jobs(jobs)

# check that all binding have been recomputed and set to be updated
nbr_binding_to_update = self.env['res.partner'].search_count(
[('sync_state', '=', 'to_update')])
self.assertEqual(nbr_binding_to_update, nbr_binding)

# Check that the json data have been updated
parser = self.exporter.get_json_parser()
data = partner.jsonify(parser)[0]
data['id'] = partner.id # the mapper add the id of the record
self.assertDictEqual(partner.data, data)

def test_export_jobs(self):
queue_job_model = self.env['queue.job']
existing_jobs = queue_job_model.search([])
with mock_api(self.env) as mocked_api:
self.assertFalse(self.se_index.name in mocked_api.index)
self.se_index_model.export_all_index(delay=True)
# by default the export method create 2 jobs
# the first one to split the bindings to export into batch
# the second one to export each batch
new_jobs = queue_job_model.search([])
new_job = new_jobs - existing_jobs
self.assertEqual(1, len(new_job))
job = Job.load(self.env, new_job.uuid)
# Set partner to be updated with fake vals in data
partners = self.env['res.partner'].search([])
partners.write({
'sync_state': 'to_update',
'data': {'objectID': 'foo'},
})
count = len(partners)

# Generate Batch export job
jobs = self.job_counter()
self.se_index_model.generate_batch_export_per_index(
[('id', '=', self.se_index.id)])
self.assertEqual(jobs.count_created(), 1)
self.assertEqual(
_('Prepare a batch export of indexes'),
job.description)
# at this stage the mocked_api is not yet called
self.assertFalse(self.se_index.name in mocked_api.index)
# perform the job
existing_jobs = new_jobs
job.perform()
new_jobs = queue_job_model.search([])
new_job = new_jobs - existing_jobs
self.assertEqual(1, len(new_job))
job = Job.load(self.env, new_job.uuid)
count = self.env['res.partner'].search_count([])
_("Prepare a batch export of index '%s'") % self.se_index.name,
jobs.search_created().name)

# Run batch export and check that export job have been created
jobs2 = self.job_counter()
self.perform_jobs(jobs)
self.assertEqual(jobs2.count_created(), 1)
self.assertEqual(
_("Export %d records of %d for index 'partner index'") % (
count, count),
job.description)
self.assertFalse(self.se_index.name in mocked_api.index)
# the last job is the one performing the export
job = Job.load(self.env, new_job.uuid)
jobs2.search_created().name)

# Run export job and check that algolia have been called
with mock_api(self.env) as mocked_api:
job.perform()
self.assertTrue(self.se_index.name in mocked_api.index)
self.assertFalse(self.se_index.name in mocked_api.index)
self.perform_jobs(jobs2)
self.assertTrue(self.se_index.name in mocked_api.index)
5 changes: 4 additions & 1 deletion connector_algolia/views/se_backend_algolia.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@
<field name="name"/>
<field name="lang_id"/>
<field name="model_id"/>
<field name="exporter_id"/>
<field name="exporter_id" context="{'form_view_ref': 'connector_algolia.view_ir_exports'}"/>
<field name="batch_size"/>
<button name="export_all"
icon="fa-refresh"
type="object"/>
Expand All @@ -35,6 +36,8 @@
</field>
</record>



<record model="ir.ui.view" id="se_backend_algolia_tree_view">
<field name="name">se.backend.algolia.tree (in connector_algolia)</field>
<field name="model">se.backend.algolia</field>
Expand Down
1 change: 1 addition & 0 deletions connector_search_engine/__manifest__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
],
'data': [
'views/se_backend.xml',
'views/se_index.xml',
'views/se_menu.xml',
'data/ir_cron.xml',
'security/ir.model.access.csv',
Expand Down
5 changes: 4 additions & 1 deletion connector_search_engine/components/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@ class SeAdapter(AbstractComponent):
def match(cls, session, model):
return True # We are a generic exporter; how cool is that?

def add(self, datas):
def index(self, datas):
return NotImplemented

def delete(self, binding_ids):
return NotImplemented

def clear(self):
return NotImplemented
29 changes: 6 additions & 23 deletions connector_search_engine/components/exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,28 +13,11 @@ class SeExporter(Component):
_base_mapper_usage = 'se.export.mapper'
_base_backend_adapter_usage = 'se.backend.adapter'

def __init__(self, environment):
"""
:param environment: current environment (backend, session, ...)
:type environment: :py:class:`connector.connector.Environment`
"""
super(SeExporter, self).__init__(environment)
self.bindings = None

def _add(self, data):
""" Create the SolR record """
return self.backend_adapter.add(data)

def _export_data(self):
return NotImplemented
def _index(self, data):
""" Index the record """
return self.backend_adapter.index(data)

def run(self):
""" Run the synchronization
:param binding_id: identifier of the binding record to export
"""
datas = []
lang = self.work.index.lang_id.code
for record in self.work.records.with_context(lang=lang):
map_record = self.mapper.map_record(record)
datas.append(map_record.values())
return self._add(datas)
""" Run the synchronization """
self.work.records.write({'sync_state': 'done'})
return self._index([record.data for record in self.work.records])
19 changes: 16 additions & 3 deletions connector_search_engine/data/ir_cron.xml
Original file line number Diff line number Diff line change
@@ -1,16 +1,29 @@
<?xml version="1.0" encoding="utf-8"?>
<odoo noupdate="1">

<record forcecreate="True" id="ir_cron_export_all_index" model="ir.cron">
<field name="name">Export All index</field>
<record forcecreate="True" id="ir_cron_recompute_all_index" model="ir.cron">
<field name="name">Search engine: recompute all index</field>
<field eval="True" name="active"/>
<field name="user_id" ref="base.user_root"/>
<field name="interval_number">1</field>
<field name="interval_type">days</field>
<field name="numbercall">-1</field>
<field eval="False" name="doall"/>
<field eval="'se.index'" name="model"/>
<field eval="'export_all_index'" name="function"/>
<field eval="'recompute_all_index'" name="function"/>
<field eval="'()'" name="args"/>
</record>

<record forcecreate="True" id="ir_cron_export_binding" model="ir.cron">
<field name="name">Search engine: Generate job for exporting binding per index</field>
<field eval="True" name="active"/>
<field name="user_id" ref="base.user_root"/>
<field name="interval_number">1</field>
<field name="interval_type">days</field>
<field name="numbercall">-1</field>
<field eval="False" name="doall"/>
<field eval="'se.index'" name="model"/>
<field eval="'generate_batch_export_per_index'" name="function"/>
<field eval="'()'" name="args"/>
</record>

Expand Down
83 changes: 41 additions & 42 deletions connector_search_engine/models/se_binding.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,53 +16,32 @@ class SeBinding(models.AbstractModel):
'se.index',
string="Index",
required=True)
sync_state = fields.Selection(
[('to_update', 'To update'),
('scheduled', 'Scheduled'),
('done', 'Done')],
default='to_update',
sync_state = fields.Selection([
('new', 'New'),
('to_update', 'To update'),
('scheduled', 'Scheduled'),
('done', 'Done'),
],
default='new',
readonly=True)
date_modified = fields.Date(readonly=True)
date_syncronized = fields.Date(readonly=True)
data = fields.Serialized()

@job(default_channel='root.search_engine')
@api.model
def _scheduler_export(self, batch_size=5000, domain=None, delay=True):
if domain is None:
domain = []
domain.append(('sync_state', '=', 'to_update'))
records = self.search(domain)
if delay:
description = _('Prepare a batch export of indexes')
records = records.with_delay(description=description)
return records.export_batch(batch_size, delay=delay)
def create(self, vals):
record = super(SeBinding, self).create(vals)
record._jobify_recompute_json()
return record

@job(default_channel='root.search_engine')
@api.multi
def export_batch(self, batch_size=5000, delay=True):
datas = self.read_group(
[('id', 'in', self.ids)], ['index_id'], ['index_id'])
for data in datas:
bindings = self.search(data['__domain'])
bindings_count = len(bindings)
while bindings:
todo = processing = bindings[0:batch_size]
bindings = bindings[batch_size:]
if delay:
description = _(
"Export %d records of %d for index '%s'") % (
len(processing),
bindings_count,
data['index_id'][1])
todo = processing.with_delay(description=description)
todo.export()
processing.with_context(connector_no_export=True).write({
'sync_state': 'scheduled',
})
def _jobify_recompute_json(self, force_export=False):
description = _('Recompute %s json and check if need update'
% self._name)
for record in self:
record.with_delay(description=description).recompute_json(
force_export=force_export)

@job(default_channel='root.search_engine')
@api.multi
def export(self):
def _work_by_index(self):
for backend in self.mapped('se_backend_id'):
for index in self.mapped('index_id'):
bindings = self.filtered(
Expand All @@ -72,5 +51,25 @@ def export(self):
with specific_backend.work_on(
self._name, records=bindings, index=index
) as work:
exporter = work.component(usage='se.record.exporter')
exporter.run()
yield work

# TODO maybe we need to add lock (todo check)
@job(default_channel='root.search_engine.recompute_json')
def recompute_json(self, force_export=False):
for work in self._work_by_index():
mapper = work.component(usage='se.export.mapper')
lang = work.index.lang_id.code
for record in work.records.with_context(lang=lang):
data = mapper.map_record(record).values()
if record.data != data or force_export:
vals = {'data': data}
if record.sync_state in ('done', 'new'):
vals['sync_state'] = 'to_update'
record.write(vals)

@job(default_channel='root.search_engine')
@api.multi
def export(self):
for work in self._work_by_index():
exporter = work.component(usage='se.record.exporter')
exporter.run()
Loading

0 comments on commit 01e17c1

Please sign in to comment.