Skip to content

Commit

Permalink
Merge 9673e71 into 10c00ed
Browse files Browse the repository at this point in the history
  • Loading branch information
tschanzt committed Aug 29, 2019
2 parents 10c00ed + 9673e71 commit 874db65
Show file tree
Hide file tree
Showing 20 changed files with 1,366 additions and 16 deletions.
3 changes: 3 additions & 0 deletions connector_importer/components/__init__.py
@@ -1,5 +1,8 @@
from . import base
from . import tracker
from . import odoorecord
from . import odoorecord_csv_std
from . import importer
from . import importer_csv_std
from . import mapper
from . import automapper
17 changes: 17 additions & 0 deletions connector_importer/components/automapper.py
@@ -0,0 +1,17 @@
# Copyright 2019 Camptocamp SA
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl)

from odoo.addons.component.core import Component
from odoo.addons.connector.components.mapper import mapping


class AutoMapper(Component):
_name = 'importer.mapper.auto'
_inherit = 'importer.base.mapper'
_usage = 'importer.automapper'

@mapping
def auto_mapping(self, record):
"""Generate the values automatically by removing internal keys."""
result = {k: v for k, v in record.items() if not k.startswith('_')}
return result
6 changes: 4 additions & 2 deletions connector_importer/components/importer.py
Expand Up @@ -63,19 +63,21 @@ class RecordImporter(Component):
# log and report errors
# do not make the whole import fail
_break_on_error = False
_record_handler_usage = 'odoorecord.handler'
_tracking_handler_usage = 'tracking.handler'
# a unique key (field name) to retrieve the odoo record
odoo_unique_key = ''

def _init_importer(self, recordset):
self.recordset = recordset
# record handler is responsible for create/write on odoo records
self.record_handler = self.component(usage='odoorecord.handler')
self.record_handler = self.component(usage=self._record_handler_usage)
self.record_handler._init_handler(
importer=self,
unique_key=self.odoo_unique_key
)
# tracking handler is responsible for logging and chunk reports
self.tracker = self.component(usage='tracking.handler')
self.tracker = self.component(usage=self._tracking_handler_usage)
self.tracker._init_handler(
model_name=self.model._name,
logger_name=LOGGER_NAME,
Expand Down
170 changes: 170 additions & 0 deletions connector_importer/components/importer_csv_std.py
@@ -0,0 +1,170 @@
# Copyright 2019 Camptocamp SA
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl)

from odoo.addons.component.core import Component

from ..log import logger


class RecordImporterCSVStd(Component):
"""CSV Standard importer for records.
This importer is used to import standard CSV files, using the `load()`
method of Odoo.
"""

_name = 'importer.record.csv.std'
_inherit = ['importer.record']
_break_on_error = True # We want the import to stop if an error occurs
_apply_on = None
_use_xmlid = True
_record_handler_usage = 'odoorecord.handler.csv'

@property
def mapper(self):
if not self._mapper:
self._mapper = self.component(usage='importer.automapper')
return self._mapper

def prepare_load_params(self, lines):
"""Prepare the parameters for the `load()` standard method.
It returns a list of fieldnames + the list of corresponding values.
"""
fieldnames = list(lines[0].keys())

data = [
[line[fieldname] for fieldname in fieldnames]
for line in lines
]
return fieldnames, data

def run(self, record, is_last_importer=True, **kw):
"""Run record job.
Steps:
* for each record, check if it is already imported or not and reference
them as created or updated
* launch the import with 'load()' method
* analyse error messages returned by 'load()' and remove relevant
references from the first step + create log error for them
* produce a report and store it on recordset
"""

self.record = record
if not self.record:
# maybe deleted???
msg = 'NO RECORD FOUND, maybe deleted? Check your jobs!'
logger.error(msg)
return msg

self._init_importer(self.record.recordset_id)

dataset = []
tracker_data = {
'created': {
# line_nr: (values, line, odoo_record),
},
'updated': {
# line_nr: (values, line, odoo_record),
},
}
lines = self._record_lines()
# The `load` method for standard import works on the whole dataset.
# First we prepare all lines with the mapper
# (so you can still customize imported data if needed)
# and we create dataset to pass to `load`.
for i, line in enumerate(lines):
line = self.prepare_line(line)
options = self._load_mapper_options()
try:
with self.env.cr.savepoint():
values = self.mapper.map_record(line).values(**options)
logger.debug(values)
except Exception as err:
values = {}
self.tracker.log_error(
values, line, message=err)
if self._break_on_error:
raise
continue
# Collect tracker data for later
# We store the parameters for chunk_report.track_{created,updated}
# functions, excepted the odoo_record which could not be known
# for newly created records
odoo_record_exists = self.record_handler.odoo_exists(
values, line, use_xmlid=self._use_xmlid)
if odoo_record_exists:
odoo_record = self.record_handler.odoo_find(
values, line, use_xmlid=self._use_xmlid)
tracker_data['updated'][i] = [values, line, odoo_record]
else:
tracker_data['created'][i] = [values, line]

# handle forced skipping
skip_info = self.skip_it(values, line)
if skip_info:
self.tracker.log_skipped(values, line, skip_info)
continue
dataset.append(values)

if dataset:
try:
with self.env.cr.savepoint():
fieldnames, data = self.prepare_load_params(dataset)
load_res = self.model.load(fieldnames, data)

# In case of errors `load` returns a list of messages with
# the cause and the rows range. Here we map these messages
# to tracked data and update the references to be able
# to provide a precise report.
for message in load_res['messages']:
if message.get('rows'):
line_numbers = range(
message['rows']['from'],
message['rows']['to'] + 1)
for line_nr in line_numbers:
# First we remove the entry from tracker data
tracker_data['created'].pop(line_nr, None)
tracker_data['updated'].pop(line_nr, None)
# We add 2 as the tracker count lines starting
# from 1 + header line
line = {'_line_nr': line_nr + 2}
self.tracker.log_error(
{}, line,
message=message['message'])
else:
line = {'_line_nr': 0}
self.tracker.log_error(
{}, line,
message=message['message'])
except Exception as err:
line = {'_line_nr': 0}
self.tracker.log_error(
{}, line, message=err)
if self._break_on_error:
raise

for arguments in tracker_data['created'].values():
self.tracker.log_created(*arguments)
for arguments in tracker_data['updated'].values():
self.tracker.log_updated(*arguments)

# update report
self._do_report()

# log chunk finished
msg = ' '.join([
'CHUNK FINISHED',
'[created: {created}]',
'[updated: {updated}]',
'[skipped: {skipped}]',
'[errored: {errored}]',
]).format(**self.tracker.get_counters())
self.tracker._log(msg)

# TODO
# chunk_finished_event.fire(
# self.env, self.model._name, self.record)
return 'ok'
37 changes: 37 additions & 0 deletions connector_importer/components/odoorecord_csv_std.py
@@ -0,0 +1,37 @@
# Copyright 2019 Camptocamp SA
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl)

from odoo.addons.component.core import Component


class OdooRecordHandlerCSVStd(Component):
"""Interact w/ odoo importable records from standard Odoo CSV files."""

_name = 'importer.odoorecord.handler.csv.std'
_inherit = 'importer.odoorecord.handler'
_usage = 'odoorecord.handler.csv'
xmlid_key = 'id' # CSV field containing the record XML-ID

def odoo_find(self, values, orig_values, use_xmlid=False):
"""Find any existing item in odoo based on the XML-ID."""
if use_xmlid:
if not self.xmlid_key:
return self.model
item = self.env.ref(
values[self.xmlid_key], raise_if_not_found=False)
return item
return super().odoo_find(values, orig_values)

def odoo_exists(self, values, orig_values, use_xmlid=False):
"""Return true if the items exists."""
return bool(self.odoo_find(values, orig_values, use_xmlid))

def odoo_create(self, values, orig_values):
"""Create a new odoo record."""
raise NotImplementedError(
"This method is not used when importing standard CSV files.")

def odoo_write(self, values, orig_values):
"""Create a new odoo record."""
raise NotImplementedError(
"This method is not used when importing standard CSV files.")
12 changes: 7 additions & 5 deletions connector_importer/components/tracker.py
Expand Up @@ -90,22 +90,24 @@ def _log(self, msg, line=None, level='info'):
)
handler(msg)

def log_updated(self, values, line, odoo_record, message=''):
self._log('UPDATED [id: {}]'.format(odoo_record.id), line=line)
def log_updated(self, values, line, odoo_record=None, message=''):
if odoo_record:
self._log('UPDATED [id: {}]'.format(odoo_record.id), line=line)
self.chunk_report.track_updated(self.chunk_report_item(
line, odoo_record=odoo_record, message=message
))

