Skip to content
Permalink
Browse files
IGNITE-14705 Fix handling collections with binary objects - Fixes #37.
  • Loading branch information
ivandasch committed May 13, 2021
1 parent 9f72781 commit 746dd1315a0a86d8f1c501d80f3a91dfafd0f648
Show file tree
Hide file tree
Showing 17 changed files with 260 additions and 174 deletions.
@@ -31,7 +31,7 @@
from .exceptions import BinaryTypeError, CacheError, ReconnectError, connection_errors
from .queries.query import CacheInfo
from .stream import AioBinaryStream, READ_BACKWARD
from .utils import cache_id, entity_id, status_to_exception, is_wrapped
from .utils import cache_id, entity_id, status_to_exception


__all__ = ['AioClient']
@@ -269,11 +269,24 @@ async def unwrap_binary(self, value: Any) -> Any:
:return: the result of the Binary Object unwrapping with all other data
left intact.
"""
if is_wrapped(value):
blob, offset = value
with AioBinaryStream(self, blob) as stream:
data_class = await BinaryObject.parse_async(stream)
return await BinaryObject.to_python_async(stream.read_ctype(data_class, direction=READ_BACKWARD), self)
if isinstance(value, tuple) and len(value) == 2:
if type(value[0]) is bytes and type(value[1]) is int:
blob, offset = value
with AioBinaryStream(self, blob) as stream:
data_class = await BinaryObject.parse_async(stream)
return await BinaryObject.to_python_async(stream.read_ctype(data_class, direction=READ_BACKWARD),
client=self)

if isinstance(value[0], int):
col_type, collection = value
if isinstance(collection, list):
coros = [self.unwrap_binary(v) for v in collection]
return col_type, await asyncio.gather(*coros)

if isinstance(collection, dict):
coros = [asyncio.gather(self.unwrap_binary(k), self.unwrap_binary(v))
for k, v in collection.items()]
return col_type, dict(await asyncio.gather(*coros))
return value

@status_to_exception(CacheError)
@@ -351,7 +364,7 @@ async def get_best_node(

key, key_hint = self._get_affinity_key(c_id, key, key_hint)

hashcode = await key_hint.hashcode_async(key, self)
hashcode = await key_hint.hashcode_async(key, client=self)

best_node = self._get_node_by_hashcode(c_id, hashcode, parts)
if best_node:
@@ -61,7 +61,7 @@
from .queries.query import CacheInfo
from .stream import BinaryStream, READ_BACKWARD
from .utils import (
cache_id, capitalize, entity_id, schema_id, process_delimiter, status_to_exception, is_iterable, is_wrapped,
cache_id, capitalize, entity_id, schema_id, process_delimiter, status_to_exception, is_iterable,
get_field_by_id, unsigned
)
from .binary import GenericObjectMeta
@@ -539,17 +539,26 @@ def query_binary_type(self, binary_type: Union[int, str], schema: Union[int, dic

def unwrap_binary(self, value: Any) -> Any:
"""
Detects and recursively unwraps Binary Object.
Detects and recursively unwraps Binary Object or collections of BinaryObject.
:param value: anything that could be a Binary Object,
:param value: anything that could be a Binary Object or collection of BinaryObject,
:return: the result of the Binary Object unwrapping with all other data
left intact.
"""
if is_wrapped(value):
blob, offset = value
with BinaryStream(self, blob) as stream:
data_class = BinaryObject.parse(stream)
return BinaryObject.to_python(stream.read_ctype(data_class, direction=READ_BACKWARD), self)
if isinstance(value, tuple) and len(value) == 2:
if type(value[0]) is bytes and type(value[1]) is int:
blob, offset = value
with BinaryStream(self, blob) as stream:
data_class = BinaryObject.parse(stream)
return BinaryObject.to_python(stream.read_ctype(data_class, direction=READ_BACKWARD), client=self)

if isinstance(value[0], int):
col_type, collection = value
if isinstance(collection, list):
return col_type, [self.unwrap_binary(v) for v in collection]

if isinstance(collection, dict):
return col_type, {self.unwrap_binary(k): self.unwrap_binary(v) for k, v in collection.items()}
return value

@status_to_exception(CacheError)
@@ -619,7 +628,7 @@ def get_best_node(
return conn

key, key_hint = self._get_affinity_key(c_id, key, key_hint)
hashcode = key_hint.hashcode(key, self)
hashcode = key_hint.hashcode(key, client=self)

best_node = self._get_node_by_hashcode(c_id, hashcode, parts)
if best_node:
@@ -48,11 +48,11 @@ class IgniteDataType(metaclass=IgniteDataTypeMeta):
classes, both object and payload varieties.
"""
@classmethod
async def hashcode_async(cls, value, *args, **kwargs):
return cls.hashcode(value, *args, **kwargs)
async def hashcode_async(cls, value, **kwargs):
return cls.hashcode(value, **kwargs)

@classmethod
def hashcode(cls, value, *args, **kwargs):
def hashcode(cls, value, **kwargs):
return 0

