Skip to content

Commit

Permalink
Projections added to python client (#451)
Browse files Browse the repository at this point in the history
* Projections added to python client

* Fixed projections example

* Identity unit tests, fixed changes requested on pr

* Fixed print statements in doc example

* Fix again print statements in python docs

* Fixed requested changes

* Python 2 test fix for HazelcastJsonValue
  • Loading branch information
RobHam99 committed Aug 19, 2021
1 parent c6149ab commit f05112e
Show file tree
Hide file tree
Showing 10 changed files with 338 additions and 1 deletion.
56 changes: 56 additions & 0 deletions docs/using_python_client_with_hazelcast_imdg.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2137,6 +2137,62 @@ See the following example.
# Average age is 30
print("Average age is %f" % average_age)
Projections
~~~~~~~~~~~

There are cases where instead of sending all the data returned by a query
from the server, you want to transform (strip down) each result object in order
to avoid redundant network traffic.

For example, you select all employees based on some criteria, but you just
want to return their name instead of the whole object. It is easily doable
with the Projections.

The ``projection`` module provides three projection functions:

- ``single_attribute``: Extracts a single attribute from an object and returns
it.
- ``multi_attribute``: Extracts multiple attributes from an object and returns
them as a ``list``.
- ``identity``: Returns the object as it is.

These projections are used with the ``map.project`` function, which takes an
optional predicate argument.

See the following example.

.. code:: python
import hazelcast
from hazelcast.core import HazelcastJsonValue
from hazelcast.predicate import greater_or_equal
from hazelcast.projection import single_attribute, multi_attribute
client = hazelcast.HazelcastClient()
employees = client.get_map("employees").blocking()
employees.put(1, HazelcastJsonValue('{"Age": 23, "Height": 180, "Weight": 60}'))
employees.put(2, HazelcastJsonValue('{"Age": 21, "Height": 170, "Weight": 70}'))
employee_ages = employees.project(single_attribute("Age"))
# Prints:
# The ages of employees are [21, 23]
print("The ages of employees are %s" % employee_ages)
# Run Single Attribute With Predicate
employee_ages = employees.project(single_attribute("Age"), greater_or_equal("Age", 23))
# Prints:
# The employee age is 23
print("The employee age is: %s" % employee_ages[0])
# Run Multi Attribute Projection
employee_multi_attribute = employees.project(multi_attribute("Age", "Height"))
# Prints:
# Employee 1 age and height: [21, 170] Employee 2 age and height: [23, 180]
print("Employee 1 age and height: %s Employee 2 age and height: %s" % (employee_multi_attribute[0], employee_multi_attribute[1]))
Performance
-----------

Expand Down
6 changes: 6 additions & 0 deletions hazelcast/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,12 @@ def __eq__(self, other):
def __ne__(self, other):
return not self.__eq__(other)

def __hash__(self):
return hash(self._json_string)

def __repr__(self):
return self._json_string


class MemberVersion(object):
"""
Expand Down
110 changes: 110 additions & 0 deletions hazelcast/projection.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
from hazelcast.serialization.api import IdentifiedDataSerializable

_PROJECTIONS_FACTORY_ID = -30


class Projection(object):
"""Marker base class for all projections.
Projections allow the client to transform (strip down) each query result
object in order to avoid redundant network traffic.
"""

pass


class _AbstractProjection(Projection, IdentifiedDataSerializable):
def write_data(self, object_data_output):
raise NotImplementedError("write_data")

def read_data(self, object_data_input):
pass

def get_factory_id(self):
return _PROJECTIONS_FACTORY_ID

def get_class_id(self):
raise NotImplementedError("get_class_id")


def _validate_attribute_path(attribute_path):
if not attribute_path:
raise ValueError("attribute_path must not be None or empty")

if "[any]" in attribute_path:
raise ValueError("attribute_path must not contain [any] operators")


class _SingleAttributeProjection(_AbstractProjection):
def __init__(self, attribute_path):
_validate_attribute_path(attribute_path)
self._attribute_path = attribute_path

def write_data(self, object_data_output):
object_data_output.write_string(self._attribute_path)

def get_class_id(self):
return 0


class _MultiAttributeProjection(_AbstractProjection):
def __init__(self, *attribute_paths):
if not attribute_paths:
raise ValueError("Specify at least one attribute path")

for attribute_path in attribute_paths:
_validate_attribute_path(attribute_path)

self.attribute_paths = attribute_paths

def write_data(self, object_data_output):
object_data_output.write_string_array(self.attribute_paths)

def get_class_id(self):
return 1


class _IdentityProjection(_AbstractProjection):
def write_data(self, object_data_output):
pass

def get_class_id(self):
return 2


def single_attribute(attribute_path):
"""Creates a projection that extracts the value of
the given attribute path.
Args:
attribute_path (str): Path to extract the attribute from.
Returns:
Projection[any]: A projection that extracts the value of the given
attribute path.
"""
return _SingleAttributeProjection(attribute_path)


def multi_attribute(*attribute_paths):
"""Creates a projection that extracts the values of
one or more attribute paths.
Args:
*attribute_paths (str): Paths to extract the attributes from.
Returns:
Projection[list]: A projection that extracts the values of the given
attribute paths.
"""
return _MultiAttributeProjection(*attribute_paths)


def identity():
"""Creates a projection that does no transformation.
Returns:
Projection[hazelcast.core.MapEntry]: A projection that does no
transformation.
"""
return _IdentityProjection()
23 changes: 23 additions & 0 deletions hazelcast/protocol/codec/map_project_codec.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from hazelcast.protocol.client_message import OutboundMessage, REQUEST_HEADER_SIZE, create_initial_buffer
from hazelcast.protocol.builtin import StringCodec
from hazelcast.protocol.builtin import DataCodec
from hazelcast.protocol.builtin import ListMultiFrameCodec

# hex: 0x013B00
_REQUEST_MESSAGE_TYPE = 80640
# hex: 0x013B01
_RESPONSE_MESSAGE_TYPE = 80641

_REQUEST_INITIAL_FRAME_SIZE = REQUEST_HEADER_SIZE


def encode_request(name, projection):
buf = create_initial_buffer(_REQUEST_INITIAL_FRAME_SIZE, _REQUEST_MESSAGE_TYPE)
StringCodec.encode(buf, name)
DataCodec.encode(buf, projection, True)
return OutboundMessage(buf, True)


def decode_response(msg):
msg.next_frame()
return ListMultiFrameCodec.decode_contains_nullable(msg, DataCodec.decode)
24 changes: 24 additions & 0 deletions hazelcast/protocol/codec/map_project_with_predicate_codec.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from hazelcast.protocol.client_message import OutboundMessage, REQUEST_HEADER_SIZE, create_initial_buffer
from hazelcast.protocol.builtin import StringCodec
from hazelcast.protocol.builtin import DataCodec
from hazelcast.protocol.builtin import ListMultiFrameCodec

# hex: 0x013C00
_REQUEST_MESSAGE_TYPE = 80896
# hex: 0x013C01
_RESPONSE_MESSAGE_TYPE = 80897

_REQUEST_INITIAL_FRAME_SIZE = REQUEST_HEADER_SIZE


def encode_request(name, projection, predicate):
buf = create_initial_buffer(_REQUEST_INITIAL_FRAME_SIZE, _REQUEST_MESSAGE_TYPE)
StringCodec.encode(buf, name)
DataCodec.encode(buf, projection)
DataCodec.encode(buf, predicate, True)
return OutboundMessage(buf, True)


def decode_response(msg):
msg.next_frame()
return ListMultiFrameCodec.decode_contains_nullable(msg, DataCodec.decode)
40 changes: 40 additions & 0 deletions hazelcast/proxy/map.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@
map_add_interceptor_codec,
map_aggregate_codec,
map_aggregate_with_predicate_codec,
map_project_codec,
map_project_with_predicate_codec,
map_execute_on_all_keys_codec,
map_execute_on_key_codec,
map_execute_on_keys_codec,
Expand Down Expand Up @@ -345,6 +347,44 @@ def handler(message):
request = map_aggregate_codec.encode_request(self.name, aggregator_data)
return self._invoke(request, handler)

def project(self, projection, predicate=None):
"""Applies the projection logic on map entries and filter the result with the
predicate, if given.
Args:
projection (hazelcast.projection.Projection): Projection to project the
entries with.
predicate (hazelcast.predicate.Predicate): Predicate to filter the entries
with.
Returns:
hazelcast.future.Future: The result of the projection.
"""
check_not_none(projection, "Projection can't be none")
projection_data = self._to_data(projection)
if predicate:
if isinstance(predicate, PagingPredicate):
raise AssertionError("Paging predicate is not supported.")

def handler(message):
return ImmutableLazyDataList(
map_project_with_predicate_codec.decode_response(message), self._to_object
)

predicate_data = self._to_data(predicate)
request = map_project_with_predicate_codec.encode_request(
self.name, projection_data, predicate_data
)
return self._invoke(request, handler)

def handler(message):
return ImmutableLazyDataList(
map_project_codec.decode_response(message), self._to_object
)

request = map_project_codec.encode_request(self.name, projection_data)
return self._invoke(request, handler)

def clear(self):
"""Clears the map.
Expand Down
1 change: 1 addition & 0 deletions hazelcast/serialization/serialization_const.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
JAVA_DEFAULT_TYPE_CLASS = -24
JAVA_DEFAULT_TYPE_DATE = -25
JAVA_DEFAULT_TYPE_BIG_INTEGER = -26
JAVA_DEFAULT_TYPE_ARRAY = -28
JAVA_DEFAULT_TYPE_ARRAY_LIST = -29
JAVA_DEFAULT_TYPE_LINKED_LIST = -30
JAVASCRIPT_JSON_SERIALIZATION_TYPE = -130
Expand Down
11 changes: 11 additions & 0 deletions hazelcast/serialization/serializer.py
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,17 @@ def get_type_id(self):
return JAVA_DEFAULT_TYPE_CLASS


class ArraySerializer(BaseSerializer):
def read(self, inp):
size = inp.read_int()
return [inp.read_object() for _ in range(size)]

# "write(self, out, obj)" is never called so not implemented here

def get_type_id(self):
return JAVA_DEFAULT_TYPE_ARRAY


class ArrayListSerializer(BaseSerializer):
def read(self, inp):
size = inp.read_int()
Expand Down
1 change: 1 addition & 0 deletions hazelcast/serialization/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ def _register_constant_serializers(self):
self._registry.register_constant_serializer(DateTimeSerializer(), datetime)
self._registry.register_constant_serializer(BigIntegerSerializer())
self._registry.register_constant_serializer(JavaClassSerializer())
self._registry.register_constant_serializer(ArraySerializer())
self._registry.register_constant_serializer(ArrayListSerializer(), list)
self._registry.register_constant_serializer(LinkedListSerializer())
self._registry.register_constant_serializer(
Expand Down

0 comments on commit f05112e

Please sign in to comment.