Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sync writes/deletes on sub case search indices. #34673

Merged
merged 10 commits into from
May 26, 2024
52 changes: 51 additions & 1 deletion corehq/apps/es/case_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
from copy import deepcopy
from datetime import datetime

from django.conf import settings

from memoized import memoized

from dimagi.utils.parsing import json_format_datetime
Expand All @@ -29,11 +31,12 @@
VALUE,
)
from corehq.apps.es.cases import CaseES, owner
from corehq.apps.es.transient_util import doc_adapter_from_cname
from corehq.util.dates import iso_string_to_datetime

from . import filters, queries
from .cases import case_adapter
from .client import ElasticDocumentAdapter, create_document_adapter
from .client import BulkActionItem, ElasticDocumentAdapter, create_document_adapter
from .const import (
HQ_CASE_SEARCH_INDEX_CANONICAL_NAME,
HQ_CASE_SEARCH_INDEX_NAME,
Expand Down Expand Up @@ -174,6 +177,41 @@ def _from_dict(self, case):
doc['_id'] = case_dict['_id']
return super()._from_dict(doc)

def _get_domain_from_doc(self, doc):
"""
`doc` can be CommcCareCase instance or dict. This util method extracts domain from doc.
This will fail hard if domain is not present in doc.
"""
if isinstance(doc, dict):
return doc["domain"]
if hasattr(doc, 'domain'):
return doc.domain
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this raise an exception if neither case is true?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I stumbled upon a case where this is also used by deletes where we don't have doc but have doc_ids.
So I have just checked for the case assuming all valid case search objects should have a domain. If they don't have a domain they can be safely ignored as they don't belong to the BHA index.


def index(self, doc, refresh=False):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do the 'update' and 'delete' methods also need similar treatment?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deletes are handled a level up in pillows code and we don't use updates with the case-search index.

"""
Selectively multiplexes writes to a sub index based on the domain of the doc.
"""
sub_index_adapter = multiplex_to_adapter(self._get_domain_from_doc(doc))
if sub_index_adapter:
# If we get a valid sub index adapter then we multiplex writes
doc_obj = BulkActionItem.index(doc)
payload = [self._render_bulk_action(doc_obj), sub_index_adapter._render_bulk_action(doc_obj)]
return self._bulk(payload, refresh=refresh, raise_errors=True)
# If adapter is None then simply index the docs
super().index(doc, refresh=refresh)

def bulk(self, actions, refresh=False, raise_errors=True):
"""
Iterates over the list of actions and multiplexes writes to a sub index based on the domain of the doc.
"""
payload = []
for action in actions:
payload.append(self._render_bulk_action(action))
adapter = multiplex_to_adapter(self._get_domain_from_doc(action.doc))
if adapter:
payload.append(adapter._render_bulk_action(action))
return self._bulk(payload, refresh=refresh, raise_errors=raise_errors)


case_search_adapter = create_document_adapter(
ElasticCaseSearch,
Expand All @@ -183,6 +221,18 @@ def _from_dict(self, case):
)


def multiplex_to_adapter(domain):
"""
Reads `CASE_SEARCH_SUB_INDICES` from settings to see if we should multiplex writes for case_search index.
Returns the appropriate adapter based on the domain passed.
"""
multiplex_info = settings.CASE_SEARCH_SUB_INDICES
domain_multiplex_settings = multiplex_info.get(domain, None)
if domain_multiplex_settings and domain_multiplex_settings.get('multiplex_writes'):
return doc_adapter_from_cname(domain_multiplex_settings['index_cname'])
return None


def case_property_query(case_property_name, value, fuzzy=False, multivalue_mode=None,
fuzzy_prefix_length=None):
"""
Expand Down
11 changes: 11 additions & 0 deletions corehq/apps/es/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
)

from .const import (
HQ_CASE_SEARCH_INDEX_CANONICAL_NAME,
INDEX_CONF_REINDEX,
INDEX_CONF_STANDARD,
SCROLL_KEEPALIVE,
Expand Down Expand Up @@ -1155,6 +1156,12 @@ def scroll(self, *args, **kw):
def search(self, *args, **kw):
return self.primary.search(*args, **kw)

def _get_case_search_sub_index_docs(self, action):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function seems to be at the wrong abstraction level. ElasticMultiplexAdapter should not need to know about any particular index by name except for the ones it's configured to multiplex via primary_adapter/secondary_adapter.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with you. I have the similar thought while I was working on it.

The ElasticMultiplexer calls in bulk method of it's own and not of the primary and secondary adapter. We have separate classes for all other adapters but for multiplexed indices every multiplexed adapter is an instance of ElasticMultiplexAdpater which have access to primary and secondary indices.
In this case we are actually dealing with another adapter inside of multiplex adapter which is neither primary nor secondary index.

And I monitored the entire call stack to hack index request to ensure the writes can be multiplexed to sub-indices, this seemed to be most appropriate place. So I ended up doing it in this way.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This also seemed a bit odd to me. Looking at the implementation again it seems like the multiplex adapter doesn't actually call the indexing methods on the primary or secondary but instead uses it's own logic which is why changes need to be applied here as well.

But I do agree with @millerdev that it would be ideal if the multiplex adapter didn't have to know about this. One option could be to have _render_bulk_action return a list of actions instead of a single one. That would allow the primary adapter to generate the actions for both the main and sub index.

(I'm OK with the current implementation for the short term)

from corehq.apps.es.case_search import multiplex_to_adapter
if self.canonical_name == HQ_CASE_SEARCH_INDEX_CANONICAL_NAME:
sub_index_adapter = multiplex_to_adapter(self.primary._get_domain_from_doc(action.doc))
return sub_index_adapter._render_bulk_action(action) if sub_index_adapter else None

# Elastic index write methods (multiplexed between both adapters)
def bulk(self, actions, refresh=False, raise_errors=True):
"""Apply bulk actions on the primary and secondary.
Expand Down Expand Up @@ -1189,6 +1196,10 @@ def bulk(self, actions, refresh=False, raise_errors=True):
)
else:
payload.append(self.secondary._render_bulk_action(action))
sub_index_docs = self._get_case_search_sub_index_docs(action)
if sub_index_docs:
payload.append(sub_index_docs)

