Skip to content

Commit

Permalink
Merge pull request #101 from exalearn/proxystore-v0.5.0
Browse files Browse the repository at this point in the history
Update to Proxystore v0.5.0
  • Loading branch information
gpauloski committed May 14, 2023
2 parents e310cf8 + 7f0b372 commit f0e2464
Show file tree
Hide file tree
Showing 9 changed files with 90 additions and 163 deletions.
6 changes: 2 additions & 4 deletions colmena/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,7 @@ class Result(BaseModel):
keep_inputs: bool = Field(True, description="Whether to keep the inputs with the result object or delete "
"them after the method has completed")
proxystore_name: Optional[str] = Field(None, description="Name of ProxyStore backend you use for transferring large objects")
proxystore_type: Optional[str] = Field(None, description="Type of ProxyStore backend being used")
proxystore_kwargs: Optional[Dict] = Field(None, description="Kwargs to reinitialize ProxyStore backend")
proxystore_config: Optional[Dict] = Field(None, description="ProxyStore backend configuration")
proxystore_threshold: Optional[int] = Field(None,
description="Proxy all input/output objects larger than this threshold in bytes")

Expand Down Expand Up @@ -307,8 +306,7 @@ def _serialize_and_proxy(value, evict=False) -> Tuple[str, int]:
# but the value in ProxyStore will still be updated.
store = get_store(
name=self.proxystore_name,
kind=self.proxystore_type,
**self.proxystore_kwargs,
config=self.proxystore_config,
)
value_proxy = store.proxy(value, evict=evict)
logger.debug(f'Proxied object of type {type(value)} with id={id(value)}')
Expand Down
45 changes: 10 additions & 35 deletions colmena/proxy.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
"""Utilities for interacting with ProxyStore"""
import logging
import importlib
import warnings
from dataclasses import asdict

import proxystore
from proxystore.proxy import extract
Expand All @@ -11,7 +9,7 @@
from proxystore.store.base import Store
from proxystore.store.utils import resolve_async, get_key

from typing import Any, Union, List, Optional, Type
from typing import Any, Dict, Union, List, Optional

logger = logging.getLogger(__name__)

Expand All @@ -20,47 +18,24 @@ class ProxyJSONSerializationWarning(Warning):
pass


def get_class_path(cls: Type[Any]) -> str:
"""Get the fully qualified pass of a type."""
return f'{cls.__module__}.{cls.__qualname__}'


def import_class(path: str) -> Type[Any]:
"""Import class via its fully qualified pass."""
module_path, _, name = path.rpartition('.')
if len(module_path) == 0:
raise ImportError(f'Class path must contain at least one module. Got {path}')
module = importlib.import_module(module_path)
return getattr(module, name)


def get_store(
name: str,
kind: Optional[Union[Type[Store], str]] = None,
**kwargs: Any,
config: Optional[Dict[str, Any]] = None,
) -> Optional[Store]:
"""Get a Store by name or create one if it does not already exist.
Args:
name (str): name of the store.
kind (type[Store], str): if ``None``, (the default) this function will
lookup the store by `name` returning either the found store or
``None``. If not ``None`` and a store by `name` does not exist,
`kind` will be used to initialize and register a new store. The
type of `kind` can be a string with the fully qualified class path
or the class type itself.
kwargs: keyword arguments to initialize the store with. Only used if
a store does not already exist and `kind` is not ``None``.
config: ``Store`` configuration that can be used to reinitialize the
``Store`` if provided and a store with `name` is not found.
Returns:
The store registered as `name` or a newly intialized and registered
store if `kind` is not ``None``.
"""
store = proxystore.store.get_store(name)
if store is None and kind is not None:
if isinstance(kind, str):
kind = import_class(kind)
store = kind(name=name, **kwargs)
if store is None and config is not None:
store = Store.from_config(config)
proxystore.store.register_store(store)
return store

Expand Down Expand Up @@ -163,11 +138,11 @@ def store_proxy_stats(proxy: Proxy, proxy_timing: dict):
key = str(key)

# Get the store associated with this proxy
store = get_store(proxy)
if store.has_stats:
store = proxystore.store.get_store(proxy)
if store.metrics is not None:
# Get the stats and convert them to a JSON-serializable form
stats = store.stats(proxy)
stats = dict((k, asdict(v)) for k, v in stats.items())
metrics = store.metrics.get_metrics(proxy)
stats = metrics.as_dict() if metrics is not None else {}
else:
stats = {}

