refactor: Simplify request queue implementation#653
Conversation
There was a problem hiding this comment.
Pull request is neither linked to an issue or epic nor labeled as adhoc!
There was a problem hiding this comment.
Pull request is neither linked to an issue or epic nor labeled as adhoc!
|
I was hoping we would first address this in the JS version where its actually costing us money, do we have some reported problems on the python side too? |
I get your angle, but I wanted to try it on code where the implementation isn't scattered between |
|
If that abstraction is problematic I am fine with removing it and leaving the base class acting like an interface mostly (we can't just make it one as that would be technically breaking), duplication is not a huge deal if we plan to drop the RQ v1 implementation anyway - but first we need to make sure v2 is working fine, we still get peeps from delivery moving back to v1 to deal with those weird issues... |
...and all of the above is why I chose to validate the points by Kuba in the python version 😄 |
|
My issue is that not many people use the python version, and those issues reported by delivery are quite random, so hard to confirm something helps here. Maybe it will resolve the issue we have a repro for here, that's great, but it might be a different one than those where we actually bleed money (we usually refund people with stuck runs completely). I am fine doing it first here, but let's not wait for some confirmation before trying to fix it in the JS version. |
vdusek
left a comment
There was a problem hiding this comment.
The commit type - maybe fix would be better? I believe this deserves to be in the changelog.
Other than that, it looks much better, although I cannot say I do understand everything.
|
|
||
| next_request_id, _ = self._queue_head_dict.popitem(last=False) # ~removeFirst() | ||
|
|
||
| # This should never happen, but... |
There was a problem hiding this comment.
haha, great we're getting rid of this 😄
| class BoundedSet(Generic[T]): | ||
| """A simple set datastructure that removes the least recently accessed item when it reaches `max_length`.""" | ||
|
|
||
| def __init__(self, max_length: int) -> None: | ||
| self._max_length = max_length | ||
| self._data = OrderedDict[T, object]() | ||
|
|
||
| def __contains__(self, item: T) -> bool: | ||
| found = item in self._data | ||
| if found: | ||
| self._data.move_to_end(item, last=True) | ||
| return found | ||
|
|
||
| def add(self, item: T) -> None: | ||
| self._data[item] = True | ||
| self._data.move_to_end(item) | ||
|
|
||
| if len(self._data) > self._max_length: | ||
| self._data.popitem(last=False) | ||
|
|
||
| def clear(self) -> None: | ||
| self._data.clear() | ||
|
|
| _RECENTLY_HANDLED_CACHE_SIZE = 1000 | ||
| """Cache size for recently handled requests.""" | ||
|
|
||
| _STORAGE_CONSISTENCY_DELAY = timedelta(seconds=3) | ||
| """Expected delay for storage to achieve consistency, guiding the timing of subsequent read operations.""" |
| self.file_operation_lock = asyncio.Lock() | ||
| self._last_used_timestamp = Decimal(0) | ||
|
|
||
| self._in_progress = set[str]() |
There was a problem hiding this comment.
I saw # noqa: SLF001 several times when accessing this. Maybe there should be public getter for this?
There was a problem hiding this comment.
This PR ports over the changes from apify/crawlee#2775.
Key changes:
storages.RequestQueueto request storage client implementationsRequestQueue.is_finishedfunction has been rewritten to avoid race conditions