decentralized queue scheduling#964
Conversation
There was a problem hiding this comment.
Code Review
This pull request introduces a disaggregated service architecture for LightX2V, separating the pipeline into controller, encoder, transformer, and decoder components. It implements a custom RDMA-backed ring buffer for inter-service communication and a ZMQ-based monitoring system for GPU metrics. Feedback points out a critical race condition in the RDMA Fetch-and-Add shim, redundant configuration mapping and service initialization, and issues with dead or commented-out monitoring code in the controller.
| def rdma_faa(self, remote_addr, add_value, rkey=None): | ||
| """Best-effort FAA semantics via read-modify-write. | ||
|
|
||
| NOTE: This is not a true remote atomic verb; it is a compatibility shim | ||
| until atomic WR support is implemented. | ||
| """ | ||
| with self._io_lock: | ||
| old = self.rdma_read_from(int(remote_addr), 8, rkey=rkey) | ||
| old_v = int.from_bytes(old, byteorder="little", signed=False) | ||
| new_v = (old_v + int(add_value)) & ((1 << 64) - 1) | ||
| self.rdma_write_to(int(remote_addr), new_v.to_bytes(8, byteorder="little", signed=False), rkey=rkey) | ||
| return old_v |
There was a problem hiding this comment.
The implementation of rdma_faa using a read-modify-write sequence is not atomic across different client processes. The _io_lock only provides thread safety within a single client process. If multiple processes (e.g., multiple consumer services) call this function concurrently on the same remote address, it will lead to race conditions where updates can be lost. For example, two consumers could read the same head value, and both would process the same item from the queue, violating queue semantics.
A true atomic RDMA operation like Fetch-and-Add (IBV_WR_ATOMIC_FETCH_AND_ADD) should be used to ensure correctness in a multi-consumer/multi-producer scenario. The note in the docstring acknowledges this is a shim, but this is a critical correctness issue for a distributed queue.
| mapping = { | ||
| "bootstrap_addr": "data_bootstrap_addr", | ||
| "bootstrap_room": "data_bootstrap_room", | ||
| "encoder_engine_rank": "encoder_engine_rank", | ||
| "transformer_engine_rank": "transformer_engine_rank", | ||
| "decoder_engine_rank": "decoder_engine_rank", | ||
| "protocol": "protocol", | ||
| "local_hostname": "local_hostname", | ||
| "metadata_server": "metadata_server", | ||
| } | ||
| for src_key, dst_key in mapping.items(): | ||
| if src_key in disagg_cfg: | ||
| config[dst_key] = disagg_cfg[src_key] |
There was a problem hiding this comment.
The mapping dictionary contains several keys that map to themselves. This is redundant and makes the code more verbose than necessary. You can simplify this by separating the keys that need renaming from those that are copied directly. This will improve readability and maintainability.
# Rename keys from disagg_config to top-level config
rename_map = {
"bootstrap_addr": "data_bootstrap_addr",
"bootstrap_room": "data_bootstrap_room",
}
for old_key, new_key in rename_map.items():
if old_key in disagg_cfg:
config[new_key] = disagg_cfg[old_key]
# Copy keys as-is from disagg_config to top-level config
copy_keys = [
"encoder_engine_rank",
"transformer_engine_rank",
"decoder_engine_rank",
"protocol",
"local_hostname",
"metadata_server",
]
for key in copy_keys:
if key in disagg_cfg:
config[key] = disagg_cfg[key]| padded = data_bytes.ljust(self.buffer_size, b"\x00") | ||
| self.local_mr.write(padded, len(padded), 0) |
There was a problem hiding this comment.
The data_bytes are padded to self.buffer_size before being written to the local memory region. However, the subsequent RDMA write operation only transfers len(data_bytes), making the padding operation ineffective for the remote write and potentially confusing. If padding is not needed, writing data_bytes directly to the memory region would be clearer.
| padded = data_bytes.ljust(self.buffer_size, b"\x00") | |
| self.local_mr.write(padded, len(padded), 0) | |
| self.local_mr.write(data_bytes, len(data_bytes), 0) |
| name="controller-monitor", | ||
| daemon=True, | ||
| ) | ||
| # monitor_thread.start() |
There was a problem hiding this comment.
The monitoring thread is created but not started. If monitoring is intended to be part of the controller's functionality, this line should be uncommented. Otherwise, the related code for creating the monitor and the thread should be removed to avoid dead code.
| # monitor_thread.start() | |
| monitor_thread.start() |
| pass | ||
| # monitor_stop_event.set() | ||
| # monitor_thread.join(timeout=1.0) |
There was a problem hiding this comment.
The cleanup logic for the monitoring thread is commented out. This should be enabled to ensure graceful shutdown of the monitor thread when the controller exits. If monitoring is not intended, this and the related monitoring code should be removed.
| pass | |
| # monitor_stop_event.set() | |
| # monitor_thread.join(timeout=1.0) | |
| monitor_stop_event.set() | |
| monitor_thread.join(timeout=1.0) |
| def init(self, config): | ||
| self.config = config | ||
| self.text_encoder = None | ||
| self.image_encoder = None | ||
| self.vae_encoder = None | ||
| shared_slots = int(self.config.get("rdma_buffer_slots", self._request_slots)) | ||
| shared_slot_size = int(self.config.get("rdma_buffer_slot_size", 4096)) | ||
| self._request_server_ip = str(self.config.get("rdma_request_host", self._request_server_ip)) | ||
| self._request_handshake_port = int(self.config.get("rdma_request_handshake_port", self._request_handshake_port)) | ||
| self._request_slots = shared_slots | ||
| self._request_slot_size = shared_slot_size | ||
| self._phase1_server_ip = str(self.config.get("rdma_phase1_host", self._phase1_server_ip)) | ||
| self._phase1_handshake_port = int(self.config.get("rdma_phase1_handshake_port", self._phase1_handshake_port)) | ||
| self._phase1_slots = shared_slots | ||
| self._phase1_slot_size = shared_slot_size | ||
| self.encoder_engine_rank = int(self.config.get("encoder_engine_rank", 0)) | ||
| self.transformer_engine_rank = int(self.config.get("transformer_engine_rank", 1)) | ||
| self.decoder_engine_rank = int(self.config.get("decoder_engine_rank", 2)) |
There was a problem hiding this comment.
The init method is called for every request, but it seems to be re-initializing service-level configuration parameters (like RDMA buffer settings and engine ranks) that are already set in __init__. This is redundant and can be confusing. The __init__ method should handle one-time service setup, while init should only perform per-request initializations. Consider refactoring to move the one-time setup from init to __init__ to avoid this redundancy and clarify the logic. This pattern is also present in DecoderService and TransformerService and should be addressed there as well.
This pull request introduces several new configuration files and implements a monitoring and reporting system for the disaggregated LightX2V services. It also improves service initialization in example scripts to ensure proper configuration is passed to each service instance.
Key changes:
Configuration for disaggregated services:
Monitoring and reporting system:
monitor.pymodule that provides aReporterclass for reporting GPU metrics (utilization, memory usage) and aMonitorclass for polling these metrics from multiple service nodes using ZeroMQ.MONITOR_POLLING_PORTtoconn.pyfor standardized monitoring communication.Service initialization improvements:
wan_t2v_service.py,wan_i2v_service.py) to ensure that theEncoderService,TransformerService, andDecoderServiceare instantiated with the required configuration object. [1] [2]run_service.pythat provides a unified entry point for running any of the disaggregated services based on command-line arguments and configuration files. This script normalizes configuration, resolves service roles, and initializes the appropriate service.Decentralized queue scheduling: