Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update anonlink to v0.12.5 #423

Merged
merged 5 commits into from
Oct 2, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions backend/entityservice/async_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,20 +28,21 @@
# Set up our logging
setup_structlog()
logger = structlog.wrap_logger(logging.getLogger('entityservice.tasks'))
logger.info("Setting up entityservice worker")
logger.debug("Debug logging enabled")


@worker_process_init.connect()
def init_worker(**kwargs):
db_min_connections = Config.CELERY_DB_MIN_CONNECTIONS
db_max_connections = Config.CELERY_DB_MAX_CONNECTIONS
init_db_pool(db_min_connections, db_max_connections)
logger.info("Setting up worker process")
logger.debug("Debug logging enabled")


@worker_process_shutdown.connect()
def shutdown_worker(**kwargs):
close_db_pool()
logger.info("Shutting down a worker process")
Comment on lines -31 to +45
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unrelated change: I moved these logs into the worker signal handlers as I saw the flask application was also logging them at import time.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍



@task_prerun.connect()
Expand Down
20 changes: 8 additions & 12 deletions backend/entityservice/serialization.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import io

import typing
import urllib3
from bitarray import bitarray
import base64
Expand Down Expand Up @@ -120,15 +121,14 @@ def deserialize_filters_concurrent(filters):
return res


def generate_scores(sims_iter):
def generate_scores(candidate_pair_stream: typing.BinaryIO):
"""
Processes a TextIO stream of csv similarity scores into
Processes a TextIO stream of candidate pair similarity scores into
a json generator.

"""
cs_sims_iter = (
f'{rec_i0}, {rec_i1}, {sim}'
for sim, _, _, rec_i0, rec_i1 in sims_iter)
sims, (dset_is0, dset_is1), (rec_is0, rec_is1) = anonlink.serialization.load_candidate_pairs(candidate_pair_stream)

cs_sims_iter = (f'{rec_i0}, {rec_i1}, {sim}' for sim, rec_i0, rec_i1 in zip(sims, rec_is0, rec_is1))
yield '{"similarity_scores": ['
line_iter = iter(cs_sims_iter)

Expand Down Expand Up @@ -163,13 +163,9 @@ def get_similarity_scores(filename):
logger.info("Starting download stream of similarity scores.", filename=filename, filesize=details.size)

try:
sims_data_stream = mc.get_object(config.MINIO_BUCKET, filename)
# TODO: Below is an Anonlink 'private' API. It should be made
# public.
sims_iter, *_ = anonlink.serialization._load_to_iterable(
sims_data_stream)
candidate_pair_binary_stream = mc.get_object(config.MINIO_BUCKET, filename)

return Response(generate_scores(sims_iter), mimetype='application/json')
return Response(generate_scores(candidate_pair_binary_stream), mimetype='application/json')

except urllib3.exceptions.ResponseError:
logger.warning("Attempt to read the similarity scores file failed with an error response.", filename=filename)
Expand Down
3 changes: 1 addition & 2 deletions backend/entityservice/tasks/mark_run_complete.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from entityservice.async_worker import celery, logger
from entityservice.cache.active_runs import set_run_state_complete
from entityservice.database import DBConn, update_run_mark_complete
from entityservice.tasks import TracedTask, calculate_comparison_rate
from entityservice.tasks import TracedTask


@celery.task(base=TracedTask, ignore_results=True, args_as_tags=('run_id',))
Expand All @@ -10,6 +10,5 @@ def mark_run_complete(run_id, parent_span=None):
log.debug("Marking run complete")
with DBConn() as db:
update_run_mark_complete(db, run_id)
calculate_comparison_rate.delay()
set_run_state_complete(run_id)
log.info("Run marked as complete")
4 changes: 4 additions & 0 deletions backend/entityservice/tests/test_project_run_results.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ def test_run_similarity_score_results(requests, similarity_scores_project, thres
run_id = post_run(requests, similarity_scores_project, threshold)
result = get_run_result(requests, similarity_scores_project, run_id, timeout=120)
assert 'similarity_scores' in result
for index1, index2, score in result['similarity_scores']:
assert 0.0 <= score >= 1.0
assert 0 <= index1
assert 0 <= index2


def test_run_permutations_results(requests, permutations_project, threshold):
Expand Down
35 changes: 26 additions & 9 deletions backend/entityservice/tests/test_serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@
import random

import json
from array import array

import anonlink

from entityservice.serialization import deserialize_bytes, generate_scores
from entityservice.tests.util import serialize_bytes

Expand All @@ -28,30 +32,43 @@ def test_random_bytes(self):
dsrb = deserialize_bytes(srb)
self.assertEqual(dsrb, rb)

def test_generate_scores_produces_json(self):
sims_iter = (
array('d', [0.1, 0.01, 1.0]),
(array('I', [0, 0, 0]), array('I', [1, 1, 1])),
(array('I', [1, 2, 5]), array('I', [2, 2, 5]))
)

def test_sims_to_json(self):
sims_iter = ((0.1, 0, 1, 1, 2),
(0.01, 0, 1, 1, 2),
(1.0, 0, 1, 5, 5))
json_iterator = generate_scores(sims_iter)
buffer = io.BytesIO()
anonlink.serialization.dump_candidate_pairs(sims_iter, buffer)
buffer.seek(0)
json_iterator = generate_scores(buffer)

# Consume the whole iterator and ensure it is valid json
json_str = ''.join(json_iterator)
json_obj = json.loads(json_str)
self.assertIn('similarity_scores', json_obj)
assert len(json_obj["similarity_scores"]) == 3
for score in json_obj["similarity_scores"]:
self.assertEqual(len(score), 3)

def test_sims_to_json_empty(self):
sims_iter = ()
json_iterator = generate_scores(sims_iter)
sims_iter = (
array('d', []),
(array('I', []), array('I', [])),
(array('I', []), array('I', []))
)

buffer = io.BytesIO()
anonlink.serialization.dump_candidate_pairs(sims_iter, buffer)
buffer.seek(0)
json_iterator = generate_scores(buffer)

# Consume the whole iterator and ensure it is valid json
json_str = ''.join(json_iterator)
json_obj = json.loads(json_str)
self.assertIn('similarity_scores', json_obj)
for score in json_obj["similarity_scores"]:
self.assertEqual(len(score), 3)
assert len(json_obj["similarity_scores"]) == 0


if __name__ == "__main__":
Expand Down
2 changes: 1 addition & 1 deletion backend/entityservice/tests/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ def get_run(requests, project, run_id, expected_status = 200):
return req.json()


def get_run_result(requests, project, run_id, result_token = None, expected_status = 200, wait=True, timeout=60):
def get_run_result(requests, project, run_id, result_token=None, expected_status=200, wait=True, timeout=60):
result_token = project['result_token'] if result_token is None else result_token
if wait:
final_status = wait_for_run_completion(requests, project, run_id, result_token, timeout)
Expand Down
2 changes: 1 addition & 1 deletion backend/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
anonlink==0.11.2
anonlink==0.12.5
bitmath==1.3.1.2
celery==4.3.0
clkhash==0.14.0
Expand Down