Skip to content

Commit

Permalink
Merge pull request #1378 from benjchristensen/pivot
Browse files Browse the repository at this point in the history
BugFix: Pivot Concurrency
  • Loading branch information
benjchristensen committed Jun 24, 2014
2 parents 69227ff + e2c5bfc commit e598966
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 13 deletions.
16 changes: 10 additions & 6 deletions rxjava-core/src/main/java/rx/internal/operators/OperatorPivot.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@

import rx.Observable.OnSubscribe;
import rx.Observable.Operator;
import rx.Observer;
import rx.Subscriber;
import rx.Subscription;
import rx.functions.Action0;
import rx.observables.GroupedObservable;
import rx.observers.SerializedObserver;
import rx.subscriptions.Subscriptions;

public final class OperatorPivot<K1, K2, T> implements Operator<GroupedObservable<K2, GroupedObservable<K1, T>>, GroupedObservable<K1, GroupedObservable<K2, T>>> {
Expand Down Expand Up @@ -268,7 +270,7 @@ private Outer<K1, K2, T> getOrCreateOuter(final AtomicReference<State> state, fi
return null;
}

Outer<K1, K2, T> newOuter = Outer.<K1, K2, T> create(this, state, key2);
Outer<K1, K2, T> newOuter = Outer.<K1, K2, T> create(key2);
Outer<K1, K2, T> existing = outerSubjects.putIfAbsent(key2, newOuter);
if (existing != null) {
// we lost the race to create so return the one that did
Expand All @@ -283,11 +285,12 @@ private Outer<K1, K2, T> getOrCreateOuter(final AtomicReference<State> state, fi

private static final class Inner<K1, K2, T> {

private final BufferUntilSubscriber<T> subscriber;
private final Observer<T> subscriber;
private final GroupedObservable<K1, T> group;

private Inner(BufferUntilSubscriber<T> subscriber, GroupedObservable<K1, T> group) {
this.subscriber = subscriber;
// since multiple threads are being pivoted we need to make sure this is serialized
this.subscriber = new SerializedObserver<T>(subscriber);
this.group = group;
}

Expand Down Expand Up @@ -335,15 +338,16 @@ public void onNext(T t) {

private static final class Outer<K1, K2, T> {

private final BufferUntilSubscriber<GroupedObservable<K1, T>> subscriber;
private final Observer<GroupedObservable<K1, T>> subscriber;
private final GroupedObservable<K2, GroupedObservable<K1, T>> group;

private Outer(BufferUntilSubscriber<GroupedObservable<K1, T>> subscriber, GroupedObservable<K2, GroupedObservable<K1, T>> group) {
this.subscriber = subscriber;
// since multiple threads are being pivoted we need to make sure this is serialized
this.subscriber = new SerializedObserver<GroupedObservable<K1, T>>(subscriber);
this.group = group;
}

public static <K1, K2, T> Outer<K1, K2, T> create(final GroupState<K1, K2, T> groups, final AtomicReference<State> state, final K2 key2) {
public static <K1, K2, T> Outer<K1, K2, T> create(final K2 key2) {
final BufferUntilSubscriber<GroupedObservable<K1, T>> subject = BufferUntilSubscriber.create();
GroupedObservable<K2, GroupedObservable<K1, T>> group = new GroupedObservable<K2, GroupedObservable<K1, T>>(key2, new OnSubscribe<GroupedObservable<K1, T>>() {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,23 +13,26 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package rx.internal.operators;

import java.util.Random;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.junit.Ignore;
import org.junit.Test;

import rx.Observable;
import rx.Observable.OnSubscribe;
import rx.Observer;
import rx.Scheduler;
import rx.Subscriber;
import rx.functions.Func1;
import rx.observables.GroupedObservable;
Expand All @@ -38,10 +41,12 @@

public class OperatorPivotTest {

@Test
@Test(timeout=10000)
public void testPivotEvenAndOdd() throws InterruptedException {
Observable<GroupedObservable<Boolean, Integer>> o1 = Observable.range(1, 10).groupBy(modKeySelector).subscribeOn(Schedulers.newThread());
Observable<GroupedObservable<Boolean, Integer>> o2 = Observable.range(11, 10).groupBy(modKeySelector).subscribeOn(Schedulers.newThread());
for(int i=0; i<1000; i++) {
System.out.println("------------------------------------------ testPivotEvenAndOdd -------------------------------------------");
Observable<GroupedObservable<Boolean, Integer>> o1 = Observable.range(1, 10).groupBy(modKeySelector).subscribeOn(Schedulers.computation());
Observable<GroupedObservable<Boolean, Integer>> o2 = Observable.range(11, 10).groupBy(modKeySelector).subscribeOn(Schedulers.computation());
Observable<GroupedObservable<String, GroupedObservable<Boolean, Integer>>> groups = Observable.from(GroupedObservable.from("o1", o1), GroupedObservable.from("o2", o2));
Observable<GroupedObservable<Boolean, GroupedObservable<String, Integer>>> pivoted = Observable.pivot(groups);

Expand All @@ -53,10 +58,12 @@ public void testPivotEvenAndOdd() throws InterruptedException {

@Override
public Observable<String> call(final GroupedObservable<Boolean, GroupedObservable<String, Integer>> outerGroup) {
System.out.println("Outer Group: " + outerGroup.getKey());
return outerGroup.flatMap(new Func1<GroupedObservable<String, Integer>, Observable<String>>() {

@Override
public Observable<String> call(final GroupedObservable<String, Integer> innerGroup) {
System.out.println("Inner Group: " + innerGroup.getKey());
return innerGroup.map(new Func1<Integer, String>() {

@Override
Expand Down Expand Up @@ -94,14 +101,16 @@ public void onNext(String t) {

});

if (!latch.await(800, TimeUnit.MILLISECONDS)) {
if (!latch.await(20000000, TimeUnit.MILLISECONDS)) {
System.out.println("xxxxxxxxxxxxxxxxxx> TIMED OUT <xxxxxxxxxxxxxxxxxxxx");
System.out.println("Received count: " + count.get());
fail("Timed Out");
}

System.out.println("Received count: " + count.get());
// TODO sometimes this test fails and gets 15 instead of 20 so there is a bug somewhere
assertEquals(20, count.get());
}
}

/**
Expand All @@ -112,7 +121,7 @@ public void onNext(String t) {
* It's NOT easy to understand though, and easy to end up with far more data consumed than expected, because pivot by definition
* is inverting the data so we can not unsubscribe from the parent until all children are done since the top key becomes the leaf once pivoted.
*/
@Test
@Test(timeout=10000)
public void testUnsubscribeFromGroups() throws InterruptedException {
AtomicInteger counter1 = new AtomicInteger();
AtomicInteger counter2 = new AtomicInteger();
Expand Down Expand Up @@ -221,7 +230,7 @@ public String call(Integer i) {
*
* Then a subsequent step can merge them if desired and add serialization, such as merge(even.o1, even.o2) to become a serialized "even"
*/
@Test
@Test(timeout=10000)
public void testConcurrencyAndSerialization() throws InterruptedException {
final AtomicInteger maxOuterConcurrency = new AtomicInteger();
final AtomicInteger maxGroupConcurrency = new AtomicInteger();
Expand Down

0 comments on commit e598966

Please sign in to comment.