Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions transfer_queue/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,10 @@ backend:

# For Yuanrong:
Yuanrong:
# Port of local yuanrong datasystem worker
# Whether to let TQ automatically start etcd and datasystem services
auto_init: True
# etcd service address (used to start etcd when auto_init=true)
etcd_address: "127.0.0.1:2379"
# datasystem worker host and port (used to start dscli when auto_init=true)
host: "127.0.0.1"
port: 31501
# If enable npu transport
enable_yr_npu_transport: false
176 changes: 174 additions & 2 deletions transfer_queue/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@
import logging
import math
import os
import shutil
import socket
import subprocess
import tempfile
import time
from importlib import resources
from typing import Any, Optional
Expand Down Expand Up @@ -103,11 +106,11 @@ def _maybe_create_transferqueue_storage(conf: DictConfig) -> DictConfig:
check = subprocess.run(["pgrep", "-f", "mooncake_master"], stdout=subprocess.PIPE, text=True)
if check.returncode == 0:
pids = check.stdout.strip().replace("\n", ", ")
logging.info(f"Find existing mooncake_master (PID: {pids}), try to kill first...")
logger.info(f"Find existing mooncake_master (PID: {pids}), try to kill first...")

result = os.system('pkill -f "[m]ooncake_master"')
if result == 0:
logging.info("Successfully killed existing mooncake_master processes.")
logger.info("Successfully killed existing mooncake_master processes.")
else:
raise RuntimeError(f"Failed to kill existing mooncake_master processes (exit code: {result}).")

Expand Down Expand Up @@ -185,6 +188,129 @@ def _maybe_create_transferqueue_storage(conf: DictConfig) -> DictConfig:
f"Output:\n{error_msg}"
)
_TRANSFER_QUEUE_STORAGE["MooncakeStore"] = process
if conf.backend.storage_backend == "Yuanrong":
if conf.backend.Yuanrong.auto_init:
etcd_process = None
etcd_data_dir = None
worker_address = None
if not shutil.which("etcd"):
raise RuntimeError(
"etcd executable not found in PATH. Please install etcd and make sure it's in the PATH."
)
if not shutil.which("dscli"):
raise RuntimeError(
"dscli executable not found in PATH. Please run `pip install openyuanrong-datasystem`."
)
try:
Comment on lines +191 to +204
Copy link

Copilot AI Mar 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new Yuanrong auto-init path (etcd + dscli) introduces substantial side effects during init() but currently has no targeted test coverage. Consider adding a unit test that monkeypatches shutil.which/subprocess.Popen/subprocess.run to validate the happy path and failure cleanup behavior (without requiring real etcd/dscli binaries).

Copilot uses AI. Check for mistakes.
# ========== Start etcd ==========
etcd_address = "127.0.0.1:2379"
try:
etcd_address = conf.backend.Yuanrong.etcd_address
except Exception:
pass

# Assume host:port format
parts = etcd_address.split(":")
if len(parts) != 2:
raise ValueError(f"Invalid etcd_address format: {etcd_address}. Expected host:port")
host = parts[0]
port = int(parts[1])

# Create temporary data directory
etcd_data_dir = tempfile.mkdtemp(prefix="tq_etcd_")
logger.info(f"Starting etcd with data directory: {etcd_data_dir}")

cmd = [
"etcd",
f"--data-dir={etcd_data_dir}",
f"--listen-client-urls=http://{host}:{port}",
f"--advertise-client-urls=http://{host}:{port}",
]

etcd_process = subprocess.Popen(
cmd,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
text=True,
bufsize=1,
universal_newlines=True,
start_new_session=True,
)
time.sleep(3) # Wait for etcd to start

if etcd_process.poll() is None:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(2)
result = sock.connect_ex((host, port))
sock.close()
if result != 0:
raise RuntimeError(f"etcd process started but not listening on {host}:{port}")
else:
raise RuntimeError(f"etcd exited immediately with return code {etcd_process.returncode}")

logger.info(f"etcd started, PID: {etcd_process.pid}")
time.sleep(2)

# ========== Start datasystem worker ==========
# Assume host:port format
worker_host = conf.backend.Yuanrong.host
worker_port = conf.backend.Yuanrong.port
worker_address = worker_host + ":" + str(worker_port)

cmd = [
"dscli",
"start",
"-w",
"--worker_address",
worker_address,
"--etcd_address",
etcd_address,
]

try:
ds_result = subprocess.run(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
timeout=90,
)
except subprocess.TimeoutExpired as err:
raise RuntimeError(f"dscli start timed out: {err}") from err
# Wait for dscli to start and exit (it starts worker and exits)
if ds_result.returncode == 0 and "[ OK ]" in ds_result.stdout:
logger.info(f"dscli started Yuanrong datasystem worker at {worker_address} successfully.")

