Skip to content

Commit

Permalink
ES Upgrade
Browse files Browse the repository at this point in the history
  • Loading branch information
Bek committed Oct 26, 2020
1 parent d4e9c77 commit f1e5cae
Show file tree
Hide file tree
Showing 19 changed files with 99 additions and 164 deletions.
4 changes: 2 additions & 2 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ executors:
- JAVA_HOME: /usr/lib/jvm/java-11-openjdk-amd64
- ES_JAVA_OPTS: -Xms2g -Xmx2g
- ES_BIN: /usr/share/elasticsearch/bin
- ES_MAJOR_VERSION: 5
- ES_MAJOR_VERSION: 7
- PG_VERSION: 10
- NODE_VERSION: 10

Expand Down Expand Up @@ -48,7 +48,7 @@ commands:
postgresql-${PG_VERSION} \
ruby2.3 \
ruby2.3-dev
sudo chown -R circleci /etc/elasticsearch
sudo chown -R circleci /var/log/elasticsearch/ /var/lib/elasticsearch /etc/default/elasticsearch /etc/elasticsearch
sed -i "1s;^;export PATH=${ES_BIN}:${PG_BIN}:$PATH\n;" $BASH_ENV
sudo apt-get install -y python3.7-dev python3-pip
sudo update-alternatives --install /usr/bin/python3 python3 /usr/bin/python3.7 0
Expand Down
8 changes: 0 additions & 8 deletions base.ini
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,6 @@ set queue_worker_processes = 16
set queue_worker_chunk_size = 1024
set queue_worker_batch_size = 2000000

[composite:regionindexer]
use = egg:snovault#indexer
app = app
path = /index_file
timeout = 60
set embed_cache.capacity = 5000
set regionindexer = true

[filter:memlimit]
use = egg:snovault#memlimit
rss_limit = 500MB
2 changes: 1 addition & 1 deletion buildout.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ pytest-bdd = git https://github.com/lrowe/pytest-bdd.git branch=allow-any-step-o

[versions]
# Hand set versions
elasticsearch = 5.4.0
elasticsearch = 7.9.1

[snovault]
recipe = zc.recipe.egg
Expand Down
3 changes: 0 additions & 3 deletions development.ini
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,6 @@ use = egg:rutter#urlmap
[composite:indexer]
use = config:base.ini#indexer

[composite:regionindexer]
use = config:base.ini#regionindexer

###
# wsgi server configuration
###
Expand Down
3 changes: 0 additions & 3 deletions production.ini.in
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,6 @@ indexer.processes = ${indexer_processes}
[composite:indexer]
use = config:base.ini#indexer

[composite:regionindexer]
use = config:base.ini#regionindexer

[pipeline:main]
pipeline =
config:base.ini#memlimit
Expand Down
2 changes: 1 addition & 1 deletion scripts/embeds.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ def embeds_uuid(es, uuid, item_type):
return {
'uuid': uuid,
'item_type': item_type,
'embeds': res['hits']['total'],
'embeds': res['hits']['total']['value'],
'buckets': res['aggregations']['item_type']['buckets'],
}

Expand Down
4 changes: 2 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
'botocore',
'jmespath',
'boto3',
'elasticsearch>=5.2',
'elasticsearch-dsl==5.4.0',
'elasticsearch==7.9.1',
'elasticsearch-dsl==7.2.1',
'lucenequery',
'future',
'humanfriendly',
Expand Down
90 changes: 40 additions & 50 deletions src/snovault/elasticsearch/create_mapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,6 @@

