Skip to content

Commit

Permalink
Feature blocking (#510)
Browse files Browse the repository at this point in the history
* Extend upload endpoint to accept both types - clks and clknblocks

* Integrate feedback from Brian

* Remove get_uses_blocking and use NamedTempFile

* added doc string

* updated docstring, rearranged arguments

* Log the number of elements and number of blocks in json upload

* Glue code to make tests pass again

* bugfix json wasn't always fully written to disk

* didn't want to commit that, sorry

* trying to handle empty uploads

* revived test for empty upload

* tornado 6 is incompatible to other requirements, thus backtrack to last known good version

* fix test condition

* Raise valueError when upload is not valid to separate our code from nixgn

Co-authored-by: Xinyue Wang <xwan8513@uni.sydney.edu.au>
Co-authored-by: wilko77 <wilko77@users.noreply.github.com>
  • Loading branch information
3 people authored Feb 23, 2020
1 parent 7a927a5 commit b5a7705
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 25 deletions.
2 changes: 1 addition & 1 deletion backend/entityservice/database/selections.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ def get_run(db, run_id):


def get_project_column(db, project_id, column):
assert column in {'notes', 'schema', 'parties', 'result_type', 'deleted', 'encoding_size'}
assert column in {'notes', 'schema', 'parties', 'result_type', 'deleted', 'encoding_size', 'uses_blocking'}
sql_query = """
SELECT {}
FROM projects
Expand Down
13 changes: 10 additions & 3 deletions backend/entityservice/tasks/encoding_uploading.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import io
import json

from entityservice.cache import encodings as encoding_cache

Expand Down Expand Up @@ -29,12 +30,18 @@ def handle_raw_upload(project_id, dp_id, receipt_token, parent_span=None):
log.info(f"Expecting to handle {expected_count} encodings")
mc = connect_to_object_store()

# Input file is line separated base64 record encodings.
#### GLUE CODE
raw_file = Config.RAW_FILENAME_FMT.format(receipt_token)
raw_data_response = mc.get_object(Config.MINIO_BUCKET, raw_file)
raw_data = mc.get_object(Config.MINIO_BUCKET, raw_file)
data = json.loads(raw_data.data.decode('utf-8'))
if 'clks' not in data:
raise ValueError('can only handle CLKs at the moment.')
binary_data = b'\n'.join(''.join(clk.split('\n')).encode() for clk in data['clks']) + b'\n'
buffer = io.BytesIO(binary_data)
#### END GLUE

# Set up streaming processing pipeline
buffered_stream = iterable_to_stream(raw_data_response.stream())
buffered_stream = iterable_to_stream(buffer)
text_stream = io.TextIOWrapper(buffered_stream, newline='\n')

first_hash_bytes = deserialize_bytes(next(text_stream))
Expand Down
32 changes: 32 additions & 0 deletions backend/entityservice/views/auth_checks.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,38 @@ def abort_if_invalid_results_token(resource_id, results_token):
safe_fail_request(403, message=INVALID_ACCESS_MSG)


def abort_if_inconsistent_upload(uses_blocking, clk_json):
"""
check if the following combinations are true
- uses_blocking is False AND 'clks' element in upload JSON
- uses_blocking if True AND 'clknblocks' element in upload JSON
otherwise, return safe_fail_request
:param uses_blocking: Boolean that indicates if the project uses blocking
:param clk_json: a json dict
:return: safe_fail_request if conditions are not met
"""
is_valid_clks = not uses_blocking and 'clks' in clk_json
is_valid_clknblocks = uses_blocking and 'clknblocks' in clk_json
if not (is_valid_clks or is_valid_clknblocks):
# fail condition1 - uses_blocking is True but uploaded element is "clks"
if uses_blocking and 'clks' in clk_json:
raise ValueError('Uploaded element is "clks" while expecting "clknblocks"')
# fail condition2 - uses_blocking is False but uploaded element is "clknblocks"
if not uses_blocking and 'clknblocks' in clk_json:
raise ValueError('Uploaded element is "clknblocks" while expecting "clks"')
# fail condition3 - "clks" exist in JSON but there is no data
if 'clks' in clk_json and len(clk_json['clks']) < 1:
raise ValueError('Missing CLKs information')
# fail condition4 - "clknblocks" exist in JSON but there is no data
if 'clknblocks' in clk_json and len(clk_json['clknblocks']) < 1:
raise ValueError('Missing CLK and Blocks information')
# fail condition5 - unknown element in JSON
if 'clks' not in clk_json and 'clknblocks' not in clk_json:
raise ValueError('Unknown upload element - expect "clks" or "clknblocks"')



def dataprovider_id_if_authorize(resource_id, receipt_token):
logger.debug("checking authorization token to fetch mask data")
if not is_receipt_token_valid(resource_id, receipt_token):
Expand Down
63 changes: 43 additions & 20 deletions backend/entityservice/views/project.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import json
from io import BytesIO
import tempfile

import minio
from flask import request
Expand All @@ -11,9 +13,9 @@
from entityservice.tracing import serialize_span
from entityservice.utils import safe_fail_request, get_json, generate_code, get_stream, \
clks_uploaded_to_project, fmt_bytes, iterable_to_stream
from entityservice.database import DBConn
from entityservice.database import DBConn, get_project_column
from entityservice.views.auth_checks import abort_if_project_doesnt_exist, abort_if_invalid_dataprovider_token, \
abort_if_invalid_results_token, get_authorization_token_type_or_abort
abort_if_invalid_results_token, get_authorization_token_type_or_abort, abort_if_inconsistent_upload
from entityservice import models
from entityservice.object_store import connect_to_object_store
from entityservice.serialization import binary_format
Expand Down Expand Up @@ -206,6 +208,8 @@ def project_clks_post(project_id):
dp_id = db.get_dataprovider_id(conn, token)
project_encoding_size = db.get_project_schema_encoding_size(conn, project_id)
upload_state_updated = db.is_dataprovider_allowed_to_upload_and_lock(conn, dp_id)
# get flag use_blocking from table projects
uses_blocking = get_project_column(conn, project_id, 'uses_blocking')

if not upload_state_updated:
return safe_fail_request(403, "This token has already been used to upload clks.")
Expand All @@ -224,8 +228,7 @@ def project_clks_post(project_id):
# However, as connexion is very, very strict about input validation when it comes to json, it will always
# consume the stream first to validate it against the spec. Thus the backflip to fully reading the CLks as
# json into memory. -> issue #184

receipt_token, raw_file = upload_json_clk_data(dp_id, get_json(), span)
receipt_token, raw_file = upload_json_clk_data(dp_id, get_json(), uses_blocking, parent_span=span)
# Schedule a task to deserialize the hashes, and carry
# out a pop count.
handle_raw_upload.delay(project_id, dp_id, receipt_token, parent_span=serialize_span(span))
Expand Down Expand Up @@ -355,43 +358,63 @@ def upload_clk_data_binary(project_id, dp_id, raw_stream, count, size=128):
return receipt_token


def upload_json_clk_data(dp_id, clk_json, parent_span):
def upload_json_clk_data(dp_id, clk_json, uses_blocking, parent_span):
"""
Convert user provided encodings from json array of base64 data into
a newline separated file of base64 data.
Take user provided encodings as json dict and save them, as-is, to the object store.
Note this implementation is non-streaming.
"""
if 'clks' not in clk_json or len(clk_json['clks']) < 1:
try:
abort_if_inconsistent_upload(uses_blocking, clk_json)
except ValueError as e:
safe_fail_request(403, e)

# now we need to know element name - clks or clknblocks
is_valid_clks = not uses_blocking and 'clks' in clk_json
element = 'clks' if is_valid_clks else 'clknblocks'

if len(clk_json[element]) < 1:
safe_fail_request(400, message="Missing CLKs information")

receipt_token = generate_code()

filename = Config.RAW_FILENAME_FMT.format(receipt_token)
logger.info("Storing user {} supplied clks from json".format(dp_id))
logger.info("Storing user {} supplied {} from json".format(dp_id, element))

with opentracing.tracer.start_span('splitting-json-clks', child_of=parent_span) as span:
count = len(clk_json['clks'])
span.set_tag("clks", count)
data = b''.join(''.join(clk.split('\n')).encode() + b'\n' for clk in clk_json['clks'])
encoding_count = len(clk_json[element])
span.set_tag(element, encoding_count)
logger.debug(f"Received {encoding_count} {element}")

if element == 'clksnblocks':
# Note the format of encoding + blocks.
# {'clknblocks': [['UG9vcA==', '001', '211'], [...]]}
blocks = set()
for _, *elements_blocks in clk_json[element]:
blocks.update(elements_blocks)
block_count = len(blocks)
else:
block_count = 1

num_bytes = len(data)
span.set_tag("num_bytes", num_bytes)
buffer = BytesIO(data)
logger.info(f"Received {encoding_count} encodings in {block_count} blocks")

logger.info(f"Received {count} encodings. Uploading {fmt_bytes(num_bytes)} to object store")
# write clk_json into a temp file
tmp = tempfile.NamedTemporaryFile(mode='w')
json.dump(clk_json, tmp)
tmp.flush()
with opentracing.tracer.start_span('save-clk-file-to-quarantine', child_of=parent_span) as span:
span.set_tag('filename', filename)
mc = connect_to_object_store()
mc.put_object(
mc.fput_object(
Config.MINIO_BUCKET,
filename,
data=buffer,
length=num_bytes
tmp.name,
content_type='application/json'
)
logger.info('Saved uploaded {} JSON to file {} in object store.'.format(element.upper(), filename))

with opentracing.tracer.start_span('update-encoding-metadata', child_of=parent_span):
with DBConn() as conn:
db.insert_encoding_metadata(conn, filename, dp_id, receipt_token, count)
db.insert_encoding_metadata(conn, filename, dp_id, receipt_token, encoding_count)

return receipt_token, filename
2 changes: 1 addition & 1 deletion base/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,4 @@ requests==2.22.0
setproctitle==1.1.10 # used by celery to change process nameFlaskTracing
structlog==20.1.0
tenacity==5.1.1
tornado==6.0.3
tornado==4.5.3

0 comments on commit b5a7705

Please sign in to comment.