-
Notifications
You must be signed in to change notification settings - Fork 7
Description
Background
The current TransferQueue (TQ) architecture utilizes a row-oriented approach for metadata management, where a Batch is composed of a list of Sample objects, and each Sample contains a dictionary of Field objects. While this structure is logical for individual sample processing, our analysis has identified it as a primary bottleneck in high-throughput scenarios, particularly within the Controller and Storage Manager.
The hierarchical structure (BatchMeta -> SampleMeta -> FieldMeta) introduces O(B×F) complexity (where B is Batch Size and F is Field Count) across critical paths. This complexity manifests as significant CPU overhead during serialization, status notifications, and storage indexing.
Problem Analysis
Deep analysis of the codebase reveals three critical scaling issues:
-
Complexity Accumulation & Blocking:
Critical operations such asbuild_storage_meta_groups,_filter_storage_data, andadd_fieldsinvolve nested loops iterating over every sample and every field. In the PUT path alone, this O(B×F) cost is incurred multiple times. -
Small Object Explosion & GC Pressure:
For a typical batch size of 1024 with 10 fields, a single PUT request creates over 10,000 small Python objects (SampleMetaandFieldMeta). This results in massive pressure on the Python Garbage Collector, causing frequent GC pauses and unpredictable tail latency. -
Redundant Data Transmission & ZMQ Fragmentation:
- Redundancy: Schema information (dtype, shape) is duplicated for every sample, wasting bandwidth.
- Fragmentation: The current row-oriented serialization recursively packs data, often resulting in a large number of disjoint memory chunks or ZMQ frames. This increases syscall overhead and prevents zero-copy optimization opportunities.
Proposed Solution
We propose refactoring BatchMeta to a column-oriented (structure of arrays) design. This change would strictly separate batch-level indices from field-level schema, aiming to reduce metadata storage and processing complexity from a multiplicative O(B×F) to an additive O(B) + O(F).
1. Proposed Data Structure Specification
The refactored BatchMeta is designed to be a flat, columnar structure using native Python collections and NumPy arrays. This specification details the fields to ensure implementation consistency.
@dataclass
class BatchMeta:
# --- Batch-Level Indices (O(B)) ---
# Global indices identifying each sample in the dataset
global_indexes: list[int]
# Partition IDs corresponding to each sample
partition_ids: list[str]
# --- Vectorized Status (O(B)) ---
# Using numpy array for efficient status checks and updates
# shape: (B,), dtype: int8 (1=Ready, 0=Not Ready)
production_status: np.ndarray
# --- Field-Level Schema (O(F)) ---
# Stores metadata ONCE per field, rather than once per sample.
# Key: field_name (str)
# Value: dict containing schema details
field_schema: dict[str, dict[str, Any]]
# --- Extra Payload ---
# Task-specific metadata attached to the batch
extra_info: dict[str, Any]
# --- Internal Storage Metadata ---
# Per-sample custom metadata required by specific storage backends
# Key: global_index (int) -> Value: dict
_custom_meta: dict[int, dict[str, Any]]Field Schema Definition (field_schema)
The field_schema dictionary is the core of the optimization, holding shared attributes for each column. We categorize usage into three types:
1. Regular Tensor (Dense)
Standard tensors where all samples share the same shape (excluding batch dim) and dtype.
"input_ids": {
"dtype": torch.int64,
"shape": torch.Size([10]), # Fixed shape [Sequence Length]
"is_nested": False,
"is_non_tensor": False
}2. Nested Tensor (Jagged/Variable Length)
Tensors with varying shapes per sample. Explicit shape tracking is required for reconstruction.
"rag_context": {
"dtype": torch.float32,
"shape": None, # Shape is not fixed
"is_nested": True,
"is_non_tensor": False,
"per_sample_shapes": [(3, 128), (5, 128), (2, 128)]
}3. Non-Tensor Data (Arbitrary Objects)
For storing non-tensor data like strings, dictionaries, or lists (handled via NonTensorStack).
"image_paths": {
"dtype": str, # Python type
"shape": None,
"is_nested": False,
"is_non_tensor": True # Flags this as non-tensor data
}- Usage: Clients extract this schema once per batch during
add_fieldsby inspecting the head item of a column (orunbindfor NestedTensor). This avoids iterating through all items for standard Tensors.
2. Key Improvements
- Vectorized Status Management:
production_statuswith NumPy arrays allows O(1) calls foris_ready(vianp.all), replacing the slow loop-over-samples approach. - Reduced ZMQ Frames: By organizing interaction by columns, ZMQ transmission becomes significantly more efficient. Instead of interleaving metadata and data for each sample, we transmit a single metadata header followed by F payload frames. This reduces frame count from variable (dependent on B) to strictly F + 1.
- Deduplicated Schema: Field metadata is stored once per batch, decoupling basic metadata size from batch size.
Impact Assessment (Based on Prototype)
We have conducted preliminary prototyping, which suggests the following impacts:
1. Performance Impact (Positive)
- Client Side (
add_fields): Complexity drops from O(B×F) to O(F). The client infers schema from theTensorDictcolumns directly without full iteration (except for NestedTensor shape collection). - Manager Side (
build_storage_meta_groups):- Current: Iterates all samples. Complexity: O(B) loops.
- Proposed: Uses block allocation slicing. Iterates only over storage units (S).
- Network (ZMQ):
- Frame Reduction: Transmission uses a stable F+1 frames regardless of batch size.
- Throughput: Expected to better saturate network bandwidth as CPU serialization bottlenecks are removed.
2. API Breaking Changes (Migration Required)
BatchMetaInterface:- REMOVED:
batch.sampleslist,SampleMetaclass,FieldMetaclass. - CHANGED: Constructor signature.
- REMOVED:
- Application Impact:
- Code iterating over
batch.sampleswill break. - Access patterns like
sample.fields['name'].dtypemust change tobatch.field_schema['name']['dtype'].
- Code iterating over
3. Edge Case: NestedTensor Support
- Limitation:
NestedTensoron CPU lacks native slicing support. - Cost: We must
unbind()the nested tensor into a list to slice it. This operation remains O(B). - Mitigation: While metadata overhead is removed, data handling for NestedTensors remains linear complexity until optimized .
Open Questions & Discussion
1. Compatibility Layer
Should we provide a compatibility layer?
- Option A: Direct replacement (Breaking).
- Option B: Provide a
LegacyBatchMetawrapper emulating the oldsamplesinterface (marked@deprecated) for gradual migration.
2. NestedTensor & PyTorch Version
Column-oriented storage creates a challenge for NestedTensor. The current unbind() approach is compatible but inefficient, causing ZMQ to transmit 1+N frames (one per sample) instead of a single frame per column, negating some zero-copy benefits.
- Question: Should we increase the minimum PyTorch requirement to > 2.5?
- Ecosystem Context: Key upstream projects like Verl are already moving to newer versions (e.g., PyTorch 2.9.1 for SGLang/vLLM, 2.7.1 for Ascend CANN 8.3, and 2.5.1 for Ascend CANN 8.2).
- Benefit: Higher versions may allow us to use optimized APIs (like
nested_tensor_from_jaggedor similar layout-aware serialization) to handle jagged arrays as a single memory block, restoring the 1 frame per column efficiency.