# An index to store non-content metadata
META_MAPPING = {
'_all': {
'enabled': False,
'analyzer': 'snovault_search_analyzer'
},
'dynamic_templates': [
{
'store_generic': {
Expand All @@ -52,9 +48,6 @@
PATH_FIELDS = ['submitted_file_name']
NON_SUBSTRING_FIELDS = ['uuid', '@id', 'submitted_by', 'md5sum',
'references', 'submitted_file_name']
KEYWORD_FIELDS = ['schema_version', 'uuid', 'accession', 'alternate_accessions',
'aliases', 'status', 'date_created', 'submitted_by',
'internal_status', 'target', 'biosample_type']
TEXT_FIELDS = ['pipeline_error_detail', 'description', 'notes']


Expand All @@ -66,7 +59,7 @@ def sorted_dict(d):
return json.loads(json.dumps(d), object_pairs_hook=sorted_pairs_hook)


def schema_mapping(name, schema):
def schema_mapping(name, schema, depth=0, parent=None):
# If a mapping is explicitly defined, use it
if 'mapping' in schema:
return schema['mapping']
Expand All @@ -80,19 +73,27 @@ def schema_mapping(name, schema):

# Elasticsearch handles multiple values for a field
if type_ == 'array':
return schema_mapping(name, schema['items'])
return schema_mapping(name,
schema['items'],
depth=depth+1,
parent=type_)

if type_ == 'object':
properties = {}
for k, v in schema.get('properties', {}).items():
mapping = schema_mapping(k, v)
mapping = schema_mapping(k, v, depth=depth+1)
if mapping is not None:
properties[k] = mapping
return {
'type': 'object',
'include_in_all': False,
'properties': properties,
}
if properties:
return {
'type': 'object',
'properties': properties,
}
else:
return {
'type': 'object',
'enabled': False
}

if type_ == ["number", "string"]:
return {
Expand Down Expand Up @@ -122,9 +123,7 @@ def schema_mapping(name, schema):

if type_ == 'string':

if name in KEYWORD_FIELDS:
field_type = 'keyword'
elif name in TEXT_FIELDS:
if name in TEXT_FIELDS:
field_type = 'text'
else:
field_type = 'keyword'
Expand All @@ -133,10 +132,12 @@ def schema_mapping(name, schema):
'type': field_type
}

# these fields are unintentially partially matching some small search
# keywords because fields are analyzed by nGram analyzer
if name in NON_SUBSTRING_FIELDS:
sub_mapping['include_in_all'] = False
if name not in NON_SUBSTRING_FIELDS:
if depth == 1 or (depth == 2 and parent == 'array'):
sub_mapping.update({
'copy_to': '_all'
})

return sub_mapping

if type_ == 'number':
Expand Down Expand Up @@ -167,12 +168,13 @@ def index_settings():
'settings': {
'index.max_result_window': 99999,
'index.mapping.total_fields.limit': 5000,
'index.number_of_shards': 5,
'index.number_of_replicas': 2,
'index.number_of_shards': 1,
'index.number_of_replicas': 0,
'index.max_ngram_diff': 32,
'analysis': {
'filter': {
'substring': {
'type': 'nGram',
'type': 'ngram',
'min_gram': 1,
'max_gram': 33
},
Expand Down Expand Up @@ -273,10 +275,6 @@ def audit_mapping():

def es_mapping(mapping):
return {
'_all': {
'enabled': True,
'analyzer': 'snovault_search_analyzer'
},
'dynamic_templates': [
{
'template_principals_allowed': {
Expand Down Expand Up @@ -325,16 +323,19 @@ def es_mapping(mapping):
}
},
},
}
},
],
'properties': {
'_all': {
'type': 'text',
'store': False,
'analyzer': 'snovault_search_analyzer'
},
'uuid': {
'type': 'keyword',
'include_in_all': False,
},
'tid': {
'type': 'keyword',
'include_in_all': False,
},
'item_type': {
'type': 'keyword',
Expand All @@ -343,33 +344,26 @@ def es_mapping(mapping):
'object': {
'type': 'object',
'enabled': False,
'include_in_all': False,
},
'properties': {
'type': 'object',
'enabled': False,
'include_in_all': False,
},
'propsheets': {
'type': 'object',
'enabled': False,
'include_in_all': False,
},
'embedded_uuids': {
'type': 'keyword',
'include_in_all': False,
},
'linked_uuids': {
'type': 'keyword',
'include_in_all': False,
},
'paths': {
'type': 'keyword',
'include_in_all': False,
},
'audit': {
'type': 'object',
'include_in_all': False,
'properties': {
'ERROR': {
'type': 'object',
Expand Down Expand Up @@ -447,7 +441,7 @@ def type_mapping(types, item_type, embed=True):
# Check if mapping for property is already an object
# multiple subobjects may be embedded, so be carful here
if m['properties'][p]['type'] in ['keyword', 'text']:
m['properties'][p] = schema_mapping(p, s)
m['properties'][p] = schema_mapping(p, s, depth=1)

m = m['properties'][p]

Expand All @@ -465,19 +459,16 @@ def type_mapping(types, item_type, embed=True):
for prop in props:
new_mapping = new_mapping[prop]['properties']
new_mapping[last]['boost'] = boost
if last in NON_SUBSTRING_FIELDS:
new_mapping[last]['include_in_all'] = False
else:
new_mapping[last]['include_in_all'] = True
new_mapping[last]['copy_to'] = '_all'
return mapping


def create_elasticsearch_index(es, index, body):
es.indices.create(index=index, body=body, wait_for_active_shards=1, ignore=[400, 404], master_timeout='5m', request_timeout=300)


def set_index_mapping(es, index, doc_type, mapping):
es.indices.put_mapping(index=index, doc_type=doc_type, body=mapping, ignore=[400], request_timeout=300)
def set_index_mapping(es, index, mapping):
es.indices.put_mapping(index=index, body=mapping, ignore=[400], request_timeout=300)


def create_snovault_index_alias(es, indices):
Expand All @@ -497,20 +488,19 @@ def run(app, collections=None, dry_run=False):
indices = []
for collection_name in collections:
if collection_name == 'meta':
doc_type = 'meta'
mapping = META_MAPPING
else:
index = doc_type = collection_name
index = collection_name
collection = registry[COLLECTIONS].by_item_type[collection_name]
mapping = es_mapping(type_mapping(registry[TYPES], collection.type_info.item_type))

if mapping is None:
continue # Testing collections
if dry_run:
print(json.dumps(sorted_dict({index: {doc_type: mapping}}), indent=4))
print(json.dumps(sorted_dict({index: {collection_name: mapping}}), indent=4))
continue
create_elasticsearch_index(es, index, index_settings())
set_index_mapping(es, index, doc_type, {doc_type: mapping})
set_index_mapping(es, index, mapping)
if collection_name != 'meta':
indices.append(index)

Expand Down
2 changes: 1 addition & 1 deletion src/snovault/elasticsearch/esstorage.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ def get_by_uuid(self, uuid):
'version': True
}
result = self.es.search(index=self.index, body=query, _source=True, size=1)
if result['hits']['total'] == 0:
if result['hits']['total']['value'] == 0:
return None
hit = result['hits']['hits'][0]
return CachedModel(hit)
Expand Down
15 changes: 5 additions & 10 deletions src/snovault/elasticsearch/indexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,13 +135,11 @@ def get_related_uuids(request, es, updated, renamed):
{
'terms': {
'embedded_uuids': updated,
'_cache': False,
},
},
{
'terms': {
'linked_uuids': renamed,
'_cache': False,
},
},
],
Expand All @@ -151,7 +149,7 @@ def get_related_uuids(request, es, updated, renamed):
}
res = es.search(index=RESOURCES_INDEX, size=SEARCH_MAX, request_timeout=60, body=query)

if res['hits']['total'] > SEARCH_MAX:
if res['hits']['total']['value'] > SEARCH_MAX:
return (list(all_uuids(request.registry)), True) # guaranteed unique

related_set = {hit['_id'] for hit in res['hits']['hits']}
Expand Down Expand Up @@ -387,7 +385,6 @@ def _load_indexing(request, session, connection, indexer_state):
else:
status = request.registry[ELASTIC_SEARCH].get(
index=request.registry.settings['snovault.elasticsearch.index'],
doc_type='meta',
id='indexing',
ignore=[400, 404]
)
Expand Down Expand Up @@ -512,7 +509,6 @@ def _run_indexing(
try:
request.registry[ELASTIC_SEARCH].index(
index=request.registry.settings['snovault.elasticsearch.index'],
doc_type='meta',
body=result,
id='indexing'
)
Expand All @@ -521,7 +517,6 @@ def _run_indexing(
del result['errors']
request.registry[ELASTIC_SEARCH].index(
index=request.registry.settings['snovault.elasticsearch.index'],
doc_type='meta',
body=result,
id='indexing'
)
Expand All @@ -535,7 +530,7 @@ def _run_indexing(
request.registry[ELASTIC_SEARCH].indices.refresh(RESOURCES_INDEX)
if flush:
try:
request.registry[ELASTIC_SEARCH].indices.flush_synced(index=RESOURCES_INDEX) # Faster recovery on ES restart
request.registry[ELASTIC_SEARCH].indices.flush(index=RESOURCES_INDEX) # Faster recovery on ES restart
except ConflictError:
pass
return result, indexing_update_infos
Expand Down Expand Up @@ -983,9 +978,9 @@ def update_object(encoded_es, request, uuid, xmin, restart=False):
}
try:
encoded_es.index(
index=doc['item_type'], doc_type=doc['item_type'], body=doc,
id=str(uuid), version=xmin, version_type='external_gte',
request_timeout=30,
index=doc['item_type'], body=doc,
id=str(uuid), version=xmin,
version_type='external_gte', request_timeout=30
)
except StatementError:
# Can't reconnect until invalid transaction is rolled back
Expand Down

0 comments on commit f1e5cae

Please sign in to comment.