From bca00e931a4a249608908db7edd9b011e3a6129a Mon Sep 17 00:00:00 2001 From: Boris Shkolnik Date: Tue, 9 May 2017 17:59:45 -0700 Subject: [PATCH 01/25] used ApplicationConfig constant --- .../test/processor/TestZkStreamProcessor.java | 558 ++++++++++++++++++ 1 file changed, 558 insertions(+) create mode 100644 samza-test/src/test/java/org/apache/samza/test/processor/TestZkStreamProcessor.java diff --git a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkStreamProcessor.java b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkStreamProcessor.java new file mode 100644 index 0000000000..005e83e981 --- /dev/null +++ b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkStreamProcessor.java @@ -0,0 +1,558 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.test.processor; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import kafka.utils.TestUtils; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.samza.config.ApplicationConfig; +import org.apache.samza.config.Config; +import org.apache.samza.config.JobCoordinatorConfig; +import org.apache.samza.config.MapConfig; +import org.apache.samza.config.ZkConfig; +import org.apache.samza.processor.StreamProcessor; +import org.apache.samza.processor.StreamProcessorLifecycleListener; +import org.apache.samza.system.IncomingMessageEnvelope; +import org.apache.samza.system.OutgoingMessageEnvelope; +import org.apache.samza.system.SystemStream; +import org.apache.samza.task.InitableTask; +import org.apache.samza.task.MessageCollector; +import org.apache.samza.task.StreamTask; +import org.apache.samza.task.TaskContext; +import org.apache.samza.task.TaskCoordinator; +import org.apache.samza.test.StandaloneIntegrationTestHarness; +import org.apache.samza.test.StandaloneTestUtils; +import org.apache.samza.zk.TestZkUtils; +import org.junit.Assert; +import org.junit.Test; + +public class TestZkStreamProcessor extends StandaloneIntegrationTestHarness { + /** + * Testing a basic identity stream task - reads data from a topic and writes it to another topic + * (without any modifications) + * + *

+ * The standalone version in this test uses KafkaSystemFactory and it uses a SingleContainerGrouperFactory. Hence, + * no matter how many tasks are present, it will always be run in a single processor instance. This simplifies testing + */ + + public static StreamProcessorLifecycleListener listener = new StreamProcessorLifecycleListener() { + @Override + public void onStart() { + } + + @Override + public void onShutdown() { + } + + @Override + public void onFailure(Throwable t) { + } + }; + + @Test + public void testSingleStreamProcessor() { + testStreamProcessor(new String[]{"1"}); + } + + @Test + public void testTwoStreamProcessors() { + testStreamProcessor(new String[]{"1", "2"}); + } + + @Test + public void testFiveStreamProcessors() { + testStreamProcessor(new String[]{"1", "2", "3", "4", "5"}); + } + + // main test method for happy path with fixed number of processors + private void testStreamProcessor(String[] processorIds) { + final String testSystem = "test-system"; + final String inputTopic = "numbers"; + final String outputTopic = "output"; + final int messageCount = 40; + + final Map map = createConfigs(testSystem, inputTopic, outputTopic, messageCount); + + // Note: createTopics needs to be called before creating a StreamProcessor. Otherwise it fails with a + // TopicExistsException since StreamProcessor auto-creates them. + createTopics(inputTopic, outputTopic); + + // create a latch of the size == number of messages + TestStreamTask.endLatch = new CountDownLatch(messageCount); + + StreamProcessor[] streamProcessors = new StreamProcessor[processorIds.length]; + CountDownLatch[] startCountDownLatches = new CountDownLatch[processorIds.length]; + for (int i = 0; i < processorIds.length; i++) { + startCountDownLatches[i] = new CountDownLatch(1); + streamProcessors[i] = createStreamProcessor(processorIds[i], map, startCountDownLatches[i], null); + } + produceMessages(0, inputTopic, messageCount); + + Thread[] threads = new Thread[processorIds.length]; + + for (int i = 0; i < processorIds.length; i++) { + threads[i] = runInThread(streamProcessors[i], TestStreamTask.endLatch); + threads[i].start(); + // wait until the processor reports that it has started + try { + startCountDownLatches[i].await(1000, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + Assert.fail("got interrupted while waiting for the " + i + "th processor to start."); + } + } + + // wait until all the events are consumed + try { + TestStreamTask.endLatch.await(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Assert.fail("endLatch.await failed with an interruption:" + e.getLocalizedMessage()); + } + + // collect all the threads + try { + for (Thread t : threads) { + synchronized (t) { + t.notify(); // to stop the thread + } + t.join(1000); + } + } catch (InterruptedException e) { + Assert.fail("Failed to join finished thread:" + e.getLocalizedMessage()); + } + + // we should get each value one time + Map expectedValues = new HashMap<>(messageCount); + for (int i = 0; i < messageCount; i++) { + expectedValues.put(i, false); + } + verifyNumMessages(outputTopic, expectedValues, messageCount); + } + + @Test + // test with adding another processor + public void testStreamProcessorWithAdd() { + final String testSystem = "test-system1"; + final String inputTopic = "numbers"; + final String outputTopic = "output"; + final int messageCount = 40; + + final Map map = createConfigs(testSystem, inputTopic, outputTopic, messageCount); + + // Note: createTopics needs to be called before creating a StreamProcessor. Otherwise it fails with a + // TopicExistsException since StreamProcessor auto-creates them. + createTopics(inputTopic, outputTopic); + + // set number of events we expect two read by both processes in total: + // p1 - reads 'messageCount' at first and then both p1 and p2 read all ('messageCount') together, since they + // start from the beginnig. + // so we expect total 3 x messageCounts + int totalEventsToGenerate = 3 * messageCount; + TestStreamTask.endLatch = new CountDownLatch(totalEventsToGenerate); + + // create first processor + CountDownLatch startCountDownLatch1 = new CountDownLatch(1); + StreamProcessor sp = createStreamProcessor("1", map, startCountDownLatch1, null); + + // produce first batch of messages starting with 0 + produceMessages(0, inputTopic, messageCount); + + // start the first processor + Thread t1 = runInThread(sp, TestStreamTask.endLatch); + t1.start(); + + // wait until the processor reports that it has started + try { + startCountDownLatch1.await(1000, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + Assert.fail("got interrupted while waiting for the first processor to start."); + } + + // make sure it consumes all the messages from the first batch + int attempts = 5; + while (attempts > 0) { + long leftEventsCount = TestStreamTask.endLatch.getCount(); + System.out.println("current count = " + leftEventsCount); + if (leftEventsCount == totalEventsToGenerate - messageCount) { // read first batch + System.out.println("read all. current count = " + leftEventsCount); + break; + } + TestZkUtils.sleepMs(1000); + attempts--; + } + Assert.assertTrue("Didn't read all the events in the first batch in 5 attempts", attempts > 0); + + // start the second processor + CountDownLatch countDownLatch2 = new CountDownLatch(1); + StreamProcessor sp2 = createStreamProcessor("2", map, countDownLatch2, null); + Thread t2 = runInThread(sp2, TestStreamTask.endLatch); + t2.start(); + + // wait until the processor reports that it has started + try { + countDownLatch2.await(1000, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + Assert.fail("got interrupted while waiting for the 2nd processor to start."); + } + + // wait for at least one full debounce time to let the system to publish and distribute the new job model + TestZkUtils.sleepMs(3000); + + // produce the second batch of the messages, starting with 'messageCount' + produceMessages(messageCount, inputTopic, messageCount); + + // wait until all the events are consumed + // make sure it consumes all the messages from the first batch + attempts = 5; + while (attempts > 0) { + long leftEventsCount = TestStreamTask.endLatch.getCount(); // how much is left to read + System.out.println("2current count = " + leftEventsCount); + if (leftEventsCount == 0) { // should read all of them + System.out.println("2read all. current count = " + leftEventsCount); + break; + } + TestZkUtils.sleepMs(1000); + attempts--; + } + Assert.assertTrue("Didn't read all the leftover events in 5 attempts", attempts > 0); + + try { + synchronized (t1) { + t1.notify(); + } + synchronized (t2) { + t2.notify(); + } + t1.join(1000); + t2.join(1000); + } catch (InterruptedException e) { + Assert.fail("Failed to join finished threads:" + e.getLocalizedMessage()); + } + + // processor1 will read 40 events, and then processor1 and processor2 will read 80 events together, + // but the expected values are the same 0-79 - we should get each value one time. + // Meanwhile the number of events we gonna get is 80+40=120 + Map expectedValues = new HashMap<>(2 * messageCount); + for (int i = 0; i < 2 * messageCount; i++) { + expectedValues.put(i, false); + } + verifyNumMessages(outputTopic, expectedValues, totalEventsToGenerate); + } + + @Test + // test with a single processor removed + public void testStreamProcessorWithRemove() { + final String testSystem = "test-system2"; + final String inputTopic = "numbers"; + final String outputTopic = "output"; + final int messageCount = 40; + + final Map map = createConfigs(testSystem, inputTopic, outputTopic, messageCount); + + // Note: createTopics needs to be called before creating a StreamProcessor. Otherwise it fails with a + // TopicExistsException since StreamProcessor auto-creates them. + createTopics(inputTopic, outputTopic); + + // set number of events we expect to read by both processes in total: + // p1 and p2 - both read messageCount at first and p1 is shutdown, new batch of events is generated + // and p2 will read all of them from the beginning (+ 2 x messageCounts, total 3 x) + int totalEventsToGenerate = 3 * messageCount; + TestStreamTask.endLatch = new CountDownLatch(totalEventsToGenerate); + + // create first processor + CountDownLatch startCountDownLatch1 = new CountDownLatch(1); + CountDownLatch stopCountDownLatch1 = new CountDownLatch(1); + StreamProcessor sp1 = createStreamProcessor("1", map, startCountDownLatch1, stopCountDownLatch1); + + // start the first processor + Thread t1 = runInThread(sp1, TestStreamTask.endLatch); + t1.start(); + + // start the second processor + CountDownLatch countDownLatch2 = new CountDownLatch(1); + StreamProcessor sp2 = createStreamProcessor("2", map, countDownLatch2, null); + Thread t2 = runInThread(sp2, TestStreamTask.endLatch); + t2.start(); + + // wait until the processor reports that it has started + try { + startCountDownLatch1.await(1000, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + Assert.fail("got interrupted while waiting for the first processor to start."); + } + + // wait until the processor reports that it has started + try { + countDownLatch2.await(1000, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + Assert.fail("got interrupted while waiting for the 2nd processor to start."); + } + + // produce first batch of messages starting with 0 + produceMessages(0, inputTopic, messageCount); + + // make sure they consume all the messages from the first batch + int attempts = 5; + while (attempts > 0) { + long leftEventsCount = TestStreamTask.endLatch.getCount(); + System.out.println("current count = " + leftEventsCount); + if (leftEventsCount == totalEventsToGenerate - messageCount) { // read first batch + System.out.println("read all. current count = " + leftEventsCount); + break; + } + TestZkUtils.sleepMs(1000); + attempts--; + } + Assert.assertTrue("Didn't read all the events in the first batch in 5 attempts", attempts > 0); + + // stop the first processor + synchronized (t1) { + t1.notify(); // this should stop it + } + + // wait until it's really down + try { + stopCountDownLatch1.await(1000, TimeUnit.MILLISECONDS); + System.out.println("Processor 1 is down"); + } catch (InterruptedException e) { + Assert.fail("got interrupted while waiting for the 1st processor to stop."); + } + + // wait for at least one full debounce time to let the system to publish and distribute the new job model + TestZkUtils.sleepMs(3000); + + // produce the second batch of the messages, starting with 'messageCount' + produceMessages(messageCount, inputTopic, messageCount); + + // wait until p2 consumes all the message by itself + attempts = 5; + while (attempts > 0) { + long leftEventsCount = TestStreamTask.endLatch.getCount(); + System.out.println("2current count = " + leftEventsCount); + if (leftEventsCount == 0) { // should read all of them + System.out.println("2read all. current count = " + leftEventsCount); + break; + } + TestZkUtils.sleepMs(1000); + attempts--; + } + Assert.assertTrue("Didn't read all the leftover events in 5 attempts", attempts > 0); + + // shutdown p2 + + try { + synchronized (t2) { + t2.notify(); + } + t2.join(1000); + } catch (InterruptedException e) { + Assert.fail("Failed to join finished thread:" + e.getLocalizedMessage()); + } + + // processor1 and 2 will both read 20 events (total 40), and then processor2 read 80 events by itself, + // but the expected values are the same 0-79 - we should get each value one time. + // Meanwhile the number of events we gonna get is 40 + 80 + Map expectedValues = new HashMap<>(2 * messageCount); + for (int i = 0; i < 2 * messageCount; i++) { + expectedValues.put(i, false); + } + verifyNumMessages(outputTopic, expectedValues, totalEventsToGenerate); + } + + // auxiliary methods + private StreamProcessor createStreamProcessor(String pId, Map map, + final CountDownLatch startLatchCountDown, final CountDownLatch stopLatchCountDown) { + map.put(ApplicationConfig.PROCESSOR_ID, pId); + + StreamProcessor processor = new StreamProcessor(new MapConfig(map), new HashMap<>(), TestStreamTask::new, + new StreamProcessorLifecycleListener() { + + @Override + public void onStart() { + if (startLatchCountDown != null) { + startLatchCountDown.countDown(); + } + } + + @Override + public void onShutdown() { + if (stopLatchCountDown != null) { + stopLatchCountDown.countDown(); + } + } + + @Override + public void onFailure(Throwable t) { + } + }); + + return processor; + } + + private void createTopics(String inputTopic, String outputTopic) { + TestUtils.createTopic(zkUtils(), inputTopic, 5, 1, servers(), new Properties()); + TestUtils.createTopic(zkUtils(), outputTopic, 5, 1, servers(), new Properties()); + } + + private Map createConfigs(String testSystem, String inputTopic, String outputTopic, + int messageCount) { + Map configs = new HashMap<>(); + configs.putAll(StandaloneTestUtils + .getStandaloneConfigs("test-job", "org.apache.samza.test.processor.TestZkStreamProcessor.TestStreamTask")); + configs.putAll(StandaloneTestUtils + .getKafkaSystemConfigs(testSystem, bootstrapServers(), zkConnect(), null, StandaloneTestUtils.SerdeAlias.STRING, + true)); + configs.put("task.inputs", String.format("%s.%s", testSystem, inputTopic)); + configs.put("app.messageCount", String.valueOf(messageCount)); + configs.put("app.outputTopic", outputTopic); + configs.put("app.outputSystem", testSystem); + configs.put(ZkConfig.ZK_CONNECT, zkConnect()); + + configs.put("job.systemstreampartition.grouper.factory", + "org.apache.samza.container.grouper.stream.GroupByPartitionFactory"); + configs.put("task.name.grouper.factory", "org.apache.samza.container.grouper.task.GroupByContainerIdsFactory"); + + configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, "org.apache.samza.zk.ZkJobCoordinatorFactory"); + + return configs; + } + + /** + * Produces the provided number of messages to the topic. + */ + private void produceMessages(final int start, String topic, int numMessages) { + KafkaProducer producer = getKafkaProducer(); + for (int i = start; i < numMessages + start; i++) { + try { + System.out.println("producing " + i); + producer.send(new ProducerRecord(topic, String.valueOf(i).getBytes())).get(); + } catch (InterruptedException | ExecutionException e) { + e.printStackTrace(); + } + } + } + + /** + * Runs the provided stream processor by starting it, waiting on the provided latch with a timeout, + * and then stopping it. + */ + private Thread runInThread(final StreamProcessor processor, CountDownLatch latch) { + Thread t = new Thread() { + + @Override + public void run() { + processor.start(); + try { + // just wait + synchronized (this) { + this.wait(100000); + } + System.out.println("notifed. Abandon the wait."); + } catch (InterruptedException e) { + e.printStackTrace(); + } + System.out.println("Stopping the processor"); + processor.stop(); + } + }; + return t; + } + + /** + * Consumes data from the topic until there are no new messages for a while + * and asserts that the number of consumed messages is as expected. + */ + private void verifyNumMessages(String topic, final Map expectedValues, int expectedNumMessages) { + KafkaConsumer consumer = getKafkaConsumer(); + consumer.subscribe(Collections.singletonList(topic)); + + Map map = new HashMap<>(expectedValues); + int count = 0; + int emptyPollCount = 0; + + while (count < expectedNumMessages && emptyPollCount < 5) { + ConsumerRecords records = consumer.poll(5000); + if (!records.isEmpty()) { + Iterator iterator = records.iterator(); + while (iterator.hasNext()) { + ConsumerRecord record = iterator.next(); + //Assert.assertEquals(new String((byte[]) record.value()), String.valueOf(count)); + String val = new String((byte[]) record.value()); + System.out.println("Got value " + val); + map.put(Integer.valueOf(val), true); + count++; + } + } else { + emptyPollCount++; + System.out.println("empty polls " + emptyPollCount); + } + } + // filter out numbers we did not get + long numFalse = map.values().stream().filter(v -> !v).count(); + Assert.assertEquals("didn't get this number of events ", 0, numFalse); + Assert.assertEquals(expectedNumMessages, count); + } + + // StreamTaskClass + public static class TestStreamTask implements StreamTask, InitableTask { + // static field since there's no other way to share state b/w a task instance and + // stream processor when constructed from "task.class". + static CountDownLatch endLatch; + private int processedMessageCount = 0; + private String processorId; + private String outputTopic; + private String outputSystem; + + @Override + public void init(Config config, TaskContext taskContext) + throws Exception { + this.processorId = config.get(ApplicationConfig.PROCESSOR_ID); + this.outputTopic = config.get("app.outputTopic", "output"); + this.outputSystem = config.get("app.outputSystem", "test-system"); + } + + @Override + public void process(IncomingMessageEnvelope incomingMessageEnvelope, MessageCollector messageCollector, + TaskCoordinator taskCoordinator) + throws Exception { + messageCollector.send(new OutgoingMessageEnvelope(new SystemStream(outputSystem, outputTopic), + incomingMessageEnvelope.getMessage())); + processedMessageCount++; + String message = (String) incomingMessageEnvelope.getMessage(); + System.out.println( + "Stream processor " + processorId + ";offset=" + incomingMessageEnvelope.getOffset() + ";received " + message + + "; ssp=" + incomingMessageEnvelope.getSystemStreamPartition()); + synchronized (endLatch) { + endLatch.countDown(); + } + } + } +} From d0fbc70ec4d1a3462a93a47b7885bc1d6be299f8 Mon Sep 17 00:00:00 2001 From: Boris Shkolnik Date: Wed, 10 May 2017 16:17:27 -0700 Subject: [PATCH 02/25] adding first draft of the test --- .../org/apache/samza/zk/ZkJobCoordinator.java | 10 ++- .../java/org/apache/samza/zk/ZkUtils.java | 23 +++++++ .../test/processor/TestZkStreamProcessor.java | 68 +++++++++++++++++++ 3 files changed, 99 insertions(+), 2 deletions(-) diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java index 37eba2d844..8d474ca443 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java @@ -50,9 +50,9 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener { // with locality. Since host-affinity is not yet implemented, this can be fixed as part of SAMZA-1197 private static final int METADATA_CACHE_TTL_MS = 5000; - private final ZkUtils zkUtils; + private ZkUtils zkUtils; private final String processorId; - private final ZkController zkController; + private ZkController zkController; private final Config config; private final CoordinationUtils coordinationUtils; @@ -67,6 +67,10 @@ public ZkJobCoordinator(Config config) { this.processorId = createProcessorId(config); this.coordinationUtils = new ZkCoordinationServiceFactory() .getCoordinationService(new ApplicationConfig(config).getGlobalAppId(), String.valueOf(processorId), config); + + } + + private void init() { this.zkUtils = ((ZkCoordinationUtils) coordinationUtils).getZkUtils(); LeaderElector leaderElector = new ZkLeaderElector(processorId, zkUtils); leaderElector.setLeaderElectorListener(new LeaderElectorListenerImpl()); @@ -75,6 +79,8 @@ public ZkJobCoordinator(Config config) { @Override public void start() { + init(); + streamMetadataCache = StreamMetadataCache.apply(METADATA_CACHE_TTL_MS, config); debounceTimer = new ScheduleAfterDebounceTime(throwable -> { LOG.error("Received exception from in JobCoordinator Processing!", throwable); diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java index be877a42e6..5abe019a24 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java @@ -26,12 +26,14 @@ import java.util.concurrent.TimeUnit; import org.I0Itec.zkclient.IZkChildListener; import org.I0Itec.zkclient.IZkDataListener; +import org.I0Itec.zkclient.IZkStateListener; import org.I0Itec.zkclient.ZkClient; import org.I0Itec.zkclient.ZkConnection; import org.I0Itec.zkclient.exception.ZkInterruptedException; import org.apache.samza.SamzaException; import org.apache.samza.job.model.JobModel; import org.apache.samza.serializers.model.SamzaObjectMapper; +import org.apache.zookeeper.Watcher; import org.apache.zookeeper.data.Stat; import org.codehaus.jackson.map.ObjectMapper; import org.slf4j.Logger; @@ -70,6 +72,27 @@ public ZkUtils(ZkKeyBuilder zkKeyBuilder, ZkClient zkClient, int connectionTimeo this.keyBuilder = zkKeyBuilder; this.connectionTimeoutMs = connectionTimeoutMs; this.zkClient = zkClient; + + // subscribe for state changes + zkClient.subscribeStateChanges(new IZkStateListener() { + @Override + public void handleStateChanged(Watcher.Event.KeeperState state) + throws Exception { + System.out.printf("=========================NEW STATE CHANGE:" + state.name()); + } + + @Override + public void handleNewSession() + throws Exception { + System.out.printf("==========================NEW SESSION"); + } + + @Override + public void handleSessionEstablishmentError(Throwable error) + throws Exception { + System.out.printf("===========================NEW EstablishementError"); + } + }); } public void connect() throws ZkInterruptedException { diff --git a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkStreamProcessor.java b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkStreamProcessor.java index 005e83e981..4ceb875519 100644 --- a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkStreamProcessor.java +++ b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkStreamProcessor.java @@ -387,6 +387,74 @@ public void testStreamProcessorWithRemove() { verifyNumMessages(outputTopic, expectedValues, totalEventsToGenerate); } + @Test + public void testZkUnavailable() { + final String testSystem = "test-system"; + final String inputTopic = "numbers"; + final String outputTopic = "output"; + final int messageCount = 40; + + String [] processorIds = new String [] { "1", "2" }; + + final Map map = createConfigs(testSystem, inputTopic, outputTopic, messageCount); + map.put(ZkConfig.ZK_CONNECT, "localhost:2222"); + + // Note: createTopics needs to be called before creating a StreamProcessor. Otherwise it fails with a + // TopicExistsException since StreamProcessor auto-creates them. + createTopics(inputTopic, outputTopic); + + // create a latch of the size == number of messages + TestStreamTask.endLatch = new CountDownLatch(messageCount); + + StreamProcessor[] streamProcessors = new StreamProcessor[processorIds.length]; + CountDownLatch[] startCountDownLatches = new CountDownLatch[processorIds.length]; + for (int i = 0; i < processorIds.length; i++) { + startCountDownLatches[i] = new CountDownLatch(1); + streamProcessors[i] = createStreamProcessor(processorIds[i], map, startCountDownLatches[i], null); + } + produceMessages(0, inputTopic, messageCount); + + Thread[] threads = new Thread[processorIds.length]; + + for (int i = 0; i < processorIds.length; i++) { + threads[i] = runInThread(streamProcessors[i], TestStreamTask.endLatch); + threads[i].start(); + // wait until the processor reports that it has started + try { + startCountDownLatches[i].await(1000, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + Assert.fail("got interrupted while waiting for the " + i + "th processor to start."); + } + } + + // wait until all the events are consumed + try { + TestStreamTask.endLatch.await(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Assert.fail("endLatch.await failed with an interruption:" + e.getLocalizedMessage()); + } + + // collect all the threads + try { + for (Thread t : threads) { + synchronized (t) { + t.notify(); // to stop the thread + } + t.join(1000); + } + } catch (InterruptedException e) { + Assert.fail("Failed to join finished thread:" + e.getLocalizedMessage()); + } + + // we should get each value one time + Map expectedValues = new HashMap<>(messageCount); + for (int i = 0; i < messageCount; i++) { + expectedValues.put(i, false); + } + verifyNumMessages(outputTopic, expectedValues, messageCount); + } + + ////////////////////////////////////////////////////////////////////////////////////////////////// // auxiliary methods private StreamProcessor createStreamProcessor(String pId, Map map, final CountDownLatch startLatchCountDown, final CountDownLatch stopLatchCountDown) { From 26fba1c6e13cdb47b5ca6cb2e08419263b416758 Mon Sep 17 00:00:00 2001 From: Boris Shkolnik Date: Wed, 10 May 2017 16:49:49 -0700 Subject: [PATCH 03/25] set shorter timeout for zk connect --- .../zk/ZkCoordinationServiceFactory.java | 12 +++- .../test/processor/TestZkStreamProcessor.java | 62 ++----------------- 2 files changed, 16 insertions(+), 58 deletions(-) diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java index 07da147aeb..82e6039129 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java @@ -18,17 +18,27 @@ */ package org.apache.samza.zk; +import org.I0Itec.zkclient.IZkStateListener; import org.I0Itec.zkclient.ZkClient; +import org.apache.samza.SamzaException; import org.apache.samza.config.Config; import org.apache.samza.config.ZkConfig; import org.apache.samza.coordinator.CoordinationUtils; import org.apache.samza.coordinator.CoordinationServiceFactory; +import org.apache.zookeeper.Watcher; + public class ZkCoordinationServiceFactory implements CoordinationServiceFactory { // TODO - Why should this method be synchronized? synchronized public CoordinationUtils getCoordinationService(String groupId, String participantId, Config config) { ZkConfig zkConfig = new ZkConfig(config); - ZkClient zkClient = new ZkClient(zkConfig.getZkConnect(), zkConfig.getZkSessionTimeoutMs(), zkConfig.getZkConnectionTimeoutMs()); + ZkClient zkClient; + try { + zkClient = new ZkClient(zkConfig.getZkConnect(), zkConfig.getZkSessionTimeoutMs(), zkConfig.getZkConnectionTimeoutMs()); + } catch (Exception e) { + throw new SamzaException("zkClient failed to connect to ZK at :" + zkConfig.getZkConnect(), e); + } + ZkUtils zkUtils = new ZkUtils(new ZkKeyBuilder(groupId), zkClient, zkConfig.getZkConnectionTimeoutMs()); return new ZkCoordinationUtils(participantId, zkConfig, zkUtils, new ScheduleAfterDebounceTime()); } diff --git a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkStreamProcessor.java b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkStreamProcessor.java index 4ceb875519..d4ca29d80d 100644 --- a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkStreamProcessor.java +++ b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkStreamProcessor.java @@ -387,71 +387,19 @@ public void testStreamProcessorWithRemove() { verifyNumMessages(outputTopic, expectedValues, totalEventsToGenerate); } - @Test + @Test(expected = org.apache.samza.SamzaException.class) public void testZkUnavailable() { final String testSystem = "test-system"; final String inputTopic = "numbers"; final String outputTopic = "output"; final int messageCount = 40; - String [] processorIds = new String [] { "1", "2" }; - final Map map = createConfigs(testSystem, inputTopic, outputTopic, messageCount); - map.put(ZkConfig.ZK_CONNECT, "localhost:2222"); - - // Note: createTopics needs to be called before creating a StreamProcessor. Otherwise it fails with a - // TopicExistsException since StreamProcessor auto-creates them. - createTopics(inputTopic, outputTopic); - - // create a latch of the size == number of messages - TestStreamTask.endLatch = new CountDownLatch(messageCount); - - StreamProcessor[] streamProcessors = new StreamProcessor[processorIds.length]; - CountDownLatch[] startCountDownLatches = new CountDownLatch[processorIds.length]; - for (int i = 0; i < processorIds.length; i++) { - startCountDownLatches[i] = new CountDownLatch(1); - streamProcessors[i] = createStreamProcessor(processorIds[i], map, startCountDownLatches[i], null); - } - produceMessages(0, inputTopic, messageCount); + map.put(ZkConfig.ZK_CONNECT, "localhost:2222"); // non-existing zk + map.put(ZkConfig.ZK_CONNECTION_TIMEOUT_MS, "3000"); - Thread[] threads = new Thread[processorIds.length]; - - for (int i = 0; i < processorIds.length; i++) { - threads[i] = runInThread(streamProcessors[i], TestStreamTask.endLatch); - threads[i].start(); - // wait until the processor reports that it has started - try { - startCountDownLatches[i].await(1000, TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - Assert.fail("got interrupted while waiting for the " + i + "th processor to start."); - } - } - - // wait until all the events are consumed - try { - TestStreamTask.endLatch.await(10, TimeUnit.SECONDS); - } catch (InterruptedException e) { - Assert.fail("endLatch.await failed with an interruption:" + e.getLocalizedMessage()); - } - - // collect all the threads - try { - for (Thread t : threads) { - synchronized (t) { - t.notify(); // to stop the thread - } - t.join(1000); - } - } catch (InterruptedException e) { - Assert.fail("Failed to join finished thread:" + e.getLocalizedMessage()); - } - - // we should get each value one time - Map expectedValues = new HashMap<>(messageCount); - for (int i = 0; i < messageCount; i++) { - expectedValues.put(i, false); - } - verifyNumMessages(outputTopic, expectedValues, messageCount); + CountDownLatch startLatch = new CountDownLatch(1); + createStreamProcessor("1", map, startLatch, null); // this should fail with timeout exception } ////////////////////////////////////////////////////////////////////////////////////////////////// From c58643146c063968c7b2956aea41f625803533d4 Mon Sep 17 00:00:00 2001 From: Boris Shkolnik Date: Thu, 11 May 2017 09:53:50 -0700 Subject: [PATCH 04/25] fixed fail test --- .../org/apache/samza/zk/ZkControllerImpl.java | 1 + .../org/apache/samza/zk/ZkJobCoordinator.java | 13 +- .../java/org/apache/samza/zk/ZkUtils.java | 23 --- .../test/processor/TestZkStreamProcessor.java | 157 +++++++++++++++--- 4 files changed, 140 insertions(+), 54 deletions(-) diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java b/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java index 7821ef9dc6..318bc2edaf 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java @@ -76,6 +76,7 @@ public void stop() { if (isLeader()) { zkLeaderElector.resignLeadership(); } + zkUtils.getZkClient().close(); } @Override diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java index adc7177252..0786e03a5e 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java @@ -50,9 +50,9 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener { // with locality. Since host-affinity is not yet implemented, this can be fixed as part of SAMZA-1197 private static final int METADATA_CACHE_TTL_MS = 5000; - private ZkUtils zkUtils; + private final ZkUtils zkUtils; private final String processorId; - private ZkController zkController; + private final ZkController zkController; private final Config config; private final CoordinationUtils coordinationUtils; @@ -67,10 +67,6 @@ public ZkJobCoordinator(Config config) { this.processorId = createProcessorId(config); this.coordinationUtils = new ZkCoordinationServiceFactory() .getCoordinationService(new ApplicationConfig(config).getGlobalAppId(), String.valueOf(processorId), config); - - } - - private void init() { this.zkUtils = ((ZkCoordinationUtils) coordinationUtils).getZkUtils(); LeaderElector leaderElector = new ZkLeaderElector(processorId, zkUtils); leaderElector.setLeaderElectorListener(new LeaderElectorListenerImpl()); @@ -79,8 +75,6 @@ private void init() { @Override public void start() { - init(); - streamMetadataCache = StreamMetadataCache.apply(METADATA_CACHE_TTL_MS, config); debounceTimer = new ScheduleAfterDebounceTime(throwable -> { LOG.error("Received exception from in JobCoordinator Processing!", throwable); @@ -92,9 +86,6 @@ public void start() { @Override public synchronized void stop() { - if (coordinatorListener != null) { - coordinatorListener.onJobModelExpired(); - } debounceTimer.stopScheduler(); zkController.stop(); diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java index db1fc4ae99..5c8fcf3d37 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java @@ -21,14 +21,12 @@ import org.I0Itec.zkclient.IZkChildListener; import org.I0Itec.zkclient.IZkDataListener; -import org.I0Itec.zkclient.IZkStateListener; import org.I0Itec.zkclient.ZkClient; import org.I0Itec.zkclient.ZkConnection; import org.I0Itec.zkclient.exception.ZkInterruptedException; import org.apache.samza.SamzaException; import org.apache.samza.job.model.JobModel; import org.apache.samza.serializers.model.SamzaObjectMapper; -import org.apache.zookeeper.Watcher; import org.apache.zookeeper.data.Stat; import org.codehaus.jackson.map.ObjectMapper; import org.slf4j.Logger; @@ -73,27 +71,6 @@ public ZkUtils(ZkKeyBuilder zkKeyBuilder, ZkClient zkClient, int connectionTimeo this.keyBuilder = zkKeyBuilder; this.connectionTimeoutMs = connectionTimeoutMs; this.zkClient = zkClient; - - // subscribe for state changes - zkClient.subscribeStateChanges(new IZkStateListener() { - @Override - public void handleStateChanged(Watcher.Event.KeeperState state) - throws Exception { - System.out.printf("=========================NEW STATE CHANGE:" + state.name()); - } - - @Override - public void handleNewSession() - throws Exception { - System.out.printf("==========================NEW SESSION"); - } - - @Override - public void handleSessionEstablishmentError(Throwable error) - throws Exception { - System.out.printf("===========================NEW EstablishementError"); - } - }); } public void connect() throws ZkInterruptedException { diff --git a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkStreamProcessor.java b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkStreamProcessor.java index d4ca29d80d..419f51cc5d 100644 --- a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkStreamProcessor.java +++ b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkStreamProcessor.java @@ -55,6 +55,7 @@ import org.junit.Test; public class TestZkStreamProcessor extends StandaloneIntegrationTestHarness { + public static int BAD_MESSAGE_KEY = 1000; /** * Testing a basic identity stream task - reads data from a topic and writes it to another topic * (without any modifications) @@ -64,20 +65,6 @@ public class TestZkStreamProcessor extends StandaloneIntegrationTestHarness { * no matter how many tasks are present, it will always be run in a single processor instance. This simplifies testing */ - public static StreamProcessorLifecycleListener listener = new StreamProcessorLifecycleListener() { - @Override - public void onStart() { - } - - @Override - public void onShutdown() { - } - - @Override - public void onFailure(Throwable t) { - } - }; - @Test public void testSingleStreamProcessor() { testStreamProcessor(new String[]{"1"}); @@ -402,9 +389,126 @@ public void testZkUnavailable() { createStreamProcessor("1", map, startLatch, null); // this should fail with timeout exception } + @Test + // test with a single processor failing + public void testFailStreamProcessor() { + final String testSystem = "test-system2"; + final String inputTopic = "numbers"; + final String outputTopic = "output"; + final int messageCount = 40; + final int numBadMessages = 4; + + final Map map = createConfigs(testSystem, inputTopic, outputTopic, messageCount); + + // Note: createTopics needs to be called before creating a StreamProcessor. Otherwise it fails with a + // TopicExistsException since StreamProcessor auto-creates them. + createTopics(inputTopic, outputTopic); + + // set number of events we expect to read by both processes in total: + // p1 will read messageCount messages, then it will die. + // p2 will read messageCount messages + numBadMessages/2, then a new job model will arrive, + // and p2 will read messageCount messages again, + numBadMessages + // total 2 x messageCount + messageCount + numBadMessages + numBadMessages/2 + int totalEventsToGenerate = 3 * messageCount + numBadMessages + numBadMessages/2; + TestStreamTask.endLatch = new CountDownLatch(totalEventsToGenerate); + + // create first processor + CountDownLatch startCountDownLatch1 = new CountDownLatch(1); + CountDownLatch stopCountDownLatch1 = new CountDownLatch(1); + StreamProcessor sp1 = createStreamProcessor("1", map, startCountDownLatch1, stopCountDownLatch1); + + // start the first processor + Thread t1 = runInThread(sp1, TestStreamTask.endLatch); + t1.start(); + + // start the second processor + CountDownLatch countDownLatch2 = new CountDownLatch(1); + StreamProcessor sp2 = createStreamProcessor("2", map, countDownLatch2, null); + Thread t2 = runInThread(sp2, TestStreamTask.endLatch); + t2.start(); + + // wait until the processor reports that it has started + try { + startCountDownLatch1.await(1000, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + Assert.fail("got interrupted while waiting for the first processor to start."); + } + + // wait until the processor reports that it has started + try { + countDownLatch2.await(1000, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + Assert.fail("got interrupted while waiting for the 2nd processor to start."); + } + + // produce first batch of messages starting with 0 + produceMessages(0, inputTopic, messageCount); + + // make sure they consume all the messages + int attempts = 5; + while (attempts > 0) { + long leftEventsCount = TestStreamTask.endLatch.getCount(); + System.out.println("left to read = " + leftEventsCount); + if (leftEventsCount == totalEventsToGenerate - messageCount) { // read first batch + System.out.println("read all. left to read = " + leftEventsCount); + break; + } + TestZkUtils.sleepMs(1000); + attempts--; + } + Assert.assertTrue("Didn't read all the events in the first batch in 5 attempts", attempts > 0); + + // produce a bad message + produceMessages(BAD_MESSAGE_KEY, inputTopic, 4); + + // wait for at least one full debounce time to let the system to publish and distribute the new job model + TestZkUtils.sleepMs(3000); + + // produce the second batch of the messages, starting with 'messageCount' + produceMessages(messageCount, inputTopic, messageCount); + + // wait until p2 consumes all the message by itself + attempts = 5; + while (attempts > 0) { + long leftEventsCount = TestStreamTask.endLatch.getCount(); + System.out.println("2nd left to read = " + leftEventsCount); + if (leftEventsCount == 0) { // should've read all of them + System.out.println("2nd read all. left to red = " + leftEventsCount); + break; + } + TestZkUtils.sleepMs(1000); + attempts--; + } + Assert.assertTrue("Didn't read all the leftover events in 5 attempts", attempts > 0); + + // shutdown p2 + try { + synchronized (t2) { + t2.notify(); + } + t2.join(1000); + } catch (InterruptedException e) { + Assert.fail("Failed to join finished thread:" + e.getLocalizedMessage()); + } + + // processor1 and 2 will both read 20 events (total 40), and then processor2 read 80 events by itself, + // but the expected values are the same 0-79 - we should get each value one time. + // Meanwhile the number of events we gonna get is 40 + 80 + Map expectedValues = new HashMap<>(2 * messageCount); + for (int i = 0; i < 2 * messageCount; i++) { + expectedValues.put(i, false); + } + + for (int i = BAD_MESSAGE_KEY; i < numBadMessages + BAD_MESSAGE_KEY; i++) { + expectedValues.put(i, false); + } + verifyNumMessages(outputTopic, expectedValues, totalEventsToGenerate); + } + + ////////////////////////////////////////////////////////////////////////////////////////////////// // auxiliary methods - private StreamProcessor createStreamProcessor(String pId, Map map, + private StreamProcessor createStreamProcessor(final String pId, Map map, final CountDownLatch startLatchCountDown, final CountDownLatch stopLatchCountDown) { map.put(ApplicationConfig.PROCESSOR_ID, pId); @@ -423,10 +527,13 @@ public void onShutdown() { if (stopLatchCountDown != null) { stopLatchCountDown.countDown(); } + System.out.println("ON STOP. PID = " + pId + " in thread " + Thread.currentThread()); + } @Override public void onFailure(Throwable t) { + } }); @@ -491,9 +598,9 @@ public void run() { synchronized (this) { this.wait(100000); } - System.out.println("notifed. Abandon the wait."); + System.out.println("notified. Abandon the wait."); } catch (InterruptedException e) { - e.printStackTrace(); + System.out.println("wait interrupted" + e); } System.out.println("Stopping the processor"); processor.stop(); @@ -559,10 +666,20 @@ public void init(Config config, TaskContext taskContext) public void process(IncomingMessageEnvelope incomingMessageEnvelope, MessageCollector messageCollector, TaskCoordinator taskCoordinator) throws Exception { - messageCollector.send(new OutgoingMessageEnvelope(new SystemStream(outputSystem, outputTopic), - incomingMessageEnvelope.getMessage())); + + Object message = incomingMessageEnvelope.getMessage(); + + messageCollector.send(new OutgoingMessageEnvelope(new SystemStream(outputSystem, outputTopic), message)); processedMessageCount++; - String message = (String) incomingMessageEnvelope.getMessage(); + + // inject a failure + if(Integer.valueOf((String)message) >= BAD_MESSAGE_KEY) { + if(processorId.equals("1")) { + System.out.println("================================ FAILING for msg=" + message); + throw new Exception("Processing in the processor " + processorId + " failed "); + } + } + System.out.println( "Stream processor " + processorId + ";offset=" + incomingMessageEnvelope.getOffset() + ";received " + message + "; ssp=" + incomingMessageEnvelope.getSystemStreamPartition()); From fa46418aa27263a41c4af224e1e982aa6195e893 Mon Sep 17 00:00:00 2001 From: Boris Shkolnik Date: Thu, 11 May 2017 10:58:29 -0700 Subject: [PATCH 05/25] restore deleted code --- .../src/main/java/org/apache/samza/zk/ZkJobCoordinator.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java index 0786e03a5e..6ad10d2c0a 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java @@ -86,6 +86,10 @@ public void start() { @Override public synchronized void stop() { + if (coordinatorListener != null) { + coordinatorListener.onJobModelExpired(); + } + debounceTimer.stopScheduler(); zkController.stop(); From 57e5410fa3858bb52cc61b97fbdfa912fa5a42fd Mon Sep 17 00:00:00 2001 From: Boris Shkolnik Date: Thu, 11 May 2017 11:39:35 -0700 Subject: [PATCH 06/25] changed name of constant --- .../samza/test/processor/TestZkStreamProcessor.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkStreamProcessor.java b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkStreamProcessor.java index 419f51cc5d..ba8ea3eae9 100644 --- a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkStreamProcessor.java +++ b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkStreamProcessor.java @@ -55,7 +55,7 @@ import org.junit.Test; public class TestZkStreamProcessor extends StandaloneIntegrationTestHarness { - public static int BAD_MESSAGE_KEY = 1000; + public static int BAD_MESSAGE_START = 1000; /** * Testing a basic identity stream task - reads data from a topic and writes it to another topic * (without any modifications) @@ -459,7 +459,7 @@ public void testFailStreamProcessor() { Assert.assertTrue("Didn't read all the events in the first batch in 5 attempts", attempts > 0); // produce a bad message - produceMessages(BAD_MESSAGE_KEY, inputTopic, 4); + produceMessages(BAD_MESSAGE_START, inputTopic, 4); // wait for at least one full debounce time to let the system to publish and distribute the new job model TestZkUtils.sleepMs(3000); @@ -499,7 +499,7 @@ public void testFailStreamProcessor() { expectedValues.put(i, false); } - for (int i = BAD_MESSAGE_KEY; i < numBadMessages + BAD_MESSAGE_KEY; i++) { + for (int i = BAD_MESSAGE_START; i < numBadMessages + BAD_MESSAGE_START; i++) { expectedValues.put(i, false); } verifyNumMessages(outputTopic, expectedValues, totalEventsToGenerate); @@ -673,7 +673,7 @@ public void process(IncomingMessageEnvelope incomingMessageEnvelope, MessageColl processedMessageCount++; // inject a failure - if(Integer.valueOf((String)message) >= BAD_MESSAGE_KEY) { + if(Integer.valueOf((String)message) >= BAD_MESSAGE_START) { if(processorId.equals("1")) { System.out.println("================================ FAILING for msg=" + message); throw new Exception("Processing in the processor " + processorId + " failed "); From 8f719d701f1e85e09a480501f94d407ed7f807c5 Mon Sep 17 00:00:00 2001 From: Boris Shkolnik Date: Thu, 11 May 2017 12:13:40 -0700 Subject: [PATCH 07/25] cleanup --- .../test/processor/TestZkStreamProcessor.java | 213 ++++-------------- 1 file changed, 43 insertions(+), 170 deletions(-) diff --git a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkStreamProcessor.java b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkStreamProcessor.java index ba8ea3eae9..fd4470a137 100644 --- a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkStreamProcessor.java +++ b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkStreamProcessor.java @@ -52,18 +52,20 @@ import org.apache.samza.test.StandaloneTestUtils; import org.apache.samza.zk.TestZkUtils; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; +/** + * Happy path tests. + * Start 1, 2, 5 processors and make sure they all consume all the events. + */ public class TestZkStreamProcessor extends StandaloneIntegrationTestHarness { - public static int BAD_MESSAGE_START = 1000; - /** - * Testing a basic identity stream task - reads data from a topic and writes it to another topic - * (without any modifications) - * - *

- * The standalone version in this test uses KafkaSystemFactory and it uses a SingleContainerGrouperFactory. Hence, - * no matter how many tasks are present, it will always be run in a single processor instance. This simplifies testing - */ + + @Before + public void setupTest() { + + } + @Test public void testSingleStreamProcessor() { @@ -82,9 +84,9 @@ public void testFiveStreamProcessors() { // main test method for happy path with fixed number of processors private void testStreamProcessor(String[] processorIds) { - final String testSystem = "test-system"; - final String inputTopic = "numbers"; - final String outputTopic = "output"; + final String testSystem = "test-system" + processorIds.length; // making system unique per test + final String inputTopic = "numbers" + processorIds.length; // making input topic unique per test + final String outputTopic = "output" + processorIds.length; // making output topic unique per test final int messageCount = 40; final Map map = createConfigs(testSystem, inputTopic, outputTopic, messageCount); @@ -96,16 +98,20 @@ private void testStreamProcessor(String[] processorIds) { // create a latch of the size == number of messages TestStreamTask.endLatch = new CountDownLatch(messageCount); + // initialize the the processors + // we need startLatch to know when the processor has been completely initialized StreamProcessor[] streamProcessors = new StreamProcessor[processorIds.length]; CountDownLatch[] startCountDownLatches = new CountDownLatch[processorIds.length]; for (int i = 0; i < processorIds.length; i++) { startCountDownLatches[i] = new CountDownLatch(1); streamProcessors[i] = createStreamProcessor(processorIds[i], map, startCountDownLatches[i], null); } + + // produce messageCount messages, starting with key '0' produceMessages(0, inputTopic, messageCount); + // run the processors in a separate threads Thread[] threads = new Thread[processorIds.length]; - for (int i = 0; i < processorIds.length; i++) { threads[i] = runInThread(streamProcessors[i], TestStreamTask.endLatch); threads[i].start(); @@ -137,6 +143,7 @@ private void testStreamProcessor(String[] processorIds) { } // we should get each value one time + // create a map of all expected values to validate Map expectedValues = new HashMap<>(messageCount); for (int i = 0; i < messageCount; i++) { expectedValues.put(i, false); @@ -145,11 +152,13 @@ private void testStreamProcessor(String[] processorIds) { } @Test - // test with adding another processor + /** + * Similar to the previous tests, but add another processor in the middle + */ public void testStreamProcessorWithAdd() { final String testSystem = "test-system1"; - final String inputTopic = "numbers"; - final String outputTopic = "output"; + final String inputTopic = "numbers_add"; + final String outputTopic = "output_add"; final int messageCount = 40; final Map map = createConfigs(testSystem, inputTopic, outputTopic, messageCount); @@ -158,9 +167,9 @@ public void testStreamProcessorWithAdd() { // TopicExistsException since StreamProcessor auto-creates them. createTopics(inputTopic, outputTopic); - // set number of events we expect two read by both processes in total: - // p1 - reads 'messageCount' at first and then both p1 and p2 read all ('messageCount') together, since they - // start from the beginnig. + // set number of events we expect wo read by both processes in total: + // p1 - reads 'messageCount' at first + // p1 and p2 read all messageCount together, since they start from the beginning. // so we expect total 3 x messageCounts int totalEventsToGenerate = 3 * messageCount; TestStreamTask.endLatch = new CountDownLatch(totalEventsToGenerate); @@ -187,9 +196,9 @@ public void testStreamProcessorWithAdd() { int attempts = 5; while (attempts > 0) { long leftEventsCount = TestStreamTask.endLatch.getCount(); - System.out.println("current count = " + leftEventsCount); + System.out.println("messages left to consume = " + leftEventsCount); if (leftEventsCount == totalEventsToGenerate - messageCount) { // read first batch - System.out.println("read all. current count = " + leftEventsCount); + System.out.println("read first batch. left to consume = " + leftEventsCount); break; } TestZkUtils.sleepMs(1000); @@ -221,9 +230,9 @@ public void testStreamProcessorWithAdd() { attempts = 5; while (attempts > 0) { long leftEventsCount = TestStreamTask.endLatch.getCount(); // how much is left to read - System.out.println("2current count = " + leftEventsCount); + System.out.println("2 processors together. left to consume = " + leftEventsCount); if (leftEventsCount == 0) { // should read all of them - System.out.println("2read all. current count = " + leftEventsCount); + System.out.println("2 processors together. read all. left " + leftEventsCount); break; } TestZkUtils.sleepMs(1000); @@ -231,6 +240,7 @@ public void testStreamProcessorWithAdd() { } Assert.assertTrue("Didn't read all the leftover events in 5 attempts", attempts > 0); + // shutdown both try { synchronized (t1) { t1.notify(); @@ -244,9 +254,9 @@ public void testStreamProcessorWithAdd() { Assert.fail("Failed to join finished threads:" + e.getLocalizedMessage()); } - // processor1 will read 40 events, and then processor1 and processor2 will read 80 events together, - // but the expected values are the same 0-79 - we should get each value one time. - // Meanwhile the number of events we gonna get is 80+40=120 + // p1 will read messageCount events, and then p1 and p2 will read 2xmessageCount events together, + // but the expected values are the same 0-79, they will appear in the output more then onces, but we should mark then only one time. + // total number of events we gonna get is 80+40=120 Map expectedValues = new HashMap<>(2 * messageCount); for (int i = 0; i < 2 * messageCount; i++) { expectedValues.put(i, false); @@ -255,11 +265,13 @@ public void testStreamProcessorWithAdd() { } @Test - // test with a single processor removed + /** + * same as other happy path messages, but with one processor removed in the middle + */ public void testStreamProcessorWithRemove() { final String testSystem = "test-system2"; - final String inputTopic = "numbers"; - final String outputTopic = "output"; + final String inputTopic = "numbers_rm"; + final String outputTopic = "output_rm"; final int messageCount = 40; final Map map = createConfigs(testSystem, inputTopic, outputTopic, messageCount); @@ -374,137 +386,6 @@ public void testStreamProcessorWithRemove() { verifyNumMessages(outputTopic, expectedValues, totalEventsToGenerate); } - @Test(expected = org.apache.samza.SamzaException.class) - public void testZkUnavailable() { - final String testSystem = "test-system"; - final String inputTopic = "numbers"; - final String outputTopic = "output"; - final int messageCount = 40; - - final Map map = createConfigs(testSystem, inputTopic, outputTopic, messageCount); - map.put(ZkConfig.ZK_CONNECT, "localhost:2222"); // non-existing zk - map.put(ZkConfig.ZK_CONNECTION_TIMEOUT_MS, "3000"); - - CountDownLatch startLatch = new CountDownLatch(1); - createStreamProcessor("1", map, startLatch, null); // this should fail with timeout exception - } - - @Test - // test with a single processor failing - public void testFailStreamProcessor() { - final String testSystem = "test-system2"; - final String inputTopic = "numbers"; - final String outputTopic = "output"; - final int messageCount = 40; - final int numBadMessages = 4; - - final Map map = createConfigs(testSystem, inputTopic, outputTopic, messageCount); - - // Note: createTopics needs to be called before creating a StreamProcessor. Otherwise it fails with a - // TopicExistsException since StreamProcessor auto-creates them. - createTopics(inputTopic, outputTopic); - - // set number of events we expect to read by both processes in total: - // p1 will read messageCount messages, then it will die. - // p2 will read messageCount messages + numBadMessages/2, then a new job model will arrive, - // and p2 will read messageCount messages again, + numBadMessages - // total 2 x messageCount + messageCount + numBadMessages + numBadMessages/2 - int totalEventsToGenerate = 3 * messageCount + numBadMessages + numBadMessages/2; - TestStreamTask.endLatch = new CountDownLatch(totalEventsToGenerate); - - // create first processor - CountDownLatch startCountDownLatch1 = new CountDownLatch(1); - CountDownLatch stopCountDownLatch1 = new CountDownLatch(1); - StreamProcessor sp1 = createStreamProcessor("1", map, startCountDownLatch1, stopCountDownLatch1); - - // start the first processor - Thread t1 = runInThread(sp1, TestStreamTask.endLatch); - t1.start(); - - // start the second processor - CountDownLatch countDownLatch2 = new CountDownLatch(1); - StreamProcessor sp2 = createStreamProcessor("2", map, countDownLatch2, null); - Thread t2 = runInThread(sp2, TestStreamTask.endLatch); - t2.start(); - - // wait until the processor reports that it has started - try { - startCountDownLatch1.await(1000, TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - Assert.fail("got interrupted while waiting for the first processor to start."); - } - - // wait until the processor reports that it has started - try { - countDownLatch2.await(1000, TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - Assert.fail("got interrupted while waiting for the 2nd processor to start."); - } - - // produce first batch of messages starting with 0 - produceMessages(0, inputTopic, messageCount); - - // make sure they consume all the messages - int attempts = 5; - while (attempts > 0) { - long leftEventsCount = TestStreamTask.endLatch.getCount(); - System.out.println("left to read = " + leftEventsCount); - if (leftEventsCount == totalEventsToGenerate - messageCount) { // read first batch - System.out.println("read all. left to read = " + leftEventsCount); - break; - } - TestZkUtils.sleepMs(1000); - attempts--; - } - Assert.assertTrue("Didn't read all the events in the first batch in 5 attempts", attempts > 0); - - // produce a bad message - produceMessages(BAD_MESSAGE_START, inputTopic, 4); - - // wait for at least one full debounce time to let the system to publish and distribute the new job model - TestZkUtils.sleepMs(3000); - - // produce the second batch of the messages, starting with 'messageCount' - produceMessages(messageCount, inputTopic, messageCount); - - // wait until p2 consumes all the message by itself - attempts = 5; - while (attempts > 0) { - long leftEventsCount = TestStreamTask.endLatch.getCount(); - System.out.println("2nd left to read = " + leftEventsCount); - if (leftEventsCount == 0) { // should've read all of them - System.out.println("2nd read all. left to red = " + leftEventsCount); - break; - } - TestZkUtils.sleepMs(1000); - attempts--; - } - Assert.assertTrue("Didn't read all the leftover events in 5 attempts", attempts > 0); - - // shutdown p2 - try { - synchronized (t2) { - t2.notify(); - } - t2.join(1000); - } catch (InterruptedException e) { - Assert.fail("Failed to join finished thread:" + e.getLocalizedMessage()); - } - - // processor1 and 2 will both read 20 events (total 40), and then processor2 read 80 events by itself, - // but the expected values are the same 0-79 - we should get each value one time. - // Meanwhile the number of events we gonna get is 40 + 80 - Map expectedValues = new HashMap<>(2 * messageCount); - for (int i = 0; i < 2 * messageCount; i++) { - expectedValues.put(i, false); - } - - for (int i = BAD_MESSAGE_START; i < numBadMessages + BAD_MESSAGE_START; i++) { - expectedValues.put(i, false); - } - verifyNumMessages(outputTopic, expectedValues, totalEventsToGenerate); - } - ////////////////////////////////////////////////////////////////////////////////////////////////// // auxiliary methods @@ -672,17 +553,9 @@ public void process(IncomingMessageEnvelope incomingMessageEnvelope, MessageColl messageCollector.send(new OutgoingMessageEnvelope(new SystemStream(outputSystem, outputTopic), message)); processedMessageCount++; - // inject a failure - if(Integer.valueOf((String)message) >= BAD_MESSAGE_START) { - if(processorId.equals("1")) { - System.out.println("================================ FAILING for msg=" + message); - throw new Exception("Processing in the processor " + processorId + " failed "); - } - } - System.out.println( - "Stream processor " + processorId + ";offset=" + incomingMessageEnvelope.getOffset() + ";received " + message - + "; ssp=" + incomingMessageEnvelope.getSystemStreamPartition()); + "Stream processor " + processorId + "; offset=" + incomingMessageEnvelope.getOffset() + "; totalRcvd=" + processedMessageCount + + ";received " + message + "; ssp=" + incomingMessageEnvelope.getSystemStreamPartition()); synchronized (endLatch) { endLatch.countDown(); } From 8d16bbd68f430570926543d54d407c423eb0969d Mon Sep 17 00:00:00 2001 From: Boris Shkolnik Date: Thu, 11 May 2017 12:23:56 -0700 Subject: [PATCH 08/25] imports --- .../org/apache/samza/zk/ZkCoordinationServiceFactory.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java index 82e6039129..997173270a 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java @@ -18,14 +18,12 @@ */ package org.apache.samza.zk; -import org.I0Itec.zkclient.IZkStateListener; import org.I0Itec.zkclient.ZkClient; import org.apache.samza.SamzaException; import org.apache.samza.config.Config; import org.apache.samza.config.ZkConfig; -import org.apache.samza.coordinator.CoordinationUtils; import org.apache.samza.coordinator.CoordinationServiceFactory; -import org.apache.zookeeper.Watcher; +import org.apache.samza.coordinator.CoordinationUtils; public class ZkCoordinationServiceFactory implements CoordinationServiceFactory { From 0f5691bf3094836790f530a6db25366000a85d97 Mon Sep 17 00:00:00 2001 From: Boris Shkolnik Date: Thu, 11 May 2017 14:04:16 -0700 Subject: [PATCH 09/25] cleanup --- .../test/processor/TestZkStreamProcessor.java | 54 +++++++++---------- 1 file changed, 26 insertions(+), 28 deletions(-) diff --git a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkStreamProcessor.java b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkStreamProcessor.java index fd4470a137..c83c7cc027 100644 --- a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkStreamProcessor.java +++ b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkStreamProcessor.java @@ -61,6 +61,8 @@ */ public class TestZkStreamProcessor extends StandaloneIntegrationTestHarness { + private final static int ATTEMPTS_NUMBER = 5; // to avoid long sleeps, we rather use multiple attempts with shorter sleeps + @Before public void setupTest() { @@ -142,13 +144,7 @@ private void testStreamProcessor(String[] processorIds) { Assert.fail("Failed to join finished thread:" + e.getLocalizedMessage()); } - // we should get each value one time - // create a map of all expected values to validate - Map expectedValues = new HashMap<>(messageCount); - for (int i = 0; i < messageCount; i++) { - expectedValues.put(i, false); - } - verifyNumMessages(outputTopic, expectedValues, messageCount); + verifyNumMessages(outputTopic, messageCount, messageCount); } @Test @@ -193,7 +189,7 @@ public void testStreamProcessorWithAdd() { } // make sure it consumes all the messages from the first batch - int attempts = 5; + int attempts = ATTEMPTS_NUMBER; while (attempts > 0) { long leftEventsCount = TestStreamTask.endLatch.getCount(); System.out.println("messages left to consume = " + leftEventsCount); @@ -204,7 +200,7 @@ public void testStreamProcessorWithAdd() { TestZkUtils.sleepMs(1000); attempts--; } - Assert.assertTrue("Didn't read all the events in the first batch in 5 attempts", attempts > 0); + Assert.assertTrue("Didn't read all the events in the first batch in " + ATTEMPTS_NUMBER + " attempts", attempts > 0); // start the second processor CountDownLatch countDownLatch2 = new CountDownLatch(1); @@ -227,7 +223,7 @@ public void testStreamProcessorWithAdd() { // wait until all the events are consumed // make sure it consumes all the messages from the first batch - attempts = 5; + attempts = ATTEMPTS_NUMBER; while (attempts > 0) { long leftEventsCount = TestStreamTask.endLatch.getCount(); // how much is left to read System.out.println("2 processors together. left to consume = " + leftEventsCount); @@ -238,7 +234,7 @@ public void testStreamProcessorWithAdd() { TestZkUtils.sleepMs(1000); attempts--; } - Assert.assertTrue("Didn't read all the leftover events in 5 attempts", attempts > 0); + Assert.assertTrue("Didn't read all the leftover events in " + ATTEMPTS_NUMBER + " attempts", attempts > 0); // shutdown both try { @@ -255,13 +251,9 @@ public void testStreamProcessorWithAdd() { } // p1 will read messageCount events, and then p1 and p2 will read 2xmessageCount events together, - // but the expected values are the same 0-79, they will appear in the output more then onces, but we should mark then only one time. + // but the expected values are the same 0-79, they will appear in the output more then once, but we should mark then only one time. // total number of events we gonna get is 80+40=120 - Map expectedValues = new HashMap<>(2 * messageCount); - for (int i = 0; i < 2 * messageCount; i++) { - expectedValues.put(i, false); - } - verifyNumMessages(outputTopic, expectedValues, totalEventsToGenerate); + verifyNumMessages(outputTopic, 2*messageCount, totalEventsToGenerate); } @Test @@ -319,7 +311,7 @@ public void testStreamProcessorWithRemove() { produceMessages(0, inputTopic, messageCount); // make sure they consume all the messages from the first batch - int attempts = 5; + int attempts = ATTEMPTS_NUMBER; while (attempts > 0) { long leftEventsCount = TestStreamTask.endLatch.getCount(); System.out.println("current count = " + leftEventsCount); @@ -330,7 +322,7 @@ public void testStreamProcessorWithRemove() { TestZkUtils.sleepMs(1000); attempts--; } - Assert.assertTrue("Didn't read all the events in the first batch in 5 attempts", attempts > 0); + Assert.assertTrue("Didn't read all the events in the first batch in " + ATTEMPTS_NUMBER + " attempts", attempts > 0); // stop the first processor synchronized (t1) { @@ -351,8 +343,8 @@ public void testStreamProcessorWithRemove() { // produce the second batch of the messages, starting with 'messageCount' produceMessages(messageCount, inputTopic, messageCount); - // wait until p2 consumes all the message by itself - attempts = 5; + // wait until p2 consumes all the message by itself; + attempts = ATTEMPTS_NUMBER; while (attempts > 0) { long leftEventsCount = TestStreamTask.endLatch.getCount(); System.out.println("2current count = " + leftEventsCount); @@ -363,7 +355,7 @@ public void testStreamProcessorWithRemove() { TestZkUtils.sleepMs(1000); attempts--; } - Assert.assertTrue("Didn't read all the leftover events in 5 attempts", attempts > 0); + Assert.assertTrue("Didn't read all the leftover events in " + ATTEMPTS_NUMBER + " attempts", attempts > 0); // shutdown p2 @@ -379,11 +371,7 @@ public void testStreamProcessorWithRemove() { // processor1 and 2 will both read 20 events (total 40), and then processor2 read 80 events by itself, // but the expected values are the same 0-79 - we should get each value one time. // Meanwhile the number of events we gonna get is 40 + 80 - Map expectedValues = new HashMap<>(2 * messageCount); - for (int i = 0; i < 2 * messageCount; i++) { - expectedValues.put(i, false); - } - verifyNumMessages(outputTopic, expectedValues, totalEventsToGenerate); + verifyNumMessages(outputTopic, 2 * messageCount, totalEventsToGenerate); } @@ -490,6 +478,17 @@ public void run() { return t; } + // for sequential values we can generate them automatically + private void verifyNumMessages(String topic, int numberOfSequentialValues, int exectedNumMessages) { + // we should get each value one time + // create a map of all expected values to validate + Map expectedValues = new HashMap<>(numberOfSequentialValues); + for (int i = 0; i < numberOfSequentialValues; i++) { + expectedValues.put(i, false); + } + verifyNumMessages(topic, expectedValues, exectedNumMessages); + } + /** * Consumes data from the topic until there are no new messages for a while * and asserts that the number of consumed messages is as expected. @@ -508,7 +507,6 @@ private void verifyNumMessages(String topic, final Map expecte Iterator iterator = records.iterator(); while (iterator.hasNext()) { ConsumerRecord record = iterator.next(); - //Assert.assertEquals(new String((byte[]) record.value()), String.valueOf(count)); String val = new String((byte[]) record.value()); System.out.println("Got value " + val); map.put(Integer.valueOf(val), true); From 7fccf8895a63669d2e4c7cb399c95418b536ea51 Mon Sep 17 00:00:00 2001 From: Boris Shkolnik Date: Thu, 11 May 2017 16:50:05 -0700 Subject: [PATCH 10/25] checkstyle --- .../org/apache/samza/test/processor/TestZkStreamProcessor.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkStreamProcessor.java b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkStreamProcessor.java index c83c7cc027..cd53b6f196 100644 --- a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkStreamProcessor.java +++ b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkStreamProcessor.java @@ -253,7 +253,7 @@ public void testStreamProcessorWithAdd() { // p1 will read messageCount events, and then p1 and p2 will read 2xmessageCount events together, // but the expected values are the same 0-79, they will appear in the output more then once, but we should mark then only one time. // total number of events we gonna get is 80+40=120 - verifyNumMessages(outputTopic, 2*messageCount, totalEventsToGenerate); + verifyNumMessages(outputTopic, 2 * messageCount, totalEventsToGenerate); } @Test From 2ae4854470ceccc14ec2a00b8f735363913972a1 Mon Sep 17 00:00:00 2001 From: Boris Shkolnik Date: Thu, 11 May 2017 17:34:02 -0700 Subject: [PATCH 11/25] put aux methods into a separate class --- .../test/processor/TestZkStreamProcessor.java | 224 +---------------- .../processor/TestZkStreamProcessorBase.java | 232 ++++++++++++++++++ 2 files changed, 237 insertions(+), 219 deletions(-) create mode 100644 samza-test/src/test/java/org/apache/samza/test/processor/TestZkStreamProcessorBase.java diff --git a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkStreamProcessor.java b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkStreamProcessor.java index cd53b6f196..941613dcdf 100644 --- a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkStreamProcessor.java +++ b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkStreamProcessor.java @@ -19,37 +19,10 @@ package org.apache.samza.test.processor; -import java.util.Collections; -import java.util.HashMap; -import java.util.Iterator; import java.util.Map; -import java.util.Properties; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; -import kafka.utils.TestUtils; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.samza.config.ApplicationConfig; -import org.apache.samza.config.Config; -import org.apache.samza.config.JobCoordinatorConfig; -import org.apache.samza.config.MapConfig; -import org.apache.samza.config.ZkConfig; import org.apache.samza.processor.StreamProcessor; -import org.apache.samza.processor.StreamProcessorLifecycleListener; -import org.apache.samza.system.IncomingMessageEnvelope; -import org.apache.samza.system.OutgoingMessageEnvelope; -import org.apache.samza.system.SystemStream; -import org.apache.samza.task.InitableTask; -import org.apache.samza.task.MessageCollector; -import org.apache.samza.task.StreamTask; -import org.apache.samza.task.TaskContext; -import org.apache.samza.task.TaskCoordinator; -import org.apache.samza.test.StandaloneIntegrationTestHarness; -import org.apache.samza.test.StandaloneTestUtils; import org.apache.samza.zk.TestZkUtils; import org.junit.Assert; import org.junit.Before; @@ -59,7 +32,7 @@ * Happy path tests. * Start 1, 2, 5 processors and make sure they all consume all the events. */ -public class TestZkStreamProcessor extends StandaloneIntegrationTestHarness { +public class TestZkStreamProcessor extends TestZkStreamProcessorBase { private final static int ATTEMPTS_NUMBER = 5; // to avoid long sleeps, we rather use multiple attempts with shorter sleeps @@ -68,7 +41,6 @@ public void setupTest() { } - @Test public void testSingleStreamProcessor() { testStreamProcessor(new String[]{"1"}); @@ -98,7 +70,7 @@ private void testStreamProcessor(String[] processorIds) { createTopics(inputTopic, outputTopic); // create a latch of the size == number of messages - TestStreamTask.endLatch = new CountDownLatch(messageCount); + TestZkStreamProcessorBase.TestStreamTask.endLatch = new CountDownLatch(messageCount); // initialize the the processors // we need startLatch to know when the processor has been completely initialized @@ -115,7 +87,7 @@ private void testStreamProcessor(String[] processorIds) { // run the processors in a separate threads Thread[] threads = new Thread[processorIds.length]; for (int i = 0; i < processorIds.length; i++) { - threads[i] = runInThread(streamProcessors[i], TestStreamTask.endLatch); + threads[i] = runInThread(streamProcessors[i], TestZkStreamProcessorBase.TestStreamTask.endLatch); threads[i].start(); // wait until the processor reports that it has started try { @@ -127,7 +99,7 @@ private void testStreamProcessor(String[] processorIds) { // wait until all the events are consumed try { - TestStreamTask.endLatch.await(10, TimeUnit.SECONDS); + TestZkStreamProcessorBase.TestStreamTask.endLatch.await(10, TimeUnit.SECONDS); } catch (InterruptedException e) { Assert.fail("endLatch.await failed with an interruption:" + e.getLocalizedMessage()); } @@ -346,7 +318,7 @@ public void testStreamProcessorWithRemove() { // wait until p2 consumes all the message by itself; attempts = ATTEMPTS_NUMBER; while (attempts > 0) { - long leftEventsCount = TestStreamTask.endLatch.getCount(); + long leftEventsCount = TestZkStreamProcessorBase.TestStreamTask.endLatch.getCount(); System.out.println("2current count = " + leftEventsCount); if (leftEventsCount == 0) { // should read all of them System.out.println("2read all. current count = " + leftEventsCount); @@ -373,190 +345,4 @@ public void testStreamProcessorWithRemove() { // Meanwhile the number of events we gonna get is 40 + 80 verifyNumMessages(outputTopic, 2 * messageCount, totalEventsToGenerate); } - - - ////////////////////////////////////////////////////////////////////////////////////////////////// - // auxiliary methods - private StreamProcessor createStreamProcessor(final String pId, Map map, - final CountDownLatch startLatchCountDown, final CountDownLatch stopLatchCountDown) { - map.put(ApplicationConfig.PROCESSOR_ID, pId); - - StreamProcessor processor = new StreamProcessor(new MapConfig(map), new HashMap<>(), TestStreamTask::new, - new StreamProcessorLifecycleListener() { - - @Override - public void onStart() { - if (startLatchCountDown != null) { - startLatchCountDown.countDown(); - } - } - - @Override - public void onShutdown() { - if (stopLatchCountDown != null) { - stopLatchCountDown.countDown(); - } - System.out.println("ON STOP. PID = " + pId + " in thread " + Thread.currentThread()); - - } - - @Override - public void onFailure(Throwable t) { - - } - }); - - return processor; - } - - private void createTopics(String inputTopic, String outputTopic) { - TestUtils.createTopic(zkUtils(), inputTopic, 5, 1, servers(), new Properties()); - TestUtils.createTopic(zkUtils(), outputTopic, 5, 1, servers(), new Properties()); - } - - private Map createConfigs(String testSystem, String inputTopic, String outputTopic, - int messageCount) { - Map configs = new HashMap<>(); - configs.putAll(StandaloneTestUtils - .getStandaloneConfigs("test-job", "org.apache.samza.test.processor.TestZkStreamProcessor.TestStreamTask")); - configs.putAll(StandaloneTestUtils - .getKafkaSystemConfigs(testSystem, bootstrapServers(), zkConnect(), null, StandaloneTestUtils.SerdeAlias.STRING, - true)); - configs.put("task.inputs", String.format("%s.%s", testSystem, inputTopic)); - configs.put("app.messageCount", String.valueOf(messageCount)); - configs.put("app.outputTopic", outputTopic); - configs.put("app.outputSystem", testSystem); - configs.put(ZkConfig.ZK_CONNECT, zkConnect()); - - configs.put("job.systemstreampartition.grouper.factory", - "org.apache.samza.container.grouper.stream.GroupByPartitionFactory"); - configs.put("task.name.grouper.factory", "org.apache.samza.container.grouper.task.GroupByContainerIdsFactory"); - - configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, "org.apache.samza.zk.ZkJobCoordinatorFactory"); - - return configs; - } - - /** - * Produces the provided number of messages to the topic. - */ - private void produceMessages(final int start, String topic, int numMessages) { - KafkaProducer producer = getKafkaProducer(); - for (int i = start; i < numMessages + start; i++) { - try { - System.out.println("producing " + i); - producer.send(new ProducerRecord(topic, String.valueOf(i).getBytes())).get(); - } catch (InterruptedException | ExecutionException e) { - e.printStackTrace(); - } - } - } - - /** - * Runs the provided stream processor by starting it, waiting on the provided latch with a timeout, - * and then stopping it. - */ - private Thread runInThread(final StreamProcessor processor, CountDownLatch latch) { - Thread t = new Thread() { - - @Override - public void run() { - processor.start(); - try { - // just wait - synchronized (this) { - this.wait(100000); - } - System.out.println("notified. Abandon the wait."); - } catch (InterruptedException e) { - System.out.println("wait interrupted" + e); - } - System.out.println("Stopping the processor"); - processor.stop(); - } - }; - return t; - } - - // for sequential values we can generate them automatically - private void verifyNumMessages(String topic, int numberOfSequentialValues, int exectedNumMessages) { - // we should get each value one time - // create a map of all expected values to validate - Map expectedValues = new HashMap<>(numberOfSequentialValues); - for (int i = 0; i < numberOfSequentialValues; i++) { - expectedValues.put(i, false); - } - verifyNumMessages(topic, expectedValues, exectedNumMessages); - } - - /** - * Consumes data from the topic until there are no new messages for a while - * and asserts that the number of consumed messages is as expected. - */ - private void verifyNumMessages(String topic, final Map expectedValues, int expectedNumMessages) { - KafkaConsumer consumer = getKafkaConsumer(); - consumer.subscribe(Collections.singletonList(topic)); - - Map map = new HashMap<>(expectedValues); - int count = 0; - int emptyPollCount = 0; - - while (count < expectedNumMessages && emptyPollCount < 5) { - ConsumerRecords records = consumer.poll(5000); - if (!records.isEmpty()) { - Iterator iterator = records.iterator(); - while (iterator.hasNext()) { - ConsumerRecord record = iterator.next(); - String val = new String((byte[]) record.value()); - System.out.println("Got value " + val); - map.put(Integer.valueOf(val), true); - count++; - } - } else { - emptyPollCount++; - System.out.println("empty polls " + emptyPollCount); - } - } - // filter out numbers we did not get - long numFalse = map.values().stream().filter(v -> !v).count(); - Assert.assertEquals("didn't get this number of events ", 0, numFalse); - Assert.assertEquals(expectedNumMessages, count); - } - - // StreamTaskClass - public static class TestStreamTask implements StreamTask, InitableTask { - // static field since there's no other way to share state b/w a task instance and - // stream processor when constructed from "task.class". - static CountDownLatch endLatch; - private int processedMessageCount = 0; - private String processorId; - private String outputTopic; - private String outputSystem; - - @Override - public void init(Config config, TaskContext taskContext) - throws Exception { - this.processorId = config.get(ApplicationConfig.PROCESSOR_ID); - this.outputTopic = config.get("app.outputTopic", "output"); - this.outputSystem = config.get("app.outputSystem", "test-system"); - } - - @Override - public void process(IncomingMessageEnvelope incomingMessageEnvelope, MessageCollector messageCollector, - TaskCoordinator taskCoordinator) - throws Exception { - - Object message = incomingMessageEnvelope.getMessage(); - - messageCollector.send(new OutgoingMessageEnvelope(new SystemStream(outputSystem, outputTopic), message)); - processedMessageCount++; - - System.out.println( - "Stream processor " + processorId + "; offset=" + incomingMessageEnvelope.getOffset() + "; totalRcvd=" + processedMessageCount - + ";received " + message + "; ssp=" + incomingMessageEnvelope.getSystemStreamPartition()); - synchronized (endLatch) { - endLatch.countDown(); - } - } - } } diff --git a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkStreamProcessorBase.java b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkStreamProcessorBase.java new file mode 100644 index 0000000000..35fabb9d0c --- /dev/null +++ b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkStreamProcessorBase.java @@ -0,0 +1,232 @@ +package org.apache.samza.test.processor; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import kafka.utils.TestUtils; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.samza.config.ApplicationConfig; +import org.apache.samza.config.Config; +import org.apache.samza.config.JobCoordinatorConfig; +import org.apache.samza.config.MapConfig; +import org.apache.samza.config.ZkConfig; +import org.apache.samza.processor.StreamProcessor; +import org.apache.samza.processor.StreamProcessorLifecycleListener; +import org.apache.samza.system.IncomingMessageEnvelope; +import org.apache.samza.system.OutgoingMessageEnvelope; +import org.apache.samza.system.SystemStream; +import org.apache.samza.task.InitableTask; +import org.apache.samza.task.MessageCollector; +import org.apache.samza.task.StreamTask; +import org.apache.samza.task.TaskContext; +import org.apache.samza.task.TaskCoordinator; +import org.apache.samza.test.StandaloneIntegrationTestHarness; +import org.apache.samza.test.StandaloneTestUtils; +import org.junit.Assert; + + +public class TestZkStreamProcessorBase extends StandaloneIntegrationTestHarness { + public final static int BAD_MESSAGE_KEY = 1000; + + // auxiliary methods + protected StreamProcessor createStreamProcessor(final String pId, Map map, + final CountDownLatch startLatchCountDown, final CountDownLatch stopLatchCountDown) { + map.put(ApplicationConfig.PROCESSOR_ID, pId); + + StreamProcessor processor = new StreamProcessor(new MapConfig(map), new HashMap<>(), TestStreamTask::new, + new StreamProcessorLifecycleListener() { + + @Override + public void onStart() { + if (startLatchCountDown != null) { + startLatchCountDown.countDown(); + } + } + + @Override + public void onShutdown() { + if (stopLatchCountDown != null) { + stopLatchCountDown.countDown(); + } + System.out.println("ON STOP. PID = " + pId + " in thread " + Thread.currentThread()); + + } + + @Override + public void onFailure(Throwable t) { + + } + }); + + return processor; + } + + protected void createTopics(String inputTopic, String outputTopic) { + TestUtils.createTopic(zkUtils(), inputTopic, 5, 1, servers(), new Properties()); + TestUtils.createTopic(zkUtils(), outputTopic, 5, 1, servers(), new Properties()); + } + + protected Map createConfigs(String testSystem, String inputTopic, String outputTopic, + int messageCount) { + Map configs = new HashMap<>(); + configs.putAll(StandaloneTestUtils + .getStandaloneConfigs("test-job", "org.apache.samza.test.processor.TestZkStreamProcessor.TestStreamTask")); + configs.putAll(StandaloneTestUtils + .getKafkaSystemConfigs(testSystem, bootstrapServers(), zkConnect(), null, StandaloneTestUtils.SerdeAlias.STRING, + true)); + configs.put("task.inputs", String.format("%s.%s", testSystem, inputTopic)); + configs.put("app.messageCount", String.valueOf(messageCount)); + configs.put("app.outputTopic", outputTopic); + configs.put("app.outputSystem", testSystem); + configs.put(ZkConfig.ZK_CONNECT, zkConnect()); + + configs.put("job.systemstreampartition.grouper.factory", + "org.apache.samza.container.grouper.stream.GroupByPartitionFactory"); + configs.put("task.name.grouper.factory", "org.apache.samza.container.grouper.task.GroupByContainerIdsFactory"); + + configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, "org.apache.samza.zk.ZkJobCoordinatorFactory"); + + return configs; + } + + /** + * Produces the provided number of messages to the topic. + */ + protected void produceMessages(final int start, String topic, int numMessages) { + KafkaProducer producer = getKafkaProducer(); + for (int i = start; i < numMessages + start; i++) { + try { + System.out.println("producing " + i); + producer.send(new ProducerRecord(topic, String.valueOf(i).getBytes())).get(); + } catch (InterruptedException | ExecutionException e) { + e.printStackTrace(); + } + } + } + + /** + * Runs the provided stream processor by starting it, waiting on the provided latch with a timeout, + * and then stopping it. + */ + protected Thread runInThread(final StreamProcessor processor, CountDownLatch latch) { + Thread t = new Thread() { + + @Override + public void run() { + processor.start(); + try { + // just wait + synchronized (this) { + this.wait(100000); + } + System.out.println("notified. Abandon the wait."); + } catch (InterruptedException e) { + System.out.println("wait interrupted" + e); + } + System.out.println("Stopping the processor"); + processor.stop(); + } + }; + return t; + } + + // for sequential values we can generate them automatically + protected void verifyNumMessages(String topic, int numberOfSequentialValues, int exectedNumMessages) { + // we should get each value one time + // create a map of all expected values to validate + Map expectedValues = new HashMap<>(numberOfSequentialValues); + for (int i = 0; i < numberOfSequentialValues; i++) { + expectedValues.put(i, false); + } + verifyNumMessages(topic, expectedValues, exectedNumMessages); + } + + /** + * Consumes data from the topic until there are no new messages for a while + * and asserts that the number of consumed messages is as expected. + */ + protected void verifyNumMessages(String topic, final Map expectedValues, int expectedNumMessages) { + KafkaConsumer consumer = getKafkaConsumer(); + consumer.subscribe(Collections.singletonList(topic)); + + Map map = new HashMap<>(expectedValues); + int count = 0; + int emptyPollCount = 0; + + while (count < expectedNumMessages && emptyPollCount < 5) { + ConsumerRecords records = consumer.poll(5000); + if (!records.isEmpty()) { + Iterator iterator = records.iterator(); + while (iterator.hasNext()) { + ConsumerRecord record = iterator.next(); + String val = new String((byte[]) record.value()); + System.out.println("Got value " + val); + map.put(Integer.valueOf(val), true); + count++; + } + } else { + emptyPollCount++; + System.out.println("empty polls " + emptyPollCount); + } + } + // filter out numbers we did not get + long numFalse = map.values().stream().filter(v -> !v).count(); + Assert.assertEquals("didn't get this number of events ", 0, numFalse); + Assert.assertEquals(expectedNumMessages, count); + } + + // StreamTaskClass + public static class TestStreamTask implements StreamTask, InitableTask { + // static field since there's no other way to share state b/w a task instance and + // stream processor when constructed from "task.class". + static CountDownLatch endLatch; + private int processedMessageCount = 0; + private String processorId; + private String outputTopic; + private String outputSystem; + + @Override + public void init(Config config, TaskContext taskContext) + throws Exception { + this.processorId = config.get(ApplicationConfig.PROCESSOR_ID); + this.outputTopic = config.get("app.outputTopic", "output"); + this.outputSystem = config.get("app.outputSystem", "test-system"); + } + + @Override + public void process(IncomingMessageEnvelope incomingMessageEnvelope, MessageCollector messageCollector, + TaskCoordinator taskCoordinator) + throws Exception { + + Object message = incomingMessageEnvelope.getMessage(); + + // inject a failure + if (Integer.valueOf((String) message) >= BAD_MESSAGE_KEY && processorId.equals("1")) { + System.out.println("================================ FAILING for msg=" + message); + throw new Exception("Processing in the processor " + processorId + " failed "); + } + + System.out.println(processorId + " is writing out " + message); + messageCollector.send(new OutgoingMessageEnvelope(new SystemStream(outputSystem, outputTopic), message)); + processedMessageCount++; + + + System.out.println( + "Stream processor " + processorId + ";offset=" + incomingMessageEnvelope.getOffset() + "; totalRcvd=" + + processedMessageCount + ";received " + message + "; ssp=" + incomingMessageEnvelope + .getSystemStreamPartition()); + + synchronized (endLatch) { + endLatch.countDown(); + } + } + } +} From f2053fbe40fe92a2edcf6fc6a3021eea3a0891dd Mon Sep 17 00:00:00 2001 From: Boris Shkolnik Date: Thu, 11 May 2017 17:34:46 -0700 Subject: [PATCH 12/25] format --- .../test/processor/TestZkStreamProcessor.java | 18 ++++++++++-------- .../processor/TestZkStreamProcessorBase.java | 2 -- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkStreamProcessor.java b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkStreamProcessor.java index 941613dcdf..56c91e817a 100644 --- a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkStreamProcessor.java +++ b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkStreamProcessor.java @@ -28,14 +28,16 @@ import org.junit.Before; import org.junit.Test; + /** * Happy path tests. * Start 1, 2, 5 processors and make sure they all consume all the events. */ public class TestZkStreamProcessor extends TestZkStreamProcessorBase { - private final static int ATTEMPTS_NUMBER = 5; // to avoid long sleeps, we rather use multiple attempts with shorter sleeps - + private final static int ATTEMPTS_NUMBER = 5; + // to avoid long sleeps, we rather use multiple attempts with shorter sleeps + @Before public void setupTest() { @@ -122,8 +124,7 @@ private void testStreamProcessor(String[] processorIds) { @Test /** * Similar to the previous tests, but add another processor in the middle - */ - public void testStreamProcessorWithAdd() { + */ public void testStreamProcessorWithAdd() { final String testSystem = "test-system1"; final String inputTopic = "numbers_add"; final String outputTopic = "output_add"; @@ -172,7 +173,8 @@ public void testStreamProcessorWithAdd() { TestZkUtils.sleepMs(1000); attempts--; } - Assert.assertTrue("Didn't read all the events in the first batch in " + ATTEMPTS_NUMBER + " attempts", attempts > 0); + Assert + .assertTrue("Didn't read all the events in the first batch in " + ATTEMPTS_NUMBER + " attempts", attempts > 0); // start the second processor CountDownLatch countDownLatch2 = new CountDownLatch(1); @@ -231,8 +233,7 @@ public void testStreamProcessorWithAdd() { @Test /** * same as other happy path messages, but with one processor removed in the middle - */ - public void testStreamProcessorWithRemove() { + */ public void testStreamProcessorWithRemove() { final String testSystem = "test-system2"; final String inputTopic = "numbers_rm"; final String outputTopic = "output_rm"; @@ -294,7 +295,8 @@ public void testStreamProcessorWithRemove() { TestZkUtils.sleepMs(1000); attempts--; } - Assert.assertTrue("Didn't read all the events in the first batch in " + ATTEMPTS_NUMBER + " attempts", attempts > 0); + Assert + .assertTrue("Didn't read all the events in the first batch in " + ATTEMPTS_NUMBER + " attempts", attempts > 0); // stop the first processor synchronized (t1) { diff --git a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkStreamProcessorBase.java b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkStreamProcessorBase.java index 35fabb9d0c..5b3b867766 100644 --- a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkStreamProcessorBase.java +++ b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkStreamProcessorBase.java @@ -57,7 +57,6 @@ public void onShutdown() { stopLatchCountDown.countDown(); } System.out.println("ON STOP. PID = " + pId + " in thread " + Thread.currentThread()); - } @Override @@ -218,7 +217,6 @@ public void process(IncomingMessageEnvelope incomingMessageEnvelope, MessageColl messageCollector.send(new OutgoingMessageEnvelope(new SystemStream(outputSystem, outputTopic), message)); processedMessageCount++; - System.out.println( "Stream processor " + processorId + ";offset=" + incomingMessageEnvelope.getOffset() + "; totalRcvd=" + processedMessageCount + ";received " + message + "; ssp=" + incomingMessageEnvelope From efe13cd49bebcc287cc3c40dc12fa7e07fe360f4 Mon Sep 17 00:00:00 2001 From: Boris Shkolnik Date: Mon, 15 May 2017 11:34:24 -0700 Subject: [PATCH 13/25] move some of the init code into @Before --- .../test/processor/TestZkStreamProcessor.java | 52 ++++++++----------- .../processor/TestZkStreamProcessorBase.java | 24 +++++---- 2 files changed, 34 insertions(+), 42 deletions(-) diff --git a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkStreamProcessor.java b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkStreamProcessor.java index 56c91e817a..e534f11b55 100644 --- a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkStreamProcessor.java +++ b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkStreamProcessor.java @@ -22,6 +22,7 @@ import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.samza.processor.StreamProcessor; import org.apache.samza.zk.TestZkUtils; import org.junit.Assert; @@ -35,12 +36,31 @@ */ public class TestZkStreamProcessor extends TestZkStreamProcessorBase { + // to avoid long sleeps, we rather use multiple attempts with shorter sleeps private final static int ATTEMPTS_NUMBER = 5; - // to avoid long sleeps, we rather use multiple attempts with shorter sleeps + + private AtomicInteger counter = new AtomicInteger(1); + private String testSystem = "test-system"; + private String inputTopic = "numbers"; + private String outputTopic = "output"; + private int messageCount = 40; + + private Map map; + @Before public void setupTest() { + // for each tests - make the common parts unique + int seqNum = counter.getAndAdd(1); + testSystem = "test-system" + seqNum; + inputTopic = "numbers" + seqNum; + outputTopic = "output" + seqNum; + map = createConfigs(testSystem, inputTopic, outputTopic, messageCount); + + // Note: createTopics needs to be called before creating a StreamProcessor. Otherwise it fails with a + // TopicExistsException since StreamProcessor auto-creates them. + createTopics(inputTopic, outputTopic); } @Test @@ -60,16 +80,6 @@ public void testFiveStreamProcessors() { // main test method for happy path with fixed number of processors private void testStreamProcessor(String[] processorIds) { - final String testSystem = "test-system" + processorIds.length; // making system unique per test - final String inputTopic = "numbers" + processorIds.length; // making input topic unique per test - final String outputTopic = "output" + processorIds.length; // making output topic unique per test - final int messageCount = 40; - - final Map map = createConfigs(testSystem, inputTopic, outputTopic, messageCount); - - // Note: createTopics needs to be called before creating a StreamProcessor. Otherwise it fails with a - // TopicExistsException since StreamProcessor auto-creates them. - createTopics(inputTopic, outputTopic); // create a latch of the size == number of messages TestZkStreamProcessorBase.TestStreamTask.endLatch = new CountDownLatch(messageCount); @@ -125,16 +135,6 @@ private void testStreamProcessor(String[] processorIds) { /** * Similar to the previous tests, but add another processor in the middle */ public void testStreamProcessorWithAdd() { - final String testSystem = "test-system1"; - final String inputTopic = "numbers_add"; - final String outputTopic = "output_add"; - final int messageCount = 40; - - final Map map = createConfigs(testSystem, inputTopic, outputTopic, messageCount); - - // Note: createTopics needs to be called before creating a StreamProcessor. Otherwise it fails with a - // TopicExistsException since StreamProcessor auto-creates them. - createTopics(inputTopic, outputTopic); // set number of events we expect wo read by both processes in total: // p1 - reads 'messageCount' at first @@ -234,16 +234,6 @@ private void testStreamProcessor(String[] processorIds) { /** * same as other happy path messages, but with one processor removed in the middle */ public void testStreamProcessorWithRemove() { - final String testSystem = "test-system2"; - final String inputTopic = "numbers_rm"; - final String outputTopic = "output_rm"; - final int messageCount = 40; - - final Map map = createConfigs(testSystem, inputTopic, outputTopic, messageCount); - - // Note: createTopics needs to be called before creating a StreamProcessor. Otherwise it fails with a - // TopicExistsException since StreamProcessor auto-creates them. - createTopics(inputTopic, outputTopic); // set number of events we expect to read by both processes in total: // p1 and p2 - both read messageCount at first and p1 is shutdown, new batch of events is generated diff --git a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkStreamProcessorBase.java b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkStreamProcessorBase.java index 5b3b867766..d22c724d07 100644 --- a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkStreamProcessorBase.java +++ b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkStreamProcessorBase.java @@ -31,9 +31,12 @@ import org.apache.samza.test.StandaloneIntegrationTestHarness; import org.apache.samza.test.StandaloneTestUtils; import org.junit.Assert; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class TestZkStreamProcessorBase extends StandaloneIntegrationTestHarness { + public final static Logger LOG = LoggerFactory.getLogger(TestZkStreamProcessorBase.class); public final static int BAD_MESSAGE_KEY = 1000; // auxiliary methods @@ -56,12 +59,12 @@ public void onShutdown() { if (stopLatchCountDown != null) { stopLatchCountDown.countDown(); } - System.out.println("ON STOP. PID = " + pId + " in thread " + Thread.currentThread()); + LOG.info("onShutdown is called for pid=" + pId); } @Override public void onFailure(Throwable t) { - + LOG.info("onFailure is called for pid=" + pId); } }); @@ -103,7 +106,7 @@ protected void produceMessages(final int start, String topic, int numMessages) { KafkaProducer producer = getKafkaProducer(); for (int i = start; i < numMessages + start; i++) { try { - System.out.println("producing " + i); + LOG.info("producing " + i); producer.send(new ProducerRecord(topic, String.valueOf(i).getBytes())).get(); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); @@ -126,11 +129,11 @@ public void run() { synchronized (this) { this.wait(100000); } - System.out.println("notified. Abandon the wait."); + LOG.info("notified. Abandon the wait."); } catch (InterruptedException e) { - System.out.println("wait interrupted" + e); + LOG.error("wait interrupted" + e); } - System.out.println("Stopping the processor"); + LOG.info("Stopping the processor"); processor.stop(); } }; @@ -167,13 +170,13 @@ protected void verifyNumMessages(String topic, final Map expec while (iterator.hasNext()) { ConsumerRecord record = iterator.next(); String val = new String((byte[]) record.value()); - System.out.println("Got value " + val); + LOG.info("Got value " + val); map.put(Integer.valueOf(val), true); count++; } } else { emptyPollCount++; - System.out.println("empty polls " + emptyPollCount); + LOG.warn("empty polls " + emptyPollCount); } } // filter out numbers we did not get @@ -209,15 +212,14 @@ public void process(IncomingMessageEnvelope incomingMessageEnvelope, MessageColl // inject a failure if (Integer.valueOf((String) message) >= BAD_MESSAGE_KEY && processorId.equals("1")) { - System.out.println("================================ FAILING for msg=" + message); + LOG.info("process method will fail for msg=" + message); throw new Exception("Processing in the processor " + processorId + " failed "); } - System.out.println(processorId + " is writing out " + message); messageCollector.send(new OutgoingMessageEnvelope(new SystemStream(outputSystem, outputTopic), message)); processedMessageCount++; - System.out.println( + LOG.info( "Stream processor " + processorId + ";offset=" + incomingMessageEnvelope.getOffset() + "; totalRcvd=" + processedMessageCount + ";received " + message + "; ssp=" + incomingMessageEnvelope .getSystemStreamPartition()); From 17e671bdcc47835794b35c555ee59e66851ee98b Mon Sep 17 00:00:00 2001 From: Boris Shkolnik Date: Mon, 15 May 2017 13:37:05 -0700 Subject: [PATCH 14/25] cleanup --- .../test/processor/TestZkStreamProcessor.java | 63 +++---------------- .../processor/TestZkStreamProcessorBase.java | 42 ++++++++++++- 2 files changed, 47 insertions(+), 58 deletions(-) diff --git a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkStreamProcessor.java b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkStreamProcessor.java index e534f11b55..b409532f04 100644 --- a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkStreamProcessor.java +++ b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkStreamProcessor.java @@ -36,13 +36,10 @@ */ public class TestZkStreamProcessor extends TestZkStreamProcessorBase { - // to avoid long sleeps, we rather use multiple attempts with shorter sleeps - private final static int ATTEMPTS_NUMBER = 5; - private AtomicInteger counter = new AtomicInteger(1); - private String testSystem = "test-system"; - private String inputTopic = "numbers"; - private String outputTopic = "output"; + private String testSystem; + private String inputTopic; + private String outputTopic; private int messageCount = 40; private Map map; @@ -162,19 +159,7 @@ private void testStreamProcessor(String[] processorIds) { } // make sure it consumes all the messages from the first batch - int attempts = ATTEMPTS_NUMBER; - while (attempts > 0) { - long leftEventsCount = TestStreamTask.endLatch.getCount(); - System.out.println("messages left to consume = " + leftEventsCount); - if (leftEventsCount == totalEventsToGenerate - messageCount) { // read first batch - System.out.println("read first batch. left to consume = " + leftEventsCount); - break; - } - TestZkUtils.sleepMs(1000); - attempts--; - } - Assert - .assertTrue("Didn't read all the events in the first batch in " + ATTEMPTS_NUMBER + " attempts", attempts > 0); + waitUntilConsumedN(totalEventsToGenerate - messageCount); // start the second processor CountDownLatch countDownLatch2 = new CountDownLatch(1); @@ -197,18 +182,7 @@ private void testStreamProcessor(String[] processorIds) { // wait until all the events are consumed // make sure it consumes all the messages from the first batch - attempts = ATTEMPTS_NUMBER; - while (attempts > 0) { - long leftEventsCount = TestStreamTask.endLatch.getCount(); // how much is left to read - System.out.println("2 processors together. left to consume = " + leftEventsCount); - if (leftEventsCount == 0) { // should read all of them - System.out.println("2 processors together. read all. left " + leftEventsCount); - break; - } - TestZkUtils.sleepMs(1000); - attempts--; - } - Assert.assertTrue("Didn't read all the leftover events in " + ATTEMPTS_NUMBER + " attempts", attempts > 0); + waitUntilConsumedN(0); // shutdown both try { @@ -274,19 +248,7 @@ private void testStreamProcessor(String[] processorIds) { produceMessages(0, inputTopic, messageCount); // make sure they consume all the messages from the first batch - int attempts = ATTEMPTS_NUMBER; - while (attempts > 0) { - long leftEventsCount = TestStreamTask.endLatch.getCount(); - System.out.println("current count = " + leftEventsCount); - if (leftEventsCount == totalEventsToGenerate - messageCount) { // read first batch - System.out.println("read all. current count = " + leftEventsCount); - break; - } - TestZkUtils.sleepMs(1000); - attempts--; - } - Assert - .assertTrue("Didn't read all the events in the first batch in " + ATTEMPTS_NUMBER + " attempts", attempts > 0); + waitUntilConsumedN(totalEventsToGenerate - messageCount); // stop the first processor synchronized (t1) { @@ -308,18 +270,7 @@ private void testStreamProcessor(String[] processorIds) { produceMessages(messageCount, inputTopic, messageCount); // wait until p2 consumes all the message by itself; - attempts = ATTEMPTS_NUMBER; - while (attempts > 0) { - long leftEventsCount = TestZkStreamProcessorBase.TestStreamTask.endLatch.getCount(); - System.out.println("2current count = " + leftEventsCount); - if (leftEventsCount == 0) { // should read all of them - System.out.println("2read all. current count = " + leftEventsCount); - break; - } - TestZkUtils.sleepMs(1000); - attempts--; - } - Assert.assertTrue("Didn't read all the leftover events in " + ATTEMPTS_NUMBER + " attempts", attempts > 0); + waitUntilConsumedN(0); // shutdown p2 diff --git a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkStreamProcessorBase.java b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkStreamProcessorBase.java index d22c724d07..9320feb414 100644 --- a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkStreamProcessorBase.java +++ b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkStreamProcessorBase.java @@ -1,3 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.samza.test.processor; import java.util.Collections; @@ -30,6 +49,7 @@ import org.apache.samza.task.TaskCoordinator; import org.apache.samza.test.StandaloneIntegrationTestHarness; import org.apache.samza.test.StandaloneTestUtils; +import org.apache.samza.zk.TestZkUtils; import org.junit.Assert; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,6 +58,10 @@ public class TestZkStreamProcessorBase extends StandaloneIntegrationTestHarness { public final static Logger LOG = LoggerFactory.getLogger(TestZkStreamProcessorBase.class); public final static int BAD_MESSAGE_KEY = 1000; + // to avoid long sleeps, we rather use multiple attempts with shorter sleeps + private final static int ATTEMPTS_NUMBER = 5; + + // auxiliary methods protected StreamProcessor createStreamProcessor(final String pId, Map map, @@ -219,8 +243,7 @@ public void process(IncomingMessageEnvelope incomingMessageEnvelope, MessageColl messageCollector.send(new OutgoingMessageEnvelope(new SystemStream(outputSystem, outputTopic), message)); processedMessageCount++; - LOG.info( - "Stream processor " + processorId + ";offset=" + incomingMessageEnvelope.getOffset() + "; totalRcvd=" + LOG.info("Stream processor " + processorId + ";offset=" + incomingMessageEnvelope.getOffset() + "; totalRcvd=" + processedMessageCount + ";received " + message + "; ssp=" + incomingMessageEnvelope .getSystemStreamPartition()); @@ -229,4 +252,19 @@ public void process(IncomingMessageEnvelope incomingMessageEnvelope, MessageColl } } } + + protected void waitUntilConsumedN(int untilLeft) { + int attempts = ATTEMPTS_NUMBER; + while (attempts > 0) { + long leftEventsCount = TestZkStreamProcessorBase.TestStreamTask.endLatch.getCount(); + //System.out.println("2current count = " + leftEventsCount); + if (leftEventsCount == untilLeft) { // should read all of them + //System.out.println("2read all. current count = " + leftEventsCount); + break; + } + TestZkUtils.sleepMs(1000); + attempts--; + } + Assert.assertTrue("Didn't read all the leftover events in " + ATTEMPTS_NUMBER + " attempts", attempts > 0); + } } From ff44db4ca046f598884f829c30ec3666b478eb91 Mon Sep 17 00:00:00 2001 From: Boris Shkolnik Date: Mon, 15 May 2017 15:31:52 -0700 Subject: [PATCH 15/25] added configs --- .../versioned/jobs/configuration-table.html | 49 +++++++++++++++++++ .../org/apache/samza/config/ZkConfig.java | 6 ++- .../zk/TestZkBarrierForVersionUpgrade.java | 2 +- 3 files changed, 54 insertions(+), 3 deletions(-) diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html b/docs/learn/documentation/versioned/jobs/configuration-table.html index afa42f5942..c94d3b58b4 100644 --- a/docs/learn/documentation/versioned/jobs/configuration-table.html +++ b/docs/learn/documentation/versioned/jobs/configuration-table.html @@ -409,6 +409,54 @@

Samza Configuration Reference

stores.store-name.changelog. + + + StandAlone job configuration + + + coordinator.zk.connect + + + Required for Zookeeper-based StandAlone applicaiton. Zookeeper coordinates to be used by + the application coordination. Usually in "host:port" format. + + + + coordinator.zk.session-timeout-ms + 30000 + + Zookeeper session timeout for all the ZK connections in milliseconds. + + + + coordinator.zk.connection-timeout-ms + 60000 + + Zookeeper connection timeout in milliseconds. + + + + coordinator.zk.new-jobmodel-consensus-timeout-ms + 40000 + + How long the processors will wait until all of them report acceptance of a the new job model before rolling back. + + + + job.coordinator.factory + + + Class to use for job coordination. Currently available values are: +
+
org.apache.samza.standalone.StandaloneJobCoordinatorFactory
+
For use in StandAlone, no Zookeeper, fixed partition mapping application.
+
org.apache.samza.zk.ZkJobCoordinatorFactory
+
For use in StandAlone, Zookeeper-based coordination application.
+
+ Required only for StandAlone applications. Please see the required value for task-name-groupper-factory + + + Task configuration @@ -647,6 +695,7 @@

Samza Configuration Reference

The fully-qualified name of the Java class which determines the factory class which will build the TaskNameGrouper. The default configuration value if the property is not present is task.name.grouper.factory=org.apache.samza.container.grouper.task.GroupByContainerCountFactory.
The user can specify a custom implementation of the TaskNameGrouperFactory where a custom logic is implemented for grouping the tasks. +

Note. For StandAlone Application one must use org.apache.samza.container.grouper.task.GroupByContainerIdsFactory diff --git a/samza-core/src/main/java/org/apache/samza/config/ZkConfig.java b/samza-core/src/main/java/org/apache/samza/config/ZkConfig.java index fc483eb941..a5e6dad23a 100644 --- a/samza-core/src/main/java/org/apache/samza/config/ZkConfig.java +++ b/samza-core/src/main/java/org/apache/samza/config/ZkConfig.java @@ -27,7 +27,9 @@ public class ZkConfig extends MapConfig { public static final int DEFAULT_CONNECTION_TIMEOUT_MS = 60000; public static final int DEFAULT_SESSION_TIMEOUT_MS = 30000; - public static final String ZK_BARRIER_TIMEOUT_MS = "coordinator.zk.barrier-timeout-ms"; + + + public static final String ZK_NEW_JOBMODEL_CONSENSUS_TIMEOUT_MS = "coordinator.zk.new-jobmodel-consensus-timeout-ms"; public static final int DEFAULT_BARRIER_TIMEOUT_MS = 40000; public ZkConfig(Config config) { @@ -50,6 +52,6 @@ public int getZkConnectionTimeoutMs() { } public int getZkBarrierTimeoutMs() { - return getInt(ZK_BARRIER_TIMEOUT_MS, DEFAULT_BARRIER_TIMEOUT_MS); + return getInt(ZK_NEW_JOBMODEL_CONSENSUS_TIMEOUT_MS, DEFAULT_BARRIER_TIMEOUT_MS); } } diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java index f1bb80454a..eadce37d06 100644 --- a/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java +++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java @@ -57,7 +57,7 @@ public void testSetup() { String processorId = "p1"; Map map = new HashMap<>(); map.put(ZkConfig.ZK_CONNECT, testZkConnectionString); - map.put(ZkConfig.ZK_BARRIER_TIMEOUT_MS, "200"); + map.put(ZkConfig.ZK_NEW_JOBMODEL_CONSENSUS_TIMEOUT_MS, "200"); Config config = new MapConfig(map); CoordinationServiceFactory serviceFactory = new ZkCoordinationServiceFactory(); From b7d6ca1fde5b16aa4c2160e7f34f97acc41c0a3c Mon Sep 17 00:00:00 2001 From: Boris Shkolnik Date: Tue, 16 May 2017 11:35:15 -0700 Subject: [PATCH 16/25] fake update to trigger another build --- .../learn/documentation/versioned/jobs/configuration-table.html | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html b/docs/learn/documentation/versioned/jobs/configuration-table.html index c94d3b58b4..de3e046bdd 100644 --- a/docs/learn/documentation/versioned/jobs/configuration-table.html +++ b/docs/learn/documentation/versioned/jobs/configuration-table.html @@ -417,7 +417,7 @@

Samza Configuration Reference

coordinator.zk.connect - Required for Zookeeper-based StandAlone applicaiton. Zookeeper coordinates to be used by + Required for Zookeeper-based StandAlone applicaiton. Zookeeper coordinates to be used by the application coordination. Usually in "host:port" format. From c1923acf8d9409d3ce283e6e92315c0023c0e012 Mon Sep 17 00:00:00 2001 From: Boris Shkolnik Date: Tue, 16 May 2017 17:19:46 -0700 Subject: [PATCH 17/25] review comments --- .../versioned/jobs/configuration-table.html | 40 +++++++++---------- .../org/apache/samza/config/ZkConfig.java | 8 ++-- 2 files changed, 24 insertions(+), 24 deletions(-) diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html b/docs/learn/documentation/versioned/jobs/configuration-table.html index de3e046bdd..21ec124ea5 100644 --- a/docs/learn/documentation/versioned/jobs/configuration-table.html +++ b/docs/learn/documentation/versioned/jobs/configuration-table.html @@ -409,12 +409,27 @@

Samza Configuration Reference

stores.store-name.changelog. + + job.coordinator.factory + + + Class to use for job coordination. Currently available values are: +
+
org.apache.samza.standalone.StandaloneJobCoordinatorFactory
+
Fixed partition mapping application. No Zoookeeper.
+
org.apache.samza.zk.ZkJobCoordinatorFactory
+
Zookeeper-based coordination application.
+
+ Required only for StandAlone applications. Please see the required value for task-name-groupper-factory + + + - StandAlone job configuration + Zookeeper-based job configuration - coordinator.zk.connect + job.coordinator.zk.connect Required for Zookeeper-based StandAlone applicaiton. Zookeeper coordinates to be used by @@ -422,41 +437,26 @@

Samza Configuration Reference

- coordinator.zk.session-timeout-ms + job.coordinator.zk.session.timeout.ms 30000 Zookeeper session timeout for all the ZK connections in milliseconds. - coordinator.zk.connection-timeout-ms + job.coordinator.zk.connection.timeout.ms 60000 Zookeeper connection timeout in milliseconds. - coordinator.zk.new-jobmodel-consensus-timeout-ms + job.coordinator.zk.consensus.timeout.ms 40000 How long the processors will wait until all of them report acceptance of a the new job model before rolling back. - - job.coordinator.factory - - - Class to use for job coordination. Currently available values are: -
-
org.apache.samza.standalone.StandaloneJobCoordinatorFactory
-
For use in StandAlone, no Zookeeper, fixed partition mapping application.
-
org.apache.samza.zk.ZkJobCoordinatorFactory
-
For use in StandAlone, Zookeeper-based coordination application.
-
- Required only for StandAlone applications. Please see the required value for task-name-groupper-factory - - - Task configuration diff --git a/samza-core/src/main/java/org/apache/samza/config/ZkConfig.java b/samza-core/src/main/java/org/apache/samza/config/ZkConfig.java index a5e6dad23a..31beca6e40 100644 --- a/samza-core/src/main/java/org/apache/samza/config/ZkConfig.java +++ b/samza-core/src/main/java/org/apache/samza/config/ZkConfig.java @@ -21,15 +21,15 @@ public class ZkConfig extends MapConfig { // Connection string for ZK, format: ::,..." - public static final String ZK_CONNECT = "coordinator.zk.connect"; - public static final String ZK_SESSION_TIMEOUT_MS = "coordinator.zk.session-timeout-ms"; - public static final String ZK_CONNECTION_TIMEOUT_MS = "coordinator.zk.connection-timeout-ms"; + public static final String ZK_CONNECT = "job.coordinator.zk.connect"; + public static final String ZK_SESSION_TIMEOUT_MS = "job.coordinator.zk.session.timeout.ms"; + public static final String ZK_CONNECTION_TIMEOUT_MS = "job.coordinator.zk.connection.timeout.ms"; public static final int DEFAULT_CONNECTION_TIMEOUT_MS = 60000; public static final int DEFAULT_SESSION_TIMEOUT_MS = 30000; - public static final String ZK_NEW_JOBMODEL_CONSENSUS_TIMEOUT_MS = "coordinator.zk.new-jobmodel-consensus-timeout-ms"; + public static final String ZK_NEW_JOBMODEL_CONSENSUS_TIMEOUT_MS = "job.coordinator.zk.consensus.timeout.ms"; public static final int DEFAULT_BARRIER_TIMEOUT_MS = 40000; public ZkConfig(Config config) { From 8512ab7419a37f1dee871ac7c3191c068aa042ed Mon Sep 17 00:00:00 2001 From: Boris Shkolnik Date: Tue, 16 May 2017 17:23:49 -0700 Subject: [PATCH 18/25] rename --- .../learn/documentation/versioned/jobs/configuration-table.html | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html b/docs/learn/documentation/versioned/jobs/configuration-table.html index 21ec124ea5..b1b3e293b8 100644 --- a/docs/learn/documentation/versioned/jobs/configuration-table.html +++ b/docs/learn/documentation/versioned/jobs/configuration-table.html @@ -420,7 +420,7 @@

Samza Configuration Reference

org.apache.samza.zk.ZkJobCoordinatorFactory
Zookeeper-based coordination application.
- Required only for StandAlone applications. Please see the required value for task-name-groupper-factory + Required only for non-cluster-managed applications. Please see the required value for task-name-groupper-factory From 7845d83c137a9c91813c811330972ea61dc080fa Mon Sep 17 00:00:00 2001 From: Boris Shkolnik Date: Wed, 17 May 2017 14:00:39 -0700 Subject: [PATCH 19/25] small refactoring --- .../samza/zk/ZkCoordinationServiceFactory.java | 8 +------- .../main/java/org/apache/samza/zk/ZkUtils.java | 17 +++++++++++++++++ 2 files changed, 18 insertions(+), 7 deletions(-) diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java index 997173270a..854c84927e 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java @@ -30,13 +30,7 @@ public class ZkCoordinationServiceFactory implements CoordinationServiceFactory // TODO - Why should this method be synchronized? synchronized public CoordinationUtils getCoordinationService(String groupId, String participantId, Config config) { ZkConfig zkConfig = new ZkConfig(config); - ZkClient zkClient; - try { - zkClient = new ZkClient(zkConfig.getZkConnect(), zkConfig.getZkSessionTimeoutMs(), zkConfig.getZkConnectionTimeoutMs()); - } catch (Exception e) { - throw new SamzaException("zkClient failed to connect to ZK at :" + zkConfig.getZkConnect(), e); - } - + ZkClient zkClient = ZkUtils.createZkClient(zkConfig); ZkUtils zkUtils = new ZkUtils(new ZkKeyBuilder(groupId), zkClient, zkConfig.getZkConnectionTimeoutMs()); return new ZkCoordinationUtils(participantId, zkConfig, zkUtils, new ScheduleAfterDebounceTime()); } diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java index 5c8fcf3d37..1c74ff36ae 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java @@ -25,6 +25,7 @@ import org.I0Itec.zkclient.ZkConnection; import org.I0Itec.zkclient.exception.ZkInterruptedException; import org.apache.samza.SamzaException; +import org.apache.samza.config.ZkConfig; import org.apache.samza.job.model.JobModel; import org.apache.samza.serializers.model.SamzaObjectMapper; import org.apache.zookeeper.data.Stat; @@ -119,6 +120,22 @@ public synchronized String registerProcessorAndGetId(final ProcessorData data) { } } + /** + * create an instance of ZkClient + * @param zkConfig + * @return an instance of zkClient + */ + public static ZkClient createZkClient(ZkConfig zkConfig) { + try { + ZkClient zkClient = + new ZkClient(zkConfig.getZkConnect(), zkConfig.getZkSessionTimeoutMs(), zkConfig.getZkConnectionTimeoutMs()); + return zkClient; + } catch (Exception e) { + // ZkClient constructor may throw a varaity of different exceptions, not all of them Zk based. + throw new SamzaException("zkClient failed to connect to ZK at :" + zkConfig.getZkConnect(), e); + } + } + public synchronized String getEphemeralPath() { return ephemeralPath; } From fae7283e5b91c59ece6f137c409e61b541ddeac6 Mon Sep 17 00:00:00 2001 From: Boris Shkolnik Date: Wed, 17 May 2017 14:49:16 -0700 Subject: [PATCH 20/25] small refactoring --- samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java index 1c74ff36ae..403e1d660b 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java @@ -127,8 +127,7 @@ public synchronized String registerProcessorAndGetId(final ProcessorData data) { */ public static ZkClient createZkClient(ZkConfig zkConfig) { try { - ZkClient zkClient = - new ZkClient(zkConfig.getZkConnect(), zkConfig.getZkSessionTimeoutMs(), zkConfig.getZkConnectionTimeoutMs()); + ZkClient zkClient = new ZkClient(zkConfig.getZkConnect(), zkConfig.getZkSessionTimeoutMs(), zkConfig.getZkConnectionTimeoutMs()); return zkClient; } catch (Exception e) { // ZkClient constructor may throw a varaity of different exceptions, not all of them Zk based. From 7b2903105ce6f2ace86a63ac5d4f838c6b7652a1 Mon Sep 17 00:00:00 2001 From: Boris Shkolnik Date: Wed, 17 May 2017 16:32:14 -0700 Subject: [PATCH 21/25] checkStyle --- samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java index 403e1d660b..6fc16c9fe3 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java @@ -122,7 +122,7 @@ public synchronized String registerProcessorAndGetId(final ProcessorData data) { /** * create an instance of ZkClient - * @param zkConfig + * @param zkConfig Zookeeper config * @return an instance of zkClient */ public static ZkClient createZkClient(ZkConfig zkConfig) { From a32364fa08a37f8cf5a0042a883e09ebff5b5115 Mon Sep 17 00:00:00 2001 From: Boris Shkolnik Date: Wed, 17 May 2017 18:21:08 -0700 Subject: [PATCH 22/25] checkstyle --- .../java/org/apache/samza/zk/ZkCoordinationServiceFactory.java | 1 - 1 file changed, 1 deletion(-) diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java index 854c84927e..0fbf052b88 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java @@ -19,7 +19,6 @@ package org.apache.samza.zk; import org.I0Itec.zkclient.ZkClient; -import org.apache.samza.SamzaException; import org.apache.samza.config.Config; import org.apache.samza.config.ZkConfig; import org.apache.samza.coordinator.CoordinationServiceFactory; From eef9c548337828e56f188f0c7962c9ff3b52c010 Mon Sep 17 00:00:00 2001 From: Boris Shkolnik Date: Fri, 19 May 2017 11:32:32 -0700 Subject: [PATCH 23/25] create path in ZK at start --- .../versioned/jobs/configuration-table.html | 49 ------------------- .../org/apache/samza/config/ZkConfig.java | 12 ++--- .../zk/ZkCoordinationServiceFactory.java | 1 - .../java/org/apache/samza/zk/ZkUtils.java | 33 ++++++++++--- .../zk/TestZkBarrierForVersionUpgrade.java | 2 +- .../apache/samza/zk/TestZkLeaderElector.java | 12 ++--- .../apache/samza/zk/TestZkProcessorLatch.java | 14 +++--- .../java/org/apache/samza/zk/TestZkUtils.java | 27 ++++++++-- 8 files changed, 67 insertions(+), 83 deletions(-) diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html b/docs/learn/documentation/versioned/jobs/configuration-table.html index b1b3e293b8..afa42f5942 100644 --- a/docs/learn/documentation/versioned/jobs/configuration-table.html +++ b/docs/learn/documentation/versioned/jobs/configuration-table.html @@ -408,54 +408,6 @@

Samza Configuration Reference

You can override this system by specifying both the system and the stream in stores.store-name.changelog. - - - job.coordinator.factory - - - Class to use for job coordination. Currently available values are: -
-
org.apache.samza.standalone.StandaloneJobCoordinatorFactory
-
Fixed partition mapping application. No Zoookeeper.
-
org.apache.samza.zk.ZkJobCoordinatorFactory
-
Zookeeper-based coordination application.
-
- Required only for non-cluster-managed applications. Please see the required value for task-name-groupper-factory - - - - - - Zookeeper-based job configuration - - - job.coordinator.zk.connect - - - Required for Zookeeper-based StandAlone applicaiton. Zookeeper coordinates to be used by - the application coordination. Usually in "host:port" format. - - - - job.coordinator.zk.session.timeout.ms - 30000 - - Zookeeper session timeout for all the ZK connections in milliseconds. - - - - job.coordinator.zk.connection.timeout.ms - 60000 - - Zookeeper connection timeout in milliseconds. - - - - job.coordinator.zk.consensus.timeout.ms - 40000 - - How long the processors will wait until all of them report acceptance of a the new job model before rolling back. - Task configuration @@ -695,7 +647,6 @@

Samza Configuration Reference

The fully-qualified name of the Java class which determines the factory class which will build the TaskNameGrouper. The default configuration value if the property is not present is task.name.grouper.factory=org.apache.samza.container.grouper.task.GroupByContainerCountFactory.
The user can specify a custom implementation of the TaskNameGrouperFactory where a custom logic is implemented for grouping the tasks. -

Note. For StandAlone Application one must use org.apache.samza.container.grouper.task.GroupByContainerIdsFactory diff --git a/samza-core/src/main/java/org/apache/samza/config/ZkConfig.java b/samza-core/src/main/java/org/apache/samza/config/ZkConfig.java index 31beca6e40..fc483eb941 100644 --- a/samza-core/src/main/java/org/apache/samza/config/ZkConfig.java +++ b/samza-core/src/main/java/org/apache/samza/config/ZkConfig.java @@ -21,15 +21,13 @@ public class ZkConfig extends MapConfig { // Connection string for ZK, format: ::,..." - public static final String ZK_CONNECT = "job.coordinator.zk.connect"; - public static final String ZK_SESSION_TIMEOUT_MS = "job.coordinator.zk.session.timeout.ms"; - public static final String ZK_CONNECTION_TIMEOUT_MS = "job.coordinator.zk.connection.timeout.ms"; + public static final String ZK_CONNECT = "coordinator.zk.connect"; + public static final String ZK_SESSION_TIMEOUT_MS = "coordinator.zk.session-timeout-ms"; + public static final String ZK_CONNECTION_TIMEOUT_MS = "coordinator.zk.connection-timeout-ms"; public static final int DEFAULT_CONNECTION_TIMEOUT_MS = 60000; public static final int DEFAULT_SESSION_TIMEOUT_MS = 30000; - - - public static final String ZK_NEW_JOBMODEL_CONSENSUS_TIMEOUT_MS = "job.coordinator.zk.consensus.timeout.ms"; + public static final String ZK_BARRIER_TIMEOUT_MS = "coordinator.zk.barrier-timeout-ms"; public static final int DEFAULT_BARRIER_TIMEOUT_MS = 40000; public ZkConfig(Config config) { @@ -52,6 +50,6 @@ public int getZkConnectionTimeoutMs() { } public int getZkBarrierTimeoutMs() { - return getInt(ZK_NEW_JOBMODEL_CONSENSUS_TIMEOUT_MS, DEFAULT_BARRIER_TIMEOUT_MS); + return getInt(ZK_BARRIER_TIMEOUT_MS, DEFAULT_BARRIER_TIMEOUT_MS); } } diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java index 854c84927e..0fbf052b88 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java @@ -19,7 +19,6 @@ package org.apache.samza.zk; import org.I0Itec.zkclient.ZkClient; -import org.apache.samza.SamzaException; import org.apache.samza.config.Config; import org.apache.samza.config.ZkConfig; import org.apache.samza.coordinator.CoordinationServiceFactory; diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java index 01e35078d5..5db913f5f1 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java @@ -19,6 +19,12 @@ package org.apache.samza.zk; +import com.google.common.base.Strings; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.TimeUnit; import org.I0Itec.zkclient.IZkChildListener; import org.I0Itec.zkclient.IZkDataListener; import org.I0Itec.zkclient.ZkClient; @@ -28,17 +34,12 @@ import org.apache.samza.config.ZkConfig; import org.apache.samza.job.model.JobModel; import org.apache.samza.serializers.model.SamzaObjectMapper; +import org.apache.zookeeper.client.ConnectStringParser; import org.apache.zookeeper.data.Stat; import org.codehaus.jackson.map.ObjectMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.TimeUnit; - /** * Util class to help manage Zk connection and ZkClient. * It also provides additional utility methods for read/write/subscribe/unsubscribe access to the ZK tree. @@ -104,6 +105,7 @@ public static ZkClient createZkClient(String connectString, int sessionTimeoutMS public static ZkClient createZkClient(ZkConfig zkConfig) { try { ZkClient zkClient = createZkClient(zkConfig.getZkConnect(), zkConfig.getZkSessionTimeoutMs(), zkConfig.getZkConnectionTimeoutMs()); + initZkPath(zkConfig.getZkConnect(), zkClient); return zkClient; } catch (Exception e) { // ZkClient constructor may throw a variety of different exceptions, not all of them Zk based. @@ -111,6 +113,25 @@ public static ZkClient createZkClient(ZkConfig zkConfig) { } } + /** + * if ZkConnectString contains some path at the end, it needs to be created when connecting for the first time. + * @param zkConnect - connect string + * @param zkClient - zkClient object to talk to the ZK + */ + public static void initZkPath(String zkConnect, ZkClient zkClient) { + ConnectStringParser parser = new ConnectStringParser(zkConnect); + + String path = parser.getChrootPath(); + LOG.info("path =" + path); + if (!Strings.isNullOrEmpty(path)) { + // create this path in zk + LOG.info("first connect. creating path =" + path + " in ZK " + parser.getServerAddresses()); + if (!zkClient.exists(path)) { + zkClient.createPersistent(path, true); // will create parents if needed and will not throw exception if exists + } + } + } + ZkClient getZkClient() { return zkClient; } diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java index eadce37d06..f1bb80454a 100644 --- a/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java +++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java @@ -57,7 +57,7 @@ public void testSetup() { String processorId = "p1"; Map map = new HashMap<>(); map.put(ZkConfig.ZK_CONNECT, testZkConnectionString); - map.put(ZkConfig.ZK_NEW_JOBMODEL_CONSENSUS_TIMEOUT_MS, "200"); + map.put(ZkConfig.ZK_BARRIER_TIMEOUT_MS, "200"); Config config = new MapConfig(map); CoordinationServiceFactory serviceFactory = new ZkCoordinationServiceFactory(); diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java index 0f9b2907ac..ab13c5179f 100644 --- a/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java +++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java @@ -18,9 +18,13 @@ */ package org.apache.samza.zk; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.I0Itec.zkclient.IZkDataListener; import org.I0Itec.zkclient.ZkClient; -import org.I0Itec.zkclient.ZkConnection; import org.I0Itec.zkclient.exception.ZkNodeExistsException; import org.apache.samza.SamzaException; import org.apache.samza.testUtils.EmbeddedZookeeper; @@ -33,12 +37,6 @@ import org.mockito.Mockito; import org.slf4j.Logger; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkProcessorLatch.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkProcessorLatch.java index 84c11f4b4a..de80bf299a 100644 --- a/samza-core/src/test/java/org/apache/samza/zk/TestZkProcessorLatch.java +++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkProcessorLatch.java @@ -18,8 +18,13 @@ */ package org.apache.samza.zk; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.I0Itec.zkclient.ZkClient; -import org.I0Itec.zkclient.ZkConnection; import org.apache.samza.coordinator.Latch; import org.apache.samza.testUtils.EmbeddedZookeeper; import org.junit.After; @@ -29,13 +34,6 @@ import org.junit.BeforeClass; import org.junit.Test; -import java.util.List; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - /** * The ZkProcessorLatch uses a shared Znode as a latch. Each participant await existence of a target znode under the * shared latch, which is a persistent, sequential target znode with value (latchSize - 1). latchSize is the minimum diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java index 63e2361b0d..e7b8ceec99 100644 --- a/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java +++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java @@ -18,7 +18,10 @@ */ package org.apache.samza.zk; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.function.BooleanSupplier; import org.I0Itec.zkclient.IZkDataListener; import org.I0Itec.zkclient.ZkClient; import org.I0Itec.zkclient.ZkConnection; @@ -35,10 +38,6 @@ import org.junit.BeforeClass; import org.junit.Test; -import java.util.HashMap; -import java.util.Map; -import java.util.function.BooleanSupplier; - public class TestZkUtils { private static EmbeddedZookeeper zkServer = null; private static final ZkKeyBuilder KEY_BUILDER = new ZkKeyBuilder("test"); @@ -87,6 +86,26 @@ public static void teardown() { zkServer.teardown(); } + + @Test + public void testInitZkPath() { + String zkConnect = "127.0.0.1:" + zkServer.getPort() + "/samza1"; + ZkUtils.initZkPath(zkConnect, zkClient); + + Assert.assertTrue(zkClient.exists("/samza1")); + + zkConnect = "127.0.0.1:" + zkServer.getPort() + "/samza1/samza2"; + ZkUtils.initZkPath(zkConnect, zkClient); + + Assert.assertTrue(zkClient.exists("/samza1/samza2")); + + + zkConnect = "127.0.0.1:" + zkServer.getPort(); // empty path. + ZkUtils.initZkPath(zkConnect, zkClient); + + Assert.assertTrue(zkClient.exists("/")); + } + @Test public void testRegisterProcessorId() { String assignedPath = zkUtils.registerProcessorAndGetId(new ProcessorData("host", "1")); From 0b08f325cc56758a734781243bc64680d7912d5a Mon Sep 17 00:00:00 2001 From: Boris Shkolnik Date: Fri, 19 May 2017 14:19:38 -0700 Subject: [PATCH 24/25] moved createClient and createZkPath methods into CoordinationUtilsFactory --- .../zk/ZkCoordinationServiceFactory.java | 51 ++++++++++++++++++- .../java/org/apache/samza/zk/ZkUtils.java | 49 ------------------ .../apache/samza/zk/TestZkLeaderElector.java | 2 +- .../apache/samza/zk/TestZkProcessorLatch.java | 3 +- .../java/org/apache/samza/zk/TestZkUtils.java | 6 +-- 5 files changed, 56 insertions(+), 55 deletions(-) diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java index 0fbf052b88..6930a30384 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java @@ -18,20 +18,69 @@ */ package org.apache.samza.zk; +import com.google.common.base.Strings; import org.I0Itec.zkclient.ZkClient; +import org.apache.samza.SamzaException; import org.apache.samza.config.Config; import org.apache.samza.config.ZkConfig; import org.apache.samza.coordinator.CoordinationServiceFactory; import org.apache.samza.coordinator.CoordinationUtils; +import org.apache.zookeeper.client.ConnectStringParser; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class ZkCoordinationServiceFactory implements CoordinationServiceFactory { + public final static Logger LOG = LoggerFactory.getLogger(ZkCoordinationServiceFactory.class); + // TODO - Why should this method be synchronized? synchronized public CoordinationUtils getCoordinationService(String groupId, String participantId, Config config) { ZkConfig zkConfig = new ZkConfig(config); - ZkClient zkClient = ZkUtils.createZkClient(zkConfig); + + ZkClient zkClient = createZkClient(zkConfig.getZkConnect(), + zkConfig.getZkSessionTimeoutMs(), zkConfig.getZkConnectionTimeoutMs()); + + // make sure the 'path' exists + createZkPath(zkConfig.getZkConnect(), zkClient); + ZkUtils zkUtils = new ZkUtils(new ZkKeyBuilder(groupId), zkClient, zkConfig.getZkConnectionTimeoutMs()); + return new ZkCoordinationUtils(participantId, zkConfig, zkUtils, new ScheduleAfterDebounceTime()); } + /** + * helper method to create zkClient + * @param connectString - zkConnect string + * @param sessionTimeoutMS - session timeout + * @param connectionTimeoutMs - connection timeout + * @return zkClient object + */ + public static ZkClient createZkClient(String connectString, int sessionTimeoutMS, int connectionTimeoutMs) { + try { + return new ZkClient(connectString, sessionTimeoutMS, connectionTimeoutMs); + } catch (Exception e) { + // ZkClient constructor may throw a variety of different exceptions, not all of them Zk based. + throw new SamzaException("zkClient failed to connect to ZK at :" + connectString, e); + } + } + + /** + * if ZkConnectString contains some path at the end, it needs to be created when connecting for the first time. + * @param zkConnect - connect string + * @param zkClient - zkClient object to talk to the ZK + */ + public static void createZkPath(String zkConnect, ZkClient zkClient) { + ConnectStringParser parser = new ConnectStringParser(zkConnect); + + String path = parser.getChrootPath(); + LOG.info("path =" + path); + if (!Strings.isNullOrEmpty(path)) { + // create this path in zk + LOG.info("first connect. creating path =" + path + " in ZK " + parser.getServerAddresses()); + if (!zkClient.exists(path)) { + zkClient.createPersistent(path, true); // will create parents if needed and will not throw exception if exists + } + } + } + } diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java index 5db913f5f1..c54790159a 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java @@ -19,7 +19,6 @@ package org.apache.samza.zk; -import com.google.common.base.Strings; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; @@ -31,10 +30,8 @@ import org.I0Itec.zkclient.ZkConnection; import org.I0Itec.zkclient.exception.ZkInterruptedException; import org.apache.samza.SamzaException; -import org.apache.samza.config.ZkConfig; import org.apache.samza.job.model.JobModel; import org.apache.samza.serializers.model.SamzaObjectMapper; -import org.apache.zookeeper.client.ConnectStringParser; import org.apache.zookeeper.data.Stat; import org.codehaus.jackson.map.ObjectMapper; import org.slf4j.Logger; @@ -86,52 +83,6 @@ public static ZkConnection createZkConnection(String zkConnectString, int sessio return new ZkConnection(zkConnectString, sessionTimeoutMs); } - /** - * helper method to create zkClient - * @param connectString - zkConnect string - * @param sessionTimeoutMS - session timeout - * @param connectionTimeoutMs - connection timeout - * @return zkClient object - */ - public static ZkClient createZkClient(String connectString, int sessionTimeoutMS, int connectionTimeoutMs) { - return new ZkClient(connectString, sessionTimeoutMS, connectionTimeoutMs); - } - - /** - * create an instance of ZkClient - * @param zkConfig Zookeeper config - * @return an instance of zkClient - */ - public static ZkClient createZkClient(ZkConfig zkConfig) { - try { - ZkClient zkClient = createZkClient(zkConfig.getZkConnect(), zkConfig.getZkSessionTimeoutMs(), zkConfig.getZkConnectionTimeoutMs()); - initZkPath(zkConfig.getZkConnect(), zkClient); - return zkClient; - } catch (Exception e) { - // ZkClient constructor may throw a variety of different exceptions, not all of them Zk based. - throw new SamzaException("zkClient failed to connect to ZK at :" + zkConfig.getZkConnect(), e); - } - } - - /** - * if ZkConnectString contains some path at the end, it needs to be created when connecting for the first time. - * @param zkConnect - connect string - * @param zkClient - zkClient object to talk to the ZK - */ - public static void initZkPath(String zkConnect, ZkClient zkClient) { - ConnectStringParser parser = new ConnectStringParser(zkConnect); - - String path = parser.getChrootPath(); - LOG.info("path =" + path); - if (!Strings.isNullOrEmpty(path)) { - // create this path in zk - LOG.info("first connect. creating path =" + path + " in ZK " + parser.getServerAddresses()); - if (!zkClient.exists(path)) { - zkClient.createPersistent(path, true); // will create parents if needed and will not throw exception if exists - } - } - } - ZkClient getZkClient() { return zkClient; } diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java index ab13c5179f..393d733793 100644 --- a/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java +++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java @@ -432,7 +432,7 @@ public void testAmILeader() { } private ZkUtils getZkUtilsWithNewClient() { - ZkClient zkClient = ZkUtils.createZkClient(testZkConnectionString, SESSION_TIMEOUT_MS, CONNECTION_TIMEOUT_MS); + ZkClient zkClient = ZkCoordinationServiceFactory.createZkClient(testZkConnectionString, SESSION_TIMEOUT_MS, CONNECTION_TIMEOUT_MS); return new ZkUtils( KEY_BUILDER, zkClient, diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkProcessorLatch.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkProcessorLatch.java index de80bf299a..9f089a08a0 100644 --- a/samza-core/src/test/java/org/apache/samza/zk/TestZkProcessorLatch.java +++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkProcessorLatch.java @@ -214,7 +214,8 @@ public void testLatchExpires() { } private ZkUtils getZkUtilsWithNewClient(String processorId) { - ZkClient zkClient = ZkUtils.createZkClient(testZkConnectionString, SESSION_TIMEOUT_MS, CONNECTION_TIMEOUT_MS); + ZkClient zkClient = ZkCoordinationServiceFactory + .createZkClient(testZkConnectionString, SESSION_TIMEOUT_MS, CONNECTION_TIMEOUT_MS); return new ZkUtils( KEY_BUILDER, zkClient, diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java index e7b8ceec99..173b8a6037 100644 --- a/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java +++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java @@ -90,18 +90,18 @@ public static void teardown() { @Test public void testInitZkPath() { String zkConnect = "127.0.0.1:" + zkServer.getPort() + "/samza1"; - ZkUtils.initZkPath(zkConnect, zkClient); + ZkCoordinationServiceFactory.createZkPath(zkConnect, zkClient); Assert.assertTrue(zkClient.exists("/samza1")); zkConnect = "127.0.0.1:" + zkServer.getPort() + "/samza1/samza2"; - ZkUtils.initZkPath(zkConnect, zkClient); + ZkCoordinationServiceFactory.createZkPath(zkConnect, zkClient); Assert.assertTrue(zkClient.exists("/samza1/samza2")); zkConnect = "127.0.0.1:" + zkServer.getPort(); // empty path. - ZkUtils.initZkPath(zkConnect, zkClient); + ZkCoordinationServiceFactory.createZkPath(zkConnect, zkClient); Assert.assertTrue(zkClient.exists("/")); } From 7f704edba4b0ba2e096f27882a85d264b91f6ae8 Mon Sep 17 00:00:00 2001 From: Boris Shkolnik Date: Mon, 22 May 2017 09:54:51 -0700 Subject: [PATCH 25/25] made LOG private --- .../java/org/apache/samza/zk/ZkCoordinationServiceFactory.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java index 6930a30384..661650dfaf 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java @@ -31,7 +31,7 @@ public class ZkCoordinationServiceFactory implements CoordinationServiceFactory { - public final static Logger LOG = LoggerFactory.getLogger(ZkCoordinationServiceFactory.class); + private final static Logger LOG = LoggerFactory.getLogger(ZkCoordinationServiceFactory.class); // TODO - Why should this method be synchronized? synchronized public CoordinationUtils getCoordinationService(String groupId, String participantId, Config config) {