Skip to content

Commit

Permalink
[Reactive] Verify flatMap doesn't reorder items from the same source (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd committed Apr 28, 2020
1 parent 60abc97 commit 25ffc50
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ public void multiLoop() throws Throwable {
}

@Test
void multi() throws ExecutionException, InterruptedException {
public void multi() throws ExecutionException, InterruptedException {
assertEquals(EXPECTED_EMISSION_COUNT, Multi.from(TEST_DATA)
.flatMap(MultiFlatMapPublisherTest::asyncFlowPublisher, MAX_CONCURRENCY, false, PREFETCH)
.distinct()
Expand Down Expand Up @@ -338,4 +338,47 @@ private static void sleep(long millis) {
throw new RuntimeException(e);
}
}

@Test
public void innerSourceOrderPreserved() {
ExecutorService executor1 = Executors.newSingleThreadExecutor();
ExecutorService executor2 = Executors.newSingleThreadExecutor();
try {
for (int p = 1; p < 256; p *= 2) {
for (int i = 0; i < 1000; i++) {
TestSubscriber<Integer> ts = new TestSubscriber<>(Long.MAX_VALUE);

Multi.just(
Multi.range(1, 100).observeOn(executor1),
Multi.range(200, 100).observeOn(executor2)
)
.flatMap(v -> v, 3, false, p)
.subscribe(ts);

ts.awaitDone(5, TimeUnit.SECONDS)
.assertItemCount(200)
.assertComplete();

int last1 = 0;
int last2 = 199;
for (Integer v : ts.getItems()) {
if (v < 200) {
if (last1 + 1 != v) {
fail("Out of order items: " + last1 + " -> " + v + " (p: " + p + ")");
}
last1 = v;
} else {
if (last2 + 1 != v) {
fail("Out of order items: " + last2 + " -> " + v + " (p: " + p + ")");
}
last2 = v;
}
}
}
}
} finally {
executor1.shutdown();
executor2.shutdown();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,12 @@ public void onNext(T item) {
if (upstream.get() == null) {
errors.add(new IllegalStateException("onSubscribe not called before onNext!"));
}
if (!errors.isEmpty()) {
errors.add(new IllegalStateException("onNext called after onError!"));
}
if (completions != 0) {
errors.add(new IllegalStateException("onNext called after onComplete!"));
}
items.add(item);
}

Expand Down

0 comments on commit 25ffc50

Please sign in to comment.