Skip to content

Commit

Permalink
Improve serialization efficiency when objects are proxied (#120)
Browse files Browse the repository at this point in the history
* Allow ProxyStore v0.6

* Get store once when serializing Result

* Avoid double serialize when proxying values (#117)

Proxied objects end up being serialized twice: first when the object is
serialized with Colmena to check the size of the serialized object and
then again by ProxyStore when the object is proxied.

ProxyStore supports passing custom serializer/deserializer functions so
this commit adds a shim serializer and a deserializer wrapper to
minimize serialization overheads when an object gets proxied.

Note that there is still one extra step when using Colmena's pickle
serialization method and ProxyStore. SerializationMethod.serialize
will pickle the object producing a byte string then convert those bytes
to a hex string then the shims convert that hex string back to bytes.
I.e., there's this intermediate string representation that's created
then discarded in this path. I don't really see a way of avoiding this
because its an artifact of the different serialization types between
Colmena and ProxyStore (str vs bytes).
  • Loading branch information
gpauloski committed Dec 13, 2023
1 parent 208b547 commit 6ad3625
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 9 deletions.
89 changes: 81 additions & 8 deletions colmena/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from dataclasses import dataclass
from datetime import datetime
from enum import Enum
from functools import partial
from io import StringIO
from pathlib import Path
from subprocess import run
Expand Down Expand Up @@ -68,6 +69,62 @@ def deserialize(method: 'SerializationMethod', message: str) -> Any:
raise NotImplementedError(f'Method {method} not yet implemented')


def _serialized_str_to_bytes_shim(
s: str,
method: Union[str, SerializationMethod],
) -> bytes:
"""Shim between Colmena serialized objects and bytes.
Colmena's serialization mechanisms produce strings but ProxyStore
serializes to bytes, so this shim takes a an object serialized by Colmena
and converts it to bytes.
Args:
s: Serialized string object
method: Serialization method used to produce s
Returns:
bytes representation of s
"""
if method == "json":
return s.encode('utf-8')
elif method == "pickle":
# In this case the conversion goes from obj > bytes > str > bytes
# which results in an unecessary conversion to a string but this is
# an unavoidable side effect of converting between the Colmena
# and ProxyStore serialization formats.
return bytes.fromhex(s)
else:
raise NotImplementedError(f'Method {method} not yet implemented')


def _serialized_bytes_to_obj_wrapper(
b: str,
method: Union[str, SerializationMethod],
) -> Any:
"""Wrapper which converts bytes to strings before deserializing.
Args:
b: Byte string of serialized object
method: Serialization method used to produce b
Returns:
Deserialized object
"""
if method == "json":
s = b.decode('utf-8')
elif method == "pickle":
# In this case the conversion goes from bytes > str > bytes > obj
# which results in an unecessary conversion to a string but this is
# an unavoidable side effect of converting between the Colmena
# and ProxyStore serialization formats.
s = b.hex()
else:
raise NotImplementedError(f'Method {method} not yet implemented')

return SerializationMethod.serialize(method, s)


class FailureInformation(BaseModel):
"""Stores information about a task failure"""

Expand Down Expand Up @@ -276,6 +333,11 @@ def serialize(self) -> Tuple[float, List[Proxy]]:
_inputs = self.inputs
proxies = []

if self.proxystore_name is not None:
store = get_store(name=self.proxystore_name, config=self.proxystore_config)
else:
store = None

def _serialize_and_proxy(value, evict=False) -> Tuple[str, int]:
"""Helper function for serializing and proxying
Expand All @@ -296,19 +358,30 @@ def _serialize_and_proxy(value, evict=False) -> Tuple[str, int]:
value_size = sys.getsizeof(value_str)

if (
self.proxystore_name is not None and
store is not None and
self.proxystore_threshold is not None and
not isinstance(value, Proxy) and
value_size >= self.proxystore_threshold
):
# Proxy the value. We use the id of the object as the key
# so multiple copies of the object are not added to ProxyStore,
# but the value in ProxyStore will still be updated.
store = get_store(
name=self.proxystore_name,
config=self.proxystore_config,
# Override ProxyStore's default serialization with these shims
# to Colmena's serialization mechanisms. This avoids value
# being serialized twice: once to get the size of the
# serialized object and once by proxy().
deserializer = partial(
_serialized_bytes_to_obj_wrapper,
method=self.serialization_method,
)
serializer = partial(
_serialized_str_to_bytes_shim,
method=self.serialization_method,
)

value_proxy = store.proxy(
value_str,
evict=evict,
deserializer=deserializer,
serializer=serializer,
)
value_proxy = store.proxy(value, evict=evict)
logger.debug(f'Proxied object of type {type(value)} with id={id(value)}')
proxies.append(value_proxy)

Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ dependencies = [
"parsl>=2022",
"pydantic==1.*",
"redis>=4.3",
"proxystore==0.5.*"
"proxystore>=0.5.0,<0.7.0"
]

[tool.setuptools.packages.find]
Expand Down

0 comments on commit 6ad3625

Please sign in to comment.