Skip to content

Commit

Permalink
Merge pull request #34673 from dimagi/ap/sync-sub-indices
Browse files Browse the repository at this point in the history
Sync writes/deletes on sub case search indices.
  • Loading branch information
gherceg committed May 26, 2024
2 parents 37a01bf + f84af4a commit 63a9782
Show file tree
Hide file tree
Showing 5 changed files with 276 additions and 6 deletions.
52 changes: 51 additions & 1 deletion corehq/apps/es/case_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
from copy import deepcopy
from datetime import datetime

from django.conf import settings

from memoized import memoized

from dimagi.utils.parsing import json_format_datetime
Expand All @@ -29,11 +31,12 @@
VALUE,
)
from corehq.apps.es.cases import CaseES, owner
from corehq.apps.es.transient_util import doc_adapter_from_cname
from corehq.util.dates import iso_string_to_datetime

from . import filters, queries
from .cases import case_adapter
from .client import ElasticDocumentAdapter, create_document_adapter
from .client import BulkActionItem, ElasticDocumentAdapter, create_document_adapter
from .const import (
HQ_CASE_SEARCH_INDEX_CANONICAL_NAME,
HQ_CASE_SEARCH_INDEX_NAME,
Expand Down Expand Up @@ -174,6 +177,41 @@ def _from_dict(self, case):
doc['_id'] = case_dict['_id']
return super()._from_dict(doc)

def _get_domain_from_doc(self, doc):
"""
`doc` can be CommcCareCase instance or dict. This util method extracts domain from doc.
This will fail hard if domain is not present in doc.
"""
if isinstance(doc, dict):
return doc["domain"]
if hasattr(doc, 'domain'):
return doc.domain

def index(self, doc, refresh=False):
"""
Selectively multiplexes writes to a sub index based on the domain of the doc.
"""
sub_index_adapter = multiplex_to_adapter(self._get_domain_from_doc(doc))
if sub_index_adapter:
# If we get a valid sub index adapter then we multiplex writes
doc_obj = BulkActionItem.index(doc)
payload = [self._render_bulk_action(doc_obj), sub_index_adapter._render_bulk_action(doc_obj)]
return self._bulk(payload, refresh=refresh, raise_errors=True)
# If adapter is None then simply index the docs
super().index(doc, refresh=refresh)

def bulk(self, actions, refresh=False, raise_errors=True):
"""
Iterates over the list of actions and multiplexes writes to a sub index based on the domain of the doc.
"""
payload = []
for action in actions:
payload.append(self._render_bulk_action(action))
adapter = multiplex_to_adapter(self._get_domain_from_doc(action.doc))
if adapter:
payload.append(adapter._render_bulk_action(action))
return self._bulk(payload, refresh=refresh, raise_errors=raise_errors)


case_search_adapter = create_document_adapter(
ElasticCaseSearch,
Expand All @@ -183,6 +221,18 @@ def _from_dict(self, case):
)


def multiplex_to_adapter(domain):
"""
Reads `CASE_SEARCH_SUB_INDICES` from settings to see if we should multiplex writes for case_search index.
Returns the appropriate adapter based on the domain passed.
"""
multiplex_info = settings.CASE_SEARCH_SUB_INDICES
domain_multiplex_settings = multiplex_info.get(domain, None)
if domain_multiplex_settings and domain_multiplex_settings.get('multiplex_writes'):
return doc_adapter_from_cname(domain_multiplex_settings['index_cname'])
return None


def case_property_query(case_property_name, value, fuzzy=False, multivalue_mode=None,
fuzzy_prefix_length=None):
"""
Expand Down
11 changes: 11 additions & 0 deletions corehq/apps/es/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
)

from .const import (
HQ_CASE_SEARCH_INDEX_CANONICAL_NAME,
INDEX_CONF_REINDEX,
INDEX_CONF_STANDARD,
SCROLL_KEEPALIVE,
Expand Down Expand Up @@ -1160,6 +1161,12 @@ def scroll(self, *args, **kw):
def search(self, *args, **kw):
return self.primary.search(*args, **kw)

def _get_case_search_sub_index_docs(self, action):
from corehq.apps.es.case_search import multiplex_to_adapter
if self.canonical_name == HQ_CASE_SEARCH_INDEX_CANONICAL_NAME:
sub_index_adapter = multiplex_to_adapter(self.primary._get_domain_from_doc(action.doc))
return sub_index_adapter._render_bulk_action(action) if sub_index_adapter else None

# Elastic index write methods (multiplexed between both adapters)
def bulk(self, actions, refresh=False, raise_errors=True):
"""Apply bulk actions on the primary and secondary.
Expand Down Expand Up @@ -1194,6 +1201,10 @@ def bulk(self, actions, refresh=False, raise_errors=True):
)
else:
payload.append(self.secondary._render_bulk_action(action))
sub_index_docs = self._get_case_search_sub_index_docs(action)
if sub_index_docs:
payload.append(sub_index_docs)

