Skip to content

Commit

Permalink
OpenConceptLab/ocl_issues#826 | parallel importer | batch index conce…
Browse files Browse the repository at this point in the history
…pts/mappings
  • Loading branch information
snyaggarwal committed Feb 17, 2022
1 parent 9ac8894 commit d6998ee
Show file tree
Hide file tree
Showing 7 changed files with 105 additions and 34 deletions.
11 changes: 9 additions & 2 deletions core/common/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ class Meta:
extras_have_been_encoded = False
extras_have_been_decoded = False
is_being_saved = False
_index = True

@property
def model_name(self):
Expand All @@ -82,6 +83,12 @@ def index(self):
if not get(settings, 'TEST_MODE', False):
handle_save.delay(self.app_name, self.model_name, self.id)

@property
def should_index(self):
if getattr(self, '_index', None) is not None:
return self._index
return True

def soft_delete(self):
if self.is_active:
self.is_active = False
Expand Down Expand Up @@ -760,9 +767,9 @@ def can_view_all_content(self, user):

class CelerySignalProcessor(RealTimeSignalProcessor):
def handle_save(self, sender, instance, **kwargs):
if settings.ES_SYNC and instance.__class__ in registry.get_models():
if settings.ES_SYNC and instance.__class__ in registry.get_models() and instance.should_index:
handle_save.delay(instance.app_name, instance.model_name, instance.id)

def handle_m2m_changed(self, sender, instance, action, **kwargs):
if settings.ES_SYNC and instance.__class__ in registry.get_models():
if settings.ES_SYNC and instance.__class__ in registry.get_models() and instance.should_index:
handle_m2m_changed.delay(instance.app_name, instance.model_name, instance.id, action)
7 changes: 5 additions & 2 deletions core/common/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -494,10 +494,13 @@ def delete_concept(concept_id): # pragma: no cover


@app.task
def batch_index_resources(resource, filters):
def batch_index_resources(resource, filters, update_indexed=False):
model = get_resource_class_from_resource_name(resource)
if model:
model.batch_index(model.objects.filter(**filters), model.get_search_document())
queryset = model.objects.filter(**filters)
model.batch_index(queryset, model.get_search_document())
if update_indexed:
queryset.update(_index=True)

return 1

Expand Down
18 changes: 18 additions & 0 deletions core/concepts/migrations/0029_concept__index.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Generated by Django 3.2.8 on 2022-02-16 06:57

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
('concepts', '0028_auto_20220203_0729'),
]

operations = [
migrations.AddField(
model_name='concept',
name='_index',
field=models.BooleanField(default=True),
),
]
15 changes: 10 additions & 5 deletions core/concepts/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ class Meta:
db_index=True
)
_counted = models.BooleanField(default=True, null=True, blank=True)
_index = models.BooleanField(default=True)
logo_path = None

OBJECT_TYPE = CONCEPT_TYPE
Expand Down Expand Up @@ -468,6 +469,7 @@ def clone(self):
is_latest_version=self.is_latest_version,
parent_id=self.parent_id,
versioned_object_id=self.versioned_object_id,
_index=self._index
)
concept_version.cloned_names = self.__clone_name_locales()
concept_version.cloned_descriptions = self.__clone_description_locales()
Expand Down Expand Up @@ -713,8 +715,9 @@ def persist_clone(
obj.clean() # clean here to validate locales that can only be saved after obj is saved
obj.update_versioned_object()
if prev_latest_version:
prev_latest_version._index = obj._index # pylint: disable=protected-access
prev_latest_version.is_latest_version = False
prev_latest_version.save(update_fields=['is_latest_version'])
prev_latest_version.save(update_fields=['is_latest_version', '_index'])
if add_prev_version_children:
if get(settings, 'TEST_MODE', False):
process_hierarchy_for_new_parent_concept_version(prev_latest_version.id, obj.id)
Expand All @@ -737,9 +740,10 @@ def persist_clone(
)

def index_all():
if prev_latest_version:
prev_latest_version.index()
obj.index()
if obj._index: # pylint: disable=protected-access
if prev_latest_version:
prev_latest_version.index()
obj.index()

transaction.on_commit(index_all)
except ValidationError as err:
Expand All @@ -749,8 +753,9 @@ def index_all():
cls.resume_indexing()
if not persisted:
if prev_latest_version:
prev_latest_version._index = True # pylint: disable=protected-access
prev_latest_version.is_latest_version = True
prev_latest_version.save(update_fields=['is_latest_version'])
prev_latest_version.save(update_fields=['is_latest_version', '_index'])
if obj.id:
obj.remove_locales()
obj.sources.remove(parent_head)
Expand Down
55 changes: 35 additions & 20 deletions core/importers/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,7 @@ class ConceptImporter(BaseResourceImporter):
def __init__(self, data, user, update_if_exists):
super().__init__(data, user, update_if_exists)
self.version = False
self.instance = None

def exists(self):
return self.get_queryset().exists()
Expand Down Expand Up @@ -414,19 +415,20 @@ def process(self):
return FAILED
if parent.has_edit_access(self.user):
if self.version:
instance = self.get_queryset().first().clone()
instance._counted = None # pylint: disable=protected-access
self.instance = self.get_queryset().first().clone()
self.instance._counted = None # pylint: disable=protected-access
self.instance._index = False # pylint: disable=protected-access
errors = Concept.create_new_version_for(
instance=instance, data=self.data, user=self.user, create_parent_version=False,
instance=self.instance, data=self.data, user=self.user, create_parent_version=False,
add_prev_version_children=False
)
return errors or UPDATED

instance = Concept.persist_new(
data={**self.data, '_counted': None}, user=self.user, create_parent_version=False)
if instance.id:
self.instance = Concept.persist_new(
data={**self.data, '_counted': None, '_index': False}, user=self.user, create_parent_version=False)
if self.instance.id:
return CREATED
return instance.errors or FAILED
return self.instance.errors or FAILED

return PERMISSION_DENIED

Expand All @@ -441,6 +443,7 @@ class MappingImporter(BaseResourceImporter):
def __init__(self, data, user, update_if_exists):
super().__init__(data, user, update_if_exists)
self.version = False
self.instance = None

def exists(self):
return self.get_queryset().exists()
Expand Down Expand Up @@ -526,14 +529,15 @@ def process(self):
return FAILED
if parent.has_edit_access(self.user):
if self.version:
instance = self.get_queryset().first().clone()
instance._counted = None # pylint: disable=protected-access
errors = Mapping.create_new_version_for(instance, self.data, self.user)
self.instance = self.get_queryset().first().clone()
self.instance._counted = None # pylint: disable=protected-access
self.instance._index = False # pylint: disable=protected-access
errors = Mapping.create_new_version_for(self.instance, self.data, self.user)
return errors or UPDATED
instance = Mapping.persist_new({**self.data, '_counted': None}, self.user)
if instance.id:
self.instance = Mapping.persist_new({**self.data, '_counted': None, '_index': False}, self.user)
if self.instance.id:
return CREATED
return instance.errors or FAILED
return self.instance.errors or FAILED

return PERMISSION_DENIED

Expand Down Expand Up @@ -647,11 +651,13 @@ def notify_progress(self):
service = RedisService()
service.set(self.self_task_id, self.processed)

def run(self):
def run(self): # pylint: disable=too-many-branches,too-many-statements
if self.self_task_id:
print("****STARTED SUBPROCESS****")
print(f"TASK ID: {self.self_task_id}")
print("***************")
new_concept_ids = []
new_mapping_ids = []
for original_item in self.input_list:
self.processed += 1
logger.info('Processing %s of %s', str(self.processed), str(self.total))
Expand Down Expand Up @@ -690,21 +696,30 @@ def run(self):
)
continue
if item_type == 'concept':
self.handle_item_import_result(
ConceptImporter(item, self.user, self.update_if_exists).run(), original_item
)
concept_importer = ConceptImporter(item, self.user, self.update_if_exists)
_result = concept_importer.run()
if get(concept_importer.instance, 'id'):
new_concept_ids.append(concept_importer.instance.mnemonic)
self.handle_item_import_result(_result, original_item)
continue
if item_type == 'mapping':
self.handle_item_import_result(
MappingImporter(item, self.user, self.update_if_exists).run(), original_item
)
mapping_importer = MappingImporter(item, self.user, self.update_if_exists)
_result = mapping_importer.run()
if get(mapping_importer.instance, 'id'):
new_mapping_ids.append(mapping_importer.instance.mnemonic)
self.handle_item_import_result(_result, original_item)
continue
if item_type == 'reference':
self.handle_item_import_result(
ReferenceImporter(item, self.user, self.update_if_exists).run(), original_item
)
continue

if new_concept_ids:
batch_index_resources.apply_async(('concept', dict(mnemonic__in=new_concept_ids), True), queue='indexing')
if new_mapping_ids:
batch_index_resources.apply_async(('mapping', dict(mnemonic__in=new_mapping_ids), True), queue='indexing')

self.elapsed_seconds = time.time() - self.start_time

self.make_result()
Expand Down
18 changes: 18 additions & 0 deletions core/mappings/migrations/0029_mapping__index.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Generated by Django 3.2.8 on 2022-02-16 06:57

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
('mappings', '0028_auto_20220203_0733'),
]

operations = [
migrations.AddField(
model_name='mapping',
name='_index',
field=models.BooleanField(default=True),
),
]
15 changes: 10 additions & 5 deletions core/mappings/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ class Meta:
to_source_url = models.TextField(null=True, blank=True, db_index=True)
to_source_version = models.TextField(null=True, blank=True)
_counted = models.BooleanField(default=True, null=True, blank=True)
_index = models.BooleanField(default=True)

logo_path = None
name = None
Expand Down Expand Up @@ -239,6 +240,7 @@ def clone(self, user=None):
from_source_id=self.from_source_id,
from_source_url=self.from_source_url,
from_source_version=self.from_source_version,
_index=self._index
)
if user:
mapping.created_by = mapping.updated_by = user
Expand Down Expand Up @@ -435,16 +437,18 @@ def persist_clone(cls, obj, user=None, **kwargs):
obj.update_versioned_object()
if prev_latest_version:
prev_latest_version.is_latest_version = False
prev_latest_version.save(update_fields=['is_latest_version'])
prev_latest_version._index = obj._index # pylint: disable=protected-access
prev_latest_version.save(update_fields=['is_latest_version', '_index'])

obj.sources.set(compact([parent, parent_head]))
persisted = True
cls.resume_indexing()

def index_all():
if prev_latest_version:
prev_latest_version.index()
obj.index()
if obj._index: # pylint: disable=protected-access
if prev_latest_version:
prev_latest_version.index()
obj.index()

transaction.on_commit(index_all)
except ValidationError as err:
Expand All @@ -455,8 +459,9 @@ def index_all():
if obj.id:
obj.sources.remove(parent_head)
if prev_latest_version:
prev_latest_version._index = True # pylint: disable=protected-access
prev_latest_version.is_latest_version = True
prev_latest_version.save(update_fields=['is_latest_version'])
prev_latest_version.save(update_fields=['is_latest_version', '_index'])
obj.delete()
errors['non_field_errors'] = [PERSIST_CLONE_ERROR]

Expand Down

0 comments on commit d6998ee

Please sign in to comment.