Skip to content
This repository has been archived by the owner on Apr 22, 2024. It is now read-only.

Commit

Permalink
fix [EXM]: keep extractor errors (#897)
Browse files Browse the repository at this point in the history
* fix: we were accidentally discarding the errors section and setting is_extracted -> true when all entities failed to be created. (#896)

* feat: expose version+revision in producer and exm (#898)

* feat: expose version+revision in producer and exm

* fix: missing quote

Co-authored-by: Obdulia Losantos <obdulia.losantos@ehealthnigeria.org>
  • Loading branch information
shawnsarwar and obdulia-losantos committed Oct 22, 2020
1 parent c3381e3 commit fe9d28d
Show file tree
Hide file tree
Showing 6 changed files with 78 additions and 5 deletions.
7 changes: 6 additions & 1 deletion aether-entity-extraction-module/aether/extractor/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ def start(self):
if not self.stopped:
raise RuntimeError('Manager already started!')

_logger.info('--------------------------------------------------------')
_logger.info(f'Version: {settings.VERSION}')
_logger.info(f'Revision: {settings.REVISION}')
_logger.info('--------------------------------------------------------')

_logger.info('starting')

signal.signal(signal.SIGINT, self.stop)
Expand Down Expand Up @@ -241,7 +246,7 @@ def entity_extraction(task, submission_queue, redis=None):
schemas[sd['name']] = schema_definition

# perform entity extraction
_, extracted_entities = extract_create_entities(
payload, extracted_entities = extract_create_entities(
submission_payload=payload,
mapping_definition=mapping['definition'],
schemas=schemas,
Expand Down
16 changes: 16 additions & 0 deletions aether-entity-extraction-module/aether/extractor/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,19 @@ def get_logger(name):
logger.addHandler(handler)
logger.setLevel(logging.getLevelName(LOG_LEVEL))
return logger


def _get_file_content(path, on_error): # pragma: no cover
try:
with open(path) as fp:
value = fp.read().strip()
except Exception:
value = on_error
return value


# Version and revision
# ------------------------------------------------------------------------------

VERSION = _get_file_content('/var/tmp/VERSION', '#.#.#')
REVISION = _get_file_content('/var/tmp/REVISION', '---')
6 changes: 4 additions & 2 deletions aether-entity-extraction-module/aether/extractor/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,10 @@ def _get_from_kernel():
finally:
try:
redis_instance.add(task=value, type=model_type, tenant=tenant)
except Exception:
pass # problems with redis or `value` is None
except Exception as uer:
# problems with redis or `value` is None
if value:
_logger.warning(f'Could not update {tenant}:{model_type}:{id} in Redis: {uer}')
return value


Expand Down
27 changes: 25 additions & 2 deletions aether-producer/aether/producer/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,14 @@
from gevent.pywsgi import WSGIServer

from aether.producer.db import init as init_offset_db
from aether.producer.settings import KAFKA_SETTINGS, SETTINGS, LOG_LEVEL, get_logger
from aether.producer.settings import (
KAFKA_SETTINGS,
LOG_LEVEL,
REVISION,
SETTINGS,
VERSION,
get_logger,
)
from aether.producer.topic import KafkaStatus, TopicStatus, RealmManager


Expand Down Expand Up @@ -218,9 +225,13 @@ def get_thread_idle(self):
# Flask Functions

def add_endpoints(self):
# URLS configured here
# public
self.register('health', self.request_health)
self.register('healthcheck', self.request_healthcheck)
self.register('kernelcheck', self.request_kernelcheck)
self.register('check-app', self.request_check_app)
self.register('check-app/aether-kernel', self.request_kernelcheck)
# protected
self.register('status', self.request_status)
self.register('topics', self.request_topics)
self.register('pause', self.request_pause)
Expand Down Expand Up @@ -267,6 +278,10 @@ def decorated(self, *args, **kwargs):

# Exposed Request Endpoints

def request_health(self):
with self.app.app_context():
return Response({'healthy': True})

def request_healthcheck(self):
with self.app.app_context():
try:
Expand All @@ -287,6 +302,14 @@ def request_kernelcheck(self):
status=200 if healthy else 424 # Failed dependency
)

def request_check_app(self):
with self.app.app_context():
return Response({
'app_name': 'aether-producer',
'app_version': VERSION,
'app_revision': REVISION,
})

@requires_auth
def request_status(self):
status = {
Expand Down
16 changes: 16 additions & 0 deletions aether-producer/aether/producer/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,3 +109,19 @@ def _get_kafka_settings():
SETTINGS = Settings(_file_path)
KAFKA_SETTINGS = _get_kafka_settings()
LOG_LEVEL = logging.getLevelName(SETTINGS.get('log_level', 'INFO'))


##################################################
# Version and revision

def _get_file_content(path, on_error): # pragma: no cover
try:
with open(path) as fp:
value = fp.read().strip()
except Exception:
value = on_error
return value


VERSION = _get_file_content('/var/tmp/VERSION', '#.#.#')
REVISION = _get_file_content('/var/tmp/REVISION', '---')
11 changes: 11 additions & 0 deletions aether-producer/tests/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,20 @@ def test_manager_http_endpoint_service():
sleep(1)

url = 'http://localhost:%s' % SETTINGS.get('server_port', 9005)

r = requests.head(f'{url}/health')
assert(r.status_code == 200), r.text

r = requests.head(f'{url}/healthcheck')
assert(r.status_code == 200), r.text

r = requests.head(f'{url}/check-app')
assert(r.status_code == 200), r.text

r = requests.head(f'{url}/kernelcheck')
assert(r.status_code == 424), r.text
r = requests.head(f'{url}/check-app/aether-kernel')
assert(r.status_code == 424), r.text

protected_endpoints = ['status', 'topics']
for e in protected_endpoints:
Expand All @@ -58,6 +67,8 @@ def test_manager_http_endpoint_service():
man.realm_managers[_realm] = {}
man.thread_checkin(_realm)
sleep(2)
r = requests.get(f'{url}/health')
assert(r.status_code == 200)
r = requests.get(f'{url}/healthcheck')
assert(r.status_code == 500)
assert(_realm in r.json().keys())
Expand Down

0 comments on commit fe9d28d

Please sign in to comment.