-
Notifications
You must be signed in to change notification settings - Fork 101
feat(spanner): add lazy decode to partitioned query #1411
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
ac31c75
to
0dff8f6
Compare
*, | ||
retry=gapic_v1.method.DEFAULT, | ||
timeout=gapic_v1.method.DEFAULT, | ||
lazy_decode=False, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: add documentation for this new argument
tests/unit/test_merged_result_set.py
Outdated
@@ -0,0 +1,128 @@ | |||
# Copyright 2024 Google LLC All rights reserved. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: 2025
def decode_row(self, row: []) -> []: | ||
"""Decodes a row from protobuf values to Python objects. This function | ||
should only be called for result sets that use ``lazy_decoding=True``. | ||
The array that is returned by this function is the same as the array | ||
that would have been returned by the rows iterator if ``lazy_decoding=False``. | ||
:returns: an array containing the decoded values of all the columns in the given row | ||
""" | ||
if not isinstance(row, (list, tuple)): | ||
raise TypeError("row must be an array of protobuf values") | ||
decoders = self._decoders | ||
return [ | ||
_parse_nullable(row[index], decoders[index]) for index in range(len(row)) | ||
] | ||
|
||
def decode_column(self, row: [], column_index: int): | ||
"""Decodes a column from a protobuf value to a Python object. This function | ||
should only be called for result sets that use ``lazy_decoding=True``. | ||
The object that is returned by this function is the same as the object | ||
that would have been returned by the rows iterator if ``lazy_decoding=False``. | ||
:returns: the decoded column value | ||
""" | ||
if not isinstance(row, (list, tuple)): | ||
raise TypeError("row must be an array of protobuf values") | ||
decoders = self._decoders | ||
return _parse_nullable(row[column_index], decoders[column_index]) | ||
|
||
@property | ||
def _decoders(self): | ||
if self.metadata is None: | ||
raise ValueError("iterator not started") | ||
return [ | ||
_get_type_decoder(field.type_, field.name, None) | ||
for field in self.metadata.row_type.fields | ||
] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this code really needed? A MergedResultSet just delegates all underlying logic to the 'normal' ResultSets. Those already support lazy decoding. So could we not just call the decode_row
/decode_column
function of one of the underlying ResultSets instead?
""" | ||
if not isinstance(row, (list, tuple)): | ||
raise TypeError("row must be an array of protobuf values") | ||
decoders = self._decoders |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we cache this instead of reconstructing it for every row?
0dff8f6
to
059cfff
Compare
Add lazy decode to partitioned query
This commit introduces a
lazy_decode
option toBatchSnapshot.run_partitioned_query
. When set toTrue
, the result set yields raw protobuf objects instead of decoded Python objects.This allows the CPU-intensive decoding work to be deferred and managed by the caller, which can significantly improve performance in multi-threaded applications by reducing GIL contention.
To support this,
MergedResultSet
now includesdecode_row()
anddecode_column()
methods for manual decoding of results.