Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 26 additions & 13 deletions transfer_queue/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,12 @@ def _maybe_create_transferqueue_client(
global _TRANSFER_QUEUE_CLIENT
if _TRANSFER_QUEUE_CLIENT is None:
if conf is None:
raise ValueError("Missing config for initializing TransferQueueClient!")
_init_from_existing()
assert _TRANSFER_QUEUE_CLIENT is not None, (
"TransferQueueController has not been initialized yet. Please call init() first."
)
return _TRANSFER_QUEUE_CLIENT

pid = os.getpid()
_TRANSFER_QUEUE_CLIENT = TransferQueueClient(
client_id=f"TransferQueueClient_{pid}", controller_info=conf.controller.zmq_info
Expand Down Expand Up @@ -88,23 +93,34 @@ def _maybe_create_transferqueue_storage(conf: DictConfig) -> DictConfig:
return conf


def _init_from_existing() -> None:
"""Initialize the TransferQueueClient from existing controller."""
def _init_from_existing() -> bool:
"""Initialize the TransferQueueClient from existing controller.

Returns:
True if successfully initialized from existing controller, False otherwise.
"""

try:
controller = ray.get_actor("TransferQueueController")
except ValueError:
logger.info("Called _init_from_existing() but TransferQueueController has not been initialized yet.")
return False

controller = ray.get_actor("TransferQueueController")
logger.info("Found existing TransferQueueController instance. Connecting...")

conf = None
while conf is None:
remote_conf = ray.get(controller.get_config.remote())
if remote_conf is not None:
_maybe_create_transferqueue_client(remote_conf)
conf = ray.get(controller.get_config.remote())
if conf is not None:
_maybe_create_transferqueue_client(conf)
logger.info("TransferQueueClient initialized.")
return
return True

logger.debug("Waiting for controller to initialize... Retrying in 1s")
time.sleep(1)

return False
Comment thread
MissFishY marked this conversation as resolved.
Comment thread
MissFishY marked this conversation as resolved.


# ==================== Initialization API ====================
def init(conf: Optional[DictConfig] = None) -> None:
Expand Down Expand Up @@ -138,14 +154,11 @@ def init(conf: Optional[DictConfig] = None) -> None:
>>> metadata = tq.get_meta(...)
>>> data = tq.get_data(metadata)
"""
try:
_init_from_existing()
except ValueError:
logger.info("No TransferQueueController found. Starting first-time initialization...")
else:
if _init_from_existing():
return

# First-time initialize TransferQueue
logger.info("No TransferQueueController found. Starting first-time initialization...")

# create config
final_conf = OmegaConf.create({}, flags={"allow_objects": True})
Expand Down