Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sync writes/deletes on sub case search indices. #34673

Merged
merged 10 commits into from
May 26, 2024
20 changes: 16 additions & 4 deletions corehq/ex-submodules/pillowtop/processors/elastic.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

from django.conf import settings

from corehq.apps.es.case_search import multiplex_to_adapter
from corehq.apps.es.const import HQ_CASE_SEARCH_INDEX_CANONICAL_NAME
from pillowtop.exceptions import BulkDocException, PillowtopIndexingError
from pillowtop.logger import pillow_logging
from pillowtop.utils import (
Expand Down Expand Up @@ -68,20 +70,23 @@ def process_change(self, change):

if change.deleted and change.id:
doc = change.get_document()
domain = doc.get('domain') if doc else None
if not domain:
domain = change.metadata.domain
gherceg marked this conversation as resolved.
Show resolved Hide resolved
if doc and doc.get('doc_type'):
logger.info(
f'[process_change] Attempting to delete doc {change.id}')
current_meta = get_doc_meta_object_from_document(doc)
if current_meta.is_deletion:
self._delete_doc_if_exists(change.id)
self._delete_doc_if_exists(change.id, domain=domain)
logger.info(
f"[process_change] Deleted doc {change.id}")
else:
logger.info(
f"[process_change] Not deleting doc {change.id} "
"because current_meta.is_deletion is false")
else:
self._delete_doc_if_exists(change.id)
self._delete_doc_if_exists(change.id, domain=domain)
logger.info(
f"[process_change] Deleted doc {change.id}")
return
Expand All @@ -97,7 +102,7 @@ def process_change(self, change):
return

if doc.get('doc_type') is not None and doc['doc_type'].endswith("-Deleted"):
self._delete_doc_if_exists(change.id)
self._delete_doc_if_exists(change.id, domain=doc.get('domain'))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why does this not need to check the change meta like the above did?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is because this isn't a deletion, and therefore there should still be a valid DB object when we call change.get_document().

return

# send it across
Expand All @@ -109,7 +114,14 @@ def process_change(self, change):
data=doc,
)

def _delete_doc_if_exists(self, doc_id):
def _delete_doc_if_exists(self, doc_id, domain=None):
if self.adapter.canonical_name == HQ_CASE_SEARCH_INDEX_CANONICAL_NAME:
sub_index_adapter = multiplex_to_adapter(domain)
if sub_index_adapter:
send_to_elasticsearch(
doc_id=doc_id, adapter=sub_index_adapter,
name='ElasticProcessor', delete=True
)
send_to_elasticsearch(
doc_id=doc_id,
adapter=self.adapter,
Expand Down
Loading