_, chunk_errs = bulk(self._es, payload, chunk_size=len(payload),
refresh=refresh, raise_on_error=False,
raise_on_exception=raise_errors)
Expand Down
181 changes: 180 additions & 1 deletion corehq/apps/es/tests/test_case_search_adapter.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,19 @@
from unittest.mock import patch

from django.test import TestCase

from corehq.apps.es.case_search import case_search_adapter
from corehq.apps.es import const
from corehq.apps.es.case_search import (
BulkActionItem,
ElasticCaseSearch,
case_search_adapter,
)
from corehq.apps.es.case_search_bha import case_search_bha_adapter
from corehq.apps.es.client import (
ElasticMultiplexAdapter,
create_document_adapter,
)
from corehq.apps.es.migration_operations import CreateIndex
from corehq.apps.es.tests.utils import es_test
from corehq.form_processor.tests.utils import create_case

Expand Down Expand Up @@ -43,3 +56,169 @@ def test_index_can_handle_case_objects(self):
es_case = case_search_adapter.search({})['hits']['hits'][0]['_source']
es_case.pop('@indexed_on')
self.assertEqual(es_case, case)


@es_test(requires=[case_search_adapter, case_search_bha_adapter], setup_class=True)
class TestCaseSearchAdapterAlsoWritesToAnotherIndex(TestCase):

@classmethod
def setUpClass(cls):
super().setUpClass()
cls.domain = 'casesearch-dual-writetests'
cls.case = create_case(cls.domain, save=True)
cls.bulk_cases = [
create_case(cls.domain, save=True).to_json()
for i in range(2)
]
cls.bulk_index_actions = [
BulkActionItem.index(case)
for case in cls.bulk_cases
]
cls.bulk_delete_actions = [
BulkActionItem.delete(case)
for case in cls.bulk_cases
]

def _get_normalized_cases_from_hits(self, cases):
normalised_cases = []
for hit in cases['hits']['hits']:
normalised_case = self._normalise_case(hit['_source'])
normalised_cases.append(normalised_case)
return normalised_cases

def _normalise_case(self, case):
case.pop('@indexed_on')
return case

def test_index_writes_to_both_adapters(self):
with patch('corehq.apps.es.case_search.multiplex_to_adapter', return_value=case_search_bha_adapter):
case_search_adapter.index(self.case, refresh=True)
self.addCleanup(case_search_bha_adapter.delete, self.case.case_id)
self.addCleanup(case_search_adapter.delete, self.case.case_id)

docs_in_bha = self._get_normalized_cases_from_hits(case_search_bha_adapter.search({}))
docs_in_case_search = self._get_normalized_cases_from_hits(case_search_adapter.search({}))

self.assertEqual(docs_in_bha, docs_in_case_search)

def test_index_not_writes_to_bha_adapter_if_not_required(self):
with patch('corehq.apps.es.case_search.multiplex_to_adapter', return_value=None):
case_search_adapter.index(self.case, refresh=True)
self.addCleanup(case_search_adapter.delete, self.case.case_id)

