-
-
Notifications
You must be signed in to change notification settings - Fork 217
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Sync writes/deletes on sub case search indices. #34673
Changes from all commits
8b38231
a636ebc
b8797e2
3417fef
7e82f79
5febaf1
399c215
2a81942
02b8735
f84af4a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -13,6 +13,8 @@ | |
from copy import deepcopy | ||
from datetime import datetime | ||
|
||
from django.conf import settings | ||
|
||
from memoized import memoized | ||
|
||
from dimagi.utils.parsing import json_format_datetime | ||
|
@@ -29,11 +31,12 @@ | |
VALUE, | ||
) | ||
from corehq.apps.es.cases import CaseES, owner | ||
from corehq.apps.es.transient_util import doc_adapter_from_cname | ||
from corehq.util.dates import iso_string_to_datetime | ||
|
||
from . import filters, queries | ||
from .cases import case_adapter | ||
from .client import ElasticDocumentAdapter, create_document_adapter | ||
from .client import BulkActionItem, ElasticDocumentAdapter, create_document_adapter | ||
from .const import ( | ||
HQ_CASE_SEARCH_INDEX_CANONICAL_NAME, | ||
HQ_CASE_SEARCH_INDEX_NAME, | ||
|
@@ -174,6 +177,41 @@ def _from_dict(self, case): | |
doc['_id'] = case_dict['_id'] | ||
return super()._from_dict(doc) | ||
|
||
def _get_domain_from_doc(self, doc): | ||
""" | ||
`doc` can be CommcCareCase instance or dict. This util method extracts domain from doc. | ||
This will fail hard if domain is not present in doc. | ||
""" | ||
if isinstance(doc, dict): | ||
return doc["domain"] | ||
if hasattr(doc, 'domain'): | ||
return doc.domain | ||
|
||
def index(self, doc, refresh=False): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do the 'update' and 'delete' methods also need similar treatment? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Deletes are handled a level up in pillows code and we don't use updates with the case-search index. |
||
""" | ||
Selectively multiplexes writes to a sub index based on the domain of the doc. | ||
""" | ||
sub_index_adapter = multiplex_to_adapter(self._get_domain_from_doc(doc)) | ||
if sub_index_adapter: | ||
# If we get a valid sub index adapter then we multiplex writes | ||
doc_obj = BulkActionItem.index(doc) | ||
payload = [self._render_bulk_action(doc_obj), sub_index_adapter._render_bulk_action(doc_obj)] | ||
return self._bulk(payload, refresh=refresh, raise_errors=True) | ||
# If adapter is None then simply index the docs | ||
super().index(doc, refresh=refresh) | ||
|
||
def bulk(self, actions, refresh=False, raise_errors=True): | ||
""" | ||
Iterates over the list of actions and multiplexes writes to a sub index based on the domain of the doc. | ||
""" | ||
payload = [] | ||
for action in actions: | ||
payload.append(self._render_bulk_action(action)) | ||
adapter = multiplex_to_adapter(self._get_domain_from_doc(action.doc)) | ||
if adapter: | ||
payload.append(adapter._render_bulk_action(action)) | ||
return self._bulk(payload, refresh=refresh, raise_errors=raise_errors) | ||
|
||
|
||
case_search_adapter = create_document_adapter( | ||
ElasticCaseSearch, | ||
|
@@ -183,6 +221,18 @@ def _from_dict(self, case): | |
) | ||
|
||
|
||
def multiplex_to_adapter(domain): | ||
""" | ||
Reads `CASE_SEARCH_SUB_INDICES` from settings to see if we should multiplex writes for case_search index. | ||
Returns the appropriate adapter based on the domain passed. | ||
""" | ||
multiplex_info = settings.CASE_SEARCH_SUB_INDICES | ||
domain_multiplex_settings = multiplex_info.get(domain, None) | ||
if domain_multiplex_settings and domain_multiplex_settings.get('multiplex_writes'): | ||
return doc_adapter_from_cname(domain_multiplex_settings['index_cname']) | ||
return None | ||
|
||
|
||
def case_property_query(case_property_name, value, fuzzy=False, multivalue_mode=None, | ||
fuzzy_prefix_length=None): | ||
""" | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -30,6 +30,7 @@ | |
) | ||
|
||
from .const import ( | ||
HQ_CASE_SEARCH_INDEX_CANONICAL_NAME, | ||
INDEX_CONF_REINDEX, | ||
INDEX_CONF_STANDARD, | ||
SCROLL_KEEPALIVE, | ||
|
@@ -1155,6 +1156,12 @@ def scroll(self, *args, **kw): | |
def search(self, *args, **kw): | ||
return self.primary.search(*args, **kw) | ||
|
||
def _get_case_search_sub_index_docs(self, action): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This function seems to be at the wrong abstraction level. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree with you. I have the similar thought while I was working on it. The And I monitored the entire call stack to hack index request to ensure the writes can be multiplexed to sub-indices, this seemed to be most appropriate place. So I ended up doing it in this way. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This also seemed a bit odd to me. Looking at the implementation again it seems like the multiplex adapter doesn't actually call the indexing methods on the primary or secondary but instead uses it's own logic which is why changes need to be applied here as well. But I do agree with @millerdev that it would be ideal if the multiplex adapter didn't have to know about this. One option could be to have (I'm OK with the current implementation for the short term) |
||
from corehq.apps.es.case_search import multiplex_to_adapter | ||
if self.canonical_name == HQ_CASE_SEARCH_INDEX_CANONICAL_NAME: | ||
sub_index_adapter = multiplex_to_adapter(self.primary._get_domain_from_doc(action.doc)) | ||
return sub_index_adapter._render_bulk_action(action) if sub_index_adapter else None | ||
|
||
# Elastic index write methods (multiplexed between both adapters) | ||
def bulk(self, actions, refresh=False, raise_errors=True): | ||
"""Apply bulk actions on the primary and secondary. | ||
|
@@ -1189,6 +1196,10 @@ def bulk(self, actions, refresh=False, raise_errors=True): | |
) | ||
else: | ||
payload.append(self.secondary._render_bulk_action(action)) | ||
sub_index_docs = self._get_case_search_sub_index_docs(action) | ||
if sub_index_docs: | ||
payload.append(sub_index_docs) | ||
|
||
_, chunk_errs = bulk(self._es, payload, chunk_size=len(payload), | ||
refresh=refresh, raise_on_error=False, | ||
raise_on_exception=raise_errors) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,6 +4,10 @@ | |
|
||
from django.conf import settings | ||
|
||
from dimagi.utils.logging import notify_error | ||
|
||
from corehq.apps.es.case_search import multiplex_to_adapter | ||
from corehq.apps.es.const import HQ_CASE_SEARCH_INDEX_CANONICAL_NAME | ||
from pillowtop.exceptions import BulkDocException, PillowtopIndexingError | ||
from pillowtop.logger import pillow_logging | ||
from pillowtop.utils import ( | ||
|
@@ -68,20 +72,24 @@ def process_change(self, change): | |
|
||
if change.deleted and change.id: | ||
doc = change.get_document() | ||
domain = doc.get('domain') if doc else None | ||
if not domain: | ||
meta = getattr(change, 'metadata', None) | ||
domain = meta.domain if meta else None | ||
if doc and doc.get('doc_type'): | ||
logger.info( | ||
f'[process_change] Attempting to delete doc {change.id}') | ||
current_meta = get_doc_meta_object_from_document(doc) | ||
if current_meta.is_deletion: | ||
self._delete_doc_if_exists(change.id) | ||
self._delete_doc_if_exists(change.id, domain=domain) | ||
logger.info( | ||
f"[process_change] Deleted doc {change.id}") | ||
else: | ||
logger.info( | ||
f"[process_change] Not deleting doc {change.id} " | ||
"because current_meta.is_deletion is false") | ||
else: | ||
self._delete_doc_if_exists(change.id) | ||
self._delete_doc_if_exists(change.id, domain=domain) | ||
logger.info( | ||
f"[process_change] Deleted doc {change.id}") | ||
return | ||
|
@@ -97,7 +105,7 @@ def process_change(self, change): | |
return | ||
|
||
if doc.get('doc_type') is not None and doc['doc_type'].endswith("-Deleted"): | ||
self._delete_doc_if_exists(change.id) | ||
self._delete_doc_if_exists(change.id, domain=doc.get('domain')) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why does this not need to check the change meta like the above did? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it is because this isn't a deletion, and therefore there should still be a valid DB object when we call |
||
return | ||
|
||
# send it across | ||
|
@@ -109,7 +117,18 @@ def process_change(self, change): | |
data=doc, | ||
) | ||
|
||
def _delete_doc_if_exists(self, doc_id): | ||
def _delete_doc_if_exists(self, doc_id, domain=None): | ||
if self.adapter.canonical_name == HQ_CASE_SEARCH_INDEX_CANONICAL_NAME: | ||
if domain: | ||
sub_index_adapter = multiplex_to_adapter(domain) | ||
if sub_index_adapter: | ||
send_to_elasticsearch( | ||
doc_id=doc_id, adapter=sub_index_adapter, | ||
name='ElasticProcessor', delete=True | ||
) | ||
else: | ||
notify_error(f"Domain not specified when deleting case {doc_id} from case search index") | ||
|
||
send_to_elasticsearch( | ||
doc_id=doc_id, | ||
adapter=self.adapter, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1076,6 +1076,17 @@ def _pkce_required(client_id): | |
# used to override low-level index settings (number_of_replicas, number_of_shards, etc) | ||
ES_SETTINGS = None | ||
|
||
""" | ||
The CASE_SEARCH_SUB_INDICES should look like this: | ||
{ | ||
'co-carecoordination-perf': { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: rather than give an example this would be more helpful if it explained what the fields do/mean. for example, i think the first is a domain, but only because I recognize that domain, and I am not sure what the other two fields control looking here. |
||
'index_cname': 'case_search_bha', | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. where is the cname mapped to an actual index name? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The case_search_bha_adapter effectively maps cname to the actual index name. So using the cname defined in settings, we can grab the adapter as done here: https://github.com/dimagi/commcare-hq/pull/34673/files#diff-7ba2ac2cd9c42581b497ab0c7cb5ea77288d3a8d7e8583cfc2c35a4da9e063b6R232 |
||
'multiplex_writes': True, | ||
} | ||
} | ||
""" | ||
CASE_SEARCH_SUB_INDICES = {} | ||
|
||
PHI_API_KEY = None | ||
PHI_PASSWORD = None | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this raise an exception if neither case is true?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I stumbled upon a case where this is also used by deletes where we don't have doc but have doc_ids.
So I have just checked for the case assuming all valid case search objects should have a domain. If they don't have a domain they can be safely ignored as they don't belong to the BHA index.