Skip to content
This repository has been archived by the owner on Apr 22, 2024. It is now read-only.

fix(producer): logs and endpoint responses #969

Merged
merged 6 commits into from
Oct 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions aether-entity-extraction-module/conf/pip/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ decorator==5.1.0
eha-jsonpath==0.6.0
fakeredis==1.6.1
flake8==4.0.1
flake8-quotes==3.3.0
flake8-quotes==3.3.1
gevent==21.8.0
greenlet==1.1.2
idna==3.3
iniconfig==1.1.1
jsonpath-ng==1.5.3
jsonschema==4.1.0
jsonschema==4.1.2
mccabe==0.6.1
packaging==21.0
pluggy==1.0.0
Expand Down
18 changes: 9 additions & 9 deletions aether-kernel/conf/pip/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ asgiref==3.4.1
attrs==21.2.0
autopep8==1.5.7
avro-python3==1.10.2
boto3==1.18.60
botocore==1.21.60
boto3==1.18.65
botocore==1.21.65
cachetools==4.2.4
certifi==2021.10.8
charset-normalizer==2.0.7
Expand All @@ -36,22 +36,22 @@ django-debug-toolbar==3.2.2
django-dynamic-fixture==3.1.2
django-filter==21.1
django-minio-storage==0.3.10
django-model-utils==4.1.1
django-model-utils==4.2.0
django-postgrespool2==2.0.1
django-prometheus==2.1.0
django-redis==5.0.0
django-silk==4.1.0
django-storages==1.12.1
django-storages==1.12.2
django-uwsgi==0.2.2
djangorestframework==3.12.4
drf-dynamic-fields==0.3.1
drf-yasg==1.20.0
eha-jsonpath==0.6.0
et-xmlfile==1.1.0
flake8==4.0.1
flake8-quotes==3.3.0
flake8-quotes==3.3.1
funcy==1.16
google-api-core==2.1.0
google-api-core==2.1.1
google-auth==2.3.0
google-cloud-core==2.1.0
google-cloud-storage==1.42.3
Expand All @@ -66,7 +66,7 @@ itypes==1.2.0
Jinja2==3.0.2
jmespath==0.10.0
jsonpath-ng==1.5.3
jsonschema==4.1.0
jsonschema==4.1.2
lxml==4.6.3
Markdown==3.3.4
MarkupSafe==2.0.1
Expand Down Expand Up @@ -97,10 +97,10 @@ s3transfer==0.5.0
sentry-sdk==1.4.3
six==1.16.0
spavro==1.1.24
SQLAlchemy==1.4.25
SQLAlchemy==1.4.26
sqlparse==0.4.2
tblib==1.7.0
toml==0.10.2
uritemplate==4.1.0
uritemplate==4.1.1
urllib3==1.26.7
uWSGI==2.0.20
12 changes: 6 additions & 6 deletions aether-odk-module/conf/pip/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
aether.sdk==1.4.0
asgiref==3.4.1
autopep8==1.5.7
boto3==1.18.60
botocore==1.21.60
boto3==1.18.65
botocore==1.21.65
cachetools==4.2.4
certifi==2021.10.8
charset-normalizer==2.0.7
Expand All @@ -32,15 +32,15 @@ django-postgrespool2==2.0.1
django-prometheus==2.1.0
django-redis==5.0.0
django-silk==4.1.0
django-storages==1.12.1
django-storages==1.12.2
django-uwsgi==0.2.2
djangorestframework==3.12.4
drf-dynamic-fields==0.3.1
flake8==4.0.1
flake8-quotes==3.3.0
flake8-quotes==3.3.1
FormEncode==1.3.1
funcy==1.16
google-api-core==2.1.0
google-api-core==2.1.1
google-auth==2.3.0
google-cloud-core==2.1.0
google-cloud-storage==1.42.3
Expand Down Expand Up @@ -77,7 +77,7 @@ s3transfer==0.5.0
sentry-sdk==1.4.3
six==1.16.0
spavro==1.1.24
SQLAlchemy==1.4.25
SQLAlchemy==1.4.26
sqlparse==0.4.2
tblib==1.7.0
toml==0.10.2
Expand Down
153 changes: 93 additions & 60 deletions aether-producer/aether/producer/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,7 @@
import socket

from confluent_kafka.admin import AdminClient
from flask import (
Flask, jsonify, request, Response
)
from flask import Flask, request, Response

