Skip to content

Commit

Permalink
Improve bulk import performance (#1539)
Browse files Browse the repository at this point in the history
* do not create savepoint for each row when bulk is enabled

* add test

* fix test for python3.7
add deprecation test

---------

Co-authored-by: Antonin <antonin.pevrol@machiq.com>
  • Loading branch information
Ptosiek and Antonin committed Feb 10, 2023
1 parent 490c737 commit 299f371
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 14 deletions.
1 change: 1 addition & 0 deletions AUTHORS
Original file line number Diff line number Diff line change
Expand Up @@ -135,3 +135,4 @@ The following is a list of much appreciated contributors:
* mpasternak (Michał Pasternak)
* nikatlas (Nikos Atlas)
* cocorocho (Erkan Çoban)
# Ptosiek (Antonin)
37 changes: 23 additions & 14 deletions import_export/resources.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import functools
import logging
import traceback
import warnings
from collections import OrderedDict
from copy import deepcopy

Expand Down Expand Up @@ -683,7 +684,7 @@ def after_import_instance(self, instance, new, row_number=None, **kwargs):
"""
pass

def import_row(self, row, instance_loader, using_transactions=True, dry_run=False, raise_errors=False, **kwargs):
def import_row(self, row, instance_loader, using_transactions=True, dry_run=False, raise_errors=None, **kwargs):
"""
Imports data from ``tablib.Dataset``. Refer to :doc:`import_workflow`
for a more complete description of the whole import process.
Expand All @@ -698,6 +699,12 @@ def import_row(self, row, instance_loader, using_transactions=True, dry_run=Fals
:param dry_run: If ``dry_run`` is set, or error occurs, transaction
will be rolled back.
"""
if raise_errors is not None:
warnings.warn(
"raise_errors argument is deprecated and will be removed in a future release.",
category=DeprecationWarning
)

skip_diff = self._meta.skip_diff
row_result = self.get_row_result_class()()
if self._meta.store_row_values:
Expand Down Expand Up @@ -762,17 +769,6 @@ def import_row(self, row, instance_loader, using_transactions=True, dry_run=Fals
tb_info = traceback.format_exc()
row_result.errors.append(self.get_error_result_class()(e, tb_info, row))

if self._meta.use_bulk:
# persist a batch of rows
# because this is a batch, any exceptions are logged and not associated
# with a specific row
if len(self.create_instances) == self._meta.batch_size:
self.bulk_create(using_transactions, dry_run, raise_errors, batch_size=self._meta.batch_size)
if len(self.update_instances) == self._meta.batch_size:
self.bulk_update(using_transactions, dry_run, raise_errors, batch_size=self._meta.batch_size)
if len(self.delete_instances) == self._meta.batch_size:
self.bulk_delete(using_transactions, dry_run, raise_errors)

return row_result

def import_data(self, dataset, dry_run=False, raise_errors=False,
Expand Down Expand Up @@ -852,16 +848,29 @@ def import_data_inner(
result.add_dataset_headers(dataset.headers)

for i, row in enumerate(dataset.dict, 1):
with atomic_if_using_transaction(using_transactions, using=db_connection):
with atomic_if_using_transaction(using_transactions and not self._meta.use_bulk, using=db_connection):
row_result = self.import_row(
row,
instance_loader,
using_transactions=using_transactions,
dry_run=dry_run,
row_number=i,
raise_errors=raise_errors,
**kwargs
)
if self._meta.use_bulk:
# persist a batch of rows
# because this is a batch, any exceptions are logged and not associated
# with a specific row
if len(self.create_instances) == self._meta.batch_size:
with atomic_if_using_transaction(using_transactions, using=db_connection):
self.bulk_create(using_transactions, dry_run, raise_errors, batch_size=self._meta.batch_size)
if len(self.update_instances) == self._meta.batch_size:
with atomic_if_using_transaction(using_transactions, using=db_connection):
self.bulk_update(using_transactions, dry_run, raise_errors, batch_size=self._meta.batch_size)
if len(self.delete_instances) == self._meta.batch_size:
with atomic_if_using_transaction(using_transactions, using=db_connection):
self.bulk_delete(using_transactions, dry_run, raise_errors)

result.increment_row_result_total(row_result)

if row_result.errors:
Expand Down
41 changes: 41 additions & 0 deletions tests/core/tests/test_resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,36 @@ def test_validate_instance_called_with_import_validation_errors_as_None_creates_
target = dict()
full_clean_mock.assert_called_once_with(exclude=target.keys(), validate_unique=True)

def test_raise_errors_deprecation_import_row(self,):
target_msg = (
"raise_errors argument is deprecated and will be removed in a future release."
)
dataset = tablib.Dataset(headers=['name', 'email', 'extra'])
dataset.append(['Some book', 'test@example.com', "10.25"])

class Loader:
def __init__(self, *args, **kwargs):
pass

class A(MyResource):
class Meta:
instance_loader_class = Loader
force_init_instance = True

def init_instance(self, row=None):
return row or {}

def import_row(self, row, instance_loader, using_transactions=True, dry_run=False, raise_errors=False, **kwargs):
return super().import_row(row, instance_loader, using_transactions, dry_run, raise_errors, **kwargs)

def save_instance(self, instance, is_create, using_transactions=True, dry_run=False):
pass

resource = A()
with self.assertWarns(DeprecationWarning) as w:
resource.import_data(dataset, raise_errors=True)
self.assertEqual(target_msg, str(w.warnings[0].message))


class AuthorResource(resources.ModelResource):

Expand Down Expand Up @@ -2080,6 +2110,17 @@ class Meta:
resource = _BookResource()
self.assertIsNotNone(resource.get_or_init_instance(ModelInstanceLoader(resource), self.dataset[0]))

@mock.patch('import_export.resources.atomic_if_using_transaction')
def test_no_sub_transaction_on_row_for_bulk(self, mock_atomic_if_using_transaction):
class _BookResource(resources.ModelResource):
class Meta:
model = Book
use_bulk = True

resource = _BookResource()
resource.import_data(self.dataset)
self.assertIn(False, [x[0][0] for x in mock_atomic_if_using_transaction.call_args_list])


class BulkUpdateTest(BulkTest):
class _BookResource(resources.ModelResource):
Expand Down

0 comments on commit 299f371

Please sign in to comment.