From 0e940198076ea65387b408587e146bb08aa75e3a Mon Sep 17 00:00:00 2001 From: Damian Guy Date: Thu, 6 Jul 2017 12:05:35 +0100 Subject: [PATCH] fix broken tests --- .../processor/internals/StreamThreadTest.java | 34 +++++++++++++++---- 1 file changed, 27 insertions(+), 7 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index 2056954cab62..a7f1db13ce8e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -1817,7 +1817,6 @@ public void shouldReleaseStateDirLockIfFailureOnTaskSuspend() throws Exception { final TaskId taskId = new TaskId(0, 0); final StreamThread thread = setupTest(taskId); - final StateDirectory testStateDir = new StateDirectory( applicationId, config.getString(StreamsConfig.STATE_DIR_CONFIG), @@ -1830,6 +1829,7 @@ public void shouldReleaseStateDirLockIfFailureOnTaskSuspend() throws Exception { } catch (final Exception e) { assertTrue(testStateDir.lock(taskId, 0)); } finally { + thread.close(); testStateDir.unlock(taskId); } } @@ -1839,7 +1839,6 @@ public void shouldReleaseStateDirLockIfFailureOnTaskCloseForSuspendedTask() thro final TaskId taskId = new TaskId(0, 0); final StreamThread thread = setupTest(taskId); - thread.start(); final StateDirectory testStateDir = new StateDirectory( applicationId, @@ -1856,7 +1855,7 @@ public void shouldReleaseStateDirLockIfFailureOnTaskCloseForSuspendedTask() thro } } - private StreamThread setupTest(final TaskId taskId) { + private StreamThread setupTest(final TaskId taskId) throws InterruptedException { final TopologyBuilder builder = new TopologyBuilder(); builder.setApplicationId(applicationId); builder.addSource("source", "topic"); @@ -1896,6 +1895,15 @@ protected StreamTask createStreamTask(final TaskId id, final Collection> activeTasks = new HashMap<>(); activeTasks.put(testStreamTask.id, testStreamTask.partitions); thread.setPartitionAssignor(new MockStreamsPartitionAssignor(activeTasks)); + thread.start(); + + TestUtils.waitForCondition(new TestCondition() { + @Override + public boolean conditionMet() { + return thread.state() == StreamThread.State.RUNNING; + } + }, "thread didn't transition to running"); + thread.rebalanceListener.onPartitionsRevoked(Collections.emptySet()); thread.rebalanceListener.onPartitionsAssigned(testStreamTask.partitions); return thread; @@ -1906,7 +1914,7 @@ public void shouldReleaseStateDirLockIfFailureOnStandbyTaskSuspend() throws Exce final TaskId taskId = new TaskId(0, 0); final StreamThread thread = setupStandbyTest(taskId); - + startThreadAndRebalance(thread); final StateDirectory testStateDir = new StateDirectory(applicationId, config.getString(StreamsConfig.STATE_DIR_CONFIG), mockTime); @@ -1918,17 +1926,29 @@ public void shouldReleaseStateDirLockIfFailureOnStandbyTaskSuspend() throws Exce } catch (final Exception e) { assertTrue(testStateDir.lock(taskId, 0)); } finally { + thread.close(); testStateDir.unlock(taskId); } } + private void startThreadAndRebalance(final StreamThread thread) throws InterruptedException { + thread.start(); + TestUtils.waitForCondition(new TestCondition() { + @Override + public boolean conditionMet() { + return thread.state() == StreamThread.State.RUNNING; + } + }, "thread didn't transition to running"); + thread.rebalanceListener.onPartitionsRevoked(Collections.emptySet()); + thread.rebalanceListener.onPartitionsAssigned(Collections.emptySet()); + } + @Test public void shouldReleaseStateDirLockIfFailureOnStandbyTaskCloseForUnassignedSuspendedStandbyTask() throws Exception { final TaskId taskId = new TaskId(0, 0); final StreamThread thread = setupStandbyTest(taskId); - thread.start(); - + startThreadAndRebalance(thread); final StateDirectory testStateDir = new StateDirectory(applicationId, config.getString(StreamsConfig.STATE_DIR_CONFIG), mockTime); @@ -1939,6 +1959,7 @@ public void shouldReleaseStateDirLockIfFailureOnStandbyTaskCloseForUnassignedSus thread.join(); assertTrue(testStateDir.lock(taskId, 0)); } finally { + thread.close(); testStateDir.unlock(taskId); } } @@ -1998,7 +2019,6 @@ public void commit() { final Map> standbyTasks = new HashMap<>(); standbyTasks.put(taskId, Collections.singleton(new TopicPartition("topic", 0))); thread.setPartitionAssignor(new MockStreamsPartitionAssignor(Collections.>emptyMap(), standbyTasks)); - thread.rebalanceListener.onPartitionsAssigned(Collections.emptySet()); return thread; }