_, chunk_errs = bulk(self._es, payload, chunk_size=len(payload),
refresh=refresh, raise_on_error=False,
raise_on_exception=raise_errors)
Expand Down
181 changes: 180 additions & 1 deletion corehq/apps/es/tests/test_case_search_adapter.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,19 @@
from unittest.mock import patch

from django.test import TestCase

from corehq.apps.es.case_search import case_search_adapter
from corehq.apps.es import const
from corehq.apps.es.case_search import (
BulkActionItem,
ElasticCaseSearch,
case_search_adapter,
)
from corehq.apps.es.case_search_bha import case_search_bha_adapter
from corehq.apps.es.client import (
ElasticMultiplexAdapter,
create_document_adapter,
)
from corehq.apps.es.migration_operations import CreateIndex
from corehq.apps.es.tests.utils import es_test
from corehq.form_processor.tests.utils import create_case

Expand Down Expand Up @@ -43,3 +56,169 @@ def test_index_can_handle_case_objects(self):
es_case = case_search_adapter.search({})['hits']['hits'][0]['_source']
es_case.pop('@indexed_on')
self.assertEqual(es_case, case)


@es_test(requires=[case_search_adapter, case_search_bha_adapter], setup_class=True)
class TestCaseSearchAdapterAlsoWritesToAnotherIndex(TestCase):

@classmethod
def setUpClass(cls):
super().setUpClass()
cls.domain = 'casesearch-dual-writetests'
cls.case = create_case(cls.domain, save=True)
cls.bulk_cases = [
create_case(cls.domain, save=True).to_json()
for i in range(2)
]
cls.bulk_index_actions = [
BulkActionItem.index(case)
for case in cls.bulk_cases
]
cls.bulk_delete_actions = [
BulkActionItem.delete(case)
for case in cls.bulk_cases
]

def _get_normalized_cases_from_hits(self, cases):
normalised_cases = []
for hit in cases['hits']['hits']:
normalised_case = self._normalise_case(hit['_source'])
normalised_cases.append(normalised_case)
return normalised_cases

def _normalise_case(self, case):
case.pop('@indexed_on')
return case

def test_index_writes_to_both_adapters(self):
with patch('corehq.apps.es.case_search.multiplex_to_adapter', return_value=case_search_bha_adapter):
case_search_adapter.index(self.case, refresh=True)
self.addCleanup(case_search_bha_adapter.delete, self.case.case_id)
self.addCleanup(case_search_adapter.delete, self.case.case_id)

docs_in_bha = self._get_normalized_cases_from_hits(case_search_bha_adapter.search({}))
docs_in_case_search = self._get_normalized_cases_from_hits(case_search_adapter.search({}))

self.assertEqual(docs_in_bha, docs_in_case_search)

def test_index_not_writes_to_bha_adapter_if_not_required(self):
with patch('corehq.apps.es.case_search.multiplex_to_adapter', return_value=None):
case_search_adapter.index(self.case, refresh=True)
self.addCleanup(case_search_adapter.delete, self.case.case_id)

docs_in_bha = self._get_normalized_cases_from_hits(case_search_bha_adapter.search({}))
docs_in_case_search = self._get_normalized_cases_from_hits(case_search_adapter.search({}))

self.assertEqual(docs_in_bha, [])
self.assertEqual(len(docs_in_case_search), 1)

def test_bulk_with_bha_mutliplexing(self):
with patch('corehq.apps.es.case_search.multiplex_to_adapter', return_value=case_search_bha_adapter):
case_search_adapter.bulk(self.bulk_index_actions, refresh=True)

# Cleanup
self.addCleanup(case_search_adapter.bulk, self.bulk_delete_actions, refresh=True)
self.addCleanup(case_search_bha_adapter.bulk, self.bulk_delete_actions, refresh=True)

docs_in_bha = self._get_normalized_cases_from_hits(case_search_bha_adapter.search({}))
docs_in_case_search = self._get_normalized_cases_from_hits(case_search_adapter.search({}))

self.assertEqual(docs_in_bha, docs_in_case_search)
self.assertEqual(len(docs_in_case_search), 2)

def test_bulk_without_bha_mutliplexing(self):
with patch('corehq.apps.es.case_search.multiplex_to_adapter', return_value=None):
case_search_adapter.bulk(self.bulk_index_actions, refresh=True)

# Cleanup
self.addCleanup(case_search_adapter.bulk, self.bulk_delete_actions, refresh=True)

docs_in_bha = self._get_normalized_cases_from_hits(case_search_bha_adapter.search({}))
docs_in_case_search = self._get_normalized_cases_from_hits(case_search_adapter.search({}))

self.assertEqual(docs_in_bha, [])
self.assertEqual(
{case["case_id"] for case in self.bulk_cases},
{case["_id"] for case in docs_in_case_search}
)

@patch.object(const, 'ES_CASE_SEARCH_INDEX_MULTIPLEXED', True)
@patch('corehq.apps.es.case_search.multiplex_to_adapter', return_value=case_search_bha_adapter)
def test_index_with_multiplexed_adapter(self, _):
SECONDARY_INDEX = 'test_secondary_case_search_index'
index_create_op = CreateIndex(
SECONDARY_INDEX,
case_search_adapter.type,
case_search_adapter.mapping,
case_search_adapter.analysis,
case_search_adapter.settings_key,
)
index_create_op.run()
self.addCleanup(index_create_op.reverse_run)
cs_multiplex_adapter = create_document_adapter(
ElasticCaseSearch,
const.HQ_CASE_SEARCH_INDEX_NAME,
case_search_adapter.type,
# create document adapter appends test_ to the index name
secondary=SECONDARY_INDEX[5:]
)

# Test we are using the multiplex adapter
self.assertTrue(isinstance(cs_multiplex_adapter, ElasticMultiplexAdapter))

# Index the case in ES
cs_multiplex_adapter.index(self.case, refresh=True)
self.addCleanup(case_search_adapter.delete, self.case.case_id)
self.addCleanup(case_search_bha_adapter.delete, self.case.case_id, refresh=True)

# Test index writes to all three indices i.e
# primary, secondary and sub index
docs_in_bha = self._get_normalized_cases_from_hits(case_search_bha_adapter.search({}))
docs_in_case_search = self._get_normalized_cases_from_hits(case_search_adapter.search({}))
docs_in_secondary_case_search = self._get_normalized_cases_from_hits(
cs_multiplex_adapter.secondary.search({})
)

self.assertEqual(len(docs_in_case_search), 1)
self.assertEqual(docs_in_bha, docs_in_case_search)
self.assertEqual(docs_in_secondary_case_search, docs_in_case_search)

@patch.object(const, 'ES_CASE_SEARCH_INDEX_MULTIPLEXED', True)
@patch('corehq.apps.es.case_search.multiplex_to_adapter', return_value=None)
def test_index_with_multiplexed_adapter_without_sub_index_settings(self, _):
SECONDARY_INDEX = 'test_secondary_case_search_index'
index_create_op = CreateIndex(
SECONDARY_INDEX,
case_search_adapter.type,
case_search_adapter.mapping,
case_search_adapter.analysis,
case_search_adapter.settings_key,
)
index_create_op.run()
self.addCleanup(index_create_op.reverse_run)
cs_multiplex_adapter = create_document_adapter(
ElasticCaseSearch,
const.HQ_CASE_SEARCH_INDEX_NAME,
case_search_adapter.type,
# create document adapter appends test_ to the index name
secondary=SECONDARY_INDEX[5:]
)

# Test we are using the multiplex adapter
self.assertTrue(isinstance(cs_multiplex_adapter, ElasticMultiplexAdapter))

# Index the case in ES
cs_multiplex_adapter.index(self.case, refresh=True)
self.addCleanup(case_search_adapter.delete, self.case.case_id)

docs_in_bha = self._get_normalized_cases_from_hits(case_search_bha_adapter.search({}))
docs_in_case_search = self._get_normalized_cases_from_hits(case_search_adapter.search({}))
docs_in_secondary_case_search = self._get_normalized_cases_from_hits(
cs_multiplex_adapter.secondary.search({})
)

# Test index writes to only two indices i.e
# primary and secondary
self.assertEqual(docs_in_bha, [])
self.assertEqual(len(docs_in_case_search), 1)
self.assertEqual(docs_in_secondary_case_search, docs_in_case_search)
27 changes: 23 additions & 4 deletions corehq/ex-submodules/pillowtop/processors/elastic.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@

from django.conf import settings

from dimagi.utils.logging import notify_error

from corehq.apps.es.case_search import multiplex_to_adapter
from corehq.apps.es.const import HQ_CASE_SEARCH_INDEX_CANONICAL_NAME
from pillowtop.exceptions import BulkDocException, PillowtopIndexingError
from pillowtop.logger import pillow_logging
from pillowtop.utils import (
Expand Down Expand Up @@ -68,20 +72,24 @@ def process_change(self, change):

if change.deleted and change.id:
doc = change.get_document()
domain = doc.get('domain') if doc else None
if not domain:
meta = getattr(change, 'metadata', None)
domain = meta.domain if meta else None
if doc and doc.get('doc_type'):
logger.info(
f'[process_change] Attempting to delete doc {change.id}')
current_meta = get_doc_meta_object_from_document(doc)
if current_meta.is_deletion:
self._delete_doc_if_exists(change.id)
self._delete_doc_if_exists(change.id, domain=domain)
logger.info(
f"[process_change] Deleted doc {change.id}")
else:
logger.info(
f"[process_change] Not deleting doc {change.id} "
"because current_meta.is_deletion is false")
else:
self._delete_doc_if_exists(change.id)
self._delete_doc_if_exists(change.id, domain=domain)
logger.info(
f"[process_change] Deleted doc {change.id}")
return
Expand All @@ -97,7 +105,7 @@ def process_change(self, change):
return

if doc.get('doc_type') is not None and doc['doc_type'].endswith("-Deleted"):
self._delete_doc_if_exists(change.id)
self._delete_doc_if_exists(change.id, domain=doc.get('domain'))
return

# send it across
Expand All @@ -109,7 +117,18 @@ def process_change(self, change):
data=doc,
)

def _delete_doc_if_exists(self, doc_id):
def _delete_doc_if_exists(self, doc_id, domain=None):
if self.adapter.canonical_name == HQ_CASE_SEARCH_INDEX_CANONICAL_NAME:
if domain:
sub_index_adapter = multiplex_to_adapter(domain)
if sub_index_adapter:
send_to_elasticsearch(
doc_id=doc_id, adapter=sub_index_adapter,
name='ElasticProcessor', delete=True
)
else:
notify_error(f"Domain not specified when deleting case {doc_id} from case search index")

send_to_elasticsearch(
doc_id=doc_id,
adapter=self.adapter,
Expand Down
11 changes: 11 additions & 0 deletions settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -1076,6 +1076,17 @@ def _pkce_required(client_id):
# used to override low-level index settings (number_of_replicas, number_of_shards, etc)
ES_SETTINGS = None

"""
The CASE_SEARCH_SUB_INDICES should look like this:
{
'co-carecoordination-perf': {
'index_cname': 'case_search_bha',
'multiplex_writes': True,
}
}
"""
CASE_SEARCH_SUB_INDICES = {}

PHI_API_KEY = None
PHI_PASSWORD = None

Expand Down

0 comments on commit 63a9782

Please sign in to comment.