From a0a95d808a0d75f74fb85502f3d4ce16dd78239c Mon Sep 17 00:00:00 2001 From: yew1eb Date: Sat, 25 Nov 2017 01:48:36 +0800 Subject: [PATCH 1/2] [FLINK-8149][kinesis] Replace usages of deprecated SerializationSchema --- .../streaming/connectors/kinesis/FlinkKinesisConsumer.java | 2 +- .../streaming/connectors/kinesis/FlinkKinesisProducer.java | 2 +- .../connectors/kinesis/examples/ConsumeFromKinesis.java | 2 +- .../connectors/kinesis/examples/ProduceIntoKinesis.java | 2 +- .../kinesis/serialization/KinesisDeserializationSchema.java | 2 +- .../serialization/KinesisDeserializationSchemaWrapper.java | 2 +- .../connectors/kinesis/FlinkKinesisConsumerTest.java | 2 +- .../connectors/kinesis/FlinkKinesisProducerTest.java | 4 ++-- .../kinesis/manualtests/ManualConsumerProducerTest.java | 2 +- .../testutils/ExactlyOnceValidatingConsumerThread.java | 2 +- .../testutils/KinesisEventsGeneratorProducerThread.java | 2 +- .../kinesis/testutils/TestableFlinkKinesisConsumer.java | 2 +- .../kinesis/testutils/TestableKinesisDataFetcher.java | 2 +- 13 files changed, 14 insertions(+), 14 deletions(-) diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java index a3681eca52fee..f6a9bd139bfba 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java @@ -19,6 +19,7 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.typeinfo.TypeInformation; @@ -41,7 +42,6 @@ import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema; import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper; import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil; -import org.apache.flink.streaming.util.serialization.DeserializationSchema; import org.apache.flink.util.InstantiationUtil; import org.slf4j.Logger; diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java index 225607396857d..04cb78a9399b0 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java @@ -18,6 +18,7 @@ package org.apache.flink.streaming.connectors.kinesis; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; @@ -25,7 +26,6 @@ import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisSerializationSchema; import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil; -import org.apache.flink.streaming.util.serialization.SerializationSchema; import org.apache.flink.util.InstantiationUtil; import com.amazonaws.services.kinesis.producer.Attempt; diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ConsumeFromKinesis.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ConsumeFromKinesis.java index b1ac05705cd12..29f631d2e95ea 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ConsumeFromKinesis.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ConsumeFromKinesis.java @@ -17,12 +17,12 @@ package org.apache.flink.streaming.connectors.kinesis.examples; +import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer; import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants; -import org.apache.flink.streaming.util.serialization.SimpleStringSchema; import java.util.Properties; diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ProduceIntoKinesis.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ProduceIntoKinesis.java index 8d21c2caa1f91..68460189c8786 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ProduceIntoKinesis.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ProduceIntoKinesis.java @@ -17,13 +17,13 @@ package org.apache.flink.streaming.connectors.kinesis.examples; +import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer; import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants; -import org.apache.flink.streaming.util.serialization.SimpleStringSchema; import org.apache.commons.lang3.RandomStringUtils; diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisDeserializationSchema.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisDeserializationSchema.java index b06b20fc34b73..c4be96b174925 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisDeserializationSchema.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisDeserializationSchema.java @@ -17,8 +17,8 @@ package org.apache.flink.streaming.connectors.kinesis.serialization; +import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; -import org.apache.flink.streaming.util.serialization.DeserializationSchema; import java.io.IOException; import java.io.Serializable; diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisDeserializationSchemaWrapper.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisDeserializationSchemaWrapper.java index 279d410eb9d0f..e0587360d2a01 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisDeserializationSchemaWrapper.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisDeserializationSchemaWrapper.java @@ -17,8 +17,8 @@ package org.apache.flink.streaming.connectors.kinesis.serialization; +import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.streaming.util.serialization.DeserializationSchema; import java.io.IOException; diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java index 2b1fcf4caa3fd..a354bb34b6996 100644 --- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java +++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java @@ -19,6 +19,7 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.state.OperatorStateStore; @@ -43,7 +44,6 @@ import org.apache.flink.streaming.connectors.kinesis.testutils.KinesisShardIdGenerator; import org.apache.flink.streaming.connectors.kinesis.testutils.TestableFlinkKinesisConsumer; import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil; -import org.apache.flink.streaming.util.serialization.SimpleStringSchema; import com.amazonaws.services.kinesis.model.HashKeyRange; import com.amazonaws.services.kinesis.model.SequenceNumberRange; diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java index 702ab0b6226c6..8351f8e092e25 100644 --- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java +++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java @@ -18,6 +18,8 @@ package org.apache.flink.streaming.connectors.kinesis; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.core.testutils.CheckedThread; import org.apache.flink.core.testutils.MultiShotLatch; import org.apache.flink.runtime.state.FunctionSnapshotContext; @@ -26,8 +28,6 @@ import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisSerializationSchema; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; -import org.apache.flink.streaming.util.serialization.SerializationSchema; -import org.apache.flink.streaming.util.serialization.SimpleStringSchema; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.InstantiationUtil; diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualConsumerProducerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualConsumerProducerTest.java index a7470dc166e2f..526261c0675b6 100644 --- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualConsumerProducerTest.java +++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualConsumerProducerTest.java @@ -18,6 +18,7 @@ package org.apache.flink.streaming.connectors.kinesis.manualtests; import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.streaming.api.datastream.DataStream; @@ -29,7 +30,6 @@ import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants; import org.apache.flink.streaming.connectors.kinesis.examples.ProduceIntoKinesis; import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisSerializationSchema; -import org.apache.flink.streaming.util.serialization.SimpleStringSchema; import org.apache.flink.util.Collector; import java.nio.ByteBuffer; diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/ExactlyOnceValidatingConsumerThread.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/ExactlyOnceValidatingConsumerThread.java index 1336652226f13..45866bf8ec199 100644 --- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/ExactlyOnceValidatingConsumerThread.java +++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/ExactlyOnceValidatingConsumerThread.java @@ -20,13 +20,13 @@ import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.checkpoint.ListCheckpointed; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer; import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants; -import org.apache.flink.streaming.util.serialization.SimpleStringSchema; import org.apache.flink.test.util.SuccessException; import org.apache.flink.util.Collector; import org.apache.flink.util.Preconditions; diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/KinesisEventsGeneratorProducerThread.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/KinesisEventsGeneratorProducerThread.java index 699c9777d9cf4..89a4c0dc136dc 100644 --- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/KinesisEventsGeneratorProducerThread.java +++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/KinesisEventsGeneratorProducerThread.java @@ -17,13 +17,13 @@ package org.apache.flink.streaming.connectors.kinesis.testutils; +import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer; import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants; -import org.apache.flink.streaming.util.serialization.SimpleStringSchema; import org.apache.commons.lang3.RandomStringUtils; import org.slf4j.Logger; diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableFlinkKinesisConsumer.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableFlinkKinesisConsumer.java index 6c91eafc4fdc0..de3e51d198ccb 100644 --- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableFlinkKinesisConsumer.java +++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableFlinkKinesisConsumer.java @@ -18,8 +18,8 @@ package org.apache.flink.streaming.connectors.kinesis.testutils; import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer; -import org.apache.flink.streaming.util.serialization.SimpleStringSchema; import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java index b6f3cbc52a002..d111546c138b2 100644 --- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java +++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java @@ -18,6 +18,7 @@ package org.apache.flink.streaming.connectors.kinesis.testutils; import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher; @@ -26,7 +27,6 @@ import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface; import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema; import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper; -import org.apache.flink.streaming.util.serialization.SimpleStringSchema; import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; From 2b0e5e5ba22b3ea5ed5ada3150744085a763f43b Mon Sep 17 00:00:00 2001 From: yew1eb Date: Thu, 30 Nov 2017 00:23:45 +0800 Subject: [PATCH 2/2] fix --- .../kinesis/FlinkKinesisConsumer.java | 20 +++++++++++++ .../kinesis/FlinkKinesisProducer.java | 28 +++++++++++++++++++ 2 files changed, 48 insertions(+) diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java index f6a9bd139bfba..f19b8ed3a98b9 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java @@ -116,6 +116,26 @@ public class FlinkKinesisConsumer extends RichParallelSourceFunction imple // Constructors // ------------------------------------------------------------------------ + /** + * Creates a new Flink Kinesis Consumer. + * + *

