[fix] Fix race condition in update_production_status#34
[fix] Fix race condition in update_production_status#340oshowero0 merged 1 commit intoAscend:mainfrom
update_production_status#34Conversation
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
CLA Signature Pass0oshowero0, thanks for your pull request. All authors of the commits have signed the CLA. 👍 |
There was a problem hiding this comment.
Pull request overview
This PR attempts to fix a race condition in the update_production_status method of TransferQueueController by protecting the production status update with data_status_lock. The fix addresses a specific issue where concurrent calls to ensure_samples_capacity or ensure_fields_capacity could cause the production_status tensor to be reassigned while another thread was updating it, leading to lost updates.
Changes:
- Wrapped the production status tensor update (line 422) in a
with self.data_status_lock:block to prevent concurrent modifications - Moved the comment "# Ensure we have enough rows" inside the lock block for clarity
Comments suppressed due to low confidence (1)
transfer_queue/controller.py:428
- The call to
_update_field_metadata(line 425) and the update toglobal_indexes(line 428) are not protected bydata_status_lock. These shared data structures (field_dtypes,field_shapes,field_custom_backend_meta, andglobal_indexes) are accessed and modified by multiple threads.
For example:
clear_datamodifiesglobal_indexes,field_dtypes,field_shapes, andfield_custom_backend_meta(lines 831-836) without acquiring the lockactivate_pre_allocated_indexesmodifiesglobal_indexes(line 318) without acquiring the lock- Other methods like line 1184 and 1506 also update
global_indexeswithout lock protection
These concurrent modifications can lead to data corruption and lost updates. Either these operations should be wrapped in the lock, or a separate locking mechanism should be introduced for these metadata structures.
self._update_field_metadata(global_indices, dtypes, shapes, custom_backend_meta)
# Save these global_indexes
self.global_indexes.update(global_indices)
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| new_fields = [field for field in field_names if field not in self.field_name_mapping] | ||
| if new_fields: | ||
| # Add new fields to mapping | ||
| for field in new_fields: | ||
| self.field_name_mapping[field] = len(self.field_name_mapping) | ||
|
|
||
| required_fields = len(self.field_name_mapping) | ||
| with self.data_status_lock: | ||
| self.ensure_fields_capacity(required_fields) |
There was a problem hiding this comment.
The update to field_name_mapping (lines 408-412) is not protected by data_status_lock, which creates a race condition. Multiple threads could:
- Simultaneously check if a field exists in the mapping (line 408)
- Both conclude it doesn't exist and needs to be added
- Both try to add the field with potentially different indices (line 412)
This leads to inconsistent field indices and can cause fields to be assigned the same index or overwrite each other. The subsequent lock acquisition at line 415 only protects the ensure_fields_capacity call, not the dictionary modification.
The entire block from line 408 to line 416 should be wrapped in with self.data_status_lock: to ensure atomic read-check-update of field_name_mapping and consistent expansion of production_status.
| with self.data_status_lock: | ||
| # Ensure we have enough rows | ||
| self.ensure_samples_capacity(required_samples) | ||
|
|
||
| # Register new fields if needed | ||
| new_fields = [field for field in field_names if field not in self.field_name_mapping] | ||
| if new_fields: | ||
| # Add new fields to mapping | ||
| for field in new_fields: | ||
| self.field_name_mapping[field] = len(self.field_name_mapping) | ||
|
|
||
| required_fields = len(self.field_name_mapping) | ||
| with self.data_status_lock: | ||
| self.ensure_fields_capacity(required_fields) | ||
|
|
||
| # Update production status | ||
| if self.production_status is not None and global_indices and field_names: | ||
| field_indices = [self.field_name_mapping.get(field) for field in field_names] | ||
| self.production_status[torch.tensor(global_indices)[:, None], torch.tensor(field_indices)] = 1 | ||
| with self.data_status_lock: |
There was a problem hiding this comment.
The lock is released between ensuring samples capacity (line 405) and ensuring fields capacity (line 416), creating a potential race condition window. If another thread calls ensure_samples_capacity or ensure_fields_capacity during this window, it could reassign self.production_status to a new tensor object. The subsequent field indexing at line 422 would then operate on this new tensor, which might not have been properly expanded for the new fields registered in lines 411-412.
Consider holding the lock continuously from line 403 through the production status update at line 422, especially since the field registration (lines 408-412) also modifies shared state that should be protected.
Background
This PR fixes a race condition in
TransferQueueController. Previously, the value update ofself.production_statusinupdate_production_statuswas not protected.If other threads call
ensure_samples_capacityorensure_fields_capacityconcurrently, the update to the production status may be lost. This occurs because the dynamic expansion of capacity is not an atomic in-place operation, leading to potential data inconsistency during resizing.