From 093488a9f63a6409042f4d7c6496b4ca503cd70f Mon Sep 17 00:00:00 2001 From: Jacob Maes Date: Fri, 11 Aug 2017 15:09:58 -0700 Subject: [PATCH 1/2] SAMZA-1388: Flaky test - TestStatefulTask#testShouldStartAndRestore --- .../test/integration/TestStatefulTask.scala | 75 +++++++------------ 1 file changed, 27 insertions(+), 48 deletions(-) diff --git a/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala b/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala index e5b6756f56..24c71b3dcf 100644 --- a/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala +++ b/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala @@ -30,8 +30,16 @@ import org.junit.{AfterClass, BeforeClass, Test} import scala.collection.JavaConverters._ object TestStatefulTask { - val STORE_NAME = "mystore" - val STATE_TOPIC_STREAM = "mystoreChangelog" + val STORE_NAME = "mystore" + val STATE_TOPIC_STREAM = "mystoreChangelog" + + // Messages with one dupe and one delete. A negative string means delete. See StateStoreTestTask.testProcess() + val MESSAGES_SEND_1 = List("1", "2", "3", "2", "99", "-99") + val MESSAGES_RECV_1 = List("1", "2", "3", "2", "99", null) + val STORE_CONTENTS_1 = List("1", "2", "3") + + val MESSAGES_SEND_2 = List("4", "5", "5") + val MESSAGES_RECV_2 = List("4", "5", "5") @BeforeClass def beforeSetupServers { @@ -47,9 +55,9 @@ object TestStatefulTask { /** * Test that does the following: * 1. Start a single partition of TestStateStoreTask using ThreadJobFactory. - * 2. Send four messages to input (1,2,3,2), which contain one dupe (2). + * 2. Send MESSAGES, which contain a dupe and a delete. * 3. Validate that all messages were received by TestStateStoreTask. - * 4. Validate that TestStateStoreTask called store.put() for all four messages, and that the messages ended up in the mystore topic. + * 4. Validate that TestStateStoreTask called store.put() for all messages, and that the messages ended up in the mystore topic. * 5. Kill the job. * 6. Start the job again. * 7. Validate that the job restored all messages (1,2,3) to the store. @@ -86,23 +94,12 @@ class TestStatefulTask extends StreamTaskTestUtil { assertEquals(0, task.received.size) // Send some messages to input stream. - send(task, "1") - send(task, "2") - send(task, "3") - send(task, "2") - send(task, "99") - send(task, "-99") + TestStatefulTask.MESSAGES_SEND_1.foreach(m => send(task, m)) // Validate that messages appear in store stream. val messages = readAll(TestStatefulTask.STATE_TOPIC_STREAM, 5, "testShouldStartTaskForFirstTime") - assertEquals(6, messages.length) - assertEquals("1", messages(0)) - assertEquals("2", messages(1)) - assertEquals("3", messages(2)) - assertEquals("2", messages(3)) - assertEquals("99", messages(4)) - assertNull(messages(5)) + assertEquals(TestStatefulTask.MESSAGES_RECV_1, messages) stopJob(job) } @@ -111,52 +108,34 @@ class TestStatefulTask extends StreamTaskTestUtil { val (job, task) = startJob // Validate that restored has expected data. - assertEquals(3, task.asInstanceOf[StateStoreTestTask].restored.size) - assertTrue(task.asInstanceOf[StateStoreTestTask].restored.contains("1")) - assertTrue(task.asInstanceOf[StateStoreTestTask].restored.contains("2")) - assertTrue(task.asInstanceOf[StateStoreTestTask].restored.contains("3")) + assertEquals(TestStatefulTask.STORE_CONTENTS_1.length, task.asInstanceOf[StateStoreTestTask].restored.size) + TestStatefulTask.STORE_CONTENTS_1.foreach(m => assertTrue(task.asInstanceOf[StateStoreTestTask].restored.contains(m))) var count = 0 - // We should get the original four messages in the stream (1,2,3,2). + // We should get the original size messages in the stream (1,2,3,2,99,-99). // Note that this will trigger four new outgoing messages to the STATE_TOPIC. - while (task.received.size < 4 && count < 100) { + while (task.received.size < TestStatefulTask.MESSAGES_RECV_1.length && count < 100) { Thread.sleep(600) count += 1 } assertTrue("Timed out waiting to received messages. Received thus far: " + task.received.size, count < 100) - // Reset the count down latch after the 4 messages come in. + // Reset the count down latch after the 6 messages come in. task.awaitMessage // Send some messages to input stream. - send(task, "4") - send(task, "5") - send(task, "5") + TestStatefulTask.MESSAGES_SEND_2.foreach(m => send(task, m)) + + val expectedMessagesRcvd = TestStatefulTask.MESSAGES_RECV_1 ++ // From initial start. + TestStatefulTask.MESSAGES_RECV_1 ++ // From second startup. + TestStatefulTask.MESSAGES_RECV_2 // From sending in this method. // Validate that messages appear in store stream. - val messages = readAll(TestStatefulTask.STATE_TOPIC_STREAM, 14, "testShouldRestoreStore") - - assertEquals(15, messages.length) - // From initial start. - assertEquals("1", messages(0)) - assertEquals("2", messages(1)) - assertEquals("3", messages(2)) - assertEquals("2", messages(3)) - assertEquals("99", messages(4)) - assertNull(messages(5)) - // From second startup. - assertEquals("1", messages(6)) - assertEquals("2", messages(7)) - assertEquals("3", messages(8)) - assertEquals("2", messages(9)) - assertEquals("99", messages(10)) - assertNull(messages(11)) - // From sending in this method. - assertEquals("4", messages(12)) - assertEquals("5", messages(13)) - assertEquals("5", messages(14)) + val messages = readAll(TestStatefulTask.STATE_TOPIC_STREAM, expectedMessagesRcvd.length-1, "testShouldRestoreStore") + + assertEquals(expectedMessagesRcvd, messages) stopJob(job) } From 6802c0cbcc7c702f1cbbbc1fe5b4b2042af415f1 Mon Sep 17 00:00:00 2001 From: Jacob Maes Date: Fri, 11 Aug 2017 15:16:55 -0700 Subject: [PATCH 2/2] Adjust the javadoc for clarity --- .../apache/samza/test/integration/TestStatefulTask.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala b/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala index 24c71b3dcf..734487be35 100644 --- a/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala +++ b/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala @@ -55,13 +55,13 @@ object TestStatefulTask { /** * Test that does the following: * 1. Start a single partition of TestStateStoreTask using ThreadJobFactory. - * 2. Send MESSAGES, which contain a dupe and a delete. + * 2. Send MESSAGES_SEND_1, which contains a dupe and a delete. * 3. Validate that all messages were received by TestStateStoreTask. * 4. Validate that TestStateStoreTask called store.put() for all messages, and that the messages ended up in the mystore topic. * 5. Kill the job. * 6. Start the job again. - * 7. Validate that the job restored all messages (1,2,3) to the store. - * 8. Send three more messages to input (4,5,5), and validate that TestStateStoreTask receives them. + * 7. Validate that the job restored all messages STORE_CONTENTS_1 to the store. + * 8. Send three more messages to input MESSAGES_SEND_2, and validate that TestStateStoreTask receives them. * 9. Kill the job again. */ class TestStatefulTask extends StreamTaskTestUtil {