import gevent
from gevent.pool import Pool
Expand Down Expand Up @@ -57,6 +55,18 @@ class ProducerManager(object):
# Spawns a RealmManager for each schema type in Kernel
# RealmManager registers own eventloop greenlet (update_kafka) with ProducerManager

killed = False

admin_name = None
admin_password = None

kernel_client = None
kafka_admin_client = None

kafka_status = KafkaStatus.SUBMISSION_PENDING
realm_managers = {}
thread_idle = {}

def __init__(self):
# Start Signal Handlers
self.killed = False
Expand All @@ -82,6 +92,12 @@ def __init__(self):
self.thread_idle = {}
self.run()

self.logger.info('Started producer service')
self.logger.info(f'== Version : {VERSION}')
self.logger.info(f'== Revision : {REVISION}')
self.logger.info(f'== Client mode : {self.kernel_client.mode()}')
self.logger.info(f'== Kafka status : {self.kafka_status}')

def keep_alive_loop(self):
# Keeps the server up in case all other threads join at the same time.
while not self.killed:
Expand Down Expand Up @@ -113,7 +129,7 @@ def safe_sleep(self, dur):
dur = (dur - res) / 5
unit = 5
gevent.sleep(res)
for x in range(int(dur)):
for _x in range(int(dur)):
if not self.killed:
gevent.sleep(unit)

Expand All @@ -127,8 +143,8 @@ def kafka_available(self):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.connect((kafka_ip, kafka_port))
except (InterruptedError, ConnectionRefusedError, socket.gaierror) as rce:
self.logger.debug(f'Could not connect to Kafka on url: {kafka_ip}:{kafka_port}')
self.logger.debug(f'Connection problem: {rce}')
self.logger.critical(f'Could not connect to Kafka on url: {kafka_ip}:{kafka_port}')
self.logger.critical(f'Connection problem: {rce}')
return False
return True

Expand Down Expand Up @@ -197,7 +213,7 @@ def check_realms(self):
else:
gevent.sleep(30)
except Exception as err:
self.logger.warning(f'No Kernel connection: {err}')
self.logger.critical(f'No Kernel connection: {err}')
gevent.sleep(1)
continue
self.logger.debug('No longer checking for new Realms')
Expand Down Expand Up @@ -229,8 +245,10 @@ def add_endpoints(self):
self.register('health', self.request_health)
self.register('healthcheck', self.request_healthcheck)
self.register('kernelcheck', self.request_kernelcheck)
self.register('kafkacheck', self.request_kafkacheck)
self.register('check-app', self.request_check_app)
self.register('check-app/aether-kernel', self.request_kernelcheck)
self.register('check-app/kafka', self.request_kafkacheck)
# protected
self.register('status', self.request_status)
self.register('topics', self.request_topics)
Expand Down Expand Up @@ -264,8 +282,9 @@ def check_auth(self, username, password):
return username == self.admin_name and password == self.admin_password

def request_authentication(self):
return Response('Bad Credentials', 401,
{'WWW-Authenticate': 'Basic realm="Login Required"'})
with self.app.app_context():
return Response('Bad Credentials', 401,
{'WWW-Authenticate': 'Basic realm="Login Required"'})

def requires_auth(f):
@wraps(f)
Expand All @@ -276,66 +295,79 @@ def decorated(self, *args, **kwargs):
return f(self, *args, **kwargs)
return decorated

def return_json(self, data, status=None):
return Response(
response=json.dumps(data),
status=status,
mimetype='application/json',
content_type='application/json',
)

# Exposed Request Endpoints

def request_health(self):
with self.app.app_context():
return Response({'healthy': True})
return self.return_json({'healthy': True})

def request_healthcheck(self):
with self.app.app_context():
try:
expired = self.check_thread_health()
if not expired:
return Response({'healthy': True})
return self.return_json({'healthy': True})
else:
return Response(json.dumps(expired), 500, mimetype='application/json')
return self.return_json(expired, 500)
except Exception as err:
self.app.logger.error(f'Unexpected HC error: {err}')
return Response(f'Unexpected error: {err}', 500)

