Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] buffer_with_time_or_count lost some data #702

Open
fanck0605 opened this issue Sep 24, 2023 · 1 comment
Open

[BUG] buffer_with_time_or_count lost some data #702

fanck0605 opened this issue Sep 24, 2023 · 1 comment

Comments

@fanck0605
Copy link

fanck0605 commented Sep 24, 2023

Describe the bug
A clear and concise description of what the bug is.

buffer_with_time_or_count lost some data

To Reproduce
Steps to reproduce the behavior:

result = []
(
    reactivex.range(100_000)
    .pipe(
        operators.buffer_with_time_or_count(timespan=0.001, count=10_000),
    )
    .subscribe(on_next=lambda x: result.extend(x))
)
print(f"len(result) = {len(result)}")
assert result == [*range(100_000)]

Result:

len(result) = 99986
Traceback (most recent call last):
  File "D:\Projects\rxdemo\test_buffer_with_time_or_count.py", line 16, in <module>
    assert result == [*range(100_000)]
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
AssertionError

Expected behavior
A clear and concise description of what you expected to happen.

assert result == [*range(100_000)]

Code or Screenshots
If applicable, add a minimal and self contained code example or screenshots to help explain your problem.

result = []
(
    reactivex.range(100_000)
    .pipe(
        operators.buffer_with_time_or_count(timespan=0.001, count=10_000),
    )
    .subscribe(on_next=lambda x: result.extend(x))
)
print(f"len(result) = {len(result)}")
assert result == [*range(100_000)]

Additional context
Add any other context about the problem here.

  • OS Windows10
  • RxPY version 4.0.4
  • Python version 3.11
@matiboy
Copy link
Collaborator

matiboy commented Sep 24, 2023

Hi @fanck0605 Thank you for opening this issue;
This seems quite similar to #694 and appears to be a threading issue. Using an event loop scheduler should solve the problem:

from reactivex.scheduler import EventLoopScheduler
event_loop = EventLoopScheduler()
result = []
reactivex.range(100_000).pipe(
    operators.buffer_with_time_or_count(timespan=0.001, count=10_000),
).subscribe(on_next=lambda x: result.extend(x), scheduler=event_loop)
assert result == list(range(100_000))

Hope this helps

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants