Skip to content

Commit

Permalink
BIG refactor cleaning up indexer
Browse files Browse the repository at this point in the history
  • Loading branch information
caseylitton committed Oct 28, 2020
1 parent 2d6cbfd commit e924ccd
Show file tree
Hide file tree
Showing 6 changed files with 199 additions and 85 deletions.
33 changes: 32 additions & 1 deletion src/snovault/elasticsearch/esstorage.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@
)


SEARCH_MAX = (2 ** 31) - 1
SEARCH_MAX = 99999 # OutOfMemoryError if too high
MAX_CLAUSES_FOR_ES = 8192


def includeme(config):
Expand All @@ -21,6 +22,36 @@ def includeme(config):
registry[STORAGE] = PickStorage(ElasticSearchStorage(es, es_index), wrapped_storage)


def get_related_uuids(es, updated, renamed, search_max=SEARCH_MAX):
'''Returns releated set from elasticserach'''
es.indices.refresh(RESOURCES_INDEX)
query = {
'query': {
'bool': {
'should': [
{
'terms': {
'embedded_uuids': updated,
'_cache': False,
},
},
{
'terms': {
'linked_uuids': renamed,
'_cache': False,
},
},
],
},
},
'_source': False,
}
res = es.search(index=RESOURCES_INDEX, size=search_max, request_timeout=60, body=query)
total_hits = res['hits']['total']
related_set = {hit['_id'] for hit in res['hits']['hits']}
return related_set, total_hits


def force_database_for_request():
request = get_current_request()
if request:
Expand Down
112 changes: 35 additions & 77 deletions src/snovault/elasticsearch/indexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@
DBSESSION,
STORAGE
)
from snovault.storage import (
TransactionRecord,
)
from snovault.storage import get_transactions
from snovault.elasticsearch.local_indexer_store import IndexerStore
from snovault.elasticsearch.esstorage import (
MAX_CLAUSES_FOR_ES,
SEARCH_MAX,
get_related_uuids,
)