docs_in_bha = self._get_normalized_cases_from_hits(case_search_bha_adapter.search({}))
docs_in_case_search = self._get_normalized_cases_from_hits(case_search_adapter.search({}))

self.assertEqual(docs_in_bha, [])
self.assertEqual(len(docs_in_case_search), 1)

def test_bulk_with_bha_mutliplexing(self):
with patch('corehq.apps.es.case_search.multiplex_to_adapter', return_value=case_search_bha_adapter):
case_search_adapter.bulk(self.bulk_index_actions, refresh=True)

# Cleanup
self.addCleanup(case_search_adapter.bulk, self.bulk_delete_actions, refresh=True)
self.addCleanup(case_search_bha_adapter.bulk, self.bulk_delete_actions, refresh=True)
gherceg marked this conversation as resolved.
Show resolved Hide resolved

docs_in_bha = self._get_normalized_cases_from_hits(case_search_bha_adapter.search({}))
docs_in_case_search = self._get_normalized_cases_from_hits(case_search_adapter.search({}))

self.assertEqual(docs_in_bha, docs_in_case_search)
gherceg marked this conversation as resolved.
Show resolved Hide resolved
self.assertEqual(len(docs_in_case_search), 2)

def test_bulk_without_bha_mutliplexing(self):
with patch('corehq.apps.es.case_search.multiplex_to_adapter', return_value=None):
case_search_adapter.bulk(self.bulk_index_actions, refresh=True)

# Cleanup
self.addCleanup(case_search_adapter.bulk, self.bulk_delete_actions, refresh=True)

docs_in_bha = self._get_normalized_cases_from_hits(case_search_bha_adapter.search({}))
docs_in_case_search = self._get_normalized_cases_from_hits(case_search_adapter.search({}))

self.assertEqual(docs_in_bha, [])
self.assertEqual(
{case["case_id"] for case in self.bulk_cases},
{case["_id"] for case in docs_in_case_search}
)

@patch.object(const, 'ES_CASE_SEARCH_INDEX_MULTIPLEXED', True)
@patch('corehq.apps.es.case_search.multiplex_to_adapter', return_value=case_search_bha_adapter)
def test_index_with_multiplexed_adapter(self, _):
SECONDARY_INDEX = 'test_secondary_case_search_index'
index_create_op = CreateIndex(
SECONDARY_INDEX,
case_search_adapter.type,
case_search_adapter.mapping,
case_search_adapter.analysis,
case_search_adapter.settings_key,
)
index_create_op.run()
self.addCleanup(index_create_op.reverse_run)
cs_multiplex_adapter = create_document_adapter(
ElasticCaseSearch,
const.HQ_CASE_SEARCH_INDEX_NAME,
case_search_adapter.type,
# create document adapter appends test_ to the index name
secondary=SECONDARY_INDEX[5:]
)

# Test we are using the multiplex adapter
self.assertTrue(isinstance(cs_multiplex_adapter, ElasticMultiplexAdapter))

# Index the case in ES
cs_multiplex_adapter.index(self.case, refresh=True)
self.addCleanup(case_search_adapter.delete, self.case.case_id)
self.addCleanup(case_search_bha_adapter.delete, self.case.case_id, refresh=True)

# Test index writes to all three indices i.e
# primary, secondary and sub index
docs_in_bha = self._get_normalized_cases_from_hits(case_search_bha_adapter.search({}))
docs_in_case_search = self._get_normalized_cases_from_hits(case_search_adapter.search({}))
docs_in_secondary_case_search = self._get_normalized_cases_from_hits(
cs_multiplex_adapter.secondary.search({})
)

self.assertEqual(len(docs_in_case_search), 1)
self.assertEqual(docs_in_bha, docs_in_case_search)
self.assertEqual(docs_in_secondary_case_search, docs_in_case_search)

@patch.object(const, 'ES_CASE_SEARCH_INDEX_MULTIPLEXED', True)
@patch('corehq.apps.es.case_search.multiplex_to_adapter', return_value=None)
def test_index_with_multiplexed_adapter_without_sub_index_settings(self, _):
SECONDARY_INDEX = 'test_secondary_case_search_index'
index_create_op = CreateIndex(
SECONDARY_INDEX,
case_search_adapter.type,
case_search_adapter.mapping,
case_search_adapter.analysis,
case_search_adapter.settings_key,
)
index_create_op.run()
self.addCleanup(index_create_op.reverse_run)
cs_multiplex_adapter = create_document_adapter(
ElasticCaseSearch,
const.HQ_CASE_SEARCH_INDEX_NAME,
case_search_adapter.type,
# create document adapter appends test_ to the index name
secondary=SECONDARY_INDEX[5:]
)