def request_kernelcheck(self):
with self.app.app_context():
healthy = self.kernel_client.check_kernel()
return Response(
return self.return_json(
{'healthy': healthy},
status=200 if healthy else 424 # Failed dependency
status=200 if healthy else 424, # Failed dependency
)

def request_kafkacheck(self):
with self.app.app_context():
healthy = self.kafka_available()
return self.return_json(
{'healthy': healthy},
status=200 if healthy else 424, # Failed dependency
)

def request_check_app(self):
with self.app.app_context():
return Response({
return self.return_json({
'app_name': 'aether-producer',
'app_version': VERSION,
'app_revision': REVISION,
})

@requires_auth
def request_status(self):
status = {
'kernel_mode': self.kernel_client.mode(),
'kernel_last_check': self.kernel_client.last_check,
'kernel_last_check_error': self.kernel_client.last_check_error,
'kafka_container_accessible': self.kafka_available(),
'kafka_broker_information': self.broker_info(),
'kafka_submission_status': str(self.kafka_status), # This is just a status flag
'topics': {k: v.get_status() for k, v in self.realm_managers.items()},
}
with self.app.app_context():
return jsonify(**status)
status = {
'kernel_mode': self.kernel_client.mode(),
'kernel_last_check': self.kernel_client.last_check,
'kernel_last_check_error': self.kernel_client.last_check_error,
'kafka_container_accessible': self.kafka_available(),
'kafka_broker_information': self.broker_info(),
'kafka_submission_status': str(self.kafka_status), # This is just a status flag
'topics': {k: v.get_status() for k, v in self.realm_managers.items()},
}
return self.return_json(status)

@requires_auth
def request_topics(self):
if not self.realm_managers:
return Response({})

status = {}
for topic, manager in self.realm_managers.items():
status[topic] = {}
for name, sw in manager.schemas.items():
status[topic][name] = manager.get_topic_size(sw)
with self.app.app_context():
return jsonify(**status)
status = {}
for topic, manager in self.realm_managers.items():
status[topic] = {}
for name, sw in manager.schemas.items():
status[topic][name] = manager.get_topic_size(sw)
return self.return_json(status)

# Topic Command API

Expand All @@ -353,34 +385,35 @@ def request_rebuild(self):

@requires_auth
def handle_topic_command(self, request, status):
topic = request.args.get('topic')
realm = request.args.get('realm')
if not realm:
return Response('A realm must be specified', 422)
if not topic:
return Response('A topic must be specified', 422)
if not self.realm_managers.get(realm):
return Response(f'Bad realm name: {realm}', 422)

manager = self.realm_managers[realm]
schema_wrapper = manager.schemas.get(topic)
if not schema_wrapper:
return Response(f'realm {realm} has no topic {topic}', 422)
if status is TopicStatus.PAUSED:
fn = manager.pause
if status is TopicStatus.NORMAL:
fn = manager.resume
if status is TopicStatus.REBUILDING:
fn = manager.rebuild
with self.app.app_context():
topic = request.args.get('topic')
realm = request.args.get('realm')
if not realm:
return Response('A realm must be specified', 422)
if not topic:
return Response('A topic must be specified', 422)
if not self.realm_managers.get(realm):
return Response(f'Bad realm name: {realm}', 422)

manager = self.realm_managers[realm]
schema_wrapper = manager.schemas.get(topic)
if not schema_wrapper:
return Response(f'realm {realm} has no topic {topic}', 422)
if status is TopicStatus.PAUSED:
fn = manager.pause
if status is TopicStatus.NORMAL:
fn = manager.resume
if status is TopicStatus.REBUILDING:
fn = manager.rebuild

try:
res = fn(schema_wrapper)
if not res:
return Response(f'Operation failed on {topic}', 500)
try:
res = fn(schema_wrapper)
if not res:
return Response(f'Operation failed on {topic}', 500)

return Response(f'Success for status {status} on {topic}', 200)
except Exception as err:
return Response(f'Operation failed on {topic} with: {err}', 500)
return Response(f'Success for status {status} on {topic}', 200)
except Exception as err:
return Response(f'Operation failed on {topic} with: {err}', 500)


def main():
Expand Down
2 changes: 2 additions & 0 deletions aether-producer/aether/producer/kernel_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,10 @@ def check_kernel(self):
# check that Kernel connection is possible
try:
self._fetch(url=_REALMS_URL)
return True
except Exception as e:
logger.exception(e)
logger.critical(f'Cannot connect to Kernel: {_KERNEL_URL}')
return False

def get_realms(self):
Expand Down
Loading