Skip to content

Commit

Permalink
Filter out null pairs earlier in multicast processor
Browse files Browse the repository at this point in the history
  • Loading branch information
gnodet committed May 14, 2024
1 parent 0c120c9 commit 159af47
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
Expand Down Expand Up @@ -69,6 +70,7 @@
import org.apache.camel.support.PluginHelper;
import org.apache.camel.support.service.ServiceHelper;
import org.apache.camel.util.CastUtils;
import org.apache.camel.util.FilterIterator;
import org.apache.camel.util.IOHelper;
import org.apache.camel.util.StopWatch;
import org.apache.camel.util.concurrent.AsyncCompletionService;
Expand Down Expand Up @@ -407,7 +409,7 @@ protected abstract class MulticastTask implements Runnable {
this.original = original;
this.pairs = pairs;
this.callback = callback;
this.iterator = pairs.iterator();
this.iterator = new FilterIterator<>(pairs.iterator(), Objects::nonNull);
if (timeout > 0) {
timeoutTask = schedule(aggregateExecutorService, this::timeout, timeout, TimeUnit.MILLISECONDS);
} else {
Expand Down Expand Up @@ -544,13 +546,7 @@ public void run() {

ProcessorExchangePair pair = iterator.next();
boolean hasNext = iterator.hasNext();
// some iterators may return true for hasNext() but then null in next()
if (pair == null && !hasNext) {
doDone(result.get(), true);
return;
}

// TODO looks like pair can still be null as the if above has composite condition?
Exchange exchange = pair.getExchange();
int index = nbExchangeSent.getAndIncrement();
updateNewExchange(exchange, index, pairs, hasNext);
Expand Down Expand Up @@ -654,13 +650,7 @@ boolean doRun() throws Exception {

ProcessorExchangePair pair = iterator.next();
boolean hasNext = iterator.hasNext();
// some iterators may return true for hasNext() but then null in next()
if (pair == null && !hasNext) {
doDone(result.get(), true);
return false;
}

// TODO looks like pair can still be null as the if above has composite condition?
Exchange exchange = pair.getExchange();
int index = nbExchangeSent.getAndIncrement();
updateNewExchange(exchange, index, pairs, hasNext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package org.apache.camel.issues;

import java.util.Iterator;
import java.util.function.Consumer;

import org.apache.camel.ContextTestSupport;
import org.apache.camel.builder.RouteBuilder;
Expand Down Expand Up @@ -50,11 +49,27 @@ public void testIteratorThrowExceptionOnFirst() throws Exception {

@Test
public void testIteratorThrowExceptionOnSecond() throws Exception {
getMockEndpoint("mock:line").expectedMessageCount(0);
getMockEndpoint("mock:end").expectedMessageCount(0);

try {
template.sendBody("direct:start", new MyIterator(2));
fail("Should throw exception");
} catch (Exception e) {
IllegalArgumentException iae = assertIsInstanceOf(IllegalArgumentException.class, e.getCause());
assertEquals("Forced error", iae.getMessage());
}

assertMockEndpointsSatisfied();
}

@Test
public void testIteratorThrowExceptionOnThird() throws Exception {
getMockEndpoint("mock:line").expectedMessageCount(1);
getMockEndpoint("mock:end").expectedMessageCount(0);

try {
template.sendBody("direct:start", new MyIterator(0));
template.sendBody("direct:start", new MyIterator(3));
fail("Should throw exception");
} catch (Exception e) {
IllegalArgumentException iae = assertIsInstanceOf(IllegalArgumentException.class, e.getCause());
Expand Down Expand Up @@ -86,27 +101,16 @@ public MyIterator(int count) {

@Override
public boolean hasNext() {
return count < 2;
return true;
}

@Override
public String next() {
count++;
if (count == 1) {
if (--count > 0) {
return "Hello";
} else {
throw new IllegalArgumentException("Forced error");
}
}

@Override
public void remove() {
// noop
}

@Override
public void forEachRemaining(Consumer<? super String> action) {
// noop
}
}
}

0 comments on commit 159af47

Please sign in to comment.