# Test we are using the multiplex adapter
self.assertTrue(isinstance(cs_multiplex_adapter, ElasticMultiplexAdapter))

# Index the case in ES
cs_multiplex_adapter.index(self.case, refresh=True)
self.addCleanup(case_search_adapter.delete, self.case.case_id)

docs_in_bha = self._get_normalized_cases_from_hits(case_search_bha_adapter.search({}))
docs_in_case_search = self._get_normalized_cases_from_hits(case_search_adapter.search({}))
docs_in_secondary_case_search = self._get_normalized_cases_from_hits(
cs_multiplex_adapter.secondary.search({})
)

# Test index writes to only two indices i.e
# primary and secondary
self.assertEqual(docs_in_bha, [])
self.assertEqual(len(docs_in_case_search), 1)
self.assertEqual(docs_in_secondary_case_search, docs_in_case_search)
27 changes: 23 additions & 4 deletions corehq/ex-submodules/pillowtop/processors/elastic.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@

from django.conf import settings

from dimagi.utils.logging import notify_error

from corehq.apps.es.case_search import multiplex_to_adapter
from corehq.apps.es.const import HQ_CASE_SEARCH_INDEX_CANONICAL_NAME
from pillowtop.exceptions import BulkDocException, PillowtopIndexingError
from pillowtop.logger import pillow_logging
from pillowtop.utils import (
Expand Down Expand Up @@ -68,20 +72,24 @@ def process_change(self, change):

if change.deleted and change.id:
doc = change.get_document()
domain = doc.get('domain') if doc else None
if not domain:
meta = getattr(change, 'metadata', None)
domain = meta.domain if meta else None
if doc and doc.get('doc_type'):
logger.info(
f'[process_change] Attempting to delete doc {change.id}')
current_meta = get_doc_meta_object_from_document(doc)
if current_meta.is_deletion:
self._delete_doc_if_exists(change.id)
self._delete_doc_if_exists(change.id, domain=domain)
logger.info(
f"[process_change] Deleted doc {change.id}")
else:
logger.info(
f"[process_change] Not deleting doc {change.id} "
"because current_meta.is_deletion is false")
else:
self._delete_doc_if_exists(change.id)
self._delete_doc_if_exists(change.id, domain=domain)
logger.info(
f"[process_change] Deleted doc {change.id}")
return
Expand All @@ -97,7 +105,7 @@ def process_change(self, change):
return

if doc.get('doc_type') is not None and doc['doc_type'].endswith("-Deleted"):
self._delete_doc_if_exists(change.id)
self._delete_doc_if_exists(change.id, domain=doc.get('domain'))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why does this not need to check the change meta like the above did?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is because this isn't a deletion, and therefore there should still be a valid DB object when we call change.get_document().

return

# send it across
Expand All @@ -109,7 +117,18 @@ def process_change(self, change):
data=doc,
)

def _delete_doc_if_exists(self, doc_id):
def _delete_doc_if_exists(self, doc_id, domain=None):
if self.adapter.canonical_name == HQ_CASE_SEARCH_INDEX_CANONICAL_NAME:
if domain:
sub_index_adapter = multiplex_to_adapter(domain)
if sub_index_adapter:
send_to_elasticsearch(
doc_id=doc_id, adapter=sub_index_adapter,
name='ElasticProcessor', delete=True
)
else:
notify_error(f"Domain not specified when deleting case {doc_id} from case search index")

send_to_elasticsearch(
doc_id=doc_id,
adapter=self.adapter,
Expand Down
11 changes: 11 additions & 0 deletions settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -1076,6 +1076,17 @@ def _pkce_required(client_id):
# used to override low-level index settings (number_of_replicas, number_of_shards, etc)
ES_SETTINGS = None

"""
The CASE_SEARCH_SUB_INDICES should look like this:
{
'co-carecoordination-perf': {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: rather than give an example this would be more helpful if it explained what the fields do/mean. for example, i think the first is a domain, but only because I recognize that domain, and I am not sure what the other two fields control looking here.

'index_cname': 'case_search_bha',
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where is the cname mapped to an actual index name?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The case_search_bha_adapter effectively maps cname to the actual index name. So using the cname defined in settings, we can grab the adapter as done here: https://github.com/dimagi/commcare-hq/pull/34673/files#diff-7ba2ac2cd9c42581b497ab0c7cb5ea77288d3a8d7e8583cfc2c35a4da9e063b6R232

'multiplex_writes': True,
}
}
"""
CASE_SEARCH_SUB_INDICES = {}

PHI_API_KEY = None
PHI_PASSWORD = None

Expand Down
Loading