Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix race condition with disposal in MemberSingle.tryEmit #59

Merged
merged 2 commits into from
Oct 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package org.davidmoten.rx.jdbc;

import org.davidmoten.rx.jdbc.Select;
import org.junit.Test;

import com.github.davidmoten.junit.Asserts;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,21 +259,25 @@ private void scheduleReleasesNoDelay() {
}

private boolean tryEmit(Observers<T> obs, DecoratingMember<T> m) {
// get a fresh worker each time so we jump threads to
// break the stack-trace (a long-enough chain of
// checkout-checkins could otherwise provoke stack
// overflow)

// note that tryEmit is protected by the drain method so will
// not be run concurrently. We do have to be careful with
// concurrent disposal of observers though.

// advance counter to the next and choose an Observer to emit to (round robin)

int index = obs.index;
// a precondition of this method is that obs.activeCount > 0 (enforced by drain method)
MemberSingleObserver<T> o = obs.observers[index];
MemberSingleObserver<T> oNext = o;
// atomically bump up the index (if that entry has not been deleted in
// the meantime by disposal)

// atomically bump up the index to select the next Observer by round-robin
// (if that entry has not been deleted in the meantime by disposal). Need
// to be careful too that ALL observers have not been deleted via a race
// with disposal.
while (true) {
Observers<T> x = observers.get();
if (x.index == index && x.observers[index] == o) {
if (x.index == index && x.activeCount > 0 && x.observers[index] == o) {
davidmoten marked this conversation as resolved.
Show resolved Hide resolved
boolean[] active = new boolean[x.active.length];
System.arraycopy(x.active, 0, active, 0, active.length);
int nextIndex = (index + 1) % active.length;
Expand All @@ -292,6 +296,10 @@ private boolean tryEmit(Observers<T> obs, DecoratingMember<T> m) {
return false;
}
}
// get a fresh worker each time so we jump threads to
// break the stack-trace (a long-enough chain of
// checkout-checkins could otherwise provoke stack
// overflow)
Worker worker = scheduler.createWorker();
worker.schedule(new Emitter<T>(worker, oNext, m));
return true;
Expand Down Expand Up @@ -479,7 +487,7 @@ private static final class Observers<T> {
final int requested;

Observers(MemberSingleObserver<T>[] observers, boolean[] active, int activeCount, int index, int requested) {
Preconditions.checkArgument(observers.length > 0 || index == 0, "index must be -1 for zero length array");
Preconditions.checkArgument(observers.length > 0 || index == 0, "index must be 0 for zero length array");
Preconditions.checkArgument(observers.length == active.length);
this.observers = observers;
this.index = index;
Expand Down