From 96f0799aaece6ae226aeef17ff96c2fe2debc3e0 Mon Sep 17 00:00:00 2001 From: "Tzu-Li (Gordon) Tai" Date: Mon, 14 Aug 2017 15:16:54 +0800 Subject: [PATCH 1/2] [FLINK-7440] [kinesis] Eagerly check serializability of deserialization schema in FlinkKinesisConsumer This commit also adds tests for verifying that the FlinkKinesisConsumer itself is serializable. --- .../kinesis/FlinkKinesisConsumer.java | 5 ++ .../kinesis/FlinkKinesisConsumerTest.java | 66 +++++++++++++++++++ 2 files changed, 71 insertions(+) diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java index d127f2b6fb710..07f67401f78ae 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java @@ -45,6 +45,7 @@ import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil; import org.apache.flink.streaming.util.serialization.DeserializationSchema; +import org.apache.flink.util.InstantiationUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -176,6 +177,10 @@ public FlinkKinesisConsumer(List streams, KinesisDeserializationSchema("test-stream", new NonSerializableDeserializationSchema(), testConfig); + } + + @Test + public void testCreateWithSerializableDeserializer() { + Properties testConfig = new Properties(); + testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1"); + testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); + testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); + + new FlinkKinesisConsumer<>("test-stream", new SerializableDeserializationSchema(), testConfig); + } + + @Test + public void testConsumerIsSerializable() { + Properties testConfig = new Properties(); + testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1"); + testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); + testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); + + FlinkKinesisConsumer consumer = new FlinkKinesisConsumer<>("test-stream", new SimpleStringSchema(), testConfig); + assertTrue(InstantiationUtil.isSerializable(consumer)); + } + // ---------------------------------------------------------------------- // Tests related to state initialization // ---------------------------------------------------------------------- @@ -1062,4 +1104,28 @@ private HashMap getFakeRestoredStore(String s return fakeRestoredState; } + + private final class NonSerializableDeserializationSchema implements KinesisDeserializationSchema { + @Override + public String deserialize(byte[] recordValue, String partitionKey, String seqNum, long approxArrivalTimestamp, String stream, String shardId) throws IOException { + return new String(recordValue); + } + + @Override + public TypeInformation getProducedType() { + return BasicTypeInfo.STRING_TYPE_INFO; + } + } + + private static final class SerializableDeserializationSchema implements KinesisDeserializationSchema { + @Override + public String deserialize(byte[] recordValue, String partitionKey, String seqNum, long approxArrivalTimestamp, String stream, String shardId) throws IOException { + return new String(recordValue); + } + + @Override + public TypeInformation getProducedType() { + return BasicTypeInfo.STRING_TYPE_INFO; + } + } } From 9faee3dc530c9b9021e33121f502f1e67deceaf0 Mon Sep 17 00:00:00 2001 From: "Tzu-Li (Gordon) Tai" Date: Mon, 14 Aug 2017 15:35:43 +0800 Subject: [PATCH 2/2] [FLINK-7440] [kinesis] Eagerly check that provided schema and partitioner is serializable in FlinkKinesisProducer This commit also adds a test to verify that the FlinkKinesisProducer is serializable. --- .../kinesis/FlinkKinesisProducer.java | 20 ++- .../kinesis/FlinkKinesisProducerTest.java | 146 ++++++++++++++++++ 2 files changed, 159 insertions(+), 7 deletions(-) create mode 100644 flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java index 04d7055ca8230..65114fc292f2d 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java @@ -17,7 +17,6 @@ package org.apache.flink.streaming.connectors.kinesis; -import org.apache.flink.api.java.ClosureCleaner; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.streaming.connectors.kinesis.config.ProducerConfigConstants; @@ -25,6 +24,7 @@ import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil; import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil; import org.apache.flink.streaming.util.serialization.SerializationSchema; +import org.apache.flink.util.InstantiationUtil; import org.apache.flink.util.PropertiesUtil; import com.amazonaws.services.kinesis.producer.Attempt; @@ -40,9 +40,9 @@ import java.nio.ByteBuffer; import java.util.List; -import java.util.Objects; import java.util.Properties; +import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; /** @@ -123,8 +123,11 @@ public FlinkKinesisProducer(KinesisSerializationSchema schema, Properties c // check the configuration properties for any conflicting settings KinesisConfigUtil.validateProducerConfiguration(this.configProps); - ClosureCleaner.ensureSerializable(Objects.requireNonNull(schema)); - this.schema = schema; + checkArgument( + InstantiationUtil.isSerializable(schema), + "The provided serialization schema is not serializable: " + schema.getClass().getName() + ". " + + "Please check that it does not contain references to non-serializable instances."); + this.schema = checkNotNull(schema); } /** @@ -154,9 +157,12 @@ public void setDefaultPartition(String defaultPartition) { } public void setCustomPartitioner(KinesisPartitioner partitioner) { - Objects.requireNonNull(partitioner); - ClosureCleaner.ensureSerializable(partitioner); - this.customPartitioner = partitioner; + checkArgument( + InstantiationUtil.isSerializable(partitioner), + "The provided custom partitioner is not serializable: " + partitioner.getClass().getName() + ". " + + "Please check that it does not contain references to non-serializable instances."); + + this.customPartitioner = checkNotNull(partitioner); } // --------------------------- Lifecycle methods --------------------------- diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java new file mode 100644 index 0000000000000..d319dc11eb11a --- /dev/null +++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis; + +import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants; +import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisSerializationSchema; +import org.apache.flink.streaming.util.serialization.SimpleStringSchema; +import org.apache.flink.util.InstantiationUtil; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import java.nio.ByteBuffer; +import java.util.Properties; + +import static org.junit.Assert.assertTrue; + +/** + * Suite of {@link FlinkKinesisProducer} tests. + */ +public class FlinkKinesisProducerTest { + + @Rule + public ExpectedException exception = ExpectedException.none(); + + // ---------------------------------------------------------------------- + // Tests to verify serializability + // ---------------------------------------------------------------------- + + @Test + public void testCreateWithNonSerializableDeserializerFails() { + exception.expect(IllegalArgumentException.class); + exception.expectMessage("The provided serialization schema is not serializable"); + + Properties testConfig = new Properties(); + testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1"); + testConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); + testConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); + + new FlinkKinesisProducer<>(new NonSerializableSerializationSchema(), testConfig); + } + + @Test + public void testCreateWithSerializableDeserializer() { + Properties testConfig = new Properties(); + testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1"); + testConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); + testConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); + + new FlinkKinesisProducer<>(new SerializableSerializationSchema(), testConfig); + } + + @Test + public void testConfigureWithNonSerializableCustomPartitionerFails() { + exception.expect(IllegalArgumentException.class); + exception.expectMessage("The provided custom partitioner is not serializable"); + + Properties testConfig = new Properties(); + testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1"); + testConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); + testConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); + + new FlinkKinesisProducer<>(new SimpleStringSchema(), testConfig) + .setCustomPartitioner(new NonSerializableCustomPartitioner()); + } + + @Test + public void testConfigureWithSerializableCustomPartitioner() { + Properties testConfig = new Properties(); + testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1"); + testConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); + testConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); + + new FlinkKinesisProducer<>(new SimpleStringSchema(), testConfig) + .setCustomPartitioner(new SerializableCustomPartitioner()); + } + + @Test + public void testConsumerIsSerializable() { + Properties testConfig = new Properties(); + testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1"); + testConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); + testConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); + + FlinkKinesisProducer consumer = new FlinkKinesisProducer<>(new SimpleStringSchema(), testConfig); + assertTrue(InstantiationUtil.isSerializable(consumer)); + } + + // ---------------------------------------------------------------------- + // Utility test classes + // ---------------------------------------------------------------------- + + private final class NonSerializableSerializationSchema implements KinesisSerializationSchema { + @Override + public ByteBuffer serialize(String element) { + return ByteBuffer.wrap(element.getBytes()); + } + + @Override + public String getTargetStream(String element) { + return "test-stream"; + } + } + + private static final class SerializableSerializationSchema implements KinesisSerializationSchema { + @Override + public ByteBuffer serialize(String element) { + return ByteBuffer.wrap(element.getBytes()); + } + + @Override + public String getTargetStream(String element) { + return "test-stream"; + } + } + + private final class NonSerializableCustomPartitioner extends KinesisPartitioner { + @Override + public String getPartitionId(String element) { + return "test-partition"; + } + } + + private static final class SerializableCustomPartitioner extends KinesisPartitioner { + @Override + public String getPartitionId(String element) { + return "test-partition"; + } + } +}