From 722157b59e73b9e3090764551d9b4d502ed434c6 Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Tue, 27 Jun 2017 18:48:42 -0700 Subject: [PATCH 1/2] KAFKA-5167: Release state locks in case of failure --- .../processor/internals/StreamThread.java | 29 ++- .../processor/internals/StreamThreadTest.java | 209 +++++++++++++++++- 2 files changed, 229 insertions(+), 9 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 3fd78326713f3..24ee078eb60b6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -1081,8 +1081,9 @@ private void shutdown(final boolean cleanRun) { streamsMetrics.removeAllSensors(); } + // visible for testing @SuppressWarnings("ThrowableNotThrown") - private void shutdownTasksAndState(final boolean cleanRun) { + void shutdownTasksAndState(final boolean cleanRun) { log.debug("{} Shutting down all active tasks {}, standby tasks {}, suspended tasks {}, and suspended standby tasks {}", logPrefix, activeTasks.keySet(), standbyTasks.keySet(), suspendedTasks.keySet(), suspendedStandbyTasks.keySet()); @@ -1121,13 +1122,31 @@ public String name() { @Override public void apply(final StreamTask task) { - task.suspend(); + try { + task.suspend(); + } catch (final Exception e) { + try { + task.close(false); + } catch (final Exception f) { + log.error("{} Closing task {} failed: ", logPrefix, task.id, f); + } + throw e; + } } })); for (final StandbyTask task : standbyTasks.values()) { try { - task.suspend(); + try { + task.suspend(); + } catch (final Exception e) { + try { + task.close(false); + } catch (final Exception f) { + log.error("{} Closing standby task {} failed: ", logPrefix, task.id, f); + } + throw e; + } } catch (final RuntimeException e) { firstException.compareAndSet(null, e); } @@ -1231,6 +1250,7 @@ private void closeNonAssignedSuspendedStandbyTasks() { } } + // visible for testing protected StreamTask createStreamTask(final TaskId id, final Collection partitions) { streamsMetrics.taskCreatedSensor.record(); @@ -1317,7 +1337,8 @@ private void addStreamTasks(final Collection assignment, final l taskCreator.retryWithBackoff(newTasks, start); } - private StandbyTask createStandbyTask(final TaskId id, final Collection partitions) { + // visible for testing + protected StandbyTask createStandbyTask(final TaskId id, final Collection partitions) { streamsMetrics.taskCreatedSensor.record(); final ProcessorTopology topology = builder.build(id.topicGroupId); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index 8205c27f6db6a..6007b6c109e09 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -467,9 +467,16 @@ public void run() { private class MockStreamsPartitionAssignor extends StreamPartitionAssignor { private final Map> activeTaskAssignment; + private final Map> standbyTaskAssignment; MockStreamsPartitionAssignor(final Map> activeTaskAssignment) { + this(activeTaskAssignment, Collections.>emptyMap()); + } + + MockStreamsPartitionAssignor(final Map> activeTaskAssignment, + final Map> standbyTaskAssignment) { this.activeTaskAssignment = activeTaskAssignment; + this.standbyTaskAssignment = standbyTaskAssignment; } @Override @@ -477,6 +484,11 @@ Map> activeTasks() { return activeTaskAssignment; } + @Override + Map> standbyTasks() { + return standbyTaskAssignment; + } + @Override public void close() {} } @@ -1113,10 +1125,10 @@ Map> activeTasks() { } }); - StreamPartitionAssignor.SubscriptionUpdates subscriptionUpdates = new StreamPartitionAssignor.SubscriptionUpdates(); - Field updatedTopicsField = subscriptionUpdates.getClass().getDeclaredField("updatedTopicSubscriptions"); + final StreamPartitionAssignor.SubscriptionUpdates subscriptionUpdates = new StreamPartitionAssignor.SubscriptionUpdates(); + final Field updatedTopicsField = subscriptionUpdates.getClass().getDeclaredField("updatedTopicSubscriptions"); updatedTopicsField.setAccessible(true); - Set updatedTopics = (Set) updatedTopicsField.get(subscriptionUpdates); + final Set updatedTopics = (Set) updatedTopicsField.get(subscriptionUpdates); updatedTopics.add(t1.topic()); builder.updateSubscriptions(subscriptionUpdates, null); @@ -1549,13 +1561,13 @@ public void shouldAlwaysUpdateWithLatestTopicsFromStreamPartitionAssignor() thro final TaskId taskId2 = new TaskId(0, 0); final TaskId taskId3 = new TaskId(0, 0); - List activeTasks = Arrays.asList(taskId1); + List activeTasks = Utils.mkList(taskId1); final Map> standbyTasks = new HashMap<>(); AssignmentInfo info = new AssignmentInfo(activeTasks, standbyTasks, new HashMap>()); - topicPartitions.addAll(Arrays.asList(topicPartition1)); + topicPartitions.addAll(Utils.mkList(topicPartition1)); PartitionAssignor.Assignment assignment = new PartitionAssignor.Assignment(topicPartitions, info.encode()); partitionAssignor.onAssignment(assignment); @@ -1590,6 +1602,193 @@ public void shouldAlwaysUpdateWithLatestTopicsFromStreamPartitionAssignor() thro } + @Test + public void shouldReleaseStateDirLockIfFailureOnTaskSuspend() throws Exception { + final TaskId taskId = new TaskId(0, 0); + + final StreamThread thread = setupTest(taskId); + + final StateDirectory testStateDir = new StateDirectory( + applicationId, + config.getString(StreamsConfig.STATE_DIR_CONFIG), + mockTime); + + assertFalse(testStateDir.lock(taskId, 0)); + try { + thread.rebalanceListener.onPartitionsRevoked(Collections.emptyList()); + fail("Should have thrown exception"); + } catch (final Exception e) { + assertTrue(testStateDir.lock(taskId, 0)); + } finally { + testStateDir.unlock(taskId); + } + } + + @Test + public void shouldReleaseStateDirLockIfFailureOnTaskCloseForSuspendedTask() throws Exception { + final TaskId taskId = new TaskId(0, 0); + + final StreamThread thread = setupTest(taskId); + + final StateDirectory testStateDir = new StateDirectory( + applicationId, + config.getString(StreamsConfig.STATE_DIR_CONFIG), + mockTime); + + assertFalse(testStateDir.lock(taskId, 0)); + try { + thread.shutdownTasksAndState(true); + assertTrue(testStateDir.lock(taskId, 0)); + } finally { + testStateDir.unlock(taskId); + } + } + + private StreamThread setupTest(final TaskId taskId) { + final TopologyBuilder builder = new TopologyBuilder(); + builder.setApplicationId(applicationId); + builder.addSource("source", "topic"); + + final MockClientSupplier clientSupplier = new MockClientSupplier(); + final StateDirectory stateDirectory = new StateDirectory( + applicationId, + config.getString(StreamsConfig.STATE_DIR_CONFIG), + mockTime); + + final TestStreamTask testStreamTask = new TestStreamTask(taskId, + applicationId, + Utils.mkSet(new TopicPartition("topic", 0)), + builder.build(0), + clientSupplier.consumer, + clientSupplier.getProducer(new HashMap()), + clientSupplier.restoreConsumer, + config, + new MockStreamsMetrics(new Metrics()), + stateDirectory) { + + @Override + public void suspend() { + throw new RuntimeException("KABOOM!!!"); + } + }; + + final StreamThread thread = new StreamThread(builder, config, clientSupplier, applicationId, + clientId, processId, new Metrics(), new MockTime(), + new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0) { + @Override + protected StreamTask createStreamTask(final TaskId id, final Collection partitions) { + return testStreamTask; + } + }; + + final Map> activeTasks = new HashMap<>(); + activeTasks.put(testStreamTask.id, testStreamTask.partitions); + thread.setPartitionAssignor(new MockStreamsPartitionAssignor(activeTasks)); + thread.rebalanceListener.onPartitionsAssigned(testStreamTask.partitions); + + return thread; + } + + @Test + public void shouldReleaseStateDirLockIfFailureOnStandbyTaskSuspend() throws Exception { + final TaskId taskId = new TaskId(0, 0); + + final StreamThread thread = setupStandbyTest(taskId); + + final StateDirectory testStateDir = new StateDirectory(applicationId, + config.getString(StreamsConfig.STATE_DIR_CONFIG), + mockTime); + + assertFalse(testStateDir.lock(taskId, 0)); + try { + thread.rebalanceListener.onPartitionsRevoked(Collections.emptyList()); + fail("Should have thrown exception"); + } catch (final Exception e) { + assertTrue(testStateDir.lock(taskId, 0)); + } finally { + testStateDir.unlock(taskId); + } + } + + @Test + public void shouldReleaseStateDirLockIfFailureOnStandbyTaskCloseForUnassignedSuspendedStandbyTask() throws Exception { + final TaskId taskId = new TaskId(0, 0); + + final StreamThread thread = setupStandbyTest(taskId); + + final StateDirectory testStateDir = new StateDirectory(applicationId, + config.getString(StreamsConfig.STATE_DIR_CONFIG), + mockTime); + + assertFalse(testStateDir.lock(taskId, 0)); + try { + thread.shutdownTasksAndState(true); + assertTrue(testStateDir.lock(taskId, 0)); + } finally { + testStateDir.unlock(taskId); + } + } + + private StreamThread setupStandbyTest(final TaskId taskId) { + final String storeName = "store"; + final String changelogTopic = applicationId + "-" + storeName + "-changelog"; + + final KStreamBuilder builder = new KStreamBuilder(); + builder.setApplicationId(applicationId); + builder.stream("topic1").groupByKey().count(storeName); + + final MockClientSupplier clientSupplier = new MockClientSupplier(); + clientSupplier.restoreConsumer.updatePartitions(changelogTopic, + Collections.singletonList(new PartitionInfo(changelogTopic, 0, null, null, null))); + clientSupplier.restoreConsumer.updateBeginningOffsets(new HashMap() { + { + put(new TopicPartition(changelogTopic, 0), 0L); + } + }); + clientSupplier.restoreConsumer.updateEndOffsets(new HashMap() { + { + put(new TopicPartition(changelogTopic, 0), 0L); + } + }); + + final StreamThread thread = new StreamThread(builder, config, clientSupplier, applicationId, + clientId, processId, new Metrics(), new MockTime(), + new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0) { + + @Override + protected StandbyTask createStandbyTask(final TaskId id, final Collection partitions) { + return new StandbyTask( + taskId, + applicationId, + partitions, + builder.build(0), + clientSupplier.consumer, + new StoreChangelogReader(getName(), clientSupplier.restoreConsumer, mockTime, 1000), + StreamThreadTest.this.config, + new StreamsMetricsImpl(new Metrics(), "groupName", Collections.emptyMap()), + stateDirectory) { + + @Override + public void suspend() { + throw new RuntimeException("KABOOM!!!"); + } + + @Override + public void commit() { + throw new RuntimeException("KABOOM!!!"); + } + }; + } + }; + + final Map> standbyTasks = new HashMap<>(); + standbyTasks.put(taskId, Collections.singleton(new TopicPartition("topic", 0))); + thread.setPartitionAssignor(new MockStreamsPartitionAssignor(Collections.>emptyMap(), standbyTasks)); + thread.rebalanceListener.onPartitionsAssigned(Collections.emptySet()); + + return thread; + } + private void initPartitionGrouper(final StreamsConfig config, final StreamThread thread, final MockClientSupplier clientSupplier) { From 9d5d03a328811c0017199678c1169c921a3939cd Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Sat, 1 Jul 2017 14:13:42 -0700 Subject: [PATCH 2/2] Github comments --- .../kafka/streams/processor/internals/StreamThread.java | 3 +-- .../streams/processor/internals/StreamThreadTest.java | 8 ++++++-- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 24ee078eb60b6..a2f87d5e9fa39 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -1081,9 +1081,8 @@ private void shutdown(final boolean cleanRun) { streamsMetrics.removeAllSensors(); } - // visible for testing @SuppressWarnings("ThrowableNotThrown") - void shutdownTasksAndState(final boolean cleanRun) { + private void shutdownTasksAndState(final boolean cleanRun) { log.debug("{} Shutting down all active tasks {}, standby tasks {}, suspended tasks {}, and suspended standby tasks {}", logPrefix, activeTasks.keySet(), standbyTasks.keySet(), suspendedTasks.keySet(), suspendedStandbyTasks.keySet()); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index 6007b6c109e09..d3fffc2690661 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -1629,6 +1629,7 @@ public void shouldReleaseStateDirLockIfFailureOnTaskCloseForSuspendedTask() thro final TaskId taskId = new TaskId(0, 0); final StreamThread thread = setupTest(taskId); + thread.start(); final StateDirectory testStateDir = new StateDirectory( applicationId, @@ -1637,7 +1638,8 @@ public void shouldReleaseStateDirLockIfFailureOnTaskCloseForSuspendedTask() thro assertFalse(testStateDir.lock(taskId, 0)); try { - thread.shutdownTasksAndState(true); + thread.close(); + thread.join(); assertTrue(testStateDir.lock(taskId, 0)); } finally { testStateDir.unlock(taskId); @@ -1715,6 +1717,7 @@ public void shouldReleaseStateDirLockIfFailureOnStandbyTaskCloseForUnassignedSus final TaskId taskId = new TaskId(0, 0); final StreamThread thread = setupStandbyTest(taskId); + thread.start(); final StateDirectory testStateDir = new StateDirectory(applicationId, config.getString(StreamsConfig.STATE_DIR_CONFIG), @@ -1722,7 +1725,8 @@ public void shouldReleaseStateDirLockIfFailureOnStandbyTaskCloseForUnassignedSus assertFalse(testStateDir.lock(taskId, 0)); try { - thread.shutdownTasksAndState(true); + thread.close(); + thread.join(); assertTrue(testStateDir.lock(taskId, 0)); } finally { testStateDir.unlock(taskId);