-
Notifications
You must be signed in to change notification settings - Fork 8
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feature extend binary format #505
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added my 2c
else: | ||
return result[columns[0]] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not 100% enjoying returning a different type for convenience when there is only one column. I get it & I've done it I just don't like it.
|
||
:return: | ||
An iterable of bytes. | ||
""" | ||
bit_packing_struct = binary_format(encoding_size) | ||
|
||
for hash_bytes in filters: | ||
yield bit_packing_struct.pack(hash_bytes) | ||
yield bit_packing_struct.pack(*hash_bytes) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
⭐️
- "<encoding size>s" Store the n (e.g. 128) raw bytes of the bitarray | ||
|
||
https://docs.python.org/3/library/struct.html | ||
""" | ||
bit_packing_fmt = f"!{encoding_size}s" | ||
bit_packing_fmt = f"!I{encoding_size}s" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are just going to pretend the old format doesn't exist aren't we? Should we take the opportunity to add a version byte to the format?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's an internal format, so yes, I pretend that it never existed. I'm old, I forget quickly.
A version byte for each filter would mean a lot of version bytes in the object store. This should rather be solved via a header. But again, since this is only used internally in the service, we will never have to differentiate between the versions anyway and thus, the versioning is redundant.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was meaning a header for the file rather than for each encoding. But sure not worth it right now.
|
||
:param data_iterable: an iterable of binary packed filters. | ||
:param max_bytes: if present, only read up to 'max_bytes' bytes. | ||
:param encoding_size: the encoding size of one filter, excluding the entity ID info |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we bounce between bytes and bits I'd try be explicit about the size.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good point
#TODO: use the entity ids! | ||
entity_ids_dp1, chunk_dp1 = zip(*chunk_dp1) | ||
t1 = time.time() | ||
log.debug("Fetching and deserializing chunk of filters for dataprovider 2") | ||
chunk_dp2, chunk_dp2_size = get_chunk_from_object_store(chunk_info_dp2, encoding_size) | ||
# TODO: use the entity ids! | ||
entity_ids_dp2, chunk_dp2 = zip(*chunk_dp2) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(what you said)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, but this is another task. I want this PR to be single purpose.
Integrating the use of the entity IDs will most likely also require changes to the helper functions in the anonlink library. I didn't want to open that can of worm just yet.
@@ -43,12 +43,12 @@ def handle_raw_upload(project_id, dp_id, receipt_token, parent_span=None): | |||
def filter_generator(): | |||
log.debug("Deserializing json filters") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This log message looks like a copy/paste error of mine no doubt. Could you update.
@@ -151,20 +151,26 @@ def project_binaryclks_post(project_id): | |||
# connexion has already read the data before our handler is called! | |||
# https://github.com/zalando/connexion/issues/592 | |||
# stream = get_stream() | |||
stream = BytesIO(request.data) | |||
expected_bytes = binary_format(size).size * count | |||
stream = iterable_to_stream(BytesIO(request.data)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm confused. Isn't BytesIO
already a stream? https://docs.python.org/3/library/io.html#binary-i-o
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes it is. I got confused by the read function, it says that it reads up to 'size' bytes. Thus I thought it might be saver to wrap it in a BufferedReader, but I guess that's unnecessary.
def entity_id_injector(filter_stream): | ||
for entity_id in range(count): | ||
yield binary_formatter.pack(entity_id, filter_stream.read(size)) | ||
data_with_ids = b''.join(entity_id_injector(stream)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I want to check my understanding here. This takes the binary stream and converts it into a byte producing generator.
This generator uses the user supplied count
and reads size
bytes (where size
is also user supplied in the header) from the stream, packing the bytes using the binary format adding in an entity id as it goes.
- How do we handle the user supplying their own entity ids? I assume that is future work? Will they be optional/required?
- If this code path is here for the long haul, can we do this without making a full copy of the data in memory?
- consider how can this fail with bad data from the user.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just wrote a quick fix to make this endpoint work with the new format.
Until connexion fixes the stream thing we cannot do much here.
There is also the problem, that the binary format as of now does not consider blocking information.
I imagine, that in the far future, one day, someone will revisit the binary upload. (S)he will define a new binary format, implement that in the anonlink-client, and finally, will make this endpoint stunningly beautiful.
For now, this is not a priority, though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for making those changes. good to merge
This PR changes the binary format for storing clks in the object store.
Previously, we implicitly inferred the entity ID from the index of the CLK in the list. That was OK, as we had the CLKs delivered in order. With the blocking however, we don't have this order any more, and thus, we have to keep track which CLK belongs to which entity ID in order to produce an interpret-able result...
I chose to encode the entity ID as an unsigned int. I know, there are more people on the earth than that, but hey, storage costs money...