Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 15 additions & 5 deletions transfer_queue/storage/clients/yuanrong_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -585,7 +585,7 @@ def clear(self, keys: list[str], custom_backend_meta=None):

strategy_tags = custom_backend_meta
routed_indexes = self._route_to_strategies(
strategy_tags, lambda strategy_, item_: strategy_.supports_clear(item_)
strategy_tags, lambda strategy_, item_: strategy_.supports_clear(item_), ignore_unmatched=True
)
Comment thread
dpj135 marked this conversation as resolved.

def clear_task(strategy, indexes):
Expand All @@ -598,6 +598,7 @@ def _route_to_strategies(
self,
items: list[Any],
selector: Callable[[StorageStrategy, Any], bool],
ignore_unmatched: bool = False,
) -> dict[StorageStrategy, list[int]]:
"""Groups item indices by the first strategy that supports them.

Expand All @@ -610,22 +611,31 @@ def _route_to_strategies(
The order must correspond to the original keys.
selector: A function that determines whether a strategy supports an item.
Signature: `(strategy: StorageStrategy, item: Any) -> bool`.
failback: If True, items that don't match any strategy will be ignored (not included in output).
If False, a ValueError will be raised for any unmatched item.

Returns:
A dictionary mapping each active strategy to a list of indexes in `items`
that it should handle. Every index appears exactly once.
"""
unmatched_count = 0
routed_indexes: dict[StorageStrategy, list[int]] = {s: [] for s in self._strategies}
for i, item in enumerate(items):
for strategy in self._strategies:
if selector(strategy, item):
routed_indexes[strategy].append(i)
break
else:
raise ValueError(
f"No strategy supports item of type {type(item).__name__}: {item}. "
f"Available strategies: {[type(s).__name__ for s in self._strategies]}"
)
if ignore_unmatched:
unmatched_count += 1
else:
raise ValueError(
f"No strategy supports item of type {type(item).__name__}: {item}. "
f"Available strategies: {[type(s).__name__ for s in self._strategies]}"
)
if unmatched_count > 0:
logger.warning(f"{unmatched_count} items were not matched to any strategy and will be ignored.")

return routed_indexes
Comment thread
dpj135 marked this conversation as resolved.

@staticmethod
Expand Down
Loading