Skip to content
This repository has been archived by the owner on Apr 22, 2024. It is now read-only.

Commit

Permalink
fix(producer): logs and endpoint responses (#969)
Browse files Browse the repository at this point in the history
* fix(producer): logs and endpoint responses

* fix: last tweaks

* fix: more logs and checks

* fix: log level

* chore: upgrade dependencies

* fix(producer): response mimetype
  • Loading branch information
obdulia-losantos committed Oct 20, 2021
1 parent 066ebee commit 9148ec2
Show file tree
Hide file tree
Showing 9 changed files with 152 additions and 105 deletions.
4 changes: 2 additions & 2 deletions aether-entity-extraction-module/conf/pip/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ decorator==5.1.0
eha-jsonpath==0.6.0
fakeredis==1.6.1
flake8==4.0.1
flake8-quotes==3.3.0
flake8-quotes==3.3.1
gevent==21.8.0
greenlet==1.1.2
idna==3.3
iniconfig==1.1.1
jsonpath-ng==1.5.3
jsonschema==4.1.0
jsonschema==4.1.2
mccabe==0.6.1
packaging==21.0
pluggy==1.0.0
Expand Down
18 changes: 9 additions & 9 deletions aether-kernel/conf/pip/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ asgiref==3.4.1
attrs==21.2.0
autopep8==1.5.7
avro-python3==1.10.2
boto3==1.18.60
botocore==1.21.60
boto3==1.18.65
botocore==1.21.65
cachetools==4.2.4
certifi==2021.10.8
charset-normalizer==2.0.7
Expand All @@ -36,22 +36,22 @@ django-debug-toolbar==3.2.2
django-dynamic-fixture==3.1.2
django-filter==21.1
django-minio-storage==0.3.10
django-model-utils==4.1.1
django-model-utils==4.2.0
django-postgrespool2==2.0.1
django-prometheus==2.1.0
django-redis==5.0.0
django-silk==4.1.0
django-storages==1.12.1
django-storages==1.12.2
django-uwsgi==0.2.2
djangorestframework==3.12.4
drf-dynamic-fields==0.3.1
drf-yasg==1.20.0
eha-jsonpath==0.6.0
et-xmlfile==1.1.0
flake8==4.0.1
flake8-quotes==3.3.0
flake8-quotes==3.3.1
funcy==1.16
google-api-core==2.1.0
google-api-core==2.1.1
google-auth==2.3.0
google-cloud-core==2.1.0
google-cloud-storage==1.42.3
Expand All @@ -66,7 +66,7 @@ itypes==1.2.0
Jinja2==3.0.2
jmespath==0.10.0
jsonpath-ng==1.5.3
jsonschema==4.1.0
jsonschema==4.1.2
lxml==4.6.3
Markdown==3.3.4
MarkupSafe==2.0.1
Expand Down Expand Up @@ -97,10 +97,10 @@ s3transfer==0.5.0
sentry-sdk==1.4.3
six==1.16.0
spavro==1.1.24
SQLAlchemy==1.4.25
SQLAlchemy==1.4.26
sqlparse==0.4.2
tblib==1.7.0
toml==0.10.2
uritemplate==4.1.0
uritemplate==4.1.1
urllib3==1.26.7
uWSGI==2.0.20
12 changes: 6 additions & 6 deletions aether-odk-module/conf/pip/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
aether.sdk==1.4.0
asgiref==3.4.1
autopep8==1.5.7
boto3==1.18.60
botocore==1.21.60
boto3==1.18.65
botocore==1.21.65
cachetools==4.2.4
certifi==2021.10.8
charset-normalizer==2.0.7
Expand All @@ -32,15 +32,15 @@ django-postgrespool2==2.0.1
django-prometheus==2.1.0
django-redis==5.0.0
django-silk==4.1.0
django-storages==1.12.1
django-storages==1.12.2
django-uwsgi==0.2.2
djangorestframework==3.12.4
drf-dynamic-fields==0.3.1
flake8==4.0.1
flake8-quotes==3.3.0
flake8-quotes==3.3.1
FormEncode==1.3.1
funcy==1.16
google-api-core==2.1.0
google-api-core==2.1.1
google-auth==2.3.0
google-cloud-core==2.1.0
google-cloud-storage==1.42.3
Expand Down Expand Up @@ -77,7 +77,7 @@ s3transfer==0.5.0
sentry-sdk==1.4.3
six==1.16.0
spavro==1.1.24
SQLAlchemy==1.4.25
SQLAlchemy==1.4.26
sqlparse==0.4.2
tblib==1.7.0
toml==0.10.2
Expand Down
153 changes: 93 additions & 60 deletions aether-producer/aether/producer/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,7 @@
import socket

from confluent_kafka.admin import AdminClient
from flask import (
Flask, jsonify, request, Response
)
from flask import Flask, request, Response

import gevent
from gevent.pool import Pool
Expand Down Expand Up @@ -57,6 +55,18 @@ class ProducerManager(object):
# Spawns a RealmManager for each schema type in Kernel
# RealmManager registers own eventloop greenlet (update_kafka) with ProducerManager

killed = False

admin_name = None
admin_password = None

kernel_client = None
kafka_admin_client = None

kafka_status = KafkaStatus.SUBMISSION_PENDING
realm_managers = {}
thread_idle = {}

def __init__(self):
# Start Signal Handlers
self.killed = False
Expand All @@ -82,6 +92,12 @@ def __init__(self):
self.thread_idle = {}
self.run()

self.logger.info('Started producer service')
self.logger.info(f'== Version : {VERSION}')
self.logger.info(f'== Revision : {REVISION}')
self.logger.info(f'== Client mode : {self.kernel_client.mode()}')
self.logger.info(f'== Kafka status : {self.kafka_status}')

def keep_alive_loop(self):
# Keeps the server up in case all other threads join at the same time.
while not self.killed:
Expand Down Expand Up @@ -113,7 +129,7 @@ def safe_sleep(self, dur):
dur = (dur - res) / 5
unit = 5
gevent.sleep(res)
for x in range(int(dur)):
for _x in range(int(dur)):
if not self.killed:
gevent.sleep(unit)

Expand All @@ -127,8 +143,8 @@ def kafka_available(self):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.connect((kafka_ip, kafka_port))
except (InterruptedError, ConnectionRefusedError, socket.gaierror) as rce:
self.logger.debug(f'Could not connect to Kafka on url: {kafka_ip}:{kafka_port}')
self.logger.debug(f'Connection problem: {rce}')
self.logger.critical(f'Could not connect to Kafka on url: {kafka_ip}:{kafka_port}')
self.logger.critical(f'Connection problem: {rce}')
return False
return True

Expand Down Expand Up @@ -197,7 +213,7 @@ def check_realms(self):
else:
gevent.sleep(30)
except Exception as err:
self.logger.warning(f'No Kernel connection: {err}')
self.logger.critical(f'No Kernel connection: {err}')
gevent.sleep(1)
continue
self.logger.debug('No longer checking for new Realms')
Expand Down Expand Up @@ -229,8 +245,10 @@ def add_endpoints(self):
self.register('health', self.request_health)
self.register('healthcheck', self.request_healthcheck)
self.register('kernelcheck', self.request_kernelcheck)
self.register('kafkacheck', self.request_kafkacheck)
self.register('check-app', self.request_check_app)
self.register('check-app/aether-kernel', self.request_kernelcheck)
self.register('check-app/kafka', self.request_kafkacheck)
# protected
self.register('status', self.request_status)
self.register('topics', self.request_topics)
Expand Down Expand Up @@ -264,8 +282,9 @@ def check_auth(self, username, password):
return username == self.admin_name and password == self.admin_password

def request_authentication(self):
return Response('Bad Credentials', 401,
{'WWW-Authenticate': 'Basic realm="Login Required"'})
with self.app.app_context():
return Response('Bad Credentials', 401,
{'WWW-Authenticate': 'Basic realm="Login Required"'})

def requires_auth(f):
@wraps(f)
Expand All @@ -276,66 +295,79 @@ def decorated(self, *args, **kwargs):
return f(self, *args, **kwargs)
return decorated

def return_json(self, data, status=None):
return Response(
response=json.dumps(data),
status=status,
mimetype='application/json',
content_type='application/json',
)

# Exposed Request Endpoints

def request_health(self):
with self.app.app_context():
return Response({'healthy': True})
return self.return_json({'healthy': True})

def request_healthcheck(self):
with self.app.app_context():
try:
expired = self.check_thread_health()
if not expired:
return Response({'healthy': True})
return self.return_json({'healthy': True})
else:
return Response(json.dumps(expired), 500, mimetype='application/json')
return self.return_json(expired, 500)
except Exception as err:
self.app.logger.error(f'Unexpected HC error: {err}')
return Response(f'Unexpected error: {err}', 500)

def request_kernelcheck(self):
with self.app.app_context():
healthy = self.kernel_client.check_kernel()
return Response(
return self.return_json(
{'healthy': healthy},
status=200 if healthy else 424 # Failed dependency
status=200 if healthy else 424, # Failed dependency
)

def request_kafkacheck(self):
with self.app.app_context():
healthy = self.kafka_available()
return self.return_json(
{'healthy': healthy},
status=200 if healthy else 424, # Failed dependency
)

def request_check_app(self):
with self.app.app_context():
return Response({
return self.return_json({
'app_name': 'aether-producer',
'app_version': VERSION,
'app_revision': REVISION,
})

@requires_auth
def request_status(self):
status = {
'kernel_mode': self.kernel_client.mode(),
'kernel_last_check': self.kernel_client.last_check,
'kernel_last_check_error': self.kernel_client.last_check_error,
'kafka_container_accessible': self.kafka_available(),
'kafka_broker_information': self.broker_info(),
'kafka_submission_status': str(self.kafka_status), # This is just a status flag
'topics': {k: v.get_status() for k, v in self.realm_managers.items()},
}
with self.app.app_context():
return jsonify(**status)
status = {
'kernel_mode': self.kernel_client.mode(),
'kernel_last_check': self.kernel_client.last_check,
'kernel_last_check_error': self.kernel_client.last_check_error,
'kafka_container_accessible': self.kafka_available(),
'kafka_broker_information': self.broker_info(),
'kafka_submission_status': str(self.kafka_status), # This is just a status flag
'topics': {k: v.get_status() for k, v in self.realm_managers.items()},
}
return self.return_json(status)

@requires_auth
def request_topics(self):
if not self.realm_managers:
return Response({})

status = {}
for topic, manager in self.realm_managers.items():
status[topic] = {}
for name, sw in manager.schemas.items():
status[topic][name] = manager.get_topic_size(sw)
with self.app.app_context():
return jsonify(**status)
status = {}
for topic, manager in self.realm_managers.items():
status[topic] = {}
for name, sw in manager.schemas.items():
status[topic][name] = manager.get_topic_size(sw)
return self.return_json(status)

# Topic Command API

Expand All @@ -353,34 +385,35 @@ def request_rebuild(self):

@requires_auth
def handle_topic_command(self, request, status):
topic = request.args.get('topic')
realm = request.args.get('realm')
if not realm:
return Response('A realm must be specified', 422)
if not topic:
return Response('A topic must be specified', 422)
if not self.realm_managers.get(realm):
return Response(f'Bad realm name: {realm}', 422)

manager = self.realm_managers[realm]
schema_wrapper = manager.schemas.get(topic)
if not schema_wrapper:
return Response(f'realm {realm} has no topic {topic}', 422)
if status is TopicStatus.PAUSED:
fn = manager.pause
if status is TopicStatus.NORMAL:
fn = manager.resume
if status is TopicStatus.REBUILDING:
fn = manager.rebuild
with self.app.app_context():
topic = request.args.get('topic')
realm = request.args.get('realm')
if not realm:
return Response('A realm must be specified', 422)
if not topic:
return Response('A topic must be specified', 422)
if not self.realm_managers.get(realm):
return Response(f'Bad realm name: {realm}', 422)

manager = self.realm_managers[realm]
schema_wrapper = manager.schemas.get(topic)
if not schema_wrapper:
return Response(f'realm {realm} has no topic {topic}', 422)
if status is TopicStatus.PAUSED:
fn = manager.pause
if status is TopicStatus.NORMAL:
fn = manager.resume
if status is TopicStatus.REBUILDING:
fn = manager.rebuild

try:
res = fn(schema_wrapper)
if not res:
return Response(f'Operation failed on {topic}', 500)
try:
res = fn(schema_wrapper)
if not res:
return Response(f'Operation failed on {topic}', 500)

return Response(f'Success for status {status} on {topic}', 200)
except Exception as err:
return Response(f'Operation failed on {topic} with: {err}', 500)
return Response(f'Success for status {status} on {topic}', 200)
except Exception as err:
return Response(f'Operation failed on {topic} with: {err}', 500)


def main():
Expand Down
2 changes: 2 additions & 0 deletions aether-producer/aether/producer/kernel_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,10 @@ def check_kernel(self):
# check that Kernel connection is possible
try:
self._fetch(url=_REALMS_URL)
return True
except Exception as e:
logger.exception(e)
logger.critical(f'Cannot connect to Kernel: {_KERNEL_URL}')
return False

def get_realms(self):
Expand Down
Loading

0 comments on commit 9148ec2

Please sign in to comment.