diff --git a/aether-entity-extraction-module/conf/pip/requirements.txt b/aether-entity-extraction-module/conf/pip/requirements.txt index 87364ead4..d6a73836f 100644 --- a/aether-entity-extraction-module/conf/pip/requirements.txt +++ b/aether-entity-extraction-module/conf/pip/requirements.txt @@ -22,13 +22,13 @@ decorator==5.1.0 eha-jsonpath==0.6.0 fakeredis==1.6.1 flake8==4.0.1 -flake8-quotes==3.3.0 +flake8-quotes==3.3.1 gevent==21.8.0 greenlet==1.1.2 idna==3.3 iniconfig==1.1.1 jsonpath-ng==1.5.3 -jsonschema==4.1.0 +jsonschema==4.1.2 mccabe==0.6.1 packaging==21.0 pluggy==1.0.0 diff --git a/aether-kernel/conf/pip/requirements.txt b/aether-kernel/conf/pip/requirements.txt index dd652632e..364493e4c 100644 --- a/aether-kernel/conf/pip/requirements.txt +++ b/aether-kernel/conf/pip/requirements.txt @@ -18,8 +18,8 @@ asgiref==3.4.1 attrs==21.2.0 autopep8==1.5.7 avro-python3==1.10.2 -boto3==1.18.60 -botocore==1.21.60 +boto3==1.18.65 +botocore==1.21.65 cachetools==4.2.4 certifi==2021.10.8 charset-normalizer==2.0.7 @@ -36,12 +36,12 @@ django-debug-toolbar==3.2.2 django-dynamic-fixture==3.1.2 django-filter==21.1 django-minio-storage==0.3.10 -django-model-utils==4.1.1 +django-model-utils==4.2.0 django-postgrespool2==2.0.1 django-prometheus==2.1.0 django-redis==5.0.0 django-silk==4.1.0 -django-storages==1.12.1 +django-storages==1.12.2 django-uwsgi==0.2.2 djangorestframework==3.12.4 drf-dynamic-fields==0.3.1 @@ -49,9 +49,9 @@ drf-yasg==1.20.0 eha-jsonpath==0.6.0 et-xmlfile==1.1.0 flake8==4.0.1 -flake8-quotes==3.3.0 +flake8-quotes==3.3.1 funcy==1.16 -google-api-core==2.1.0 +google-api-core==2.1.1 google-auth==2.3.0 google-cloud-core==2.1.0 google-cloud-storage==1.42.3 @@ -66,7 +66,7 @@ itypes==1.2.0 Jinja2==3.0.2 jmespath==0.10.0 jsonpath-ng==1.5.3 -jsonschema==4.1.0 +jsonschema==4.1.2 lxml==4.6.3 Markdown==3.3.4 MarkupSafe==2.0.1 @@ -97,10 +97,10 @@ s3transfer==0.5.0 sentry-sdk==1.4.3 six==1.16.0 spavro==1.1.24 -SQLAlchemy==1.4.25 +SQLAlchemy==1.4.26 sqlparse==0.4.2 tblib==1.7.0 toml==0.10.2 -uritemplate==4.1.0 +uritemplate==4.1.1 urllib3==1.26.7 uWSGI==2.0.20 diff --git a/aether-odk-module/conf/pip/requirements.txt b/aether-odk-module/conf/pip/requirements.txt index 07ff43a5e..71eb32cec 100644 --- a/aether-odk-module/conf/pip/requirements.txt +++ b/aether-odk-module/conf/pip/requirements.txt @@ -15,8 +15,8 @@ aether.sdk==1.4.0 asgiref==3.4.1 autopep8==1.5.7 -boto3==1.18.60 -botocore==1.21.60 +boto3==1.18.65 +botocore==1.21.65 cachetools==4.2.4 certifi==2021.10.8 charset-normalizer==2.0.7 @@ -32,15 +32,15 @@ django-postgrespool2==2.0.1 django-prometheus==2.1.0 django-redis==5.0.0 django-silk==4.1.0 -django-storages==1.12.1 +django-storages==1.12.2 django-uwsgi==0.2.2 djangorestframework==3.12.4 drf-dynamic-fields==0.3.1 flake8==4.0.1 -flake8-quotes==3.3.0 +flake8-quotes==3.3.1 FormEncode==1.3.1 funcy==1.16 -google-api-core==2.1.0 +google-api-core==2.1.1 google-auth==2.3.0 google-cloud-core==2.1.0 google-cloud-storage==1.42.3 @@ -77,7 +77,7 @@ s3transfer==0.5.0 sentry-sdk==1.4.3 six==1.16.0 spavro==1.1.24 -SQLAlchemy==1.4.25 +SQLAlchemy==1.4.26 sqlparse==0.4.2 tblib==1.7.0 toml==0.10.2 diff --git a/aether-producer/aether/producer/__init__.py b/aether-producer/aether/producer/__init__.py index c95319ab6..3e641dcd2 100644 --- a/aether-producer/aether/producer/__init__.py +++ b/aether-producer/aether/producer/__init__.py @@ -23,9 +23,7 @@ import socket from confluent_kafka.admin import AdminClient -from flask import ( - Flask, jsonify, request, Response -) +from flask import Flask, request, Response import gevent from gevent.pool import Pool @@ -57,6 +55,18 @@ class ProducerManager(object): # Spawns a RealmManager for each schema type in Kernel # RealmManager registers own eventloop greenlet (update_kafka) with ProducerManager + killed = False + + admin_name = None + admin_password = None + + kernel_client = None + kafka_admin_client = None + + kafka_status = KafkaStatus.SUBMISSION_PENDING + realm_managers = {} + thread_idle = {} + def __init__(self): # Start Signal Handlers self.killed = False @@ -82,6 +92,12 @@ def __init__(self): self.thread_idle = {} self.run() + self.logger.info('Started producer service') + self.logger.info(f'== Version : {VERSION}') + self.logger.info(f'== Revision : {REVISION}') + self.logger.info(f'== Client mode : {self.kernel_client.mode()}') + self.logger.info(f'== Kafka status : {self.kafka_status}') + def keep_alive_loop(self): # Keeps the server up in case all other threads join at the same time. while not self.killed: @@ -113,7 +129,7 @@ def safe_sleep(self, dur): dur = (dur - res) / 5 unit = 5 gevent.sleep(res) - for x in range(int(dur)): + for _x in range(int(dur)): if not self.killed: gevent.sleep(unit) @@ -127,8 +143,8 @@ def kafka_available(self): with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.connect((kafka_ip, kafka_port)) except (InterruptedError, ConnectionRefusedError, socket.gaierror) as rce: - self.logger.debug(f'Could not connect to Kafka on url: {kafka_ip}:{kafka_port}') - self.logger.debug(f'Connection problem: {rce}') + self.logger.critical(f'Could not connect to Kafka on url: {kafka_ip}:{kafka_port}') + self.logger.critical(f'Connection problem: {rce}') return False return True @@ -197,7 +213,7 @@ def check_realms(self): else: gevent.sleep(30) except Exception as err: - self.logger.warning(f'No Kernel connection: {err}') + self.logger.critical(f'No Kernel connection: {err}') gevent.sleep(1) continue self.logger.debug('No longer checking for new Realms') @@ -229,8 +245,10 @@ def add_endpoints(self): self.register('health', self.request_health) self.register('healthcheck', self.request_healthcheck) self.register('kernelcheck', self.request_kernelcheck) + self.register('kafkacheck', self.request_kafkacheck) self.register('check-app', self.request_check_app) self.register('check-app/aether-kernel', self.request_kernelcheck) + self.register('check-app/kafka', self.request_kafkacheck) # protected self.register('status', self.request_status) self.register('topics', self.request_topics) @@ -264,8 +282,9 @@ def check_auth(self, username, password): return username == self.admin_name and password == self.admin_password def request_authentication(self): - return Response('Bad Credentials', 401, - {'WWW-Authenticate': 'Basic realm="Login Required"'}) + with self.app.app_context(): + return Response('Bad Credentials', 401, + {'WWW-Authenticate': 'Basic realm="Login Required"'}) def requires_auth(f): @wraps(f) @@ -276,20 +295,28 @@ def decorated(self, *args, **kwargs): return f(self, *args, **kwargs) return decorated + def return_json(self, data, status=None): + return Response( + response=json.dumps(data), + status=status, + mimetype='application/json', + content_type='application/json', + ) + # Exposed Request Endpoints def request_health(self): with self.app.app_context(): - return Response({'healthy': True}) + return self.return_json({'healthy': True}) def request_healthcheck(self): with self.app.app_context(): try: expired = self.check_thread_health() if not expired: - return Response({'healthy': True}) + return self.return_json({'healthy': True}) else: - return Response(json.dumps(expired), 500, mimetype='application/json') + return self.return_json(expired, 500) except Exception as err: self.app.logger.error(f'Unexpected HC error: {err}') return Response(f'Unexpected error: {err}', 500) @@ -297,14 +324,22 @@ def request_healthcheck(self): def request_kernelcheck(self): with self.app.app_context(): healthy = self.kernel_client.check_kernel() - return Response( + return self.return_json( {'healthy': healthy}, - status=200 if healthy else 424 # Failed dependency + status=200 if healthy else 424, # Failed dependency + ) + + def request_kafkacheck(self): + with self.app.app_context(): + healthy = self.kafka_available() + return self.return_json( + {'healthy': healthy}, + status=200 if healthy else 424, # Failed dependency ) def request_check_app(self): with self.app.app_context(): - return Response({ + return self.return_json({ 'app_name': 'aether-producer', 'app_version': VERSION, 'app_revision': REVISION, @@ -312,30 +347,27 @@ def request_check_app(self): @requires_auth def request_status(self): - status = { - 'kernel_mode': self.kernel_client.mode(), - 'kernel_last_check': self.kernel_client.last_check, - 'kernel_last_check_error': self.kernel_client.last_check_error, - 'kafka_container_accessible': self.kafka_available(), - 'kafka_broker_information': self.broker_info(), - 'kafka_submission_status': str(self.kafka_status), # This is just a status flag - 'topics': {k: v.get_status() for k, v in self.realm_managers.items()}, - } with self.app.app_context(): - return jsonify(**status) + status = { + 'kernel_mode': self.kernel_client.mode(), + 'kernel_last_check': self.kernel_client.last_check, + 'kernel_last_check_error': self.kernel_client.last_check_error, + 'kafka_container_accessible': self.kafka_available(), + 'kafka_broker_information': self.broker_info(), + 'kafka_submission_status': str(self.kafka_status), # This is just a status flag + 'topics': {k: v.get_status() for k, v in self.realm_managers.items()}, + } + return self.return_json(status) @requires_auth def request_topics(self): - if not self.realm_managers: - return Response({}) - - status = {} - for topic, manager in self.realm_managers.items(): - status[topic] = {} - for name, sw in manager.schemas.items(): - status[topic][name] = manager.get_topic_size(sw) with self.app.app_context(): - return jsonify(**status) + status = {} + for topic, manager in self.realm_managers.items(): + status[topic] = {} + for name, sw in manager.schemas.items(): + status[topic][name] = manager.get_topic_size(sw) + return self.return_json(status) # Topic Command API @@ -353,34 +385,35 @@ def request_rebuild(self): @requires_auth def handle_topic_command(self, request, status): - topic = request.args.get('topic') - realm = request.args.get('realm') - if not realm: - return Response('A realm must be specified', 422) - if not topic: - return Response('A topic must be specified', 422) - if not self.realm_managers.get(realm): - return Response(f'Bad realm name: {realm}', 422) - - manager = self.realm_managers[realm] - schema_wrapper = manager.schemas.get(topic) - if not schema_wrapper: - return Response(f'realm {realm} has no topic {topic}', 422) - if status is TopicStatus.PAUSED: - fn = manager.pause - if status is TopicStatus.NORMAL: - fn = manager.resume - if status is TopicStatus.REBUILDING: - fn = manager.rebuild + with self.app.app_context(): + topic = request.args.get('topic') + realm = request.args.get('realm') + if not realm: + return Response('A realm must be specified', 422) + if not topic: + return Response('A topic must be specified', 422) + if not self.realm_managers.get(realm): + return Response(f'Bad realm name: {realm}', 422) + + manager = self.realm_managers[realm] + schema_wrapper = manager.schemas.get(topic) + if not schema_wrapper: + return Response(f'realm {realm} has no topic {topic}', 422) + if status is TopicStatus.PAUSED: + fn = manager.pause + if status is TopicStatus.NORMAL: + fn = manager.resume + if status is TopicStatus.REBUILDING: + fn = manager.rebuild - try: - res = fn(schema_wrapper) - if not res: - return Response(f'Operation failed on {topic}', 500) + try: + res = fn(schema_wrapper) + if not res: + return Response(f'Operation failed on {topic}', 500) - return Response(f'Success for status {status} on {topic}', 200) - except Exception as err: - return Response(f'Operation failed on {topic} with: {err}', 500) + return Response(f'Success for status {status} on {topic}', 200) + except Exception as err: + return Response(f'Operation failed on {topic} with: {err}', 500) def main(): diff --git a/aether-producer/aether/producer/kernel_api.py b/aether-producer/aether/producer/kernel_api.py index dc6808338..0fe3eeed5 100644 --- a/aether-producer/aether/producer/kernel_api.py +++ b/aether-producer/aether/producer/kernel_api.py @@ -75,8 +75,10 @@ def check_kernel(self): # check that Kernel connection is possible try: self._fetch(url=_REALMS_URL) + return True except Exception as e: logger.exception(e) + logger.critical(f'Cannot connect to Kernel: {_KERNEL_URL}') return False def get_realms(self): diff --git a/aether-producer/conf/pip/requirements.txt b/aether-producer/conf/pip/requirements.txt index 1a8f4e17a..8616e65dc 100644 --- a/aether-producer/conf/pip/requirements.txt +++ b/aether-producer/conf/pip/requirements.txt @@ -18,7 +18,7 @@ charset-normalizer==2.0.7 click==8.0.3 confluent-kafka==1.7.0 flake8==4.0.1 -flake8-quotes==3.3.0 +flake8-quotes==3.3.1 Flask==2.0.2 gevent==21.8.0 greenlet==1.1.2 @@ -40,7 +40,7 @@ pytest==6.2.5 requests==2.26.0 six==1.16.0 spavro==1.1.24 -SQLAlchemy==1.4.25 +SQLAlchemy==1.4.26 toml==0.10.2 urllib3==1.26.7 Werkzeug==2.0.2 diff --git a/aether-producer/tests/test_integration.py b/aether-producer/tests/test_integration.py index 0b8dbe878..7135b8e8d 100644 --- a/aether-producer/tests/test_integration.py +++ b/aether-producer/tests/test_integration.py @@ -55,6 +55,11 @@ def test_manager_http_endpoint_service(): r = requests.head(f'{url}/check-app/aether-kernel') assert(r.status_code == 424), r.text + r = requests.head(f'{url}/kafkacheck') + assert(r.status_code == 424), r.text + r = requests.head(f'{url}/check-app/kafka') + assert(r.status_code == 424), r.text + protected_endpoints = ['status', 'topics'] for e in protected_endpoints: r = requests.head(f'{url}/{e}') @@ -70,7 +75,7 @@ def test_manager_http_endpoint_service(): r = requests.get(f'{url}/health') assert(r.status_code == 200) r = requests.get(f'{url}/healthcheck') - assert(r.status_code == 500) + assert(r.status_code == 500), r.text assert(_realm in r.json().keys()) finally: diff --git a/aether-ui/conf/pip/requirements.txt b/aether-ui/conf/pip/requirements.txt index 0b424c7f0..fe5c8c9be 100644 --- a/aether-ui/conf/pip/requirements.txt +++ b/aether-ui/conf/pip/requirements.txt @@ -15,8 +15,8 @@ aether.sdk==1.4.0 asgiref==3.4.1 autopep8==1.5.7 -boto3==1.18.60 -botocore==1.21.60 +boto3==1.18.65 +botocore==1.21.65 cachetools==4.2.4 certifi==2021.10.8 charset-normalizer==2.0.7 @@ -28,20 +28,20 @@ django-cleanup==5.2.0 django-cors-headers==3.10.0 django-debug-toolbar==3.2.2 django-minio-storage==0.3.10 -django-model-utils==4.1.1 +django-model-utils==4.2.0 django-postgrespool2==2.0.1 django-prometheus==2.1.0 django-redis==5.0.0 django-silk==4.1.0 -django-storages==1.12.1 +django-storages==1.12.2 django-uwsgi==0.2.2 django-webpack-loader==1.4.1 djangorestframework==3.12.4 drf-dynamic-fields==0.3.1 flake8==4.0.1 -flake8-quotes==3.3.0 +flake8-quotes==3.3.1 funcy==1.16 -google-api-core==2.1.0 +google-api-core==2.1.1 google-auth==2.3.0 google-cloud-core==2.1.0 google-cloud-storage==1.42.3 @@ -74,7 +74,7 @@ rsa==4.7.2 s3transfer==0.5.0 sentry-sdk==1.4.3 six==1.16.0 -SQLAlchemy==1.4.25 +SQLAlchemy==1.4.26 sqlparse==0.4.2 tblib==1.7.0 toml==0.10.2 diff --git a/test-aether-integration-module/test/__init__.py b/test-aether-integration-module/test/__init__.py index cdd0097b9..fe63fbbd3 100644 --- a/test-aether-integration-module/test/__init__.py +++ b/test-aether-integration-module/test/__init__.py @@ -59,32 +59,20 @@ @pytest.fixture(scope='function') def producer_topics(): - max_retry = 10 - for x in range(max_retry): - try: - status = producer_request('status') - kafka = status.get('kafka_container_accessible') - if not kafka: - raise ValueError('Kafka not connected yet') - topics = producer_request('topics') - return topics - except Exception: - sleep(1) + wait_for_kafka() + topics = producer_request('topics') + return topics @pytest.fixture(scope='function') def wait_for_producer_status(): + wait_for_kafka() + max_retry = 30 failure_mode = None - for x in range(max_retry): + for _x in range(max_retry): try: status = producer_request('status') - if not status: - raise ValueError('No status response from producer') - kafka = status.get('kafka_container_accessible') - if not kafka: - raise ValueError('Kafka not connected yet') - person = status.get('topics', {}).get(REALM, {}).get(SEED_TYPE, {}) ok_count = person.get('last_changeset_status', {}).get('succeeded') if ok_count: @@ -156,6 +144,25 @@ def producer_request(endpoint, expect_json=True): sleep(1) +def wait_for_kafka(): + max_retry = 30 + failure_mode = None + for _x in range(max_retry): + try: + status = producer_request('kafkacheck') + if not status: + raise ValueError('No status response from producer') + kafka = status.get('healthy') + if not kafka: + raise ValueError('Kafka not connected yet') + return + except Exception as err: + failure_mode = str(err) + sleep(1) + + raise TimeoutError(f'Producer not ready before {max_retry}s timeout. Reason: {failure_mode}') + + def topic_status(realm, topic): status = producer_request('status') return status['topics'][realm][topic]