@classmethod
@@ -72,9 +72,9 @@ async def from_python_async(cls, stream, value, **kwargs):
cls.from_python(stream, value, **kwargs)

@classmethod
def to_python(cls, ctypes_object, *args, **kwargs):
def to_python(cls, ctypes_object, **kwargs):
raise NotImplementedError

@classmethod
async def to_python_async(cls, ctypes_object, *args, **kwargs):
return cls.to_python(ctypes_object, *args, **kwargs)
async def to_python_async(cls, ctypes_object, **kwargs):
return cls.to_python(ctypes_object, **kwargs)
@@ -117,12 +117,12 @@ async def parse_async(cls, stream):
return cls.parse(stream)

@classmethod
def to_python(cls, ctypes_object, *args, **kwargs):
return cls.prop_data_class.to_python(ctypes_object.data, *args, **kwargs)
def to_python(cls, ctypes_object, **kwargs):
return cls.prop_data_class.to_python(ctypes_object.data, **kwargs)

@classmethod
async def to_python_async(cls, ctypes_object, *args, **kwargs):
return cls.to_python(ctypes_object, *args, **kwargs)
async def to_python_async(cls, ctypes_object, **kwargs):
return cls.to_python(ctypes_object, **kwargs)

@classmethod
def from_python(cls, stream, value):
@@ -302,6 +302,6 @@ def from_python(cls, stream, value):
)

@classmethod
def to_python(cls, ctypes_object, *args, **kwargs):
def to_python(cls, ctypes_object, **kwargs):
prop_data_class = prop_map(ctypes_object.prop_code)
return prop_data_class.to_python(ctypes_object.data, *args, **kwargs)
return prop_data_class.to_python(ctypes_object.data, **kwargs)
@@ -90,22 +90,21 @@ def __build_final_class(cls, fields):
)

@classmethod
def to_python_not_null(cls, ctypes_object, *args, **kwargs):
def to_python_not_null(cls, ctypes_object, **kwargs):
result = []
for i in range(ctypes_object.length):
result.append(
AnyDataObject.to_python(
getattr(ctypes_object, f'element_{i}'),
*args, **kwargs
getattr(ctypes_object, f'element_{i}'), **kwargs
)
)
return ctypes_object.type_id, result

@classmethod
async def to_python_not_null_async(cls, ctypes_object, *args, **kwargs):
async def to_python_not_null_async(cls, ctypes_object, **kwargs):
result = [
await AnyDataObject.to_python_async(
getattr(ctypes_object, f'element_{i}'), *args, **kwargs
getattr(ctypes_object, f'element_{i}'), **kwargs
)
for i in range(ctypes_object.length)]
return ctypes_object.type_id, result
@@ -223,8 +222,6 @@ class CollectionObject(Nullable):
_type_id = TYPE_COL
_header_class = None
type_code = TC_COLLECTION
pythonic = list
default = []

@classmethod
def parse_not_null(cls, stream):
@@ -271,15 +268,15 @@ def __build_final_class(cls, fields):
@classmethod
def to_python_not_null(cls, ctypes_object, *args, **kwargs):
result = [
AnyDataObject.to_python(getattr(ctypes_object, f'element_{i}'), *args, **kwargs)
AnyDataObject.to_python(getattr(ctypes_object, f'element_{i}'), **kwargs)
for i in range(ctypes_object.length)
]
return ctypes_object.type, result

@classmethod
async def to_python_not_null_async(cls, ctypes_object, *args, **kwargs):
result_coro = [
AnyDataObject.to_python_async(getattr(ctypes_object, f'element_{i}'), *args, **kwargs)
AnyDataObject.to_python_async(getattr(ctypes_object, f'element_{i}'), **kwargs)
for i in range(ctypes_object.length)
]

@@ -361,35 +358,27 @@ def __build_final_class(cls, fields):
)

@classmethod
def _to_python(cls, ctypes_object, *args, **kwargs):
def _to_python(cls, ctypes_object, **kwargs):
map_cls = cls.__get_map_class(ctypes_object)

result = map_cls()
for i in range(0, ctypes_object.length << 1, 2):
k = AnyDataObject.to_python(
getattr(ctypes_object, f'element_{i}'),
*args, **kwargs
)
v = AnyDataObject.to_python(
getattr(ctypes_object, f'element_{i + 1}'),
*args, **kwargs
)
k = AnyDataObject.to_python(getattr(ctypes_object, f'element_{i}'), **kwargs)
v = AnyDataObject.to_python(getattr(ctypes_object, f'element_{i + 1}'), **kwargs)
result[k] = v
return result

@classmethod
async def _to_python_async(cls, ctypes_object, *args, **kwargs):
async def _to_python_async(cls, ctypes_object, **kwargs):
map_cls = cls.__get_map_class(ctypes_object)

kv_pairs_coro = [
asyncio.gather(
AnyDataObject.to_python_async(
getattr(ctypes_object, f'element_{i}'),
*args, **kwargs
getattr(ctypes_object, f'element_{i}'), **kwargs
),
AnyDataObject.to_python_async(
getattr(ctypes_object, f'element_{i + 1}'),
*args, **kwargs
getattr(ctypes_object, f'element_{i + 1}'), **kwargs
)
) for i in range(0, ctypes_object.length << 1, 2)
]
@@ -449,12 +438,12 @@ def _parse_header(cls, stream):
return [('length', ctypes.c_int)], length

@classmethod
def to_python(cls, ctypes_object, *args, **kwargs):
return cls._to_python(ctypes_object, *args, **kwargs)
def to_python(cls, ctypes_object, **kwargs):
return cls._to_python(ctypes_object, **kwargs)

@classmethod
async def to_python_async(cls, ctypes_object, *args, **kwargs):
return await cls._to_python_async(ctypes_object, *args, **kwargs)
async def to_python_async(cls, ctypes_object, **kwargs):
return await cls._to_python_async(ctypes_object, **kwargs)

@classmethod
def from_python(cls, stream, value, type_id=None):
@@ -484,8 +473,6 @@ class MapObject(Nullable, _MapBase):
_type_name = NAME_MAP
_type_id = TYPE_MAP
type_code = TC_MAP
pythonic = dict
default = {}

@classmethod
def parse_not_null(cls, stream):
@@ -507,12 +494,12 @@ def _parse_header(cls, stream):
return fields, length

@classmethod
def to_python_not_null(cls, ctypes_object, *args, **kwargs):
return ctypes_object.type, cls._to_python(ctypes_object, *args, **kwargs)
def to_python_not_null(cls, ctypes_object, **kwargs):
return ctypes_object.type, cls._to_python(ctypes_object, **kwargs)

@classmethod
async def to_python_not_null_async(cls, ctypes_object, *args, **kwargs):
return ctypes_object.type, await cls._to_python_async(ctypes_object, *args, **kwargs)
async def to_python_not_null_async(cls, ctypes_object, **kwargs):
return ctypes_object.type, await cls._to_python_async(ctypes_object, **kwargs)

@classmethod
def from_python_not_null(cls, stream, value, **kwargs):
@@ -557,7 +544,7 @@ class BinaryObject(Nullable):
COMPACT_FOOTER = 0x0020

@classmethod
def hashcode(cls, value: object, client: Optional['Client']) -> int:
def hashcode(cls, value: object, client: Optional['Client'] = None) -> int:
# binary objects's hashcode implementation is special in the sense
# that you need to fully serialize the object to calculate
# its hashcode
@@ -568,7 +555,7 @@ def hashcode(cls, value: object, client: Optional['Client']) -> int:
return value._hashcode

@classmethod
async def hashcode_async(cls, value: object, client: Optional['AioClient']) -> int:
async def hashcode_async(cls, value: object, client: Optional['AioClient'] = None) -> int:
if not value._hashcode and client:
with AioBinaryStream(client) as stream:
await value._from_python_async(stream, save_to_buf=True)
@@ -680,7 +667,7 @@ def __build_final_class(cls, stream, header, header_class, object_fields, fields
return final_class

@classmethod
def to_python_not_null(cls, ctypes_object, client: 'Client' = None, *args, **kwargs):
def to_python_not_null(cls, ctypes_object, client: 'Client' = None, **kwargs):
type_id = ctypes_object.type_id
if not client:
raise ParseError(f'Can not query binary type {type_id}')
@@ -692,14 +679,13 @@ def to_python_not_null(cls, ctypes_object, client: 'Client' = None, *args, **kwa
for field_name, field_type in data_class.schema.items():
setattr(
result, field_name, field_type.to_python(
getattr(ctypes_object.object_fields, field_name),
client, *args, **kwargs
getattr(ctypes_object.object_fields, field_name), client=client, **kwargs
)
)
return result

@classmethod
async def to_python_not_null_async(cls, ctypes_object, client: 'AioClient' = None, *args, **kwargs):
async def to_python_not_null_async(cls, ctypes_object, client: 'AioClient' = None, **kwargs):
type_id = ctypes_object.type_id
if not client:
raise ParseError(f'Can not query binary type {type_id}')
@@ -711,7 +697,7 @@ async def to_python_not_null_async(cls, ctypes_object, client: 'AioClient' = Non
field_values = await asyncio.gather(
*[
field_type.to_python_async(
getattr(ctypes_object.object_fields, field_name), client, *args, **kwargs
getattr(ctypes_object.object_fields, field_name), client=client, **kwargs
)
for field_name, field_type in data_class.schema.items()
]
@@ -80,14 +80,14 @@ async def parse_async(cls, stream):
return cls.parse(stream)

@classmethod
def to_python(cls, ctypes_object):
def to_python(cls, ctypes_object, **kwargs):
if ctypes_object == 0:
return None

return ExpiryPolicy(create=ctypes_object.create, update=ctypes_object.update, access=ctypes_object.access)

@classmethod
async def to_python_async(cls, ctypes_object):
async def to_python_async(cls, ctypes_object, **kwargs):
return cls.to_python(ctypes_object)

@classmethod

0 comments on commit 746dd13

Please sign in to comment.