from urllib3.exceptions import ReadTimeoutError
from .interfaces import (
Expand All @@ -33,7 +36,6 @@
IndexerState,
all_uuids,
all_types,
SEARCH_MAX,
setup_indexing_nodes,
HEAD_NODE_INDEX,
INDEXING_NODE_INDEX,
Expand All @@ -52,7 +54,6 @@
es_logger = logging.getLogger("elasticsearch")
es_logger.setLevel(logging.ERROR)
log = logging.getLogger('snovault.elasticsearch.es_index_listener')
MAX_CLAUSES_FOR_ES = 8192
DEFAULT_QUEUE = 'Simple'
# Allow three minutes for indexer to stop before head indexer process continues
_REMOTE_INDEXING_SHUTDOWN_SLEEP = 3*60
Expand Down Expand Up @@ -94,47 +95,13 @@ def includeme(config):
if not processes:
registry[INDEXER] = Indexer(registry)
if not registry.get(INDEXER_STORE):
registry[INDEXER_STORE] = IndexerStore(config.registry.settings)
registry[INDEXER_STORE] = IndexerStore(
registry[ELASTIC_SEARCH],
registry[COLLECTIONS],
config.registry.settings,
)



def get_related_uuids(request, es, updated, renamed, indexer_store=None):
'''Returns releated set from elasticserach'''
es.indices.refresh(RESOURCES_INDEX)
query = {
'query': {
'bool': {
'should': [
{
'terms': {
'embedded_uuids': updated,
'_cache': False,
},
},
{
'terms': {
'linked_uuids': renamed,
'_cache': False,
},
},
],
},
},
'_source': False,
}
res = es.search(index=RESOURCES_INDEX, size=SEARCH_MAX, request_timeout=60, body=query)
if res['hits']['total'] > SEARCH_MAX:
return (list(all_uuids(request.registry)), True) # guaranteed unique

related_set = {hit['_id'] for hit in res['hits']['hits']}
if indexer_store:
indexer_store.set_state(
indexer_store.state_load_indexing,
related_set=f"[{','.join(updated)}]"[0:100],
)
return (related_set, False)


def _determine_indexing_protocol(request, uuid_count):
remote_indexing = asbool(request.registry.settings.get('remote_indexing', False))
if not remote_indexing:
Expand Down Expand Up @@ -348,17 +315,18 @@ def _load_indexing(request, session, connection, indexer_state):
snapshot_id = None
last_xmin = None
(xmin, priority_invalidated, restart) = indexer_state.priority_cycle(request)
# Initialize load state
indexer_store.set_state(
indexer_store.state_load_indexing,
priority_invalidated=priority_invalidated
)
indexer_state.log_reindex_init_state()
# OPTIONAL: restart support
if restart: # Currently not bothering with restart!!!
xmin = -1
priority_invalidated = []
# OPTIONAL: restart support

# Initialize load state
indexer_store.set_state(
indexer_store.state_load_indexing,
priority_invalidated=priority_invalidated
)

result = indexer_state.get_initial_state() # get after checking priority!

Expand Down Expand Up @@ -406,22 +374,8 @@ def _load_indexing(request, session, connection, indexer_state):
invalidated = list(all_uuids(request.registry, types))
flush = True
else:
txns = session.query(TransactionRecord).filter(
TransactionRecord.xid >= last_xmin,
)

invalidated = set(priority_invalidated) # not empty if API index request occurred
updated = set()
renamed = set()
for txn in txns.all():
txn_count += 1
max_xid = max(max_xid, txn.xid)
if first_txn is None:
first_txn = txn.timestamp
else:
first_txn = min(first_txn, txn.timestamp)
renamed.update(txn.data.get('renamed', ()))
updated.update(txn.data.get('updated', ()))
updated, renamed, first_txn, txn_count, max_xid = get_transactions(session, last_xmin)
raw_updated = copy.copy(renamed)
raw_renamed = copy.copy(updated)
if invalidated: # reindex requested, treat like updated
Expand All @@ -447,25 +401,27 @@ def _load_indexing(request, session, connection, indexer_state):

is_testing = asbool(request.registry.settings.get('testing', False))
is_testing_full = request.json.get('is_testing_full', False)
full_reindex = False
related_set = set()
if is_testing and is_testing_full:
full_reindex = False
related_set = set(all_uuids(request.registry))
else:
updated_count = len(updated)
renamed_count = len(renamed)
(related_set, full_reindex) = get_related_uuids(request, request.registry[ELASTIC_SEARCH], updated, renamed, indexer_store=indexer_store)
if (updated_count + renamed_count) > MAX_CLAUSES_FOR_ES:
related_set = list(all_uuids(request.registry))
full_reindex = True
elif (updated_count + renamed_count) == 0:
related_set = set()
full_reindex = True
elif res['hits']['total'] > SEARCH_MAX:
return (list(all_uuids(request.registry)), True) # guaranteed unique

raw_related_set = copy.copy(related_set)
related_set_total = copy.copy(related_set)

elif updated_count + renamed_count > 0:
related_set, related_set_total = get_related_uuids(
request.registry[ELASTIC_SEARCH],
updated,
renamed,
search_max=SEARCH_MAX,
)
raw_related_set = copy.copy(related_set)
if related_set_total > SEARCH_MAX:
related_set = list(all_uuids(request.registry))
full_reindex = True
if full_reindex:
invalidated = related_set
flush = True
Expand Down Expand Up @@ -499,6 +455,7 @@ def _load_indexing(request, session, connection, indexer_state):
updated=raw_updated,
renamed=raw_renamed,
related_set=raw_related_set,
related_set_total=related_set_total,
)
return result, invalidated, flush, first_txn, snapshot_id, restart, xmin, return_now

Expand All @@ -523,11 +480,12 @@ def _run_indexing(
# Do the work...
local_state, event_tag = indexer_store.set_state(
indexer_store.state_run_indexing,
# 'invalidated' indicates the start of indexing
invalidated_cnt=len(invalidated)
invalidated,

)
indexing_update_infos, errors, err_msg = request.registry[INDEXER].serve_objects(
request,
indexer_store,
invalidated,
xmin,
snapshot_id=snapshot_id,
Expand Down
3 changes: 2 additions & 1 deletion src/snovault/elasticsearch/indexer_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,11 @@
import boto3
import socket

from snovault.elasticsearch.esstorage import SEARCH_MAX


AWS_REGION = 'us-west-2'
_HOSTNAME = socket.gethostname()
SEARCH_MAX = 99999 # OutOfMemoryError if too high
HEAD_NODE_INDEX = 'head_node'
INDEXING_NODE_INDEX = 'indexing_node'

Expand Down
2 changes: 2 additions & 0 deletions src/snovault/elasticsearch/interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
INDEXER_STATE_TAG = f"{INDEXER}_state:hash"
INDEXER_EVENTS_TAG = f"{INDEXER}_event"
INDEXER_EVENTS_LIST = f"{INDEXER_EVENTS_TAG}:list"
INDEXER_REINDEX_TAG = f"{INDEXER}_reindex:hash"
INDEXER_REINDEX_UUIDS_LIST = f"{INDEXER}_reindex_uuids:list"
RESOURCES_INDEX = 'snovault-resources'


Expand Down
Loading

0 comments on commit e924ccd

Please sign in to comment.