Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Race condition window_with_time operator #604

Closed
thoelzl opened this issue Feb 23, 2022 · 2 comments
Closed

Race condition window_with_time operator #604

thoelzl opened this issue Feb 23, 2022 · 2 comments

Comments

@thoelzl
Copy link

thoelzl commented Feb 23, 2022

I'm using RxPy 3.1.0 and observed a race condition of the window_with_time operator.

In detail I use the buffer_with_time operator with a non-overlapping window and observed duplicated data when running a heavy load job inside the subscriber. The buffer_with_time operator internally uses the window_with_time operator, which is in my opinion not thread safe. The insufficient job scheduling was an unintented mistake in the first place but shows the potential race condition.

Looking into the implementation of the buffer_with_time operator shows, that the queue manipulation is not thread safe.

def window_with_time(source: Observable) -> Observable:
        def subscribe(observer, scheduler_=None):
            _scheduler = scheduler or scheduler_ or TimeoutScheduler.singleton()

            timer_d = SerialDisposable()
            next_shift = [timeshift]
            next_span = [timespan]
            total_time = [DELTA_ZERO]
            q = []

            group_disposable = CompositeDisposable(timer_d)
            ref_count_disposable = RefCountDisposable(group_disposable)

            def create_timer():
                m = SingleAssignmentDisposable()
                timer_d.disposable = m
                is_span = False
                is_shift = False

                if next_span[0] == next_shift[0]:
                    is_span = True
                    is_shift = True
                elif next_span[0] < next_shift[0]:
                    is_span = True
                else:
                    is_shift = True

                new_total_time = next_span[0] if is_span else next_shift[0]

                ts = new_total_time - total_time[0]
                total_time[0] = new_total_time
                if is_span:
                    next_span[0] += timeshift

                if is_shift:
                    next_shift[0] += timeshift

                def action(scheduler, state=None):
                    s = None

                    if is_shift:
                        s = Subject()
                        q.append(s)
                        observer.on_next(add_ref(s, ref_count_disposable))

                    if is_span:
                        s = q.pop(0)
                        s.on_completed()

                    create_timer()
                m.disposable = _scheduler.schedule_relative(ts, action)

            q.append(Subject())
            observer.on_next(add_ref(q[0], ref_count_disposable))
            create_timer()

            def on_next(x):
                for s in q:
                    s.on_next(x)

            def on_error(e):
                for s in q:
                    s.on_error(e)

                observer.on_error(e)

            def on_completed():
                for s in q:
                    s.on_completed()

                observer.on_completed()

            group_disposable.add(source.subscribe_(on_next, on_error, on_completed, scheduler_))
            return ref_count_disposable
        return Observable(subscribe)
    return window_with_time

Queue manipulation is not an atomic action and can result in duplicate subscriptions even if a non-overlapping window is used.
grafik

The problem may occur when the observable on_next function is called between the red marked lines. The error occurs rather infrequently and is difficult to reproduce.

dbrattli added a commit that referenced this issue Mar 6, 2022
@dbrattli
Copy link
Collaborator

dbrattli commented Mar 6, 2022

@thoelzl Thanks for the detailed description of the problem. I think your analysis is spot on. As you say this is a hard issue to test, but trying to fix by protecting the queue manipulation behind a lock.

@thoelzl
Copy link
Author

thoelzl commented Mar 13, 2022

Thanks for fixing the issue. I have tested your solution and it works as expected.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants