Skip to content

Commit

Permalink
Update synthetic-data demo app to ProxyStore v0.5.0
Browse files Browse the repository at this point in the history
  • Loading branch information
gpauloski committed May 14, 2023
1 parent 69bb03c commit 7f0b372
Showing 1 changed file with 21 additions and 22 deletions.
43 changes: 21 additions & 22 deletions demo_apps/synthetic-data/synthetic.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,16 @@
import sys
import time
from datetime import datetime
from typing import Any

import numpy as np
import proxystore as ps
from proxystore.connectors.connector import Connector
from proxystore.connectors.file import FileConnector
from proxystore.connectors.globus import GlobusEndpoints
from proxystore.connectors.globus import GlobusConnector
from proxystore.connectors.redis import RedisConnector
from proxystore.store import Store
from proxystore.store import register_store
from funcx import FuncXClient
from parsl import HighThroughputExecutor
from parsl.addresses import address_by_hostname
Expand Down Expand Up @@ -254,31 +261,23 @@ def producer(self):

logging.info(f'Args: {args}')

ps_name: str | None
connector: Connector[Any] | None = None
ps_name: str | None = None
if args.ps_file:
ps_name = 'file'
ps.store.init_store('file', name=ps_name, store_dir=args.ps_file_dir)
connector = FileConnector(args.ps_file_dir)
elif args.ps_globus:
ps_name = 'globus'
endpoints = ps.store.globus.GlobusEndpoints.from_json(
args.ps_globus_config,
)
ps.store.init_store(
'globus',
name=ps_name,
endpoints=endpoints,
timeout=60,
)
endpoints = GlobusEndpoints.from_json(args.ps_globus_config)
connector = GlobusConnector(endpoints, timeout=60)
elif args.ps_redis:
ps_name = 'redis'
ps.store.init_store(
'redis',
name=ps_name,
hostname=args.redis_host,
port=args.redis_port,
)
else:
ps_name = None
connector = RedisConnector(args.redis_host, args.redis_port)

store: Store[Any] | None = None
if connector is not None and ps_name is not None:
store = Store(ps_name, connector, metrics=True)
register_store(store)

# Make the queues
queues = PipeQueues(
Expand Down Expand Up @@ -354,8 +353,8 @@ def producer(self):
# Wait for the task server to complete
doer.join()

if ps_name is not None:
ps.store.get_store(ps_name).cleanup()
if store is not None:
store.close()

# Print the output result
logging.info(f'Finished. Runtime = {time.time() - start_time}s')

0 comments on commit 7f0b372

Please sign in to comment.