Skip to content

Commit

Permalink
Merge pull request #15 from brighthive/fixes
Browse files Browse the repository at this point in the history
Fixes
  • Loading branch information
reginafcompton committed Mar 3, 2020
2 parents 8b822e9 + 4bbe056 commit 199dd23
Show file tree
Hide file tree
Showing 10 changed files with 104 additions and 37 deletions.
2 changes: 1 addition & 1 deletion build_deployment.sh
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
ORGANIZATION=brighthive
IMAGE_NAME=data-trust-logger
VERSION=0.1.0-76e27
VERSION=1.0.1
AWS_ECR_REPO=396527728813.dkr.ecr.us-east-2.amazonaws.com

docker build -t $ORGANIZATION/$IMAGE_NAME:$VERSION -f Dockerfile .
Expand Down
15 changes: 6 additions & 9 deletions data_trust_logger/health_audit/data_resources_collector.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,22 @@
import logging

from sqlalchemy import create_engine
from sqlalchemy.exc import OperationalError

from data_trust_logger.config import ConfigurationFactory
from data_trust_logger.health_audit.metrics_collector import \
HealthMetricsCollector
from data_trust_logger.utilities.basic_logger import basic_logger

config = ConfigurationFactory.from_env()
logger = logging.getLogger(__name__)


def instantiate_data_resources_collector():
def instantiate_data_resources_collector(data_resources_engine):
try:
# create_engine() itself does not establish a DB connection.
# We call `connect()` to assess the database health early on.
data_resources_engine = create_engine(config.dr_psql_uri)
data_resources_engine.connect()
connection = data_resources_engine.connect()
connection.close()
except (ValueError, OperationalError) as error:
logger.error("Data Resources HealthMetricsCollector cannot connect to database.")
logger.error(error)
basic_logger.error("Data Resources HealthMetricsCollector cannot connect to database.")
basic_logger.error(error)
data_resources_engine = None
table_names = []
else:
Expand Down
10 changes: 8 additions & 2 deletions data_trust_logger/health_audit/health_auditor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,29 @@
import os
from threading import Thread
from time import sleep
from sqlalchemy import create_engine

from data_trust_logger.config import ConfigurationFactory
from data_trust_logger.health_audit.data_resources_collector import \
instantiate_data_resources_collector
from data_trust_logger.health_audit.mci_collector import \
instantiate_mci_collector

config = ConfigurationFactory.from_env()

class HealthAuditor(Thread):
def __init__(self):
Thread.__init__(self)

def audit(self):
mci_engine = create_engine(config.mci_psql_uri)
data_resources_engine = create_engine(config.dr_psql_uri)

while True:
mci_collector = instantiate_mci_collector()
mci_collector = instantiate_mci_collector(mci_engine)
mci_metrics = mci_collector.collect_metrics()

data_resources_collector = instantiate_data_resources_collector()
data_resources_collector = instantiate_data_resources_collector(data_resources_engine)
data_resources_metrics = data_resources_collector.collect_metrics()

