Skip to content

Commit

Permalink
[FLINK-10774][tests] Refactor FlinkKafkaConsumerBaseTest#testConsumer…
Browse files Browse the repository at this point in the history
…LifeCycle

Split #testConsumerLifeCycle into two methods which represent the two if-else
branches.
  • Loading branch information
tillrohrmann committed Jan 30, 2019
1 parent 3067dcb commit afea47e
Showing 1 changed file with 21 additions and 25 deletions.
Expand Up @@ -64,7 +64,6 @@
import org.junit.Test;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import java.io.Serializable;
import java.util.ArrayDeque;
Expand Down Expand Up @@ -469,7 +468,7 @@ public void testClosePartitionDiscovererWhenOpenThrowException() throws Exceptio

final DummyFlinkKafkaConsumer<String> consumer = new DummyFlinkKafkaConsumer<>(failingPartitionDiscoverer);

testConsumerLifeCycle(consumer, failureCause);
testFailingConsumerLifecycle(consumer, failureCause);
assertTrue("partitionDiscoverer should be closed when consumer is closed", failingPartitionDiscoverer.isClosed());
}

Expand All @@ -485,7 +484,7 @@ public void testClosePartitionDiscovererWhenCreateKafkaFetcherFails() throws Exc
testPartitionDiscoverer,
100L);

testConsumerLifeCycle(consumer, failureCause);
testFailingConsumerLifecycle(consumer, failureCause);
assertTrue("partitionDiscoverer should be closed when consumer is closed", testPartitionDiscoverer.isClosed());
}

Expand All @@ -503,40 +502,37 @@ public void testClosePartitionDiscovererWhenKafkaFetcherFails() throws Exception

final DummyFlinkKafkaConsumer<String> consumer = new DummyFlinkKafkaConsumer<>(() -> mock, testPartitionDiscoverer, 100L);

testConsumerLifeCycle(consumer, failureCause);
testFailingConsumerLifecycle(consumer, failureCause);
assertTrue("partitionDiscoverer should be closed when consumer is closed", testPartitionDiscoverer.isClosed());
}

private void testFailingConsumerLifecycle(FlinkKafkaConsumerBase<String> testKafkaConsumer, @Nonnull Exception expectedException) throws Exception {
try {
setupConsumer(testKafkaConsumer);
testKafkaConsumer.run(new TestSourceContext<>());

fail("Exception should have been thrown from open / run method of FlinkKafkaConsumerBase.");
} catch (Exception e) {
assertThat(ExceptionUtils.findThrowable(e, throwable -> throwable.equals(expectedException)).isPresent(), is(true));
}
testKafkaConsumer.close();
}

@Test
public void testClosePartitionDiscovererWithCancellation() throws Exception {
final DummyPartitionDiscoverer testPartitionDiscoverer = new DummyPartitionDiscoverer();

final TestingFlinkKafkaConsumer<String> consumer = new TestingFlinkKafkaConsumer<>(testPartitionDiscoverer, 100L);

testConsumerLifeCycle(consumer, null);
testNormalConsumerLifecycle(consumer);
assertTrue("partitionDiscoverer should be closed when consumer is closed", testPartitionDiscoverer.isClosed());
}

private void testConsumerLifeCycle(
FlinkKafkaConsumerBase<String> testKafkaConsumer,
@Nullable Exception expectedException) throws Exception {

if (expectedException == null) {
setupConsumer(testKafkaConsumer);
final CompletableFuture<Void> runFuture = CompletableFuture.runAsync(ThrowingRunnable.unchecked(() -> testKafkaConsumer.run(new TestSourceContext<>())));
testKafkaConsumer.close();
runFuture.get();
} else {
try {
setupConsumer(testKafkaConsumer);
testKafkaConsumer.run(new TestSourceContext<>());

fail("Exception should have been thrown from open / run method of FlinkKafkaConsumerBase.");
} catch (Exception e) {
assertThat(ExceptionUtils.findThrowable(e, throwable -> throwable.equals(expectedException)).isPresent(), is(true));
}
testKafkaConsumer.close();
}
private void testNormalConsumerLifecycle(FlinkKafkaConsumerBase<String> testKafkaConsumer) throws Exception {
setupConsumer(testKafkaConsumer);
final CompletableFuture<Void> runFuture = CompletableFuture.runAsync(ThrowingRunnable.unchecked(() -> testKafkaConsumer.run(new TestSourceContext<>())));
testKafkaConsumer.close();
runFuture.get();
}

private void setupConsumer(FlinkKafkaConsumerBase<String> consumer) throws Exception {
Expand Down

0 comments on commit afea47e

Please sign in to comment.