else:
raise RuntimeError(
f"Failed to start datasystem worker at {worker_address}. "
f"Return code: {ds_result.returncode}, Output: {ds_result.stdout}"
)

# Store processes and data directory
_TRANSFER_QUEUE_STORAGE["Yuanrong"] = {
"etcd": etcd_process,
"etcd_data_dir": etcd_data_dir,
"worker_address": worker_address,
"etcd_address": etcd_address,
}
logger.info("Yuanrong backend (etcd + datasystem) started successfully.")

except Exception as e:
# Clean up on failure
if etcd_process is not None and etcd_process.poll() is None:
etcd_process.terminate()
Copy link

Copilot AI Mar 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On Yuanrong startup failure, the cleanup path terminates the etcd process but doesn't wait() (or kill() on timeout) to ensure the process is actually stopped/reaped. This can leave a stray/zombie etcd process around after init() raises. Consider mirroring the shutdown logic used in close() (terminate -> wait with timeout -> kill -> wait).

Suggested change
etcd_process.terminate()
try:
etcd_process.terminate()
try:
etcd_process.wait(timeout=5)
except subprocess.TimeoutExpired:
etcd_process.kill()
etcd_process.wait()
except Exception:
# Best-effort cleanup; ignore failures here
pass

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

try:
etcd_process.wait(timeout=5)
except subprocess.TimeoutExpired:
etcd_process.kill()
etcd_process.wait()
if etcd_data_dir is not None:
try:
shutil.rmtree(etcd_data_dir, ignore_errors=True)
except Exception:
pass
raise RuntimeError(f"Failed to start Yuanrong backend: {e}") from e
return conf


Expand Down Expand Up @@ -346,6 +472,52 @@ def close():
logger.info("Successfully removed all existing keys in mooncake_master.")
except Exception:
pass
elif key == "Yuanrong":
# Stop etcd process and clean up data directory, stop datasystem worker via dscli
if isinstance(value, dict):
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only close etcd when enable auto_init?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. If users start etcd manually, they should close etcd manually.

etcd_process = value.get("etcd")
etcd_data_dir = value.get("etcd_data_dir")
worker_address = value.get("worker_address")

# Stop etcd if running
if etcd_process is not None and etcd_process.poll() is None:
etcd_process.terminate()
try:
etcd_process.wait(timeout=5)
except subprocess.TimeoutExpired:
etcd_process.kill()
Copy link

Copilot AI Mar 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If etcd_process.kill() is used after a timeout, the code should still call wait() to reap the child process; otherwise a zombie process can be left behind until interpreter exit. After kill(), call etcd_process.wait() (possibly in a nested try/except) to ensure the process is fully collected.

Suggested change
etcd_process.kill()
etcd_process.kill()
try:
# Ensure the killed process is fully reaped to avoid zombies
etcd_process.wait(timeout=5)
except subprocess.TimeoutExpired:
logger.warning("Timed out while waiting for etcd process to exit after kill.")
except Exception as e:
logger.warning(f"Error while waiting for etcd process to exit after kill: {e}")

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add etcd_process.wait()

etcd_process.wait()

# Clean up etcd data directory
if etcd_data_dir is not None and os.path.exists(etcd_data_dir):
try:
shutil.rmtree(etcd_data_dir, ignore_errors=True)
logger.info(f"Cleaned up etcd data directory: {etcd_data_dir}")
except Exception as e:
logger.warning(f"Failed to clean up etcd data directory {etcd_data_dir}: {e}")

# Stop datasystem worker via dscli command
if worker_address:
try:
result = subprocess.run(
["dscli", "stop", "--worker_address", worker_address],
timeout=90,
capture_output=True,
)
if result.returncode == 0:
logger.info(f"Stopped datasystem worker at {worker_address} via dscli stop")
else:
error_msg = (result.stderr or result.stdout or b"").decode()
logger.warning(
f"Failed to stop datasystem worker at {worker_address}. "
f"Return code: {result.returncode}, Error: {error_msg}"
)
except subprocess.TimeoutExpired as err:
logger.warning(f"dscli stop timed out for {worker_address}: {err}")
except Exception as e:
logger.warning(f"Failed to stop datasystem worker via dscli: {e}")
else:
logger.warning(f"Unexpected Yuanrong storage value: {value}")
else:
logger.warning(f"close for _TRANSFER_QUEUE_STORAGE with key {key} is not supported for now.")

Expand Down
2 changes: 1 addition & 1 deletion transfer_queue/storage/clients/yuanrong_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ class GeneralKVClientAdapter(StorageStrategy):
The serialization method uses '_decoder' and '_encoder' from 'transfer_queue.utils.serial_utils'.
"""

PUT_KEYS_LIMIT: int = 2_000
PUT_KEYS_LIMIT: int = 10_000
GET_CLEAR_KEYS_LIMIT: int = 10_000

# Header: number of entries (uint32, little-endian)
Expand Down
Loading