metrics_blob = {
Expand Down
14 changes: 6 additions & 8 deletions data_trust_logger/health_audit/mci_collector.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,23 @@
import logging

from sqlalchemy import create_engine
from sqlalchemy.exc import OperationalError

from data_trust_logger.config import ConfigurationFactory
from data_trust_logger.health_audit.metrics_collector import \
HealthMetricsCollector
from data_trust_logger.utilities.basic_logger import basic_logger

logger = logging.getLogger(__name__)
config = ConfigurationFactory.from_env()


def instantiate_mci_collector():
def instantiate_mci_collector(mci_engine):
try:
# create_engine() itself does not establish a DB connection.
# We call `connect()` to assess the database health early on.
mci_engine = create_engine(config.mci_psql_uri)
mci_engine.connect()
connection = mci_engine.connect()
connection.close()
except (ValueError, OperationalError) as error:
logger.error("MCI HealthMetricsCollector cannot connect to database.")
logger.error(error)
basic_logger.error("MCI HealthMetricsCollector cannot connect to database.")
basic_logger.error(error)
mci_engine = None

mci_tablenames = ['individual', 'source', 'gender', 'address', 'disposition', 'ethnicity_race', 'employment_status', 'education_level']
Expand Down
16 changes: 12 additions & 4 deletions data_trust_logger/health_audit/metrics_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import requests

from data_trust_logger.utilities import get_access_token, secure_requests
from data_trust_logger.utilities import get_access_token, read_token, secure_requests


class HealthMetricsCollector(object):
Expand All @@ -25,26 +25,34 @@ def _get_endpoint_status(self, api_ep: str, token: str):

def _get_endpoint_record_count(self, engine: object, endpoint: str):
try:
result = engine.execute(f"SELECT COUNT(*) from {endpoint}")
connection = engine.connect()
result = connection.execute(f"SELECT COUNT(*) from {endpoint}")
count, = result.fetchone()
connection.close()
except Exception:
count = -1

return count

def collect_metrics(self):
metrics_list = []
token = get_access_token()
token = read_token()

for table in self.tablenames:
try:
endpoint = self.table_to_ep_mappings[table]
except KeyError:
endpoint = table

status, last_accessed = self._get_endpoint_status(f"{self.api_url}/{endpoint}", token)
count = self._get_endpoint_record_count(self.engine, table)

# Request a new access token if the token has expired, i.e., the endpoint raises a 401.
status, last_accessed = self._get_endpoint_status(f"{self.api_url}/{endpoint}", token)
if status == 401:
get_access_token()
token = read_token()
status, last_accessed = self._get_endpoint_status(f"{self.api_url}/{endpoint}", token)

metrics_list.append({
'endpoint': endpoint,
'record_count': count,
Expand Down
3 changes: 2 additions & 1 deletion data_trust_logger/utilities/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
from data_trust_logger.utilities.secure_requests import get_access_token, secure_get
from data_trust_logger.utilities.secure_requests import get_access_token, read_token, secure_get
from data_trust_logger.utilities.basic_logger import basic_logger
11 changes: 11 additions & 0 deletions data_trust_logger/utilities/basic_logger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import logging


basic_logger = logging.getLogger("basic_logger")

handler = logging.StreamHandler()
handler.setLevel(logging.DEBUG)
formatter = logging.Formatter(fmt='[%(asctime)s] [%(levelname)s] %(message)s', datefmt="%a, %d %b %Y %H:%M:%S")
handler.setFormatter(formatter)

basic_logger.addHandler(handler)
45 changes: 36 additions & 9 deletions data_trust_logger/utilities/secure_requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,20 @@
import requests

from data_trust_logger.config import ConfigurationFactory
from data_trust_logger.utilities.basic_logger import basic_logger

config = ConfigurationFactory.from_env()
location_of_token = '/tmp/token.txt'


def get_access_token():
""" Retrieves an OAuth 2.0 access token from the OAuth 2.0 provider.
Note:
At present, we use Auth0 as our OAuth 2.0 provider.
Returns:
str: OAuth 2.0 access token.
"""
Retrieves an OAuth 2.0 access token from the OAuth 2.0 provider, and
write the access token to a `tmp` file.
It is possible that OAuth cannot return a token, e.g., due to a service outage.
In this case, we set the token value to an empty string, which eventually gets passed to
`_get_endpoint_status` and, ultimately, returns a 401 status to the Logger API health endpoint.
"""
headers = {'content-type': 'application/json'}
data = {
Expand All @@ -30,13 +34,36 @@ def get_access_token():

try:
response = requests.post(config.oauth2_url, headers=headers, data=json.dumps(data))
except requests.exceptions.ConnectionError:
return None
else:
token = response.json()['access_token']
except Exception as e:
basic_logger.error(e)
token = "invalid token"

with open(location_of_token, 'w+') as f:
f.write(token)


def read_token():
"""
Reads the access token from a temporary file, and returns the value.
If the file does not exist, then we call `get_access_token`, and try again.
"""
token = "invalid token"

for _ in range(2):
try:
with open(location_of_token) as f:
basic_logger.info(f"Token read from {location_of_token}")
token = f.read()
break
except FileNotFoundError as e:
basic_logger.error(e)
get_access_token()
pass

return token


def secure_get(url: str, token: str):
"""Convenience method for GET requests against API resources.
Expand Down
6 changes: 3 additions & 3 deletions tests/test_health_metrics_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def test_get_data_resources_count(mocker):
for count in record_counts:
assert count == seeded_count

def test_get_mci_count_fail():
def test_get_data_resources_count_fail():
collector = instantiate_data_resources_collector()
metrics = collector.collect_metrics()

Expand All @@ -79,7 +79,7 @@ def test_get_mci_count_fail():
for count in record_counts:
assert count == -1

def test_get_mci_statuses(client):
def test_get_data_resources_statuses(client):
with requests_mock.Mocker() as m:
matcher = re.compile('/programs')
m.get(matcher, json={}, status_code=200)
Expand All @@ -92,4 +92,4 @@ def test_get_mci_statuses(client):
if entry['endpoint'] == 'programs':
expect(entry['endpoint_health']).to(equal(200))
else:
expect(entry['endpoint_health']).to(equal(503))
expect(entry['endpoint_health']).to(equal(503))
19 changes: 19 additions & 0 deletions tests/test_secure_requests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import re
from unittest.mock import mock_open

from expects import be, equal, expect, have_key, have_len

from data_trust_logger.health_audit.mci_collector import \
instantiate_mci_collector


def test_get_statuses_with_invalid_token(client, mocker):
"""
Test that endpoints return a 401 status code when `/tmp/token.txt` does not contain a valid token.
"""
mocker.patch('data_trust_logger.utilities.secure_requests.open', mock_open(read_data='invalid token'))
collector = instantiate_mci_collector()
metrics = collector.collect_metrics()

for entry in metrics:
expect(entry['endpoint_health']).to(equal(401))

0 comments on commit 199dd23

Please sign in to comment.