The AWS credentials to be used, AWS region of the Kinesis streams, initial position to start streaming + * from are configured with a {@link Properties} instance.

+ * + * @param stream + * The single AWS Kinesis stream to read from. + * @param deserializer + * The deserializer used to convert raw bytes of Kinesis records to Java objects (without key). + * @param configProps + * The properties used to configure AWS credentials, AWS region, and initial starting position. + * + * @deprecated Use {@link #FlinkKinesisConsumer(String, DeserializationSchema, Properties)} instead. + */ + @Deprecated + public FlinkKinesisConsumer(String stream, org.apache.flink.streaming.util.serialization.DeserializationSchema deserializer, Properties configProps) { + this(stream, new KinesisDeserializationSchemaWrapper<>(deserializer), configProps); + } + /** * Creates a new Flink Kinesis Consumer. * diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java index 04cb78a9399b0..54a9eb361e2ce 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java @@ -88,6 +88,34 @@ public class FlinkKinesisProducer extends RichSinkFunction implements // --------------------------- Initialization and configuration --------------------------- + /** + * Create a new FlinkKinesisProducer. + * This is a constructor supporting Flink's {@see SerializationSchema}. + * + * @param schema Serialization schema for the data type + * @param configProps The properties used to configure KinesisProducer, including AWS credentials and AWS region + * + * @deprecated Use {@link #FlinkKinesisProducer(SerializationSchema, Properties)} instead. + */ + @Deprecated + public FlinkKinesisProducer(final org.apache.flink.streaming.util.serialization.SerializationSchema schema, Properties configProps) { + + // create a simple wrapper for the serialization schema + this(new KinesisSerializationSchema() { + @Override + public ByteBuffer serialize(OUT element) { + // wrap into ByteBuffer + return ByteBuffer.wrap(schema.serialize(element)); + } + // use default stream and hash key + + @Override + public String getTargetStream(OUT element) { + return null; + } + }, configProps); + } + /** * Create a new FlinkKinesisProducer. * This is a constructor supporting Flink's {@see SerializationSchema}.