Skip to content

Commit

Permalink
KAFKA-8412: Fix nullpointer exception thrown on flushing before closi…
Browse files Browse the repository at this point in the history
…ng producers (#7207)

Prior to this change an NPE is raised when calling AssignedTasks.close
under the following conditions:

1. EOS is enabled
2. The task was in a suspended state

The cause for the NPE is that when a clean close is requested for a
StreamTask the StreamTask tries to commit. However, in the suspended
state there is no producer so ultimately an NPE is thrown for the
contained RecordCollector in flush.

The fix put forth in this commit is to have AssignedTasks call
closeSuspended when it knows the underlying StreamTask is suspended.

Note also that this test is quite involved. I could have just tested
that AssignedTasks calls closeSuspended when appropriate, but that is
testing, IMO, a detail of the implementation and doesn't actually verify
we reproduced the original problem as it was described. I feel much more
confident that we are reproducing the behavior - and we can test exactly
the conditions that lead to it - when testing across AssignedTasks and
StreamTask. I believe this is an additional support for the argument of
eventually consolidating the state split across classes.

Reviewers: Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
  • Loading branch information
cpettitt-confluent authored and guozhangwang committed Aug 26, 2019
1 parent d7f8ec8 commit 7334222
Show file tree
Hide file tree
Showing 4 changed files with 136 additions and 22 deletions.
2 changes: 2 additions & 0 deletions checkstyle/import-control.xml
Original file line number Diff line number Diff line change
Expand Up @@ -265,8 +265,10 @@
<subpackage name="processor">
<subpackage name="internals">
<allow pkg="com.fasterxml.jackson" />
<allow pkg="kafka.utils" />
<allow pkg="org.apache.zookeeper" />
<allow pkg="org.apache.zookeeper" />
<allow pkg="org.apache.log4j" />
<subpackage name="testutil">
<allow pkg="org.apache.log4j" />
</subpackage>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -331,18 +331,23 @@ void closeNonAssignedSuspendedTasks(final Map<TaskId, Set<TopicPartition>> newAs

void close(final boolean clean) {
final AtomicReference<RuntimeException> firstException = new AtomicReference<>(null);
for (final T task : allTasks()) {

for (final T task: allTasks()) {
try {
task.close(clean, false);
if (suspended.containsKey(task.id())) {
task.closeSuspended(clean, false, null);
} else {
task.close(clean, false);
}
} catch (final TaskMigratedException e) {
log.info("Failed to close {} {} since it got migrated to another thread already. " +
"Closing it as zombie and move on.", taskTypeName, task.id());
"Closing it as zombie and move on.", taskTypeName, task.id());
firstException.compareAndSet(null, closeZombieTask(task));
} catch (final RuntimeException t) {
log.error("Failed while closing {} {} due to the following error:",
task.getClass().getSimpleName(),
task.id(),
t);
task.getClass().getSimpleName(),
task.id(),
t);
if (clean) {
if (!closeUnclean(task)) {
firstException.compareAndSet(null, t);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,42 @@

package org.apache.kafka.streams.processor.internals;

import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.IsEqual.equalTo;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import java.util.Collection;
import java.util.Collections;
import java.util.Set;
import kafka.utils.LogCaptureAppender;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor.RecordingLevel;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.test.MockSourceNode;
import org.apache.log4j.Level;
import org.apache.log4j.spi.LoggingEvent;
import org.easymock.EasyMock;
import org.junit.Before;
import org.junit.Test;

import java.util.Collection;
import java.util.Collections;
import java.util.Set;

import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.IsEqual.equalTo;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

public class AssignedStreamsTasksTest {

private final StreamTask t1 = EasyMock.createMock(StreamTask.class);
Expand Down Expand Up @@ -451,6 +466,96 @@ public void shouldReturnNumberOfPunctuations() {
EasyMock.verify(t1);
}

@Test
public void shouldCloseCleanlyWithSuspendedTaskAndEOS() {
final String topic = "topic";

final Deserializer<byte[]> deserializer = Serdes.ByteArray().deserializer();
final Serializer<byte[]> serializer = Serdes.ByteArray().serializer();

final MockConsumer<byte[], byte[]> consumer =
new MockConsumer<>(OffsetResetStrategy.EARLIEST);
final MockProducer<byte[], byte[]> producer =
new MockProducer<>(false, serializer, serializer);

final MockSourceNode<byte[], byte[]> source = new MockSourceNode<>(
new String[] {"topic"},
deserializer,
deserializer);

final ChangelogReader changelogReader = new MockChangelogReader();

final ProcessorTopology topology = new ProcessorTopology(
Collections.singletonList(source),
Collections.singletonMap(topic, source),
Collections.emptyMap(),
Collections.emptyList(),
Collections.emptyList(),
Collections.emptyMap(),
Collections.emptySet());

final Set<TopicPartition> partitions = Collections.singleton(
new TopicPartition(topic, 1));

final Metrics metrics = new Metrics(new MetricConfig().recordLevel(RecordingLevel.DEBUG));

final StreamsMetricsImpl streamsMetrics = new MockStreamsMetrics(metrics);

final MockTime time = new MockTime();

final StateDirectory stateDirectory = new StateDirectory(
StreamTaskTest.createConfig(true),
time,
true);

final StreamTask task = new StreamTask(
new TaskId(0, 0),
partitions,
topology,
consumer,
changelogReader,
StreamTaskTest.createConfig(true),
streamsMetrics,
stateDirectory,
null,
time,
() -> producer);

assignedTasks.addNewTask(task);
assignedTasks.initializeNewTasks();
assertNull(assignedTasks.suspend());

// We have to test for close failure by looking at the logs because the current close
// logic suppresses the raised exception in AssignedTasks.close. It's not clear if this
// is the intended behavior.
//
// Also note that capturing the failure through this side effect is very brittle.
final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
final Level previousLevel =
LogCaptureAppender.setClassLoggerLevel(AssignedStreamsTasks.class, Level.ERROR);
try {
assignedTasks.close(true);
} finally {
LogCaptureAppender.setClassLoggerLevel(AssignedStreamsTasks.class, previousLevel);
LogCaptureAppender.unregister(appender);
}
if (!appender.getMessages().isEmpty()) {
final LoggingEvent firstError = appender.getMessages().head();
final String firstErrorCause =
firstError.getThrowableStrRep() != null
? String.join("\n", firstError.getThrowableStrRep())
: "N/A";

final String failMsg =
String.format("Expected no ERROR message while closing assignedTasks, but got %d. " +
"First error: %s. Cause: %s",
appender.getMessages().size(),
firstError.getMessage(),
firstErrorCause);
fail(failMsg);
}
}

private void addAndInitTask() {
assignedTasks.addNewTask(t1);
assignedTasks.initializeNewTasks();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@

public class StreamTaskTest {

private static final File BASE_DIR = TestUtils.tempDirectory();

private final Serializer<Integer> intSerializer = Serdes.Integer().serializer();
private final Serializer<byte[]> bytesSerializer = Serdes.ByteArray().serializer();
private final Deserializer<Integer> intDeserializer = Serdes.Integer().deserializer();
Expand Down Expand Up @@ -140,7 +142,6 @@ public Map<TopicPartition, Long> restoredOffsets() {
private final StreamsMetricsImpl streamsMetrics = new MockStreamsMetrics(metrics);
private final TaskId taskId00 = new TaskId(0, 0);
private final MockTime time = new MockTime();
private final File baseDir = TestUtils.tempDirectory();
private StateDirectory stateDirectory;
private StreamTask task;
private long punctuatedAt;
Expand Down Expand Up @@ -175,10 +176,11 @@ static ProcessorTopology withSources(final List<ProcessorNode> processorNodes,
Collections.emptySet());
}

private StreamsConfig createConfig(final boolean enableEoS) {
// Exposed to make it easier to create StreamTask config from other tests.
static StreamsConfig createConfig(final boolean enableEoS) {
final String canonicalPath;
try {
canonicalPath = baseDir.getCanonicalPath();
canonicalPath = BASE_DIR.getCanonicalPath();
} catch (final IOException e) {
throw new RuntimeException(e);
}
Expand Down Expand Up @@ -210,7 +212,7 @@ public void cleanup() throws IOException {
}
}
} finally {
Utils.delete(baseDir);
Utils.delete(BASE_DIR);
}
}

Expand Down

0 comments on commit 7334222

Please sign in to comment.