Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dynamic batch #14

Merged
merged 3 commits into from Nov 23, 2021
Merged

Dynamic batch #14

merged 3 commits into from Nov 23, 2021

Conversation

artemcpp
Copy link
Contributor

@artemcpp artemcpp commented Nov 9, 2021

When batch_timeout is set to 0 use alternative batching stratagy.
Instead of waiting fixed timeout while batch is ready, grab all task6 that are curentely in input queue and process them right away. We effectively using queue as batch buffer.

Copy link
Collaborator

@bugrimov bugrimov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need:

  • Readme
  • bump version

"""Provides suitable for processing tasks."""
while True:
try:
def _get_task(self, timeout: float):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def _get_task(self, timeout: float):
def _get_task(self, timeout: float) -> Optional[BaseTask]:

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

if self._stop_task or timeout <= 0:
return batch

timeout = max(timeout, 0.1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
timeout = max(timeout, 0.1)
timeout = max(timeout, 0.001)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


# wait first task
while True:
task = self._get_task(10.)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a comment why do we use exactly 10 seconds

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added comment

Artem Prikhodko and others added 2 commits November 23, 2021 18:44
aqueduct/flow.py Outdated
@@ -243,18 +244,34 @@ def step_names(handlers):
return {queue_: f'from_{from_}_to_{to}'
for queue_, (from_, to) in zip(self._queues, step_names(self._contexts))}

async def _fetch_processed(self):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In line 52 you need to change default timeout to None: batch_timeout: Optional[int] = None

try:
def _get_task(self, timeout: float):
try:
if timeout == 0:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because 0 change behavior of batching, it is better to use None-value. To show users that behavior is different than integer value.

@bugrimov bugrimov merged commit 21c60c0 into avito-tech:main Nov 23, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants