From 69bb03c1bb974c480037453190e710d75af44c45 Mon Sep 17 00:00:00 2001 From: Greg Pauloski <18683347+gpauloski@users.noreply.github.com> Date: Wed, 26 Apr 2023 20:55:20 -0500 Subject: [PATCH 1/2] Update to ProxyStore v0.5.0 --- colmena/models.py | 6 +-- colmena/proxy.py | 45 ++++------------- colmena/queue/base.py | 42 ++++++++-------- colmena/task_server/tests/test_base.py | 16 ++++--- colmena/task_server/tests/test_parsl.py | 17 ++++--- colmena/tests/test_proxy.py | 64 ++++--------------------- docs/quickstart.rst | 18 +++---- pyproject.toml | 2 +- 8 files changed, 69 insertions(+), 141 deletions(-) diff --git a/colmena/models.py b/colmena/models.py index 7972929..fbb4942 100644 --- a/colmena/models.py +++ b/colmena/models.py @@ -161,8 +161,7 @@ class Result(BaseModel): keep_inputs: bool = Field(True, description="Whether to keep the inputs with the result object or delete " "them after the method has completed") proxystore_name: Optional[str] = Field(None, description="Name of ProxyStore backend you use for transferring large objects") - proxystore_type: Optional[str] = Field(None, description="Type of ProxyStore backend being used") - proxystore_kwargs: Optional[Dict] = Field(None, description="Kwargs to reinitialize ProxyStore backend") + proxystore_config: Optional[Dict] = Field(None, description="ProxyStore backend configuration") proxystore_threshold: Optional[int] = Field(None, description="Proxy all input/output objects larger than this threshold in bytes") @@ -307,8 +306,7 @@ def _serialize_and_proxy(value, evict=False) -> Tuple[str, int]: # but the value in ProxyStore will still be updated. store = get_store( name=self.proxystore_name, - kind=self.proxystore_type, - **self.proxystore_kwargs, + config=self.proxystore_config, ) value_proxy = store.proxy(value, evict=evict) logger.debug(f'Proxied object of type {type(value)} with id={id(value)}') diff --git a/colmena/proxy.py b/colmena/proxy.py index 5b2af89..f22e33d 100644 --- a/colmena/proxy.py +++ b/colmena/proxy.py @@ -1,8 +1,6 @@ """Utilities for interacting with ProxyStore""" import logging -import importlib import warnings -from dataclasses import asdict import proxystore from proxystore.proxy import extract @@ -11,7 +9,7 @@ from proxystore.store.base import Store from proxystore.store.utils import resolve_async, get_key -from typing import Any, Union, List, Optional, Type +from typing import Any, Dict, Union, List, Optional logger = logging.getLogger(__name__) @@ -20,47 +18,24 @@ class ProxyJSONSerializationWarning(Warning): pass -def get_class_path(cls: Type[Any]) -> str: - """Get the fully qualified pass of a type.""" - return f'{cls.__module__}.{cls.__qualname__}' - - -def import_class(path: str) -> Type[Any]: - """Import class via its fully qualified pass.""" - module_path, _, name = path.rpartition('.') - if len(module_path) == 0: - raise ImportError(f'Class path must contain at least one module. Got {path}') - module = importlib.import_module(module_path) - return getattr(module, name) - - def get_store( name: str, - kind: Optional[Union[Type[Store], str]] = None, - **kwargs: Any, + config: Optional[Dict[str, Any]] = None, ) -> Optional[Store]: """Get a Store by name or create one if it does not already exist. Args: name (str): name of the store. - kind (type[Store], str): if ``None``, (the default) this function will - lookup the store by `name` returning either the found store or - ``None``. If not ``None`` and a store by `name` does not exist, - `kind` will be used to initialize and register a new store. The - type of `kind` can be a string with the fully qualified class path - or the class type itself. - kwargs: keyword arguments to initialize the store with. Only used if - a store does not already exist and `kind` is not ``None``. + config: ``Store`` configuration that can be used to reinitialize the + ``Store`` if provided and a store with `name` is not found. Returns: The store registered as `name` or a newly intialized and registered store if `kind` is not ``None``. """ store = proxystore.store.get_store(name) - if store is None and kind is not None: - if isinstance(kind, str): - kind = import_class(kind) - store = kind(name=name, **kwargs) + if store is None and config is not None: + store = Store.from_config(config) proxystore.store.register_store(store) return store @@ -163,11 +138,11 @@ def store_proxy_stats(proxy: Proxy, proxy_timing: dict): key = str(key) # Get the store associated with this proxy - store = get_store(proxy) - if store.has_stats: + store = proxystore.store.get_store(proxy) + if store.metrics is not None: # Get the stats and convert them to a JSON-serializable form - stats = store.stats(proxy) - stats = dict((k, asdict(v)) for k, v in stats.items()) + metrics = store.metrics.get_metrics(proxy) + stats = metrics.as_dict() if metrics is not None else {} else: stats = {} diff --git a/colmena/queue/base.py b/colmena/queue/base.py index 52b9300..384eece 100644 --- a/colmena/queue/base.py +++ b/colmena/queue/base.py @@ -6,10 +6,9 @@ from typing import Optional, Tuple, Any, Collection, Union, Dict, Set import logging -import proxystore as ps +import proxystore.store from colmena.models import Result, SerializationMethod, ResourceRequirements -from colmena.proxy import get_class_path logger = logging.getLogger(__name__) @@ -40,10 +39,11 @@ def __init__(self, topics: Names of topics that are known for this queue serialization_method: Method used to serialize task inputs and results keep_inputs: Whether to return task inputs with the result object - proxystore_name (str, dict): Name of ProxyStore backend to use for all - topics or a mapping of topic to ProxyStore backend for specifying - backends for certain tasks. If a mapping is provided but a topic is - not in the mapping, ProxyStore will not be used. + proxystore_name (str, dict): Name of a registered ProxyStore + `Store` instance. This can be a single name such that the + corresponding `Store` is used for all topics or a mapping of + topics to registered `Store` names. If a mapping is provided + but a topic is not in the mapping, ProxyStore will not be used. proxystore_threshold (int, dict): Threshold in bytes for using ProxyStore to transfer objects. Optionally can pass a dict mapping topics to threshold to use different threshold values @@ -80,11 +80,11 @@ def __init__(self, for ps_name in set(self.proxystore_name.values()): if ps_name is None: continue - store = ps.store.get_store(ps_name) + store = proxystore.store.get_store(ps_name) if store is None: raise ValueError( - f'ProxyStore backend with name "{ps_name}" was not ' - 'found. This is likely because the store needs to be ' + f'A Store with name "{ps_name}" has not been registered. ' + 'This is likely because the store needs to be ' 'initialized prior to initializing the Colmena queue.' ) @@ -213,21 +213,17 @@ def send_inputs(self, _keep_inputs = keep_inputs # Gather ProxyStore info if we are using it with this topic - proxystore_kwargs = {} - if ( - self.proxystore_name[topic] is not None and - self.proxystore_threshold[topic] is not None - ): - store = ps.store.get_store(self.proxystore_name[topic]) + ps_name = self.proxystore_name[topic] + ps_threshold = self.proxystore_threshold[topic] + ps_kwargs = {} + if ps_name is not None and ps_threshold is not None: + store = proxystore.store.get_store(ps_name) # proxystore_kwargs contains all the information we would need to # reconnect to the ProxyStore backend on any worker - proxystore_kwargs.update({ - 'proxystore_name': self.proxystore_name[topic], - 'proxystore_threshold': self.proxystore_threshold[topic], - # Pydantic prefers to not have types as attributes, so we - # get the string corresponding to the type of the store we use - 'proxystore_type': get_class_path(type(store)), - 'proxystore_kwargs': store.kwargs + ps_kwargs.update({ + 'proxystore_name': ps_name, + 'proxystore_threshold': ps_threshold, + 'proxystore_config': store.config(), }) # Create a new Result object @@ -238,7 +234,7 @@ def send_inputs(self, serialization_method=self.serialization_method, task_info=task_info, resources=resources or ResourceRequirements(), # Takes either the user specified or a default - **proxystore_kwargs + **ps_kwargs ) # Push the serialized value to the task server diff --git a/colmena/task_server/tests/test_base.py b/colmena/task_server/tests/test_base.py index 10239eb..d85b641 100644 --- a/colmena/task_server/tests/test_base.py +++ b/colmena/task_server/tests/test_base.py @@ -1,8 +1,10 @@ from typing import Any, Dict, Tuple, List, Optional from pathlib import Path +from proxystore.connectors.file import FileConnector +from proxystore.store import Store +from proxystore.store import register_store from proxystore.store import unregister_store -from proxystore.store.file import FileStore from pytest import fixture from colmena.models import Result, ExecutableTask, SerializationMethod @@ -44,9 +46,10 @@ def test_run_with_executable(): @fixture def store(tmpdir): - store = FileStore(name='store', store_dir=tmpdir, stats=True) - yield store - unregister_store('store') + with Store('store', FileConnector(tmpdir), metrics=True) as store: + register_store(store) + yield store + unregister_store(store) def test_run_function(store): @@ -59,9 +62,8 @@ def test_run_function(store): # Make the result and configure it to use the store result = Result(inputs=(('a' * 1024,), {})) result.proxystore_name = store.name - result.proxystore_type = f'{store.__class__.__module__}.{store.__class__.__name__}' result.proxystore_threshold = 128 - result.proxystore_kwargs = store.kwargs + result.proxystore_config = store.config() # Serialize it result.serialization_method = SerializationMethod.PICKLE @@ -79,4 +81,4 @@ def test_run_function(store): # Make sure we have stats for both proxies assert len(result.proxy_timing) == 2 - assert all('set_bytes' in v for v in result.proxy_timing.values()) + assert all('store.proxy' in v['times'] for v in result.proxy_timing.values()) diff --git a/colmena/task_server/tests/test_parsl.py b/colmena/task_server/tests/test_parsl.py index 876887a..ab24c3f 100644 --- a/colmena/task_server/tests/test_parsl.py +++ b/colmena/task_server/tests/test_parsl.py @@ -4,8 +4,11 @@ from parsl import HighThroughputExecutor from parsl.config import Config from pytest import fixture, mark -import proxystore -from proxystore.store.redis import RedisStore + +from proxystore.connectors.redis import RedisConnector +from proxystore.store import Store +from proxystore.store import register_store +from proxystore.store import unregister_store from colmena.queue.base import ColmenaQueues @@ -46,11 +49,11 @@ def config(tmpdir): # Make a proxy store for larger objects @fixture(scope='module') def store(): - store = RedisStore('store', hostname='localhost', port=6379, stats=True) - proxystore.store.register_store(store) - yield store - proxystore.store.unregister_store(store.name) - store.close() + connector = RedisConnector(hostname='localhost', port=6379) + with Store('store', connector=connector, metrics=True) as store: + register_store(store) + yield store + unregister_store(store.name) @fixture(autouse=True) diff --git a/colmena/tests/test_proxy.py b/colmena/tests/test_proxy.py index d568862..ef14b01 100644 --- a/colmena/tests/test_proxy.py +++ b/colmena/tests/test_proxy.py @@ -1,7 +1,5 @@ """Test for ProxyStore utilities.""" import json -from typing import Any -from typing import Type import pytest @@ -9,11 +7,8 @@ from proxystore.store import register_store from proxystore.store import unregister_store from proxystore.store.base import Store -from proxystore.store.file import FileStore -from proxystore.store.local import LocalStore +from proxystore.connectors.local import LocalConnector -from colmena.proxy import get_class_path -from colmena.proxy import import_class from colmena.proxy import get_store from colmena.proxy import proxy_json_encoder from colmena.proxy import resolve_proxies_async @@ -25,69 +20,28 @@ class ExampleStore(Store): @pytest.fixture def proxy() -> Proxy: - with LocalStore('proxy-fixture-store') as store: + with Store('proxy-fixture-store', LocalConnector()) as store: yield store.proxy('test-value') -@pytest.mark.parametrize( - 'cls,expected', - ( - (FileStore, 'proxystore.store.file.FileStore'), - (LocalStore, 'proxystore.store.local.LocalStore'), - # This directory has no __init__.py so the module is test_proxy - # rather than colmena.tests.test_proxy. - (ExampleStore, 'test_proxy.ExampleStore'), - ), -) -def test_get_class_path(cls: Type[Any], expected: str) -> None: - assert get_class_path(cls) == expected - - -@pytest.mark.parametrize( - 'path,expected', - ( - ('proxystore.store.file.FileStore', FileStore), - ('proxystore.store.local.LocalStore', LocalStore), - ('test_proxy.ExampleStore', ExampleStore), - ('typing.Any', Any), - ), -) -def test_import_class(path: str, expected: Type[Any]) -> None: - assert import_class(path) == expected - - -def test_import_class_missing_path() -> None: - with pytest.raises(ImportError): - import_class('FileStore') - - def test_get_store_already_registered() -> None: - store = LocalStore('test-store') + store = Store('test-store', LocalConnector()) register_store(store) assert get_store('test-store') is store - unregister_store(store.name) + unregister_store(store) def test_get_store_missing() -> None: assert get_store('test-store') is None -def test_get_store_initialize_by_type() -> None: - store = get_store('test-store', LocalStore, cache_size=0) - # Verify get_store registered the store globally by getting it again - assert get_store('test-store') is store - unregister_store(store.name) - - -def test_get_store_initialize_by_str() -> None: - store = get_store( - 'test-store', - 'proxystore.store.local.LocalStore', - cache_size=0, - ) +def test_get_store_initialize_from_config() -> None: + # Create a temp store just to get the config + config = Store('test-store', LocalConnector()).config() + store = get_store('test-store', config) # Verify get_store registered the store globally by getting it again assert get_store('test-store') is store - unregister_store(store.name) + unregister_store(store) def test_proxy_json_encoder() -> None: diff --git a/docs/quickstart.rst b/docs/quickstart.rst index d3a6b1b..dd29fbc 100644 --- a/docs/quickstart.rst +++ b/docs/quickstart.rst @@ -47,29 +47,29 @@ Using ProxyStore ++++++++++++++++ Colmena can use `ProxyStore `_ to efficiently transfer large objects, typically on the order of 100KB or larger, between the thinker and workers directly. -To enable the use of ProxyStore, a ProxyStore backend must be initialized. -The name of the ProxyStore backend and a threshold value (bytes) can be passed via the parameters :code:`proxystore_name` and :code:`proxystore_threshold` to :code:`make_queue_pairs`. +Enable ProxyStore by initializing a `Store `_ then passing the name (:code:`proxystore_name`) and threshold size (:code:`proxystore_threshold`) for the store to :code:`make_queue_pairs`. Any input/output object of a target function larger than :code:`proxystore_threshold` will be automatically passed via ProxyStore. For example, a common use case is to initialize ProxyStore to use a Redis server to communicate data directly to workers .. code-block:: python - import proxystore as ps + from proxystore.connectors.redis import RedisConnector + from proxystore.store import Store + from proxystore.store import register_store from colmena.queue import PipeQueues - ps.store.init_store( - 'redis', name='default-store' - ) + store = Store('redis', RedisConnector('localhost', 6379)) + register_store(store) queue = PipeQueues( - proxystore_name='default-store', + proxystore_name='redis', proxystore_threshold=100000 ) - Any object larger than 100kB will get sent via Redis, reducing the communication costs of your application. +Any object larger than 100kB will get sent via Redis, reducing the communication costs of your application. -More details on initializing ProxyStore backends can be found in the `docs `_. +Learn more about ProxyStore and find the "Getting Started" guides at `docs.proxystore.dev `_. 2. Build a task server ---------------------- diff --git a/pyproject.toml b/pyproject.toml index b197811..9d90f8d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -26,7 +26,7 @@ dependencies = [ "parsl>=2022", "pydantic==1.*", "redis>=4.3", - "proxystore==0.4.*" + "proxystore==0.5.*" ] [tool.setuptools.packages.find] From 7f0b37279d7ff2bc0b7f99001fe54929cd5bfee5 Mon Sep 17 00:00:00 2001 From: Greg Pauloski <18683347+gpauloski@users.noreply.github.com> Date: Fri, 12 May 2023 20:18:55 -0500 Subject: [PATCH 2/2] Update synthetic-data demo app to ProxyStore v0.5.0 --- demo_apps/synthetic-data/synthetic.py | 43 +++++++++++++-------------- 1 file changed, 21 insertions(+), 22 deletions(-) diff --git a/demo_apps/synthetic-data/synthetic.py b/demo_apps/synthetic-data/synthetic.py index 38933ef..ff8c95e 100644 --- a/demo_apps/synthetic-data/synthetic.py +++ b/demo_apps/synthetic-data/synthetic.py @@ -6,9 +6,16 @@ import sys import time from datetime import datetime +from typing import Any import numpy as np -import proxystore as ps +from proxystore.connectors.connector import Connector +from proxystore.connectors.file import FileConnector +from proxystore.connectors.globus import GlobusEndpoints +from proxystore.connectors.globus import GlobusConnector +from proxystore.connectors.redis import RedisConnector +from proxystore.store import Store +from proxystore.store import register_store from funcx import FuncXClient from parsl import HighThroughputExecutor from parsl.addresses import address_by_hostname @@ -254,31 +261,23 @@ def producer(self): logging.info(f'Args: {args}') - ps_name: str | None + connector: Connector[Any] | None = None + ps_name: str | None = None if args.ps_file: ps_name = 'file' - ps.store.init_store('file', name=ps_name, store_dir=args.ps_file_dir) + connector = FileConnector(args.ps_file_dir) elif args.ps_globus: ps_name = 'globus' - endpoints = ps.store.globus.GlobusEndpoints.from_json( - args.ps_globus_config, - ) - ps.store.init_store( - 'globus', - name=ps_name, - endpoints=endpoints, - timeout=60, - ) + endpoints = GlobusEndpoints.from_json(args.ps_globus_config) + connector = GlobusConnector(endpoints, timeout=60) elif args.ps_redis: ps_name = 'redis' - ps.store.init_store( - 'redis', - name=ps_name, - hostname=args.redis_host, - port=args.redis_port, - ) - else: - ps_name = None + connector = RedisConnector(args.redis_host, args.redis_port) + + store: Store[Any] | None = None + if connector is not None and ps_name is not None: + store = Store(ps_name, connector, metrics=True) + register_store(store) # Make the queues queues = PipeQueues( @@ -354,8 +353,8 @@ def producer(self): # Wait for the task server to complete doer.join() - if ps_name is not None: - ps.store.get_store(ps_name).cleanup() + if store is not None: + store.close() # Print the output result logging.info(f'Finished. Runtime = {time.time() - start_time}s')