From c8f152037f2f6c31be2b24613ddad8dd1881e631 Mon Sep 17 00:00:00 2001 From: "Tzu-Li (Gordon) Tai" Date: Fri, 21 Oct 2016 14:23:58 +0800 Subject: [PATCH] [FLINK-4155] [kafka] Move partition list fetching to open() for Kafka producers The fetched partition list from Kafka in open() is sorted by partition id so that subtasks will have the same list across failures. To compensate the original use of the KafkaProducer instantiation in the constructor to eagerly ensure that required producer configs are provided, we check that at least the bootstrap servers are set. This change also includes refactoring of AtLeastOnceProducerTest for a more complete suite of tests on FlinkKafkaProducerBase. --- .../kafka/Kafka08JsonTableSinkTest.java | 4 +- .../connectors/kafka/KafkaProducerTest.java | 11 +- .../kafka/Kafka09JsonTableSinkTest.java | 4 +- .../connectors/kafka/KafkaProducerTest.java | 9 +- .../kafka/FlinkKafkaProducerBase.java | 42 +++--- ...t.java => FlinkKafkaProducerBaseTest.java} | 123 +++++++++++++----- .../kafka/KafkaTableSinkTestBase.java | 16 ++- .../testutils/FakeStandardProducerConfig.java | 34 +++++ 8 files changed, 176 insertions(+), 67 deletions(-) rename flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/{AtLeastOnceProducerTest.java => FlinkKafkaProducerBaseTest.java} (58%) create mode 100644 flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FakeStandardProducerConfig.java diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java index 446e1d78ef6e9..6d0b1406ff20c 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java @@ -41,8 +41,8 @@ protected FlinkKafkaProducerBase createKafkaProducer(String topic, Properti @Override @SuppressWarnings("unchecked") - protected Class> getSerializationSchema() { - return (Class) JsonRowSerializationSchema.class; + protected SerializationSchema getSerializationSchema() { + return new JsonRowSerializationSchema(FIELD_NAMES); } } diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java index 7efa94ea5ccb2..91fc286deb7ee 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java @@ -21,6 +21,7 @@ import org.apache.flink.streaming.api.operators.StreamSink; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.flink.streaming.connectors.kafka.testutils.FakeStandardProducerConfig; import org.apache.flink.streaming.util.serialization.SimpleStringSchema; import org.apache.flink.util.TestLogger; @@ -37,8 +38,7 @@ import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; -import java.util.Arrays; -import java.util.Properties; +import java.util.Collections; import java.util.concurrent.Future; @@ -60,7 +60,8 @@ public void testPropagateExceptions() { // partition setup when(kafkaProducerMock.partitionsFor(anyString())).thenReturn( - Arrays.asList(new PartitionInfo("mock_topic", 42, null, null, null))); + // returning a unmodifiable list to mimic KafkaProducer#partitionsFor() behaviour + Collections.singletonList(new PartitionInfo("mock_topic", 42, null, null, null))); // failure when trying to send an element when(kafkaProducerMock.send(any(ProducerRecord.class), any(Callback.class))) @@ -79,7 +80,7 @@ public Future answer(InvocationOnMock invocation) throws Throwab // (1) producer that propagates errors FlinkKafkaProducer08 producerPropagating = new FlinkKafkaProducer08<>( - "mock_topic", new SimpleStringSchema(), new Properties(), null); + "mock_topic", new SimpleStringSchema(), FakeStandardProducerConfig.get(), null); OneInputStreamOperatorTestHarness testHarness = new OneInputStreamOperatorTestHarness<>(new StreamSink<>(producerPropagating)); @@ -102,7 +103,7 @@ public Future answer(InvocationOnMock invocation) throws Throwab // (2) producer that only logs errors FlinkKafkaProducer08 producerLogging = new FlinkKafkaProducer08<>( - "mock_topic", new SimpleStringSchema(), new Properties(), null); + "mock_topic", new SimpleStringSchema(), FakeStandardProducerConfig.get(), null); producerLogging.setLogFailuresOnly(true); testHarness = new OneInputStreamOperatorTestHarness<>(new StreamSink(producerLogging)); diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java index 068640d1c8921..45f70acf8c6fb 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java @@ -41,8 +41,8 @@ protected FlinkKafkaProducerBase createKafkaProducer(String topic, Properti @Override @SuppressWarnings("unchecked") - protected Class> getSerializationSchema() { - return (Class) JsonRowSerializationSchema.class; + protected SerializationSchema getSerializationSchema() { + return new JsonRowSerializationSchema(FIELD_NAMES); } } diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java index 31691d5b09bdc..18b2aec0e5d2e 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java @@ -21,6 +21,7 @@ import org.apache.flink.streaming.api.operators.StreamSink; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.flink.streaming.connectors.kafka.testutils.FakeStandardProducerConfig; import org.apache.flink.streaming.util.serialization.SimpleStringSchema; import org.apache.flink.util.TestLogger; @@ -40,7 +41,6 @@ import org.powermock.modules.junit4.PowerMockRunner; import java.util.Collections; -import java.util.Properties; import java.util.concurrent.Future; import static org.junit.Assert.assertNotNull; @@ -65,7 +65,8 @@ public void testPropagateExceptions() { // partition setup when(kafkaProducerMock.partitionsFor(anyString())).thenReturn( - Collections.singletonList(new PartitionInfo("mock_topic", 42, null, null, null))); + // returning a unmodifiable list to mimic KafkaProducer#partitionsFor() behaviour + Collections.singletonList(new PartitionInfo("mock_topic", 42, null, null, null))); // failure when trying to send an element when(kafkaProducerMock.send(any(ProducerRecord.class), any(Callback.class))) @@ -84,7 +85,7 @@ public Future answer(InvocationOnMock invocation) throws Throwab // (1) producer that propagates errors FlinkKafkaProducer09 producerPropagating = new FlinkKafkaProducer09<>( - "mock_topic", new SimpleStringSchema(), new Properties(), null); + "mock_topic", new SimpleStringSchema(), FakeStandardProducerConfig.get(), null); OneInputStreamOperatorTestHarness testHarness = new OneInputStreamOperatorTestHarness<>(new StreamSink(producerPropagating)); @@ -105,7 +106,7 @@ public Future answer(InvocationOnMock invocation) throws Throwab // (2) producer that only logs errors FlinkKafkaProducer09 producerLogging = new FlinkKafkaProducer09<>( - "mock_topic", new SimpleStringSchema(), new Properties(), null); + "mock_topic", new SimpleStringSchema(), FakeStandardProducerConfig.get(), null); producerLogging.setLogFailuresOnly(true); testHarness = new OneInputStreamOperatorTestHarness<>(new StreamSink(producerLogging)); diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java index bede064f9d3d2..33289f88cb3d3 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java @@ -34,7 +34,6 @@ import org.apache.flink.util.NetUtils; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; @@ -46,8 +45,11 @@ import org.slf4j.LoggerFactory; import java.util.List; +import java.util.ArrayList; import java.util.Map; import java.util.Properties; +import java.util.Collections; +import java.util.Comparator; import static java.util.Objects.requireNonNull; @@ -76,7 +78,7 @@ public abstract class FlinkKafkaProducerBase extends RichSinkFunction im * Array with the partition ids of the given defaultTopicId * The size of this array is the number of partitions */ - protected final int[] partitions; + protected int[] partitions; /** * User defined properties for the Producer @@ -148,30 +150,22 @@ public FlinkKafkaProducerBase(String defaultTopicId, KeyedSerializationSchema getPartitionsProd = getKafkaProducer(this.producerConfig)) { - List partitionsList = getPartitionsProd.partitionsFor(defaultTopicId); - - this.partitions = new int[partitionsList.size()]; - for (int i = 0; i < partitions.length; i++) { - partitions[i] = partitionsList.get(i).partition(); - } - getPartitionsProd.close(); + // eagerly ensure that bootstrap servers are set. + if (!this.producerConfig.containsKey(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)) { + throw new IllegalArgumentException(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG + " must be supplied in the producer config properties."); } this.partitioner = customPartitioner; @@ -218,6 +212,22 @@ protected KafkaProducer getKafkaProducer(Properties props) { public void open(Configuration configuration) { producer = getKafkaProducer(this.producerConfig); + // the fetched list is immutable, so we're creating a mutable copy in order to sort it + List partitionsList = new ArrayList<>(producer.partitionsFor(defaultTopicId)); + + // sort the partitions by partition id to make sure the fetched partition list is the same across subtasks + Collections.sort(partitionsList, new Comparator() { + @Override + public int compare(PartitionInfo o1, PartitionInfo o2) { + return Integer.compare(o1.partition(), o2.partition()); + } + }); + + partitions = new int[partitionsList.size()]; + for (int i = 0; i < partitions.length; i++) { + partitions[i] = partitionsList.get(i).partition(); + } + RuntimeContext ctx = getRuntimeContext(); if (partitioner != null) { partitioner.open(ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks(), partitions); diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AtLeastOnceProducerTest.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java similarity index 58% rename from flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AtLeastOnceProducerTest.java rename to flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java index 6d92f9b5ecdf4..2e06160168822 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AtLeastOnceProducerTest.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java @@ -15,23 +15,23 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.flink.streaming.connectors.kafka; +import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.java.tuple.Tuple1; import org.apache.flink.streaming.api.operators.StreamSink; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.state.FunctionSnapshotContext; -import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl; +import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; +import org.apache.flink.streaming.connectors.kafka.testutils.FakeStandardProducerConfig; import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema; -import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper; -import org.apache.flink.streaming.util.serialization.SimpleStringSchema; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.Metric; import org.apache.kafka.common.MetricName; import org.apache.kafka.common.PartitionInfo; @@ -48,33 +48,83 @@ import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicBoolean; +import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; -/** - * Test ensuring that the producer is not dropping buffered records - */ -@SuppressWarnings("unchecked") -public class AtLeastOnceProducerTest { +public class FlinkKafkaProducerBaseTest { + + /** + * Tests that the constructor eagerly checks bootstrap servers are set in config + */ + @Test(expected = IllegalArgumentException.class) + public void testInstantiationFailsWhenBootstrapServersMissing() throws Exception { + // no bootstrap servers set in props + Properties props = new Properties(); + // should throw IllegalArgumentException + new DummyFlinkKafkaProducer<>(props, null); + } + + /** + * Tests that constructor defaults to key value serializers in config to byte array deserializers if not set + */ + @Test + public void testKeyValueDeserializersSetIfMissing() throws Exception { + Properties props = new Properties(); + props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:12345"); + // should set missing key value deserializers + new DummyFlinkKafkaProducer<>(props, null); + + assertTrue(props.containsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)); + assertTrue(props.containsKey(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)); + assertTrue(props.getProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).equals(ByteArraySerializer.class.getCanonicalName())); + assertTrue(props.getProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).equals(ByteArraySerializer.class.getCanonicalName())); + } - // we set a timeout because the test will not finish if the logic is broken + /** + * Tests that partitions list is determinate and correctly provided to custom partitioner + */ + @Test + public void testPartitionerOpenedWithDeterminatePartitionList() throws Exception { + KafkaPartitioner mockPartitioner = mock(KafkaPartitioner.class); + RuntimeContext mockRuntimeContext = mock(RuntimeContext.class); + when(mockRuntimeContext.getIndexOfThisSubtask()).thenReturn(0); + when(mockRuntimeContext.getNumberOfParallelSubtasks()).thenReturn(1); + + DummyFlinkKafkaProducer producer = new DummyFlinkKafkaProducer( + FakeStandardProducerConfig.get(), mockPartitioner); + producer.setRuntimeContext(mockRuntimeContext); + + producer.open(new Configuration()); + + // the internal mock KafkaProducer will return an out-of-order list of 4 partitions, + // which should be sorted before provided to the custom partitioner's open() method + int[] correctPartitionList = {0, 1, 2, 3}; + verify(mockPartitioner).open(0, 1, correctPartitionList); + } + + /** + * Test ensuring that the producer is not dropping buffered records.; + * we set a timeout because the test will not finish if the logic is broken + */ @Test(timeout=5000) public void testAtLeastOnceProducer() throws Throwable { - runTest(true); + runAtLeastOnceTest(true); } - // This test ensures that the actual test fails if the flushing is disabled + /** + * Ensures that the at least once producing test fails if the flushing is disabled + */ @Test(expected = AssertionError.class, timeout=5000) - public void ensureTestFails() throws Throwable { - runTest(false); + public void testAtLeastOnceProducerFailsIfFlushingDisabled() throws Throwable { + runAtLeastOnceTest(false); } - private void runTest(boolean flushOnCheckpoint) throws Throwable { - Properties props = new Properties(); + private void runAtLeastOnceTest(boolean flushOnCheckpoint) throws Throwable { final AtomicBoolean snapshottingFinished = new AtomicBoolean(false); - - final TestingKafkaProducer producer = new TestingKafkaProducer<>("someTopic", new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), props, - snapshottingFinished); - + final DummyFlinkKafkaProducer producer = new DummyFlinkKafkaProducer<>( + FakeStandardProducerConfig.get(), null, snapshottingFinished); producer.setFlushOnCheckpoint(flushOnCheckpoint); OneInputStreamOperatorTestHarness testHarness = @@ -106,7 +156,7 @@ public void run() { // we now check that no records have been confirmed yet Assert.assertEquals(100, pending.size()); Assert.assertFalse("Snapshot method returned before all records were confirmed", - snapshottingFinished.get()); + snapshottingFinished.get()); // now confirm all checkpoints for (Callback c: pending) { @@ -141,17 +191,27 @@ public void run() { } - private static class TestingKafkaProducer extends FlinkKafkaProducerBase { + // ------------------------------------------------------------------------ + + private static class DummyFlinkKafkaProducer extends FlinkKafkaProducerBase { private static final long serialVersionUID = 1L; private transient MockProducer prod; private AtomicBoolean snapshottingFinished; - public TestingKafkaProducer(String defaultTopicId, KeyedSerializationSchema serializationSchema, Properties producerConfig, AtomicBoolean snapshottingFinished) { - super(defaultTopicId, serializationSchema, producerConfig, null); + @SuppressWarnings("unchecked") + public DummyFlinkKafkaProducer(Properties producerConfig, KafkaPartitioner partitioner, AtomicBoolean snapshottingFinished) { + super("dummy-topic", (KeyedSerializationSchema< T >) mock(KeyedSerializationSchema.class), producerConfig, partitioner); this.snapshottingFinished = snapshottingFinished; } + // constructor variant for test irrelated to snapshotting + @SuppressWarnings("unchecked") + public DummyFlinkKafkaProducer(Properties producerConfig, KafkaPartitioner partitioner) { + super("dummy-topic", (KeyedSerializationSchema< T >) mock(KeyedSerializationSchema.class), producerConfig, partitioner); + this.snapshottingFinished = new AtomicBoolean(true); + } + @Override protected KafkaProducer getKafkaProducer(Properties props) { this.prod = new MockProducer(); @@ -179,15 +239,8 @@ public MockProducer getProducerInstance() { private static class MockProducer extends KafkaProducer { List pendingCallbacks = new ArrayList<>(); - private static Properties getFakeProperties() { - Properties p = new Properties(); - p.setProperty("bootstrap.servers", "localhost:12345"); - p.setProperty("key.serializer", ByteArraySerializer.class.getName()); - p.setProperty("value.serializer", ByteArraySerializer.class.getName()); - return p; - } public MockProducer() { - super(getFakeProperties()); + super(FakeStandardProducerConfig.get()); } @Override @@ -204,7 +257,11 @@ public Future send(ProducerRecord record, Callback callbac @Override public List partitionsFor(String topic) { List list = new ArrayList<>(); + // deliberately return an out-of-order partition list + list.add(new PartitionInfo(topic, 3, null, null, null)); + list.add(new PartitionInfo(topic, 1, null, null, null)); list.add(new PartitionInfo(topic, 0, null, null, null)); + list.add(new PartitionInfo(topic, 2, null, null, null)); return list; } diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java index baddab1b7141a..ae0af527ebb78 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java @@ -23,6 +23,7 @@ import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.connectors.kafka.internals.TypeUtil; import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; +import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper; import org.apache.flink.streaming.util.serialization.SerializationSchema; import org.junit.Test; @@ -41,18 +42,23 @@ public abstract class KafkaTableSinkTestBase { private static final String TOPIC = "testTopic"; - private static final String[] FIELD_NAMES = new String[] {"field1", "field2"}; + protected static final String[] FIELD_NAMES = new String[] {"field1", "field2"}; private static final TypeInformation[] FIELD_TYPES = TypeUtil.toTypeInfo(new Class[] {Integer.class, String.class}); private static final KafkaPartitioner PARTITIONER = new CustomPartitioner(); private static final Properties PROPERTIES = createSinkProperties(); - // we have to mock FlinkKafkaProducerBase as it cannot be instantiated without Kafka @SuppressWarnings("unchecked") - private static final FlinkKafkaProducerBase PRODUCER = mock(FlinkKafkaProducerBase.class); + private final FlinkKafkaProducerBase PRODUCER = new FlinkKafkaProducerBase( + TOPIC, new KeyedSerializationSchemaWrapper(getSerializationSchema()), PROPERTIES, PARTITIONER) { + + @Override + protected void flush() {} + }; @Test @SuppressWarnings("unchecked") public void testKafkaTableSink() throws Exception { DataStream dataStream = mock(DataStream.class); + KafkaTableSink kafkaTableSink = spy(createTableSink()); kafkaTableSink.emitDataStream(dataStream); @@ -61,7 +67,7 @@ public void testKafkaTableSink() throws Exception { verify(kafkaTableSink).createKafkaProducer( eq(TOPIC), eq(PROPERTIES), - any(getSerializationSchema()), + any(getSerializationSchema().getClass()), eq(PARTITIONER)); } @@ -79,7 +85,7 @@ public void testConfiguration() { protected abstract KafkaTableSink createTableSink(String topic, Properties properties, KafkaPartitioner partitioner, FlinkKafkaProducerBase kafkaProducer); - protected abstract Class> getSerializationSchema(); + protected abstract SerializationSchema getSerializationSchema(); private KafkaTableSink createTableSink() { return createTableSink(TOPIC, PROPERTIES, PARTITIONER, PRODUCER); diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FakeStandardProducerConfig.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FakeStandardProducerConfig.java new file mode 100644 index 0000000000000..055326d9e5d70 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FakeStandardProducerConfig.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.kafka.testutils; + +import org.apache.kafka.common.serialization.ByteArraySerializer; + +import java.util.Properties; + +public class FakeStandardProducerConfig { + + public static Properties get() { + Properties p = new Properties(); + p.setProperty("bootstrap.servers", "localhost:12345"); + p.setProperty("key.serializer", ByteArraySerializer.class.getName()); + p.setProperty("value.serializer", ByteArraySerializer.class.getName()); + return p; + } + +}