def log_error(self, values, line, odoo_record, message=''):
def log_error(self, values, line, odoo_record=None, message=''):
if isinstance(message, Exception):
message = str(message)
self._log(message, line=line, level='error')
self.chunk_report.track_error(self.chunk_report_item(
line, odoo_record=odoo_record, message=message
))

def log_created(self, values, line, odoo_record, message=''):
self._log('CREATED [id: {}]'.format(odoo_record.id), line=line)
def log_created(self, values, line, odoo_record=None, message=''):
if odoo_record:
self._log('CREATED [id: {}]'.format(odoo_record.id), line=line)
self.chunk_report.track_created(self.chunk_report_item(
line, odoo_record=odoo_record, message=message
))
Expand Down
10 changes: 6 additions & 4 deletions connector_importer/models/reporter.py
Expand Up @@ -100,7 +100,7 @@ def report_add_line(self, writer, item):
writer.writerow(item)

def report_get_columns(self, recordset, orig_content,
extra_keys=None, delimiter=';'):
extra_keys=None, delimiter=';', quotechar='"'):
"""Retrieve columns by recordset.
:param recordset: instance of recordset.
Expand All @@ -110,8 +110,10 @@ def report_get_columns(self, recordset, orig_content,
extra_keys = extra_keys or []
# read only the 1st line of the original file
if orig_content:
line1 = orig_content[0].split(delimiter)
return line1 + extra_keys
reader = csv.reader(
orig_content[:1], delimiter=delimiter, quotechar=quotechar)
columns = next(reader)
return columns + extra_keys
return extra_keys

def report_do(self, recordset, fileout, **options):
Expand Down Expand Up @@ -140,7 +142,7 @@ def report_do(self, recordset, fileout, **options):

columns = self.report_get_columns(
recordset, orig_content,
extra_keys=extra_keys, delimiter=delimiter)
extra_keys=extra_keys, delimiter=delimiter, quotechar=quotechar)

writer = self.report_get_writer(
fileout, columns, delimiter=delimiter, quotechar=quotechar)
Expand Down
5 changes: 5 additions & 0 deletions connector_importer/models/sources/source_csv.py
Expand Up @@ -32,6 +32,9 @@ class CSVSource(models.Model):
string='CSV quotechar',
default='"',
)
csv_encoding = fields.Char(
string='CSV Encoding',
)

_csv_reader_klass = CSVReader

Expand All @@ -41,6 +44,7 @@ def _config_summary_fields(self):
return _fields + [
'csv_filename', 'csv_filesize',
'csv_delimiter', 'csv_quotechar',
'csv_encoding'
]

def _binary_csv_content(self):
Expand All @@ -66,6 +70,7 @@ def _get_lines(self):
# read CSV
reader_args = {
'delimiter': self.csv_delimiter,
'encoding': self.csv_encoding
}
if self.csv_path:
# TODO: join w/ filename
Expand Down
4 changes: 4 additions & 0 deletions connector_importer/models/sources/source_mixin.py
Expand Up @@ -51,6 +51,7 @@ def _compute_source_ref_id(self):
def _selection_source_ref_id(self):
return [
('import.source.csv', 'CSV'),
('import.source.csv.std', 'Odoo CSV'),
]

@api.multi
Expand Down Expand Up @@ -176,6 +177,9 @@ def get_lines(self):
# sort them
lines_sorted = self._sort_lines(lines)

# no chunk size means no chunk of lines
if not self.chunk_size:
yield list(lines)
for i, chunk in enumerate(gen_chunks(lines_sorted,
chunksize=self.chunk_size)):
# get out of chunk iterator
Expand Down
6 changes: 6 additions & 0 deletions connector_importer/readme/ROADMAP.rst
@@ -0,0 +1,6 @@
* with the import of standard Odoo CSV files, a concurrency error occurs
when updating the `report_data` of `import_recordset` table (from the
importer: `self._do_report()` -> `self.recordset.set_report(...)`).
The job is automatically retried a second time (without concurrency errors).
For small files it's not a big issue, but for files with a huge amount of
lines it takes time to process them two times.

0 comments on commit 874db65

Please sign in to comment.