Expand Down
42 changes: 19 additions & 23 deletions colmena/queue/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,9 @@
from typing import Optional, Tuple, Any, Collection, Union, Dict, Set
import logging

import proxystore as ps
import proxystore.store

from colmena.models import Result, SerializationMethod, ResourceRequirements
from colmena.proxy import get_class_path

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -40,10 +39,11 @@ def __init__(self,
topics: Names of topics that are known for this queue
serialization_method: Method used to serialize task inputs and results
keep_inputs: Whether to return task inputs with the result object
proxystore_name (str, dict): Name of ProxyStore backend to use for all
topics or a mapping of topic to ProxyStore backend for specifying
backends for certain tasks. If a mapping is provided but a topic is
not in the mapping, ProxyStore will not be used.
proxystore_name (str, dict): Name of a registered ProxyStore
`Store` instance. This can be a single name such that the
corresponding `Store` is used for all topics or a mapping of
topics to registered `Store` names. If a mapping is provided
but a topic is not in the mapping, ProxyStore will not be used.
proxystore_threshold (int, dict): Threshold in bytes for using
ProxyStore to transfer objects. Optionally can pass a dict
mapping topics to threshold to use different threshold values
Expand Down Expand Up @@ -80,11 +80,11 @@ def __init__(self,
for ps_name in set(self.proxystore_name.values()):
if ps_name is None:
continue
store = ps.store.get_store(ps_name)
store = proxystore.store.get_store(ps_name)
if store is None:
raise ValueError(
f'ProxyStore backend with name "{ps_name}" was not '
'found. This is likely because the store needs to be '
f'A Store with name "{ps_name}" has not been registered. '
'This is likely because the store needs to be '
'initialized prior to initializing the Colmena queue.'
)

Expand Down Expand Up @@ -213,21 +213,17 @@ def send_inputs(self,
_keep_inputs = keep_inputs

# Gather ProxyStore info if we are using it with this topic
proxystore_kwargs = {}
if (
self.proxystore_name[topic] is not None and
self.proxystore_threshold[topic] is not None
):
store = ps.store.get_store(self.proxystore_name[topic])
ps_name = self.proxystore_name[topic]
ps_threshold = self.proxystore_threshold[topic]
ps_kwargs = {}
if ps_name is not None and ps_threshold is not None:
store = proxystore.store.get_store(ps_name)
# proxystore_kwargs contains all the information we would need to
# reconnect to the ProxyStore backend on any worker
proxystore_kwargs.update({
'proxystore_name': self.proxystore_name[topic],
'proxystore_threshold': self.proxystore_threshold[topic],
# Pydantic prefers to not have types as attributes, so we
# get the string corresponding to the type of the store we use
'proxystore_type': get_class_path(type(store)),
'proxystore_kwargs': store.kwargs
ps_kwargs.update({
'proxystore_name': ps_name,
'proxystore_threshold': ps_threshold,
'proxystore_config': store.config(),
})

# Create a new Result object
Expand All @@ -238,7 +234,7 @@ def send_inputs(self,
serialization_method=self.serialization_method,
task_info=task_info,
resources=resources or ResourceRequirements(), # Takes either the user specified or a default
**proxystore_kwargs
**ps_kwargs
)

# Push the serialized value to the task server
Expand Down
16 changes: 9 additions & 7 deletions colmena/task_server/tests/test_base.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
from typing import Any, Dict, Tuple, List, Optional
from pathlib import Path

from proxystore.connectors.file import FileConnector
from proxystore.store import Store
from proxystore.store import register_store
from proxystore.store import unregister_store
from proxystore.store.file import FileStore
from pytest import fixture

from colmena.models import Result, ExecutableTask, SerializationMethod
Expand Down Expand Up @@ -44,9 +46,10 @@ def test_run_with_executable():

@fixture
def store(tmpdir):
store = FileStore(name='store', store_dir=tmpdir, stats=True)
yield store
unregister_store('store')
with Store('store', FileConnector(tmpdir), metrics=True) as store:
register_store(store)
yield store
unregister_store(store)


def test_run_function(store):
Expand All @@ -59,9 +62,8 @@ def test_run_function(store):
# Make the result and configure it to use the store
result = Result(inputs=(('a' * 1024,), {}))
result.proxystore_name = store.name
result.proxystore_type = f'{store.__class__.__module__}.{store.__class__.__name__}'
result.proxystore_threshold = 128
result.proxystore_kwargs = store.kwargs
result.proxystore_config = store.config()

# Serialize it
result.serialization_method = SerializationMethod.PICKLE
Expand All @@ -79,4 +81,4 @@ def test_run_function(store):

# Make sure we have stats for both proxies
assert len(result.proxy_timing) == 2
assert all('set_bytes' in v for v in result.proxy_timing.values())
assert all('store.proxy' in v['times'] for v in result.proxy_timing.values())
17 changes: 10 additions & 7 deletions colmena/task_server/tests/test_parsl.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@
from parsl import HighThroughputExecutor
from parsl.config import Config
from pytest import fixture, mark
import proxystore
from proxystore.store.redis import RedisStore

from proxystore.connectors.redis import RedisConnector
from proxystore.store import Store
from proxystore.store import register_store
from proxystore.store import unregister_store

from colmena.queue.base import ColmenaQueues

Expand Down Expand Up @@ -46,11 +49,11 @@ def config(tmpdir):
# Make a proxy store for larger objects
@fixture(scope='module')
def store():
store = RedisStore('store', hostname='localhost', port=6379, stats=True)
proxystore.store.register_store(store)
yield store
proxystore.store.unregister_store(store.name)
store.close()
connector = RedisConnector(hostname='localhost', port=6379)
with Store('store', connector=connector, metrics=True) as store:
register_store(store)
yield store
unregister_store(store.name)


@fixture(autouse=True)
Expand Down
64 changes: 9 additions & 55 deletions colmena/tests/test_proxy.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,14 @@
"""Test for ProxyStore utilities."""
import json
from typing import Any
from typing import Type

import pytest

from proxystore.proxy import Proxy
from proxystore.store import register_store
from proxystore.store import unregister_store
from proxystore.store.base import Store
from proxystore.store.file import FileStore
from proxystore.store.local import LocalStore
from proxystore.connectors.local import LocalConnector

from colmena.proxy import get_class_path
from colmena.proxy import import_class
from colmena.proxy import get_store
from colmena.proxy import proxy_json_encoder
from colmena.proxy import resolve_proxies_async
Expand All @@ -25,69 +20,28 @@ class ExampleStore(Store):

@pytest.fixture
def proxy() -> Proxy:
with LocalStore('proxy-fixture-store') as store:
with Store('proxy-fixture-store', LocalConnector()) as store:
yield store.proxy('test-value')


@pytest.mark.parametrize(
'cls,expected',
(
(FileStore, 'proxystore.store.file.FileStore'),
(LocalStore, 'proxystore.store.local.LocalStore'),
# This directory has no __init__.py so the module is test_proxy
# rather than colmena.tests.test_proxy.
(ExampleStore, 'test_proxy.ExampleStore'),
),
)
def test_get_class_path(cls: Type[Any], expected: str) -> None:
assert get_class_path(cls) == expected


@pytest.mark.parametrize(
'path,expected',
(
('proxystore.store.file.FileStore', FileStore),
('proxystore.store.local.LocalStore', LocalStore),
('test_proxy.ExampleStore', ExampleStore),
('typing.Any', Any),
),
)
def test_import_class(path: str, expected: Type[Any]) -> None:
assert import_class(path) == expected


def test_import_class_missing_path() -> None:
with pytest.raises(ImportError):
import_class('FileStore')


def test_get_store_already_registered() -> None:
store = LocalStore('test-store')
store = Store('test-store', LocalConnector())
register_store(store)
assert get_store('test-store') is store
unregister_store(store.name)
unregister_store(store)


def test_get_store_missing() -> None:
assert get_store('test-store') is None


def test_get_store_initialize_by_type() -> None:
store = get_store('test-store', LocalStore, cache_size=0)
# Verify get_store registered the store globally by getting it again
assert get_store('test-store') is store
unregister_store(store.name)


def test_get_store_initialize_by_str() -> None:
store = get_store(
'test-store',
'proxystore.store.local.LocalStore',
cache_size=0,
)
def test_get_store_initialize_from_config() -> None:
# Create a temp store just to get the config
config = Store('test-store', LocalConnector()).config()
store = get_store('test-store', config)
# Verify get_store registered the store globally by getting it again
assert get_store('test-store') is store
unregister_store(store.name)
unregister_store(store)


def test_proxy_json_encoder() -> None:
Expand Down
Loading

0 comments on commit f0e2464

Please sign in to comment.