From 744f8ebb66b2a7288942be139cd7a7e6d1170c80 Mon Sep 17 00:00:00 2001 From: Robert Metzger Date: Tue, 11 Oct 2016 15:48:32 +0200 Subject: [PATCH 1/2] [hotfix][kafka] Undo DataGenerators changes (use inline kafka producer again --- .../kafka/KafkaTestEnvironmentImpl.java | 3 - .../connectors/kafka/Kafka09ITCase.java | 9 - .../kafka/KafkaConsumerTestBase.java | 242 +----------------- .../kafka/testutils/DataGenerators.java | 165 +++++------- 4 files changed, 72 insertions(+), 347 deletions(-) diff --git a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java index af6d254a4f161..78fc1c65103cf 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java @@ -115,9 +115,6 @@ public DataStreamSink produceIntoKafka(DataStream stream, String topic FlinkKafkaProducer010 prod = new FlinkKafkaProducer010<>(topic, serSchema, props, partitioner); prod.setFlushOnCheckpoint(true); return stream.addSink(prod); - /* FlinkKafkaProducer010.FlinkKafkaProducer010Configuration sink = FlinkKafkaProducer010.writeToKafkaWithTimestamps(stream, topic, serSchema, props, partitioner); - sink.setFlushOnCheckpoint(true); - return sink; */ } @Override diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java index fd167a081d054..b9ec18a2ebd16 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java @@ -35,15 +35,6 @@ public void testConcurrentProducerConsumerTopology() throws Exception { runSimpleConcurrentProducerConsumerTopology(); } -// @Test(timeout = 60000) -// public void testPunctuatedExplicitWMConsumer() throws Exception { -// runExplicitPunctuatedWMgeneratingConsumerTest(false); -// } - -// @Test(timeout = 60000) -// public void testPunctuatedExplicitWMConsumerWithEmptyTopic() throws Exception { -// runExplicitPunctuatedWMgeneratingConsumerTest(true); -// } @Test(timeout = 60000) public void testKeyValueSupport() throws Exception { diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java index 3c967ba82f594..0810a3e99e0df 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java @@ -18,8 +18,6 @@ package org.apache.flink.streaming.connectors.kafka; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ObjectNode; import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; @@ -31,7 +29,6 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.functions.FlatMapFunction; -import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.common.restartstrategy.RestartStrategies; @@ -39,13 +36,10 @@ import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.java.table.StreamTableEnvironment; import org.apache.flink.api.java.tuple.Tuple1; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.typeutils.TypeInfoParser; -import org.apache.flink.api.table.Row; -import org.apache.flink.api.table.Table; import org.apache.flink.client.program.ProgramInvocationException; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.memory.DataInputView; @@ -68,7 +62,6 @@ import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; -import org.apache.flink.streaming.api.functions.source.StatefulSequenceSource; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.watermark.Watermark; @@ -92,7 +85,6 @@ import org.apache.flink.testutils.junit.RetryOnException; import org.apache.flink.testutils.junit.RetryRule; import org.apache.flink.util.Collector; -import org.apache.flink.util.StringUtils; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.errors.TimeoutException; import org.junit.Assert; @@ -116,7 +108,6 @@ import java.util.Random; import java.util.Set; import java.util.UUID; -import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicReference; import static org.apache.flink.test.util.TestUtils.tryExecute; @@ -517,7 +508,7 @@ public void runCancelingOnFullInputTest() throws Exception { // launch a producer thread DataGenerators.InfiniteStringsGenerator generator = - new DataGenerators.InfiniteStringsGenerator(kafkaServer, topic, flinkPort); + new DataGenerators.InfiniteStringsGenerator(kafkaServer, topic); generator.start(); // launch a consumer asynchronously @@ -571,7 +562,6 @@ public void run() { assertTrue(failueCause.getMessage().contains("Job was cancelled")); if (generator.isAlive()) { - JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout), "String generator"); generator.shutdown(); generator.join(); } @@ -1723,234 +1713,4 @@ public void restoreState(Integer state) { this.numElementsTotal = state; } } - - ///////////// Testing the Kafka consumer with embeded watermark generation functionality /////////////// - -// @RetryOnException(times=0, exception=kafka.common.NotLeaderForPartitionException.class) -// public void runExplicitPunctuatedWMgeneratingConsumerTest(boolean emptyPartition) throws Exception { -// -// final String topic1 = "wmExtractorTopic1_" + UUID.randomUUID().toString(); -// final String topic2 = "wmExtractorTopic2_" + UUID.randomUUID().toString(); -// -// final Map topics = new HashMap<>(); -// topics.put(topic1, false); -// topics.put(topic2, emptyPartition); -// -// final int noOfTopcis = topics.size(); -// final int partitionsPerTopic = 1; -// final int elementsPerPartition = 100 + 1; -// -// final int totalElements = emptyPartition ? -// partitionsPerTopic * elementsPerPartition : -// noOfTopcis * partitionsPerTopic * elementsPerPartition; -// -// createTestTopic(topic1, partitionsPerTopic, 1); -// createTestTopic(topic2, partitionsPerTopic, 1); -// -// final StreamExecutionEnvironment env = -// StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); -// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); -// env.setParallelism(partitionsPerTopic); -// env.setRestartStrategy(RestartStrategies.noRestart()); // fail immediately -// env.getConfig().disableSysoutLogging(); -// -// TypeInformation> longIntType = TypeInfoParser.parse("Tuple2"); -// -// Properties producerProperties = FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings); -// producerProperties.setProperty("retries", "0"); -// -// putDataInTopics(env, producerProperties, elementsPerPartition, topics, longIntType); -// -// List topicTitles = new ArrayList<>(topics.keySet()); -// runPunctuatedComsumer(env, topicTitles, totalElements, longIntType); -// -// executeAndCatchException(env, "runComsumerWithPunctuatedExplicitWMTest"); -// -// for(String topic: topicTitles) { -// deleteTestTopic(topic); -// } -// } -// -// private void executeAndCatchException(StreamExecutionEnvironment env, String execName) throws Exception { -// try { -// tryExecutePropagateExceptions(env, execName); -// } -// catch (ProgramInvocationException | JobExecutionException e) { -// // look for NotLeaderForPartitionException -// Throwable cause = e.getCause(); -// -// // search for nested SuccessExceptions -// int depth = 0; -// while (cause != null && depth++ < 20) { -// if (cause instanceof kafka.common.NotLeaderForPartitionException) { -// throw (Exception) cause; -// } -// cause = cause.getCause(); -// } -// throw e; -// } -// } -// -// private void putDataInTopics(StreamExecutionEnvironment env, -// Properties producerProperties, -// final int elementsPerPartition, -// Map topics, -// TypeInformation> outputTypeInfo) { -// if(topics.size() != 2) { -// throw new RuntimeException("This method accepts two topics as arguments."); -// } -// -// TypeInformationSerializationSchema> sinkSchema = -// new TypeInformationSerializationSchema<>(outputTypeInfo, env.getConfig()); -// -// DataStream> stream = env -// .addSource(new RichParallelSourceFunction>() { -// private boolean running = true; -// -// @Override -// public void run(SourceContext> ctx) throws InterruptedException { -// int topic = 0; -// int currentTs = 1; -// -// while (running && currentTs < elementsPerPartition) { -// long timestamp = (currentTs % 10 == 0) ? -1L : currentTs; -// ctx.collect(new Tuple2(timestamp, topic)); -// currentTs++; -// } -// -// Tuple2 toWrite2 = new Tuple2(-1L, topic); -// ctx.collect(toWrite2); -// } -// -// @Override -// public void cancel() { -// running = false; -// } -// }).setParallelism(1); -// -// List> topicsL = new ArrayList<>(topics.entrySet()); -// -// stream = stream.map(new MapFunction, Tuple2>() { -// -// @Override -// public Tuple2 map(Tuple2 value) throws Exception { -// return value; -// } -// }).setParallelism(1); -// kafkaServer.produceIntoKafka(stream, topicsL.get(0).getKey(), -// new KeyedSerializationSchemaWrapper<>(sinkSchema), producerProperties, null).setParallelism(1); -// -// if(!topicsL.get(1).getValue()) { -// stream.map(new MapFunction, Tuple2>() { -// -// @Override -// public Tuple2 map(Tuple2 value) throws Exception { -// long timestamp = (value.f0 == -1) ? -1L : 1000 + value.f0; -// return new Tuple2<>(timestamp, 1); -// } -// }).setParallelism(1).addSink(kafkaServer.produceIntoKafka(topicsL.get(1).getKey(), -// new KeyedSerializationSchemaWrapper<>(sinkSchema), producerProperties, null)).setParallelism(1); -// } -// } - - private DataStreamSink> runPunctuatedComsumer(StreamExecutionEnvironment env, - List topics, - final int totalElementsToExpect, - TypeInformation> inputTypeInfo) { - - TypeInformationSerializationSchema> sourceSchema = - new TypeInformationSerializationSchema<>(inputTypeInfo, env.getConfig()); - - Properties props = new Properties(); - props.putAll(standardProps); - props.putAll(secureProps); - FlinkKafkaConsumerBase> source = kafkaServer - .getConsumer(topics, sourceSchema, props) - .assignTimestampsAndWatermarks(new TestPunctuatedTSExtractor()); - - DataStreamSource> consuming = env.setParallelism(1).addSource(source); - - return consuming - .transform("testingWatermarkOperator", inputTypeInfo, new WMTestingOperator()) - .addSink(new RichSinkFunction>() { - - private int elementCount = 0; - - @Override - public void invoke(Tuple2 value) throws Exception { - elementCount++; - if (elementCount == totalElementsToExpect) { - throw new SuccessException(); - } - } - - @Override - public void close() throws Exception { - super.close(); - } - }); - } - - /** An extractor that emits a Watermark whenever the timestamp in the record is equal to {@code -1}. */ - private static class TestPunctuatedTSExtractor implements AssignerWithPunctuatedWatermarks> { - - @Override - public Watermark checkAndGetNextWatermark(Tuple2 lastElement, long extractedTimestamp) { - return (lastElement.f0 == -1) ? new Watermark(extractedTimestamp) : null; - } - - @Override - public long extractTimestamp(Tuple2 element, long previousElementTimestamp) { - return element.f0; - } - } - - private static class WMTestingOperator extends AbstractStreamOperator> implements OneInputStreamOperator, Tuple2> { - - private long lastReceivedWatermark = Long.MIN_VALUE; - - private Map isEligible = new HashMap<>(); - private Map perPartitionMaxTs = new HashMap<>(); - - WMTestingOperator() { - isEligible = new HashMap<>(); - perPartitionMaxTs = new HashMap<>(); - } - - @Override - public void processElement(StreamRecord> element) throws Exception { - int partition = element.getValue().f1; - Long maxTs = perPartitionMaxTs.get(partition); - if(maxTs == null || maxTs < element.getValue().f0) { - perPartitionMaxTs.put(partition, element.getValue().f0); - isEligible.put(partition, element.getValue().f0 > lastReceivedWatermark); - } - output.collect(element); - } - - @Override - public void processWatermark(Watermark mark) throws Exception { - int partition = -1; - long minTS = Long.MAX_VALUE; - for (Integer part : perPartitionMaxTs.keySet()) { - Long ts = perPartitionMaxTs.get(part); - if (ts < minTS && isEligible.get(part)) { - partition = part; - minTS = ts; - lastReceivedWatermark = ts; - } - } - isEligible.put(partition, false); - - assertEquals(minTS, mark.getTimestamp()); - output.emitWatermark(mark); - } - - @Override - public void close() throws Exception { - super.close(); - perPartitionMaxTs.clear(); - isEligible.clear(); - } - } } diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java index 3f035fdc70f8e..ba75212e7fdf7 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java @@ -18,74 +18,35 @@ package org.apache.flink.streaming.connectors.kafka.testutils; +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.functions.RichFunction; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.TypeInfoParser; +import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; -import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.operators.ChainingStrategy; +import org.apache.flink.streaming.api.operators.StreamSink; +import org.apache.flink.streaming.api.transformations.StreamTransformation; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase; import org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironment; import org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner; import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper; import org.apache.flink.streaming.util.serialization.SimpleStringSchema; import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema; -import java.io.Serializable; +import java.util.Collection; import java.util.Properties; import java.util.Random; @SuppressWarnings("serial") public class DataGenerators { - - public static void generateLongStringTupleSequence(StreamExecutionEnvironment env, - KafkaTestEnvironment testServer, String topic, - int numPartitions, - final int from, final int to) throws Exception { - TypeInformation> resultType = TypeInfoParser.parse("Tuple2"); - - env.setParallelism(numPartitions); - env.getConfig().disableSysoutLogging(); - env.setRestartStrategy(RestartStrategies.noRestart()); - - DataStream> stream =env.addSource( - new RichParallelSourceFunction>() { - - private volatile boolean running = true; - - @Override - public void run(SourceContext> ctx) throws Exception { - int cnt = from; - int partition = getRuntimeContext().getIndexOfThisSubtask(); - - while (running && cnt <= to) { - ctx.collect(new Tuple2<>(partition, cnt)); - cnt++; - } - } - - @Override - public void cancel() { - running = false; - } - }); - - testServer.produceIntoKafka(stream, topic, - new KeyedSerializationSchemaWrapper<>(new TypeInformationSerializationSchema<>(resultType, env.getConfig())), - FlinkKafkaProducerBase.getPropertiesFromBrokerList(testServer.getBrokerConnectionString()), - new Tuple2Partitioner(numPartitions) - ); - - env.execute("Data generator (Int, Int) stream to topic " + topic); - } - - // ------------------------------------------------------------------------ - public static void generateRandomizedIntegerSequence(StreamExecutionEnvironment env, KafkaTestEnvironment testServer, String topic, final int numPartitions, @@ -105,9 +66,9 @@ public void run(SourceContext ctx) { // create a sequence int[] elements = new int[numElements]; for (int i = 0, val = getRuntimeContext().getIndexOfThisSubtask(); - i < numElements; - i++, val += getRuntimeContext().getNumberOfParallelSubtasks()) { - + i < numElements; + i++, val += getRuntimeContext().getNumberOfParallelSubtasks()) { + elements[i] = val; } @@ -116,7 +77,7 @@ public void run(SourceContext ctx) { Random rnd = new Random(); for (int i = 0; i < elements.length; i++) { int otherPos = rnd.nextInt(elements.length); - + int tmp = elements[i]; elements[i] = elements[otherPos]; elements[otherPos] = tmp; @@ -142,7 +103,7 @@ public void cancel() { if(secureProps != null) { props.putAll(testServer.getSecureProperties()); } - + stream = stream.rebalance(); testServer.produceIntoKafka(stream, topic, new KeyedSerializationSchemaWrapper<>(new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, env.getConfig())), @@ -156,63 +117,55 @@ public int partition(Integer next, byte[] serializedKey, byte[] serializedValue, env.execute("Scrambles int sequence generator"); } - + // ------------------------------------------------------------------------ - - public static class InfiniteStringsGenerator extends Thread implements Serializable { - private transient KafkaTestEnvironment server; - - private final String topic; + public static class InfiniteStringsGenerator extends Thread { - private final int flinkPort; + private final KafkaTestEnvironment server; + + private final String topic; private volatile Throwable error; - + private volatile boolean running = true; - - public InfiniteStringsGenerator(KafkaTestEnvironment server, String topic, int flinkPort) { + + public InfiniteStringsGenerator(KafkaTestEnvironment server, String topic) { this.server = server; this.topic = topic; - this.flinkPort = flinkPort; } @Override public void run() { // we manually feed data into the Kafka sink - FlinkKafkaProducerBase producer = null; + RichFunction producer = null; try { - final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); - DataStream stream = env.addSource(new SourceFunction() { - @Override - public void run(SourceContext ctx) throws Exception { - final StringBuilder bld = new StringBuilder(); - final Random rnd = new Random(); - while (running) { - bld.setLength(0); - int len = rnd.nextInt(100) + 1; - for (int i = 0; i < len; i++) { - bld.append((char) (rnd.nextInt(20) + 'a')); - } + Properties producerProperties = FlinkKafkaProducerBase.getPropertiesFromBrokerList(server.getBrokerConnectionString()); + producerProperties.setProperty("retries", "3"); + StreamTransformation mockTransform = new MockStreamTransformation(); + DataStream stream = new DataStream<>(new DummyStreamExecutionEnvironment(), mockTransform); + DataStreamSink sink = server.produceIntoKafka(stream, topic, new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), + producerProperties, new FixedPartitioner()); + StreamSink producerOperator = sink.getTransformation().getOperator(); + producer = (RichFunction) producerOperator.getUserFunction(); + producer.setRuntimeContext(new MockRuntimeContext(1,0)); + producer.open(new Configuration()); - String next = bld.toString(); - ctx.collect(next); - } - } + final StringBuilder bld = new StringBuilder(); + final Random rnd = new Random(); - @Override - public void cancel() { - running = false; + while (running) { + bld.setLength(0); + + int len = rnd.nextInt(100) + 1; + for (int i = 0; i < len; i++) { + bld.append((char) (rnd.nextInt(20) + 'a') ); } - }); - Properties producerProperties = FlinkKafkaProducerBase.getPropertiesFromBrokerList(server.getBrokerConnectionString()); - producerProperties.setProperty("retries", "3"); - server.produceIntoKafka(stream, topic, - new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), - producerProperties, new FixedPartitioner()); - env.execute("String generator"); + String next = bld.toString(); + producerOperator.processElement(new StreamRecord<>(next)); + } } catch (Throwable t) { this.error = t; @@ -228,14 +181,38 @@ public void cancel() { } } } - + public void shutdown() { this.running = false; this.interrupt(); } - + public Throwable getError() { return this.error; } + + private static class MockStreamTransformation extends StreamTransformation { + public MockStreamTransformation() { + super("MockTransform", TypeInfoParser.parse("String"), 1); + } + + @Override + public void setChainingStrategy(ChainingStrategy strategy) { + + } + + @Override + public Collection> getTransitivePredecessors() { + return null; + } + } + + public static class DummyStreamExecutionEnvironment extends StreamExecutionEnvironment { + + @Override + public JobExecutionResult execute(String jobName) throws Exception { + return null; + } + } } -} +} \ No newline at end of file From ce26248ad0a4672eeb556863a061a478987694e9 Mon Sep 17 00:00:00 2001 From: Robert Metzger Date: Wed, 12 Oct 2016 14:03:01 +0200 Subject: [PATCH 2/2] [hotfix][kafka] Backport Kafka09FetcherTest for Kafka010 --- .../connectors/kafka/Kafka010FetcherTest.java | 300 ++++++++++++++++++ 1 file changed, 300 insertions(+) create mode 100644 flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java diff --git a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java new file mode 100644 index 0000000000000..8f0b17054f543 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.core.testutils.MultiShotLatch; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper; +import org.apache.flink.streaming.util.serialization.SimpleStringSchema; + +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.consumer.OffsetCommitCallback; +import org.apache.kafka.common.TopicPartition; + +import org.junit.Test; +import org.junit.runner.RunWith; + +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Properties; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.anyLong; +import static org.powermock.api.mockito.PowerMockito.doAnswer; +import static org.powermock.api.mockito.PowerMockito.mock; +import static org.powermock.api.mockito.PowerMockito.when; +import static org.powermock.api.mockito.PowerMockito.whenNew; + +/** + * Unit tests for the {@link Kafka010Fetcher}. + */ +@RunWith(PowerMockRunner.class) +@PrepareForTest(Kafka010Fetcher.class) +public class Kafka010FetcherTest { + + @Test + public void testCommitDoesNotBlock() throws Exception { + + // test data + final KafkaTopicPartition testPartition = new KafkaTopicPartition("test", 42); + final Map testCommitData = new HashMap<>(); + testCommitData.put(testPartition, 11L); + + // to synchronize when the consumer is in its blocking method + final OneShotLatch sync = new OneShotLatch(); + + // ----- the mock consumer with blocking poll calls ---- + final MultiShotLatch blockerLatch = new MultiShotLatch(); + + KafkaConsumer mockConsumer = mock(KafkaConsumer.class); + when(mockConsumer.poll(anyLong())).thenAnswer(new Answer>() { + + @Override + public ConsumerRecords answer(InvocationOnMock invocation) throws InterruptedException { + sync.trigger(); + blockerLatch.await(); + return ConsumerRecords.empty(); + } + }); + + doAnswer(new Answer() { + @Override + public Void answer(InvocationOnMock invocation) { + blockerLatch.trigger(); + return null; + } + }).when(mockConsumer).wakeup(); + + // make sure the fetcher creates the mock consumer + whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer); + + // ----- create the test fetcher ----- + + @SuppressWarnings("unchecked") + SourceContext sourceContext = mock(SourceContext.class); + List topics = Collections.singletonList(new KafkaTopicPartition("test", 42)); + KeyedDeserializationSchema schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema()); + StreamingRuntimeContext context = mock(StreamingRuntimeContext.class); + + final Kafka010Fetcher fetcher = new Kafka010Fetcher<>( + sourceContext, topics, null, null, context, schema, new Properties(), 0L, false); + + // ----- run the fetcher ----- + + final AtomicReference error = new AtomicReference<>(); + final Thread fetcherRunner = new Thread("fetcher runner") { + + @Override + public void run() { + try { + fetcher.runFetchLoop(); + } catch (Throwable t) { + error.set(t); + } + } + }; + fetcherRunner.start(); + + // wait until the fetcher has reached the method of interest + sync.await(); + + // ----- trigger the offset commit ----- + + final AtomicReference commitError = new AtomicReference<>(); + final Thread committer = new Thread("committer runner") { + @Override + public void run() { + try { + fetcher.commitSpecificOffsetsToKafka(testCommitData); + } catch (Throwable t) { + commitError.set(t); + } + } + }; + committer.start(); + + // ----- ensure that the committer finishes in time ----- + committer.join(30000); + assertFalse("The committer did not finish in time", committer.isAlive()); + + // ----- test done, wait till the fetcher is done for a clean shutdown ----- + fetcher.cancel(); + fetcherRunner.join(); + + // check that there were no errors in the fetcher + final Throwable caughtError = error.get(); + if (caughtError != null) { + throw new Exception("Exception in the fetcher", caughtError); + } + } + + @Test + public void ensureOffsetsGetCommitted() throws Exception { + + // test data + final KafkaTopicPartition testPartition1 = new KafkaTopicPartition("test", 42); + final KafkaTopicPartition testPartition2 = new KafkaTopicPartition("another", 99); + + final Map testCommitData1 = new HashMap<>(); + testCommitData1.put(testPartition1, 11L); + testCommitData1.put(testPartition2, 18L); + + final Map testCommitData2 = new HashMap<>(); + testCommitData2.put(testPartition1, 19L); + testCommitData2.put(testPartition2, 28L); + + final BlockingQueue> commitStore = new LinkedBlockingQueue<>(); + + + // ----- the mock consumer with poll(), wakeup(), and commit(A)sync calls ---- + + final MultiShotLatch blockerLatch = new MultiShotLatch(); + + KafkaConsumer mockConsumer = mock(KafkaConsumer.class); + + when(mockConsumer.poll(anyLong())).thenAnswer(new Answer>() { + @Override + public ConsumerRecords answer(InvocationOnMock invocation) throws InterruptedException { + blockerLatch.await(); + return ConsumerRecords.empty(); + } + }); + + doAnswer(new Answer() { + @Override + public Void answer(InvocationOnMock invocation) { + blockerLatch.trigger(); + return null; + } + }).when(mockConsumer).wakeup(); + + doAnswer(new Answer() { + @Override + public Void answer(InvocationOnMock invocation) { + @SuppressWarnings("unchecked") + Map offsets = + (Map) invocation.getArguments()[0]; + + OffsetCommitCallback callback = (OffsetCommitCallback) invocation.getArguments()[1]; + + commitStore.add(offsets); + callback.onComplete(offsets, null); + + return null; + } + }).when(mockConsumer).commitAsync( + Mockito.>any(), any(OffsetCommitCallback.class)); + + // make sure the fetcher creates the mock consumer + whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer); + + // ----- create the test fetcher ----- + + @SuppressWarnings("unchecked") + SourceContext sourceContext = mock(SourceContext.class); + List topics = Collections.singletonList(new KafkaTopicPartition("test", 42)); + KeyedDeserializationSchema schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema()); + StreamingRuntimeContext context = mock(StreamingRuntimeContext.class); + + final Kafka010Fetcher fetcher = new Kafka010Fetcher<>( + sourceContext, topics, null, null, context, schema, new Properties(), 0L, false); + + // ----- run the fetcher ----- + + final AtomicReference error = new AtomicReference<>(); + final Thread fetcherRunner = new Thread("fetcher runner") { + + @Override + public void run() { + try { + fetcher.runFetchLoop(); + } catch (Throwable t) { + error.set(t); + } + } + }; + fetcherRunner.start(); + + // ----- trigger the first offset commit ----- + + fetcher.commitSpecificOffsetsToKafka(testCommitData1); + Map result1 = commitStore.take(); + + for (Entry entry : result1.entrySet()) { + TopicPartition partition = entry.getKey(); + if (partition.topic().equals("test")) { + assertEquals(42, partition.partition()); + assertEquals(12L, entry.getValue().offset()); + } + else if (partition.topic().equals("another")) { + assertEquals(99, partition.partition()); + assertEquals(18L, entry.getValue().offset()); + } + } + + // ----- trigger the second offset commit ----- + + fetcher.commitSpecificOffsetsToKafka(testCommitData2); + Map result2 = commitStore.take(); + + for (Entry entry : result2.entrySet()) { + TopicPartition partition = entry.getKey(); + if (partition.topic().equals("test")) { + assertEquals(42, partition.partition()); + assertEquals(20L, entry.getValue().offset()); + } + else if (partition.topic().equals("another")) { + assertEquals(99, partition.partition()); + assertEquals(28L, entry.getValue().offset()); + } + } + + // ----- test done, wait till the fetcher is done for a clean shutdown ----- + fetcher.cancel(); + fetcherRunner.join(); + + // check that there were no errors in the fetcher + final Throwable caughtError = error.get(); + if (caughtError != null) { + throw new Exception("Exception in the fetcher", caughtError); + } + } +} \ No newline at end of file