From 41ebad1eff5f3989703c87cd6d6ff151d9dc648c Mon Sep 17 00:00:00 2001 From: Ivan Mushketyk Date: Tue, 5 Jul 2016 22:00:18 +0100 Subject: [PATCH 01/14] [FLINK-3874] Implement KafkaJsonTableSink --- .../org/apache/flink/api/table/Row.scala | 4 + .../flink/api/table/sinks/CsvTableSink.scala | 4 +- .../flink/api/table/sinks/TableSink.scala | 48 +----- .../flink/api/table/sinks/TableSinkBase.scala | 68 +++++++++ .../org/apache/flink/api/table/table.scala | 4 +- .../kafka/Kafka08JsonTableSink.java | 62 ++++++++ .../kafka/Kafka08JsonTableSinkTest.java | 38 +++++ .../kafka/Kafka09JsonTableSink.java | 61 ++++++++ .../kafka/Kafka09JsonTableSinkTest.java | 37 +++++ .../kafka/KafkaJsonTableSinkBase.java | 59 ++++++++ .../connectors/kafka/KafkaTableSink.java | 131 +++++++++++++++++ .../AvroSerializationSchema.java | 127 ++++++++++++++++ .../JsonRowSerializationSchema.java | 52 +++++++ .../KeyedSerializationSchemaWrapper.java | 4 +- .../kafka/AvroSerializationSchemaTest.java | 114 +++++++++++++++ .../kafka/JsonRowSerializationSchemaTest.java | 52 +++++++ .../kafka/KafkaTableSinkTestBase.java | 137 ++++++++++++++++++ 17 files changed, 954 insertions(+), 48 deletions(-) create mode 100644 flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/TableSinkBase.scala create mode 100644 flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java create mode 100644 flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java create mode 100644 flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java create mode 100644 flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java create mode 100644 flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSinkBase.java create mode 100644 flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java create mode 100644 flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroSerializationSchema.java create mode 100644 flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java create mode 100644 flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AvroSerializationSchemaTest.java create mode 100644 flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowSerializationSchemaTest.java create mode 100644 flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/Row.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/Row.scala index e3baab3f9a3ca..91cde9d44643d 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/Row.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/Row.scala @@ -31,6 +31,10 @@ class Row(arity: Int) extends Product { def setField(i: Int, value: Any): Unit = fields(i) = value + def getField(i: Int): Any = fields(i) + + def getFieldNumber(): Int = fields.length + def canEqual(that: Any) = false override def toString = fields.mkString(",") diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/CsvTableSink.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/CsvTableSink.scala index ed05caf7ff36e..7567ba8169c37 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/CsvTableSink.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/CsvTableSink.scala @@ -34,7 +34,7 @@ import org.apache.flink.streaming.api.datastream.DataStream class CsvTableSink( path: String, fieldDelim: String = ",") - extends BatchTableSink[Row] with StreamTableSink[Row] { + extends TableSinkBase[Row] with BatchTableSink[Row] with StreamTableSink[Row] { override def emitDataSet(dataSet: DataSet[Row]): Unit = { dataSet @@ -48,7 +48,7 @@ class CsvTableSink( .writeAsText(path) } - override protected def copy: TableSink[Row] = { + override protected def copy: TableSinkBase[Row] = { new CsvTableSink(path, fieldDelim) } diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/TableSink.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/TableSink.scala index 12e57de67d31d..f50e1185dfd21 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/TableSink.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/TableSink.scala @@ -29,9 +29,6 @@ import org.apache.flink.api.common.typeinfo.TypeInformation */ trait TableSink[T] { - private var fieldNames: Option[Array[String]] = None - private var fieldTypes: Option[Array[TypeInformation[_]]] = None - /** * Return the type expected by this [[TableSink]]. * @@ -41,46 +38,9 @@ trait TableSink[T] { */ def getOutputType: TypeInformation[T] - /** Return a deep copy of the [[TableSink]]. */ - protected def copy: TableSink[T] - - /** - * Return the field names of the [[org.apache.flink.api.table.Table]] to emit. */ - protected final def getFieldNames: Array[String] = { - fieldNames match { - case Some(n) => n - case None => throw new IllegalStateException( - "TableSink must be configured to retrieve field names.") - } - } - - /** Return the field types of the [[org.apache.flink.api.table.Table]] to emit. */ - protected final def getFieldTypes: Array[TypeInformation[_]] = { - fieldTypes match { - case Some(t) => t - case None => throw new IllegalStateException( - "TableSink must be configured to retrieve field types.") - } - } - - /** - * Return a copy of this [[TableSink]] configured with the field names and types of the - * [[org.apache.flink.api.table.Table]] to emit. - * - * @param fieldNames The field names of the table to emit. - * @param fieldTypes The field types of the table to emit. - * @return A copy of this [[TableSink]] configured with the field names and types of the - * [[org.apache.flink.api.table.Table]] to emit. - */ - private[flink] final def configure( - fieldNames: Array[String], - fieldTypes: Array[TypeInformation[_]]): TableSink[T] = { - - val configuredSink = this.copy - configuredSink.fieldNames = Some(fieldNames) - configuredSink.fieldTypes = Some(fieldTypes) - - configuredSink - } + /** Returns the names of the table fields. */ + def getFieldNames: Array[String] + /** Returns the types of the table fields. */ + def getFieldTypes: Array[TypeInformation[_]] } diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/TableSinkBase.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/TableSinkBase.scala new file mode 100644 index 0000000000000..9d7976da61d81 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/TableSinkBase.scala @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.sinks + +import org.apache.flink.api.common.typeinfo.TypeInformation + +trait TableSinkBase[T] extends TableSink[T] { + + private var fieldNames: Option[Array[String]] = None + private var fieldTypes: Option[Array[TypeInformation[_]]] = None + + /** Return a deep copy of the [[TableSink]]. */ + protected def copy: TableSinkBase[T] + + /** + * Return the field names of the [[org.apache.flink.api.table.Table]] to emit. */ + def getFieldNames: Array[String] = { + fieldNames match { + case Some(n) => n + case None => throw new IllegalStateException( + "TableSink must be configured to retrieve field names.") + } + } + + /** Return the field types of the [[org.apache.flink.api.table.Table]] to emit. */ + def getFieldTypes: Array[TypeInformation[_]] = { + fieldTypes match { + case Some(t) => t + case None => throw new IllegalStateException( + "TableSink must be configured to retrieve field types.") + } + } + + /** + * Return a copy of this [[TableSink]] configured with the field names and types of the + * [[org.apache.flink.api.table.Table]] to emit. + * + * @param fieldNames The field names of the table to emit. + * @param fieldTypes The field types of the table to emit. + * @return A copy of this [[TableSink]] configured with the field names and types of the + * [[org.apache.flink.api.table.Table]] to emit. + */ + private[flink] final def configure( + fieldNames: Array[String], + fieldTypes: Array[TypeInformation[_]]): TableSink[T] = { + + val configuredSink = this.copy + configuredSink.fieldNames = Some(fieldNames) + configuredSink.fieldTypes = Some(fieldTypes) + + configuredSink + } +} diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala index cbb9a077b27f3..4b919d88f031d 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala @@ -26,6 +26,8 @@ import org.apache.flink.api.table.plan.logical._ import org.apache.flink.api.table.sinks.TableSink import scala.collection.JavaConverters._ +import org.apache.flink.api.table.sinks.{TableSink, TableSinkBase} +import org.apache.flink.api.table.typeutils.TypeConverter /** * A Table is the core component of the Table API. @@ -590,7 +592,7 @@ class Table( .map(field => FlinkTypeFactory.toTypeInfo(field.getType)).toArray // configure the table sink - val configuredSink = sink.configure(fieldNames, fieldTypes) + val configuredSink = sink.asInstanceOf[TableSinkBase[T]].configure(fieldNames, fieldTypes) // emit the table to the configured table sink tableEnv.writeToSink(this, configuredSink) diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java new file mode 100644 index 0000000000000..43a87c00c5ba4 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.table.Row; +import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; +import org.apache.flink.streaming.util.serialization.SerializationSchema; + +import java.util.Properties; + +/** + * Kafka 0.8 {@link KafkaTableSink} that serializes data in JSON format. + */ +public class Kafka08JsonTableSink extends KafkaJsonTableSinkBase { + /** + * Creates {@link KafkaTableSink} for Kafka 0.8 + * + * @param topic topic in Kafka + * @param properties properties to connect to Kafka + * @param partitioner Kafra partitioner + * @param fieldNames row field names + * @param fieldTypes row field types + */ + public Kafka08JsonTableSink(String topic, Properties properties, KafkaPartitioner partitioner, String[] fieldNames, Class[] fieldTypes) { + super(topic, properties, partitioner, fieldNames, fieldTypes); + } + + /** + * Creates {@link KafkaTableSink} for Kafka 0.9 + * + * @param topic topic in Kafka + * @param properties properties to connect to Kafka + * @param partitioner Kafra partitioner + * @param fieldNames row field names + * @param fieldTypes row field types + */ + public Kafka08JsonTableSink(String topic, Properties properties, KafkaPartitioner partitioner, String[] fieldNames, TypeInformation[] fieldTypes) { + super(topic, properties, partitioner, fieldNames, fieldTypes); + } + + @Override + protected FlinkKafkaProducerBase createKafkaProducer(String topic, Properties properties, SerializationSchema serializationSchema, KafkaPartitioner partitioner) { + return new FlinkKafkaProducer08(topic,serializationSchema, properties, partitioner); + } +} + diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java new file mode 100644 index 0000000000000..4a35ef6864a6c --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.kafka; + +import org.junit.Test; + +public class Kafka08JsonTableSinkTest extends KafkaTableSinkTestBase { + @Test + public void kafka08JsonTableSinkTest() throws Exception { + testKafkaTableSink(); + } + + @Override + protected KafkaTableSink createTableSink() { + return new Kafka08JsonTableSink( + TOPIC, + createSinkProperties(), + createPartitioner(), + FIELD_NAMES, + FIELD_TYPES); + } +} + diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java new file mode 100644 index 0000000000000..5c8614b020ac3 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.table.Row; +import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; +import org.apache.flink.streaming.util.serialization.SerializationSchema; + +import java.util.Properties; + +/** + * Kafka 0.9 {@link KafkaTableSink} that serializes data in JSON format. + */ +public class Kafka09JsonTableSink extends KafkaJsonTableSinkBase { + /** + * Creates {@link KafkaTableSink} for Kafka 0.9 + * + * @param topic topic in Kafka + * @param properties properties to connect to Kafka + * @param partitioner Kafra partitioner + * @param fieldNames row field names + * @param fieldTypes row field types + */ + public Kafka09JsonTableSink(String topic, Properties properties, KafkaPartitioner partitioner, String[] fieldNames, Class[] fieldTypes) { + super(topic, properties, partitioner, fieldNames, fieldTypes); + } + + /** + * Creates {@link KafkaTableSink} for Kafka 0.9 + * + * @param topic topic in Kafka + * @param properties properties to connect to Kafka + * @param partitioner Kafra partitioner + * @param fieldNames row field names + * @param fieldTypes row field types + */ + public Kafka09JsonTableSink(String topic, Properties properties, KafkaPartitioner partitioner, String[] fieldNames, TypeInformation[] fieldTypes) { + super(topic, properties, partitioner, fieldNames, fieldTypes); + } + + @Override + protected FlinkKafkaProducerBase createKafkaProducer(String topic, Properties properties, SerializationSchema serializationSchema, KafkaPartitioner partitioner) { + return new FlinkKafkaProducer09(topic,serializationSchema, properties, partitioner); + } +} diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java new file mode 100644 index 0000000000000..493b154e04fae --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.kafka; + +import org.junit.Test; + +public class Kafka09JsonTableSinkTest extends KafkaTableSinkTestBase { + @Test + public void kafka09JsonTableSinkTest() throws Exception { + testKafkaTableSink(); + } + + @Override + protected KafkaTableSink createTableSink() { + return new Kafka09JsonTableSink( + TOPIC, + createSinkProperties(), + createPartitioner(), + FIELD_NAMES, + FIELD_TYPES); + } +} diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSinkBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSinkBase.java new file mode 100644 index 0000000000000..73ee8ea8619c7 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSinkBase.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.table.Row; +import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; +import org.apache.flink.streaming.util.serialization.AvroSerializationSchema; +import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema; + +import java.util.Properties; + +/** + * Base class for {@link KafkaTableSink} that serializes data in JSON format + */ +public abstract class KafkaJsonTableSinkBase extends KafkaTableSink { + + /** + * Creates KafkaJsonTableSinkBase + * + * @param topic topic in Kafka + * @param properties properties to connect to Kafka + * @param partitioner Kafra partitioner + * @param fieldNames row field names + * @param fieldTypes row field types + */ + public KafkaJsonTableSinkBase(String topic, Properties properties, KafkaPartitioner partitioner, String[] fieldNames, Class[] fieldTypes) { + super(topic, properties, new JsonRowSerializationSchema(fieldNames), partitioner, fieldNames, fieldTypes); + } + + /** + * Creates KafkaJsonTableSinkBase + * + * @param topic topic in Kafka + * @param properties properties to connect to Kafka + * @param partitioner Kafra partitioner + * @param fieldNames row field names + * @param fieldTypes row field types + */ + public KafkaJsonTableSinkBase(String topic, Properties properties, KafkaPartitioner partitioner, String[] fieldNames, TypeInformation[] fieldTypes) { + super(topic, properties, new JsonRowSerializationSchema(fieldNames), partitioner, fieldNames, fieldTypes); + } + +} diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java new file mode 100644 index 0000000000000..ba33824ada3a9 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.api.table.Row; +import org.apache.flink.api.table.sinks.StreamTableSink; +import org.apache.flink.api.table.typeutils.RowTypeInfo; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; +import org.apache.flink.streaming.util.serialization.SerializationSchema; +import org.apache.flink.util.Preconditions; + +import java.util.Properties; + +/** + * A version-agnostic Kafka {@link StreamTableSink}. + * + *

The version-specific Kafka consumers need to extend this class and + * override {@link #createKafkaProducer(String, Properties, SerializationSchema, KafkaPartitioner)}}. + */ +public abstract class KafkaTableSink implements StreamTableSink { + + private final String topic; + private final Properties properties; + private final SerializationSchema serializationSchema; + private final KafkaPartitioner partitioner; + private final String[] fieldNames; + private final TypeInformation[] fieldTypes; + + /** + * Creates KafkaTableSink + * + * @param topic Kafka topic to consume. + * @param properties Properties for the Kafka consumer. + * @param serializationSchema Serialization schema for emitted items + * @param partitioner Partitioner to select Kafka partition for each item + * @param fieldNames Row field names. + * @param fieldTypes Row field types. + */ + public KafkaTableSink( + String topic, + Properties properties, + SerializationSchema serializationSchema, + KafkaPartitioner partitioner, + String[] fieldNames, + Class[] fieldTypes) { + this(topic, properties, serializationSchema, partitioner, fieldNames, toTypeInfo(fieldTypes)); + } + + /** + * Creates KafkaTableSink + * + * @param topic Kafka topic to consume. + * @param properties Properties for the Kafka consumer. + * @param serializationSchema Serialization schema for emitted items + * @param partitioner Partitioner to select Kafka partition for each item + * @param fieldNames Row field names. + * @param fieldTypes Row field types. + */ + public KafkaTableSink( + String topic, + Properties properties, + SerializationSchema serializationSchema, + KafkaPartitioner partitioner, + String[] fieldNames, + TypeInformation[] fieldTypes) { + + this.topic = Preconditions.checkNotNull(topic, "topic"); + this.properties = Preconditions.checkNotNull(properties, "properties"); + this.serializationSchema = Preconditions.checkNotNull(serializationSchema, "serializationSchema"); + this.partitioner = Preconditions.checkNotNull(partitioner, "partitioner"); + this.fieldNames = Preconditions.checkNotNull(fieldNames, "fieldNames"); + this.fieldTypes = Preconditions.checkNotNull(fieldTypes); + + Preconditions.checkArgument(fieldNames.length == fieldTypes.length, + "Number of provided field names and types does not match."); + } + + @Override + public void emitDataStream(DataStream dataStream) { + FlinkKafkaProducerBase kafkaProducer = createKafkaProducer(topic, properties, serializationSchema, partitioner); + dataStream.addSink(kafkaProducer); + } + + abstract protected FlinkKafkaProducerBase createKafkaProducer( + String topic, Properties properties, + SerializationSchema serializationSchema, + KafkaPartitioner partitioner); + + @Override + public TypeInformation getOutputType() { + return new RowTypeInfo(getFieldTypes(), getFieldNames()); + } + + public String[] getFieldNames() { + return fieldNames; + } + + @Override + public TypeInformation[] getFieldTypes() { + return fieldTypes; + } + + /** + * Creates TypeInformation array for an array of Classes. + */ + private static TypeInformation[] toTypeInfo(Class[] fieldTypes) { + TypeInformation[] typeInfos = new TypeInformation[fieldTypes.length]; + for (int i = 0; i < fieldTypes.length; i++) { + typeInfos[i] = TypeExtractor.getForClass(fieldTypes[i]); + } + return typeInfos; + } +} diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroSerializationSchema.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroSerializationSchema.java new file mode 100644 index 0000000000000..da7a519553fc5 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroSerializationSchema.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.util.serialization; + +import org.apache.avro.Schema; +import org.apache.avro.file.CodecFactory; +import org.apache.avro.file.DataFileWriter; +import org.apache.avro.io.BinaryEncoder; +import org.apache.avro.io.BlockingBinaryEncoder; +import org.apache.avro.io.DatumWriter; +import org.apache.avro.io.Encoder; +import org.apache.avro.io.EncoderFactory; +import org.apache.avro.reflect.ReflectData; +import org.apache.avro.reflect.ReflectDatumReader; +import org.apache.avro.reflect.ReflectDatumWriter; +import org.apache.avro.util.ByteBufferOutputStream; +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.java.typeutils.runtime.AvroSerializer; +import org.apache.flink.api.java.typeutils.runtime.DataInputDecoder; +import org.apache.flink.api.java.typeutils.runtime.DataOutputEncoder; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.runtime.util.DataOutputSerializer; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; + +/** + * Abstract class for implementing serialization schemas for Kafka + * using Avro serialization library. + */ +@PublicEvolving +public class AvroSerializationSchema implements SerializationSchema { + + private final Class klass; + private final AvroSerializer avroSerializer; + + public AvroSerializationSchema(Class klass) { + this.avroSerializer = new AvroSerializer(klass); + this.klass = klass; + } + + @Override + public byte[] serialize(T element) { + return serialize1(element); + } + + private byte[] serialize1(T object) { + + + // get the reflected schema for packets + Schema schema = ReflectData.get().getSchema(klass); + + // create a file of packets + ByteArrayOutputStream out = new ByteArrayOutputStream(); + DatumWriter writer = new ReflectDatumWriter(klass); + final Encoder encoder = EncoderFactory.get().directBinaryEncoder(out, null); + + try { + writer.write(object, encoder); + out.close(); + } catch (IOException e) { + e.printStackTrace(); + } + + return out.toByteArray(); + } + + private static final class TestOutputView extends DataOutputStream implements DataOutputView { + + public TestOutputView() { + super(new ByteArrayOutputStream(4096)); + } + + public TestInputView getInputView() { + ByteArrayOutputStream baos = (ByteArrayOutputStream) out; + return new TestInputView(baos.toByteArray()); + } + + @Override + public void skipBytesToWrite(int numBytes) throws IOException { + for (int i = 0; i < numBytes; i++) { + write(0); + } + } + + @Override + public void write(DataInputView source, int numBytes) throws IOException { + byte[] buffer = new byte[numBytes]; + source.readFully(buffer); + write(buffer); + } + } + + private static final class TestInputView extends DataInputStream implements DataInputView { + + public TestInputView(byte[] data) { + super(new ByteArrayInputStream(data)); + } + + @Override + public void skipBytesToRead(int numBytes) throws IOException { + while (numBytes > 0) { + int skipped = skipBytes(numBytes); + numBytes -= skipped; + } + } + } +} diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java new file mode 100644 index 0000000000000..112ea4f1e1512 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.util.serialization; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.flink.api.table.Row; + + +/** + * Serialization schema that serializes an object into a JSON bytes. + *

+ */ +public class JsonRowSerializationSchema implements SerializationSchema { + private final String[] fieldNames; + private static ObjectMapper mapper = new ObjectMapper(); + + public JsonRowSerializationSchema(String[] fieldNames) { + this.fieldNames = fieldNames; + } + + @Override + public byte[] serialize(Row row) { + + ObjectNode objectNode = mapper.createObjectNode(); + for (int i = 0; i < row.getFieldNumber(); i++) { + JsonNode node = mapper.valueToTree(row.getField(i)); + objectNode.set(fieldNames[i], node); + } + + try { + return mapper.writeValueAsBytes(objectNode); + } catch (Exception e) { + throw new RuntimeException(e); + } + } +} diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchemaWrapper.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchemaWrapper.java index 1b3e4860fbfac..4399b60f43514 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchemaWrapper.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchemaWrapper.java @@ -16,12 +16,14 @@ */ package org.apache.flink.streaming.util.serialization; +import java.io.Serializable; + /** * A simple wrapper for using the SerializationSchema with the KeyedDeserializationSchema * interface * @param The type to serialize */ -public class KeyedSerializationSchemaWrapper implements KeyedSerializationSchema { +public class KeyedSerializationSchemaWrapper implements KeyedSerializationSchema, Serializable { private static final long serialVersionUID = 1351665280744549933L; diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AvroSerializationSchemaTest.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AvroSerializationSchemaTest.java new file mode 100644 index 0000000000000..1ff8248a7eb58 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AvroSerializationSchemaTest.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.avro.Schema; +import org.apache.avro.file.DataFileStream; +import org.apache.avro.io.DatumWriter; +import org.apache.avro.io.Encoder; +import org.apache.avro.io.EncoderFactory; +import org.apache.avro.reflect.ReflectData; +import org.apache.avro.reflect.ReflectDatumReader; +import org.apache.avro.reflect.ReflectDatumWriter; +import org.apache.flink.streaming.util.serialization.AvroSerializationSchema; +import org.junit.Test; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.Objects; + +import static javafx.scene.input.KeyCode.T; +import static org.junit.Assert.assertEquals; + +public class AvroSerializationSchemaTest { +// @Test +// public void serializeValue() throws IOException { +// Schema schema = ReflectData.get().getSchema(TestType.class); +// AvroSerializationSchema serializationSchema = new AvroSerializationSchema(TestType.class); +// +// TestType inputValue = new TestType("key", "value"); +// byte[] serializedKey = serializationSchema.serialize(inputValue); +// TestType resultValue = deserialize(schema, serializedKey); +// assertEquals(inputValue, resultValue); +// } + + private TestType deserialize(Schema schema, byte[] serializedKey) throws IOException { + ByteArrayInputStream inputStream = new ByteArrayInputStream(serializedKey); + ReflectDatumReader reflectDatumReader = new ReflectDatumReader(schema); + DataFileStream reader = new DataFileStream(inputStream, reflectDatumReader); + TestType key = reader.next(); + reader.close(); + inputStream.close(); + return key; + } + + @Test + public void test() throws IOException { + + TestObject testObject = new TestObject(); + + ByteArrayOutputStream out = new ByteArrayOutputStream(); + DatumWriter writer = new ReflectDatumWriter(TestObject.class); + final Encoder encoder = EncoderFactory.get().directBinaryEncoder(out, null); + + writer.write(testObject, encoder); + out.close(); + byte[] res = out.toByteArray(); + } +} + +class TestObject { +// Object[] arr = new Object[] {1, "str", false}; + String str = "str"; +} + +class TestType { + private String key; + private String value; + + // To make Avro happy + public TestType() {} + + public TestType(String key, String value) { + this.key = key; + this.value = value; + } + + public String getKey() { + return key; + } + + public String getValue() { + return value; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + TestType testType = (TestType) o; + return Objects.equals(key, testType.key) && + Objects.equals(value, testType.value); + } + + @Override + public int hashCode() { + return Objects.hash(key, value); + } +} diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowSerializationSchemaTest.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowSerializationSchemaTest.java new file mode 100644 index 0000000000000..151396b785bb1 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowSerializationSchemaTest.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.api.table.Row; +import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema; +import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema; +import org.junit.Test; + +import java.io.IOException; + +import static org.junit.Assert.assertEquals; + +public class JsonRowSerializationSchemaTest { + @Test + public void testRowSerialization() throws IOException { + String[] fieldNames = new String[] {"f1", "f2", "f3"}; + Class[] fieldTypes = new Class[] {Integer.class, Boolean.class, String.class}; + Row row = new Row(3); + row.setField(0, 1); + row.setField(1, true); + row.setField(2, "str"); + + + JsonRowSerializationSchema serializationSchema = new JsonRowSerializationSchema(fieldNames); + JsonRowDeserializationSchema deserializationSchema = new JsonRowDeserializationSchema(fieldNames, fieldTypes); + + byte[] bytes = serializationSchema.serialize(row); + Row resultRow = deserializationSchema.deserialize(bytes); + + assertEquals("Deserialized row should have expected number of fields", + row.getFieldNumber(), resultRow.getFieldNumber()); + for (int i = 0; i < row.getFieldNumber(); i++) { + assertEquals(String.format("Field number %d should be as in the original row", i), + row.getField(i), resultRow.getField(i)); + } + } +} diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java new file mode 100644 index 0000000000000..85c6e7dc65044 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.table.Row; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; +import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema; +import org.apache.flink.test.util.SuccessException; + +import java.io.Serializable; +import java.util.HashSet; +import java.util.Properties; + +import static org.apache.flink.test.util.TestUtils.tryExecute; + +public abstract class KafkaTableSinkTestBase extends KafkaTestBase implements Serializable { + + protected final static String TOPIC = "customPartitioningTestTopic"; + protected final static int PARALLELISM = 1; + protected final static String[] FIELD_NAMES = new String[] {"field1", "field2"}; + protected final static Class[] FIELD_TYPES = new Class[] {Integer.class, String.class}; + + public void testKafkaTableSink() throws Exception { + LOG.info("Starting KafkaTableSinkTestBase.testKafkaTableSink()"); + + createTestTopic(TOPIC, PARALLELISM, 1); + StreamExecutionEnvironment env = createEnvironment(); + + createProducingTopology(env); + createConsumingTopology(env); + + tryExecute(env, "custom partitioning test"); + deleteTestTopic(TOPIC); + LOG.info("Finished KafkaTableSinkTestBase.testKafkaTableSink()"); + } + + private StreamExecutionEnvironment createEnvironment() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); + env.setRestartStrategy(RestartStrategies.noRestart()); + env.getConfig().disableSysoutLogging(); + return env; + } + + private void createConsumingTopology(StreamExecutionEnvironment env) { + JsonRowDeserializationSchema jsonDeserializationSchema = new JsonRowDeserializationSchema( + FIELD_NAMES, FIELD_TYPES); + + FlinkKafkaConsumerBase source = kafkaServer.getConsumer(TOPIC, jsonDeserializationSchema, standardProps); + + env.addSource(source).setParallelism(PARALLELISM) + .map(new RichMapFunction() { + @Override + public Integer map(Row value) { + return (Integer) value.getField(0); + } + }).setParallelism(PARALLELISM) + + .addSink(new SinkFunction() { + HashSet ids = new HashSet<>(); + @Override + public void invoke(Integer value) throws Exception { + ids.add(value); + + if (ids.size() == 100) { + throw new SuccessException(); + } + } + }).setParallelism(1); + } + + private void createProducingTopology(StreamExecutionEnvironment env) { + DataStream stream = env.addSource(new SourceFunction() { + + private boolean running = true; + + @Override + public void run(SourceContext ctx) throws Exception { + long cnt = 0; + while (running) { + Row row = new Row(2); + row.setField(0, cnt); + row.setField(1, "kafka-" + cnt); + ctx.collect(row); + cnt++; + } + } + + @Override + public void cancel() { + running = false; + } + }) + .setParallelism(1); + + KafkaTableSink kafkaTableSinkBase = createTableSink(); + + kafkaTableSinkBase.emitDataStream(stream); + } + + protected KafkaPartitioner createPartitioner() { + return new CustomPartitioner(); + } + + protected Properties createSinkProperties() { + return FlinkKafkaProducerBase.getPropertiesFromBrokerList(KafkaTestBase.brokerConnectionStrings); + } + + protected abstract KafkaTableSink createTableSink(); + + public static class CustomPartitioner extends KafkaPartitioner implements Serializable { + @Override + public int partition(Row next, byte[] serializedKey, byte[] serializedValue, int numPartitions) { + return 0; + } + } +} From b33ad7d7fe53ed262263f1d67255084dbe4beb8e Mon Sep 17 00:00:00 2001 From: Ivan Mushketyk Date: Wed, 13 Jul 2016 22:43:13 +0100 Subject: [PATCH 02/14] [FLINK-3874] Implement tests for CsvTableSink --- .../api/table/sink/CsvTableSinkTest.scala | 78 +++++++++++++++++++ 1 file changed, 78 insertions(+) create mode 100644 flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/sink/CsvTableSinkTest.scala diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/sink/CsvTableSinkTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/sink/CsvTableSinkTest.scala new file mode 100644 index 0000000000000..68cfa1d6af4c5 --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/sink/CsvTableSinkTest.scala @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.sink + +import java.io.File +import java.nio.file.Files +import java.util.Collections + +import org.apache.flink.api.java.ExecutionEnvironment +import org.apache.flink.api.java.operators.DataSource +import org.apache.flink.api.table.Row +import org.apache.flink.api.table.sinks.CsvTableSink +import org.junit.rules.TemporaryFolder +import org.junit.{Rule, Test} +import org.junit.Assert._ + +class CsvTableSinkTest { + + val _folder = new TemporaryFolder + + @Rule + def folder = _folder + + @Test + def saveDataSetToCsvFileWithDefaultDelimiter(): Unit = { + val file = new File(folder.getRoot, "test.csv") + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val ds = createDataset(env) + + val sink = new CsvTableSink(file.getAbsolutePath) + writeToCsv(env, ds, sink) + + val lines = Files.readAllLines(file.toPath) + assertEquals(Collections.singletonList("1,str,false"), lines) + } + + @Test + def saveDataSetToCsvFileWithCustomDelimiter(): Unit = { + val file = new File(folder.getRoot, "test.csv") + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val ds = createDataset(env) + + val sink = new CsvTableSink(file.getAbsolutePath, "|") + writeToCsv(env, ds, sink) + + val lines = Files.readAllLines(file.toPath) + assertEquals(Collections.singletonList("1|str|false"), lines) + } + + def writeToCsv(env: ExecutionEnvironment, ds: DataSource[Row], sink: CsvTableSink): Unit = { + sink.emitDataSet(ds) + env.execute("job") + } + + def createDataset(env: ExecutionEnvironment): DataSource[Row] = { + val row = new Row(3) + row.setField(0, 1) + row.setField(1, "str") + row.setField(2, false) + env.fromCollection(Collections.singletonList(row)) + } +} From 3164dc1b4f14f2e852a75804be00da92c0cb3bc0 Mon Sep 17 00:00:00 2001 From: Ivan Mushketyk Date: Thu, 14 Jul 2016 20:40:24 +0100 Subject: [PATCH 03/14] [FLINK-3874] Fix according to PR. Fix build --- .../api/table/sink/CsvTableSinkTest.scala | 3 +- .../kafka/KafkaJsonTableSinkBase.java | 1 - .../connectors/kafka/KafkaTableSink.java | 1 - .../AvroSerializationSchema.java | 127 ------------------ .../kafka/AvroSerializationSchemaTest.java | 114 ---------------- 5 files changed, 2 insertions(+), 244 deletions(-) delete mode 100644 flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroSerializationSchema.java delete mode 100644 flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AvroSerializationSchemaTest.java diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/sink/CsvTableSinkTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/sink/CsvTableSinkTest.scala index 68cfa1d6af4c5..68583074c3b10 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/sink/CsvTableSinkTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/sink/CsvTableSinkTest.scala @@ -19,6 +19,7 @@ package org.apache.flink.api.table.sink import java.io.File +import java.nio.charset.Charset import java.nio.file.Files import java.util.Collections @@ -59,7 +60,7 @@ class CsvTableSinkTest { val sink = new CsvTableSink(file.getAbsolutePath, "|") writeToCsv(env, ds, sink) - val lines = Files.readAllLines(file.toPath) + val lines = Files.readAllLines(file.toPath, Charset.defaultCharset()) assertEquals(Collections.singletonList("1|str|false"), lines) } diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSinkBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSinkBase.java index 73ee8ea8619c7..5a8587b80cb8a 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSinkBase.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSinkBase.java @@ -20,7 +20,6 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.table.Row; import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; -import org.apache.flink.streaming.util.serialization.AvroSerializationSchema; import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema; import java.util.Properties; diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java index ba33824ada3a9..d8ea1eafba72f 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java @@ -88,7 +88,6 @@ public KafkaTableSink( this.partitioner = Preconditions.checkNotNull(partitioner, "partitioner"); this.fieldNames = Preconditions.checkNotNull(fieldNames, "fieldNames"); this.fieldTypes = Preconditions.checkNotNull(fieldTypes); - Preconditions.checkArgument(fieldNames.length == fieldTypes.length, "Number of provided field names and types does not match."); } diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroSerializationSchema.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroSerializationSchema.java deleted file mode 100644 index da7a519553fc5..0000000000000 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroSerializationSchema.java +++ /dev/null @@ -1,127 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.streaming.util.serialization; - -import org.apache.avro.Schema; -import org.apache.avro.file.CodecFactory; -import org.apache.avro.file.DataFileWriter; -import org.apache.avro.io.BinaryEncoder; -import org.apache.avro.io.BlockingBinaryEncoder; -import org.apache.avro.io.DatumWriter; -import org.apache.avro.io.Encoder; -import org.apache.avro.io.EncoderFactory; -import org.apache.avro.reflect.ReflectData; -import org.apache.avro.reflect.ReflectDatumReader; -import org.apache.avro.reflect.ReflectDatumWriter; -import org.apache.avro.util.ByteBufferOutputStream; -import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.api.java.typeutils.runtime.AvroSerializer; -import org.apache.flink.api.java.typeutils.runtime.DataInputDecoder; -import org.apache.flink.api.java.typeutils.runtime.DataOutputEncoder; -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataOutputView; -import org.apache.flink.runtime.util.DataOutputSerializer; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; - -/** - * Abstract class for implementing serialization schemas for Kafka - * using Avro serialization library. - */ -@PublicEvolving -public class AvroSerializationSchema implements SerializationSchema { - - private final Class klass; - private final AvroSerializer avroSerializer; - - public AvroSerializationSchema(Class klass) { - this.avroSerializer = new AvroSerializer(klass); - this.klass = klass; - } - - @Override - public byte[] serialize(T element) { - return serialize1(element); - } - - private byte[] serialize1(T object) { - - - // get the reflected schema for packets - Schema schema = ReflectData.get().getSchema(klass); - - // create a file of packets - ByteArrayOutputStream out = new ByteArrayOutputStream(); - DatumWriter writer = new ReflectDatumWriter(klass); - final Encoder encoder = EncoderFactory.get().directBinaryEncoder(out, null); - - try { - writer.write(object, encoder); - out.close(); - } catch (IOException e) { - e.printStackTrace(); - } - - return out.toByteArray(); - } - - private static final class TestOutputView extends DataOutputStream implements DataOutputView { - - public TestOutputView() { - super(new ByteArrayOutputStream(4096)); - } - - public TestInputView getInputView() { - ByteArrayOutputStream baos = (ByteArrayOutputStream) out; - return new TestInputView(baos.toByteArray()); - } - - @Override - public void skipBytesToWrite(int numBytes) throws IOException { - for (int i = 0; i < numBytes; i++) { - write(0); - } - } - - @Override - public void write(DataInputView source, int numBytes) throws IOException { - byte[] buffer = new byte[numBytes]; - source.readFully(buffer); - write(buffer); - } - } - - private static final class TestInputView extends DataInputStream implements DataInputView { - - public TestInputView(byte[] data) { - super(new ByteArrayInputStream(data)); - } - - @Override - public void skipBytesToRead(int numBytes) throws IOException { - while (numBytes > 0) { - int skipped = skipBytes(numBytes); - numBytes -= skipped; - } - } - } -} diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AvroSerializationSchemaTest.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AvroSerializationSchemaTest.java deleted file mode 100644 index 1ff8248a7eb58..0000000000000 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AvroSerializationSchemaTest.java +++ /dev/null @@ -1,114 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.streaming.connectors.kafka; - -import org.apache.avro.Schema; -import org.apache.avro.file.DataFileStream; -import org.apache.avro.io.DatumWriter; -import org.apache.avro.io.Encoder; -import org.apache.avro.io.EncoderFactory; -import org.apache.avro.reflect.ReflectData; -import org.apache.avro.reflect.ReflectDatumReader; -import org.apache.avro.reflect.ReflectDatumWriter; -import org.apache.flink.streaming.util.serialization.AvroSerializationSchema; -import org.junit.Test; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.util.Objects; - -import static javafx.scene.input.KeyCode.T; -import static org.junit.Assert.assertEquals; - -public class AvroSerializationSchemaTest { -// @Test -// public void serializeValue() throws IOException { -// Schema schema = ReflectData.get().getSchema(TestType.class); -// AvroSerializationSchema serializationSchema = new AvroSerializationSchema(TestType.class); -// -// TestType inputValue = new TestType("key", "value"); -// byte[] serializedKey = serializationSchema.serialize(inputValue); -// TestType resultValue = deserialize(schema, serializedKey); -// assertEquals(inputValue, resultValue); -// } - - private TestType deserialize(Schema schema, byte[] serializedKey) throws IOException { - ByteArrayInputStream inputStream = new ByteArrayInputStream(serializedKey); - ReflectDatumReader reflectDatumReader = new ReflectDatumReader(schema); - DataFileStream reader = new DataFileStream(inputStream, reflectDatumReader); - TestType key = reader.next(); - reader.close(); - inputStream.close(); - return key; - } - - @Test - public void test() throws IOException { - - TestObject testObject = new TestObject(); - - ByteArrayOutputStream out = new ByteArrayOutputStream(); - DatumWriter writer = new ReflectDatumWriter(TestObject.class); - final Encoder encoder = EncoderFactory.get().directBinaryEncoder(out, null); - - writer.write(testObject, encoder); - out.close(); - byte[] res = out.toByteArray(); - } -} - -class TestObject { -// Object[] arr = new Object[] {1, "str", false}; - String str = "str"; -} - -class TestType { - private String key; - private String value; - - // To make Avro happy - public TestType() {} - - public TestType(String key, String value) { - this.key = key; - this.value = value; - } - - public String getKey() { - return key; - } - - public String getValue() { - return value; - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - TestType testType = (TestType) o; - return Objects.equals(key, testType.key) && - Objects.equals(value, testType.value); - } - - @Override - public int hashCode() { - return Objects.hash(key, value); - } -} From 31d26517c73880ddadef5eb3f71d5893ff7d1eb6 Mon Sep 17 00:00:00 2001 From: Ivan Mushketyk Date: Thu, 21 Jul 2016 21:54:14 +0100 Subject: [PATCH 04/14] [FLINK-3874] Minor refactoring --- .../kafka/Kafka08JsonTableSink.java | 2 +- .../kafka/Kafka08JsonTableSinkTest.java | 8 +++ .../kafka/Kafka09JsonTableSink.java | 2 +- .../kafka/Kafka09JsonTableSinkTest.java | 8 +++ ...eSinkBase.java => KafkaJsonTableSink.java} | 10 ++-- .../connectors/kafka/KafkaTableSink.java | 14 +---- .../connectors/kafka/KafkaTableSource.java | 15 +---- .../apache/flink/streaming/util/TypeUtil.java | 38 ++++++++++++ .../kafka/KafkaTableSinkTestBase.java | 59 ++++++++++--------- 9 files changed, 95 insertions(+), 61 deletions(-) rename flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/{KafkaJsonTableSinkBase.java => KafkaJsonTableSink.java} (80%) create mode 100644 flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/TypeUtil.java diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java index 43a87c00c5ba4..02b931a32df33 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java @@ -27,7 +27,7 @@ /** * Kafka 0.8 {@link KafkaTableSink} that serializes data in JSON format. */ -public class Kafka08JsonTableSink extends KafkaJsonTableSinkBase { +public class Kafka08JsonTableSink extends KafkaJsonTableSink { /** * Creates {@link KafkaTableSink} for Kafka 0.8 * diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java index 4a35ef6864a6c..96661b30af5eb 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java @@ -17,6 +17,9 @@ */ package org.apache.flink.streaming.connectors.kafka; +import org.apache.flink.api.table.Row; +import org.apache.flink.streaming.util.serialization.DeserializationSchema; +import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema; import org.junit.Test; public class Kafka08JsonTableSinkTest extends KafkaTableSinkTestBase { @@ -34,5 +37,10 @@ protected KafkaTableSink createTableSink() { FIELD_NAMES, FIELD_TYPES); } + + protected DeserializationSchema createRowDeserializationSchema() { + return new JsonRowDeserializationSchema( + FIELD_NAMES, FIELD_TYPES); + } } diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java index 5c8614b020ac3..fc1974f2673f9 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java @@ -27,7 +27,7 @@ /** * Kafka 0.9 {@link KafkaTableSink} that serializes data in JSON format. */ -public class Kafka09JsonTableSink extends KafkaJsonTableSinkBase { +public class Kafka09JsonTableSink extends KafkaJsonTableSink { /** * Creates {@link KafkaTableSink} for Kafka 0.9 * diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java index 493b154e04fae..a291063f828f7 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java @@ -17,6 +17,9 @@ */ package org.apache.flink.streaming.connectors.kafka; +import org.apache.flink.api.table.Row; +import org.apache.flink.streaming.util.serialization.DeserializationSchema; +import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema; import org.junit.Test; public class Kafka09JsonTableSinkTest extends KafkaTableSinkTestBase { @@ -34,4 +37,9 @@ protected KafkaTableSink createTableSink() { FIELD_NAMES, FIELD_TYPES); } + + protected DeserializationSchema createRowDeserializationSchema() { + return new JsonRowDeserializationSchema( + FIELD_NAMES, FIELD_TYPES); + } } diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSinkBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java similarity index 80% rename from flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSinkBase.java rename to flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java index 5a8587b80cb8a..8667a0c4ecbf0 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSinkBase.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java @@ -27,10 +27,10 @@ /** * Base class for {@link KafkaTableSink} that serializes data in JSON format */ -public abstract class KafkaJsonTableSinkBase extends KafkaTableSink { +public abstract class KafkaJsonTableSink extends KafkaTableSink { /** - * Creates KafkaJsonTableSinkBase + * Creates KafkaJsonTableSink * * @param topic topic in Kafka * @param properties properties to connect to Kafka @@ -38,12 +38,12 @@ public abstract class KafkaJsonTableSinkBase extends KafkaTableSink { * @param fieldNames row field names * @param fieldTypes row field types */ - public KafkaJsonTableSinkBase(String topic, Properties properties, KafkaPartitioner partitioner, String[] fieldNames, Class[] fieldTypes) { + public KafkaJsonTableSink(String topic, Properties properties, KafkaPartitioner partitioner, String[] fieldNames, Class[] fieldTypes) { super(topic, properties, new JsonRowSerializationSchema(fieldNames), partitioner, fieldNames, fieldTypes); } /** - * Creates KafkaJsonTableSinkBase + * Creates KafkaJsonTableSink * * @param topic topic in Kafka * @param properties properties to connect to Kafka @@ -51,7 +51,7 @@ public KafkaJsonTableSinkBase(String topic, Properties properties, KafkaPartitio * @param fieldNames row field names * @param fieldTypes row field types */ - public KafkaJsonTableSinkBase(String topic, Properties properties, KafkaPartitioner partitioner, String[] fieldNames, TypeInformation[] fieldTypes) { + public KafkaJsonTableSink(String topic, Properties properties, KafkaPartitioner partitioner, String[] fieldNames, TypeInformation[] fieldTypes) { super(topic, properties, new JsonRowSerializationSchema(fieldNames), partitioner, fieldNames, fieldTypes); } diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java index d8ea1eafba72f..4235f5354f1ad 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java @@ -18,7 +18,6 @@ package org.apache.flink.streaming.connectors.kafka; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.api.table.Row; import org.apache.flink.api.table.sinks.StreamTableSink; import org.apache.flink.api.table.typeutils.RowTypeInfo; @@ -29,6 +28,8 @@ import java.util.Properties; +import static org.apache.flink.streaming.util.TypeUtil.toTypeInfo; + /** * A version-agnostic Kafka {@link StreamTableSink}. * @@ -116,15 +117,4 @@ public String[] getFieldNames() { public TypeInformation[] getFieldTypes() { return fieldTypes; } - - /** - * Creates TypeInformation array for an array of Classes. - */ - private static TypeInformation[] toTypeInfo(Class[] fieldTypes) { - TypeInformation[] typeInfos = new TypeInformation[fieldTypes.length]; - for (int i = 0; i < fieldTypes.length; i++) { - typeInfos[i] = TypeExtractor.getForClass(fieldTypes[i]); - } - return typeInfos; - } } diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java index c6904fe8f6de0..5b61416d8e041 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java @@ -19,7 +19,6 @@ package org.apache.flink.streaming.connectors.kafka; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.api.table.Row; import org.apache.flink.api.table.sources.StreamTableSource; import org.apache.flink.api.table.typeutils.RowTypeInfo; @@ -30,6 +29,8 @@ import java.util.Properties; +import static org.apache.flink.streaming.util.TypeUtil.toTypeInfo; + /** * A version-agnostic Kafka {@link StreamTableSource}. * @@ -147,16 +148,4 @@ abstract FlinkKafkaConsumerBase getKafkaConsumer( protected DeserializationSchema getDeserializationSchema() { return deserializationSchema; } - - /** - * Creates TypeInformation array for an array of Classes. - */ - private static TypeInformation[] toTypeInfo(Class[] fieldTypes) { - TypeInformation[] typeInfos = new TypeInformation[fieldTypes.length]; - for (int i = 0; i < fieldTypes.length; i++) { - typeInfos[i] = TypeExtractor.getForClass(fieldTypes[i]); - } - return typeInfos; - } - } diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/TypeUtil.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/TypeUtil.java new file mode 100644 index 0000000000000..56e906f053718 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/TypeUtil.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.util; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.TypeExtractor; + +public class TypeUtil { + private TypeUtil() {} + + /** + * Creates TypeInformation array for an array of Classes. + * @param fieldTypes classes to extract type information from + * @return type information + */ + public static TypeInformation[] toTypeInfo(Class[] fieldTypes) { + TypeInformation[] typeInfos = new TypeInformation[fieldTypes.length]; + for (int i = 0; i < fieldTypes.length; i++) { + typeInfos[i] = TypeExtractor.getForClass(fieldTypes[i]); + } + return typeInfos; + } +} diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java index 85c6e7dc65044..6ed1310f77f09 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java @@ -25,7 +25,7 @@ import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; -import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema; +import org.apache.flink.streaming.util.serialization.DeserializationSchema; import org.apache.flink.test.util.SuccessException; import java.io.Serializable; @@ -62,36 +62,8 @@ private StreamExecutionEnvironment createEnvironment() { return env; } - private void createConsumingTopology(StreamExecutionEnvironment env) { - JsonRowDeserializationSchema jsonDeserializationSchema = new JsonRowDeserializationSchema( - FIELD_NAMES, FIELD_TYPES); - - FlinkKafkaConsumerBase source = kafkaServer.getConsumer(TOPIC, jsonDeserializationSchema, standardProps); - - env.addSource(source).setParallelism(PARALLELISM) - .map(new RichMapFunction() { - @Override - public Integer map(Row value) { - return (Integer) value.getField(0); - } - }).setParallelism(PARALLELISM) - - .addSink(new SinkFunction() { - HashSet ids = new HashSet<>(); - @Override - public void invoke(Integer value) throws Exception { - ids.add(value); - - if (ids.size() == 100) { - throw new SuccessException(); - } - } - }).setParallelism(1); - } - private void createProducingTopology(StreamExecutionEnvironment env) { DataStream stream = env.addSource(new SourceFunction() { - private boolean running = true; @Override @@ -118,6 +90,32 @@ public void cancel() { kafkaTableSinkBase.emitDataStream(stream); } + private void createConsumingTopology(StreamExecutionEnvironment env) { + DeserializationSchema deserializationSchema = createRowDeserializationSchema(); + + FlinkKafkaConsumerBase source = kafkaServer.getConsumer(TOPIC, deserializationSchema, standardProps); + + env.addSource(source).setParallelism(PARALLELISM) + .map(new RichMapFunction() { + @Override + public Integer map(Row value) { + return (Integer) value.getField(0); + } + }).setParallelism(PARALLELISM) + + .addSink(new SinkFunction() { + HashSet ids = new HashSet<>(); + @Override + public void invoke(Integer value) throws Exception { + ids.add(value); + + if (ids.size() == 100) { + throw new SuccessException(); + } + } + }).setParallelism(1); + } + protected KafkaPartitioner createPartitioner() { return new CustomPartitioner(); } @@ -128,6 +126,9 @@ protected Properties createSinkProperties() { protected abstract KafkaTableSink createTableSink(); + protected abstract DeserializationSchema createRowDeserializationSchema(); + + public static class CustomPartitioner extends KafkaPartitioner implements Serializable { @Override public int partition(Row next, byte[] serializedKey, byte[] serializedValue, int numPartitions) { From 7b2d287ba47d2ca3dbbf0b18f548aeb5ba163819 Mon Sep 17 00:00:00 2001 From: Ivan Mushketyk Date: Fri, 22 Jul 2016 23:15:15 +0100 Subject: [PATCH 05/14] [FLINK-3874] Fix build --- .../apache/flink/streaming/connectors/kafka/KafkaTableSink.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java index 4235f5354f1ad..c55d7550a585e 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java @@ -106,7 +106,7 @@ abstract protected FlinkKafkaProducerBase createKafkaProducer( @Override public TypeInformation getOutputType() { - return new RowTypeInfo(getFieldTypes(), getFieldNames()); + return new RowTypeInfo(getFieldTypes()); } public String[] getFieldNames() { From 0595cd53fa12e554eb2949effc4423542dcf96d3 Mon Sep 17 00:00:00 2001 From: Ivan Mushketyk Date: Sat, 23 Jul 2016 11:08:18 +0100 Subject: [PATCH 06/14] [FLINK-3874] Fix build --- .../org/apache/flink/api/table/sink/CsvTableSinkTest.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/sink/CsvTableSinkTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/sink/CsvTableSinkTest.scala index 68583074c3b10..594f37ac3de18 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/sink/CsvTableSinkTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/sink/CsvTableSinkTest.scala @@ -47,7 +47,7 @@ class CsvTableSinkTest { val sink = new CsvTableSink(file.getAbsolutePath) writeToCsv(env, ds, sink) - val lines = Files.readAllLines(file.toPath) + val lines = Files.readAllLines(file.toPath, Charset.defaultCharset()) assertEquals(Collections.singletonList("1,str,false"), lines) } From d837a01cbd41eea4f02b6ca1c8b14aed9a6b106c Mon Sep 17 00:00:00 2001 From: Ivan Mushketyk Date: Fri, 12 Aug 2016 21:14:01 +0100 Subject: [PATCH 07/14] Fixed according to PR review --- .../org/apache/flink/api/table/Row.scala | 4 --- .../api/table/sink/CsvTableSinkTest.scala | 4 +-- .../kafka/Kafka08JsonTableSink.java | 2 +- .../connectors/kafka/KafkaTableSink.java | 21 +++++++++++----- .../connectors/kafka/KafkaTableSource.java | 2 +- .../kafka/internals}/TypeUtil.java | 2 +- .../JsonRowDeserializationSchema.java | 4 +-- .../JsonRowSerializationSchema.java | 25 ++++++++++++++++--- .../KeyedSerializationSchemaWrapper.java | 4 +-- .../kafka/JsonRowSerializationSchemaTest.java | 22 +++++++++++++--- .../kafka/KafkaTableSinkTestBase.java | 2 +- 11 files changed, 64 insertions(+), 28 deletions(-) rename flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/{util => connectors/kafka/internals}/TypeUtil.java (95%) diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/Row.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/Row.scala index 91cde9d44643d..e3baab3f9a3ca 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/Row.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/Row.scala @@ -31,10 +31,6 @@ class Row(arity: Int) extends Product { def setField(i: Int, value: Any): Unit = fields(i) = value - def getField(i: Int): Any = fields(i) - - def getFieldNumber(): Int = fields.length - def canEqual(that: Any) = false override def toString = fields.mkString(",") diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/sink/CsvTableSinkTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/sink/CsvTableSinkTest.scala index 594f37ac3de18..a3732d5344ed5 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/sink/CsvTableSinkTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/sink/CsvTableSinkTest.scala @@ -39,7 +39,7 @@ class CsvTableSinkTest { def folder = _folder @Test - def saveDataSetToCsvFileWithDefaultDelimiter(): Unit = { + def testSaveDataSetToCsvFileWithDefaultDelimiter(): Unit = { val file = new File(folder.getRoot, "test.csv") val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val ds = createDataset(env) @@ -52,7 +52,7 @@ class CsvTableSinkTest { } @Test - def saveDataSetToCsvFileWithCustomDelimiter(): Unit = { + def testSaveDataSetToCsvFileWithCustomDelimiter(): Unit = { val file = new File(folder.getRoot, "test.csv") val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val ds = createDataset(env) diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java index 02b931a32df33..57f3de3393213 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java @@ -42,7 +42,7 @@ public Kafka08JsonTableSink(String topic, Properties properties, KafkaPartitione } /** - * Creates {@link KafkaTableSink} for Kafka 0.9 + * Creates {@link KafkaTableSink} for Kafka 0.8 * * @param topic topic in Kafka * @param properties properties to connect to Kafka diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java index c55d7550a585e..299f2ba36e136 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java @@ -28,7 +28,7 @@ import java.util.Properties; -import static org.apache.flink.streaming.util.TypeUtil.toTypeInfo; +import static org.apache.flink.streaming.connectors.kafka.internals.TypeUtil.toTypeInfo; /** * A version-agnostic Kafka {@link StreamTableSink}. @@ -93,17 +93,26 @@ public KafkaTableSink( "Number of provided field names and types does not match."); } + /** + * Returns the version-specifid Kafka producer. + * + * @param topic Kafka topic to produce to. + * @param properties Properties for the Kafka producer. + * @param serializationSchema Serialization schema to use to create Kafka records. + * @param partitioner Partitioner to select Kafka partition. + * @return The version-specific Kafka producer + */ + protected abstract FlinkKafkaProducerBase createKafkaProducer( + String topic, Properties properties, + SerializationSchema serializationSchema, + KafkaPartitioner partitioner); + @Override public void emitDataStream(DataStream dataStream) { FlinkKafkaProducerBase kafkaProducer = createKafkaProducer(topic, properties, serializationSchema, partitioner); dataStream.addSink(kafkaProducer); } - abstract protected FlinkKafkaProducerBase createKafkaProducer( - String topic, Properties properties, - SerializationSchema serializationSchema, - KafkaPartitioner partitioner); - @Override public TypeInformation getOutputType() { return new RowTypeInfo(getFieldTypes()); diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java index 5b61416d8e041..fc6bf44c94224 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java @@ -29,7 +29,7 @@ import java.util.Properties; -import static org.apache.flink.streaming.util.TypeUtil.toTypeInfo; +import static org.apache.flink.streaming.connectors.kafka.internals.TypeUtil.toTypeInfo; /** * A version-agnostic Kafka {@link StreamTableSource}. diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/TypeUtil.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/TypeUtil.java similarity index 95% rename from flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/TypeUtil.java rename to flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/TypeUtil.java index 56e906f053718..7a41ade5732bf 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/TypeUtil.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/TypeUtil.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.flink.streaming.util; +package org.apache.flink.streaming.connectors.kafka.internals; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.typeutils.TypeExtractor; diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowDeserializationSchema.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowDeserializationSchema.java index 970c73ebf867e..434481095cc7c 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowDeserializationSchema.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowDeserializationSchema.java @@ -51,7 +51,7 @@ public class JsonRowDeserializationSchema implements DeserializationSchema private boolean failOnMissingField; /** - * Creates a JSON deserializtion schema for the given fields and type classes. + * Creates a JSON deserialization schema for the given fields and type classes. * * @param fieldNames Names of JSON fields to parse. * @param fieldTypes Type classes to parse JSON fields as. @@ -69,7 +69,7 @@ public JsonRowDeserializationSchema(String[] fieldNames, Class[] fieldTypes) } /** - * Creates a JSON deserializtion schema for the given fields and types. + * Creates a JSON deserialization schema for the given fields and types. * * @param fieldNames Names of JSON fields to parse. * @param fieldTypes Types to parse JSON fields as. diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java index 112ea4f1e1512..167666cac3e9c 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java @@ -20,26 +20,43 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.flink.api.table.Row; +import org.apache.flink.util.Preconditions; /** * Serialization schema that serializes an object into a JSON bytes. - *

+ * + *

Serializes the input {@link Row} object into a JSON string and + * converts it into byte[]. + * + *

Result byte[] messages can be deserialized using + * {@link JsonRowDeserializationSchema}. */ public class JsonRowSerializationSchema implements SerializationSchema { + /** Fields names in the input Row object */ private final String[] fieldNames; + /** Object mapper that is used to create output JSON objects */ private static ObjectMapper mapper = new ObjectMapper(); + /** + * Creates a JSON serialization schema for the given fields and types. + * + * @param fieldNames Names of JSON fields to parse. + */ public JsonRowSerializationSchema(String[] fieldNames) { - this.fieldNames = fieldNames; + this.fieldNames = Preconditions.checkNotNull(fieldNames); } @Override public byte[] serialize(Row row) { + if (row.productArity() != fieldNames.length) { + throw new IllegalStateException(String.format( + "Number of elements in the row %s is different from number of field names: %d", row, fieldNames.length)); + } ObjectNode objectNode = mapper.createObjectNode(); - for (int i = 0; i < row.getFieldNumber(); i++) { - JsonNode node = mapper.valueToTree(row.getField(i)); + for (int i = 0; i < row.productArity(); i++) { + JsonNode node = mapper.valueToTree(row.productElement(i)); objectNode.set(fieldNames[i], node); } diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchemaWrapper.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchemaWrapper.java index 4399b60f43514..1b3e4860fbfac 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchemaWrapper.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchemaWrapper.java @@ -16,14 +16,12 @@ */ package org.apache.flink.streaming.util.serialization; -import java.io.Serializable; - /** * A simple wrapper for using the SerializationSchema with the KeyedDeserializationSchema * interface * @param The type to serialize */ -public class KeyedSerializationSchemaWrapper implements KeyedSerializationSchema, Serializable { +public class KeyedSerializationSchemaWrapper implements KeyedSerializationSchema { private static final long serialVersionUID = 1351665280744549933L; diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowSerializationSchemaTest.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowSerializationSchemaTest.java index 151396b785bb1..9497cc0a399be 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowSerializationSchemaTest.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowSerializationSchemaTest.java @@ -43,10 +43,26 @@ public void testRowSerialization() throws IOException { Row resultRow = deserializationSchema.deserialize(bytes); assertEquals("Deserialized row should have expected number of fields", - row.getFieldNumber(), resultRow.getFieldNumber()); - for (int i = 0; i < row.getFieldNumber(); i++) { + row.productArity(), resultRow.productArity()); + for (int i = 0; i < row.productArity(); i++) { assertEquals(String.format("Field number %d should be as in the original row", i), - row.getField(i), resultRow.getField(i)); + row.productElement(i), resultRow.productElement(i)); } } + + @Test(expected = NullPointerException.class) + public void testInputValidation() { + new JsonRowSerializationSchema(null); + } + + @Test(expected = IllegalStateException.class) + public void testSerializeRowWithInvalidNumberOfFields() { + String[] fieldNames = new String[] {"f1", "f2", "f3"}; + Class[] fieldTypes = new Class[] {Integer.class}; + Row row = new Row(1); + row.setField(0, 1); + + JsonRowSerializationSchema serializationSchema = new JsonRowSerializationSchema(fieldNames); + serializationSchema.serialize(row); + } } diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java index 6ed1310f77f09..16eb74adcd33c 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java @@ -99,7 +99,7 @@ private void createConsumingTopology(StreamExecutionEnvironment env) { .map(new RichMapFunction() { @Override public Integer map(Row value) { - return (Integer) value.getField(0); + return (Integer) value.productElement(0); } }).setParallelism(PARALLELISM) From 7dc770ac23790acfb585eb1b66bbb01000b1adaf Mon Sep 17 00:00:00 2001 From: Ivan Mushketyk Date: Fri, 12 Aug 2016 23:48:14 +0100 Subject: [PATCH 08/14] [FLINK-3874] Fix exception message --- .../util/serialization/JsonRowSerializationSchema.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java index 167666cac3e9c..3139114fc1c3a 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java @@ -63,7 +63,7 @@ public byte[] serialize(Row row) { try { return mapper.writeValueAsBytes(objectNode); } catch (Exception e) { - throw new RuntimeException(e); + throw new RuntimeException("Failed to serialize row", e); } } } From 24efe1897b9c174158366fd7db2380e938d74fbc Mon Sep 17 00:00:00 2001 From: Ivan Mushketyk Date: Sun, 14 Aug 2016 16:12:15 +0100 Subject: [PATCH 09/14] [FLINK-3703] Fixed according to PR review --- .../flink/api/table/sinks/TableSink.scala | 4 ++++ .../flink/api/table/sinks/TableSinkBase.scala | 5 ++--- .../org/apache/flink/api/table/table.scala | 2 +- .../kafka/Kafka08JsonTableSink.java | 4 ++-- .../kafka/Kafka09JsonTableSink.java | 4 ++-- .../connectors/kafka/KafkaJsonTableSink.java | 4 ++-- .../connectors/kafka/KafkaTableSink.java | 19 +++++++++++++------ 7 files changed, 26 insertions(+), 16 deletions(-) diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/TableSink.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/TableSink.scala index f50e1185dfd21..588dcde63a2e8 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/TableSink.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/TableSink.scala @@ -43,4 +43,8 @@ trait TableSink[T] { /** Returns the types of the table fields. */ def getFieldTypes: Array[TypeInformation[_]] + + + def configure(fieldNames: Array[String], + fieldTypes: Array[TypeInformation[_]]): TableSink[T] } diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/TableSinkBase.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/TableSinkBase.scala index 9d7976da61d81..612ee0ad160be 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/TableSinkBase.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/TableSinkBase.scala @@ -55,9 +55,8 @@ trait TableSinkBase[T] extends TableSink[T] { * @return A copy of this [[TableSink]] configured with the field names and types of the * [[org.apache.flink.api.table.Table]] to emit. */ - private[flink] final def configure( - fieldNames: Array[String], - fieldTypes: Array[TypeInformation[_]]): TableSink[T] = { + final def configure(fieldNames: Array[String], + fieldTypes: Array[TypeInformation[_]]): TableSink[T] = { val configuredSink = this.copy configuredSink.fieldNames = Some(fieldNames) diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala index 4b919d88f031d..613203f630770 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala @@ -592,7 +592,7 @@ class Table( .map(field => FlinkTypeFactory.toTypeInfo(field.getType)).toArray // configure the table sink - val configuredSink = sink.asInstanceOf[TableSinkBase[T]].configure(fieldNames, fieldTypes) + val configuredSink = sink.asInstanceOf[TableSink[T]].configure(fieldNames, fieldTypes) // emit the table to the configured table sink tableEnv.writeToSink(this, configuredSink) diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java index 57f3de3393213..d1dc42e3904ed 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java @@ -33,7 +33,7 @@ public class Kafka08JsonTableSink extends KafkaJsonTableSink { * * @param topic topic in Kafka * @param properties properties to connect to Kafka - * @param partitioner Kafra partitioner + * @param partitioner Kafka partitioner * @param fieldNames row field names * @param fieldTypes row field types */ @@ -46,7 +46,7 @@ public Kafka08JsonTableSink(String topic, Properties properties, KafkaPartitione * * @param topic topic in Kafka * @param properties properties to connect to Kafka - * @param partitioner Kafra partitioner + * @param partitioner Kafka partitioner * @param fieldNames row field names * @param fieldTypes row field types */ diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java index fc1974f2673f9..93873cccc0683 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java @@ -33,7 +33,7 @@ public class Kafka09JsonTableSink extends KafkaJsonTableSink { * * @param topic topic in Kafka * @param properties properties to connect to Kafka - * @param partitioner Kafra partitioner + * @param partitioner Kafka partitioner * @param fieldNames row field names * @param fieldTypes row field types */ @@ -46,7 +46,7 @@ public Kafka09JsonTableSink(String topic, Properties properties, KafkaPartitione * * @param topic topic in Kafka * @param properties properties to connect to Kafka - * @param partitioner Kafra partitioner + * @param partitioner Kafka partitioner * @param fieldNames row field names * @param fieldTypes row field types */ diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java index 8667a0c4ecbf0..fac3f63f72dad 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java @@ -34,7 +34,7 @@ public abstract class KafkaJsonTableSink extends KafkaTableSink { * * @param topic topic in Kafka * @param properties properties to connect to Kafka - * @param partitioner Kafra partitioner + * @param partitioner Kafka partitioner * @param fieldNames row field names * @param fieldTypes row field types */ @@ -47,7 +47,7 @@ public KafkaJsonTableSink(String topic, Properties properties, KafkaPartitioner< * * @param topic topic in Kafka * @param properties properties to connect to Kafka - * @param partitioner Kafra partitioner + * @param partitioner Kafka partitioner * @param fieldNames row field names * @param fieldTypes row field types */ diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java index 299f2ba36e136..1191fbe7babd4 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java @@ -42,8 +42,8 @@ public abstract class KafkaTableSink implements StreamTableSink { private final Properties properties; private final SerializationSchema serializationSchema; private final KafkaPartitioner partitioner; - private final String[] fieldNames; - private final TypeInformation[] fieldTypes; + private String[] fieldNames; + private TypeInformation[] fieldTypes; /** * Creates KafkaTableSink @@ -87,10 +87,7 @@ public KafkaTableSink( this.properties = Preconditions.checkNotNull(properties, "properties"); this.serializationSchema = Preconditions.checkNotNull(serializationSchema, "serializationSchema"); this.partitioner = Preconditions.checkNotNull(partitioner, "partitioner"); - this.fieldNames = Preconditions.checkNotNull(fieldNames, "fieldNames"); - this.fieldTypes = Preconditions.checkNotNull(fieldTypes); - Preconditions.checkArgument(fieldNames.length == fieldTypes.length, - "Number of provided field names and types does not match."); + configure(fieldNames, fieldTypes); } /** @@ -126,4 +123,14 @@ public String[] getFieldNames() { public TypeInformation[] getFieldTypes() { return fieldTypes; } + + @Override + public KafkaTableSink configure(String[] fieldNames, TypeInformation[] fieldTypes) { + this.fieldNames = Preconditions.checkNotNull(fieldNames, "fieldNames"); + this.fieldTypes = Preconditions.checkNotNull(fieldTypes, "fieldTypes"); + Preconditions.checkArgument(fieldNames.length == fieldTypes.length, + "Number of provided field names and types does not match."); + + return this; + } } From b572f3acb436524c0b14a4737cc6c88ec58b762f Mon Sep 17 00:00:00 2001 From: Ivan Mushketyk Date: Sun, 14 Aug 2016 18:17:36 +0100 Subject: [PATCH 10/14] [FLINK-3874] Fixed according to PR review --- .../org/apache/flink/api/table/table.scala | 2 +- .../kafka/Kafka08JsonTableSink.java | 20 +--------- .../kafka/Kafka08JsonTableSinkTest.java | 8 ++-- .../kafka/Kafka09JsonTableSink.java | 20 +--------- .../kafka/Kafka09JsonTableSinkTest.java | 8 ++-- .../connectors/kafka/KafkaJsonTableSink.java | 23 +++-------- .../connectors/kafka/KafkaTableSink.java | 40 ++++--------------- .../kafka/KafkaTableSinkTestBase.java | 4 +- 8 files changed, 29 insertions(+), 96 deletions(-) diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala index 613203f630770..37e107732c946 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala @@ -592,7 +592,7 @@ class Table( .map(field => FlinkTypeFactory.toTypeInfo(field.getType)).toArray // configure the table sink - val configuredSink = sink.asInstanceOf[TableSink[T]].configure(fieldNames, fieldTypes) + val configuredSink = sink.configure(fieldNames, fieldTypes) // emit the table to the configured table sink tableEnv.writeToSink(this, configuredSink) diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java index d1dc42e3904ed..83f5585a77d63 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java @@ -17,7 +17,6 @@ */ package org.apache.flink.streaming.connectors.kafka; -import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.table.Row; import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; import org.apache.flink.streaming.util.serialization.SerializationSchema; @@ -34,24 +33,9 @@ public class Kafka08JsonTableSink extends KafkaJsonTableSink { * @param topic topic in Kafka * @param properties properties to connect to Kafka * @param partitioner Kafka partitioner - * @param fieldNames row field names - * @param fieldTypes row field types */ - public Kafka08JsonTableSink(String topic, Properties properties, KafkaPartitioner partitioner, String[] fieldNames, Class[] fieldTypes) { - super(topic, properties, partitioner, fieldNames, fieldTypes); - } - - /** - * Creates {@link KafkaTableSink} for Kafka 0.8 - * - * @param topic topic in Kafka - * @param properties properties to connect to Kafka - * @param partitioner Kafka partitioner - * @param fieldNames row field names - * @param fieldTypes row field types - */ - public Kafka08JsonTableSink(String topic, Properties properties, KafkaPartitioner partitioner, String[] fieldNames, TypeInformation[] fieldTypes) { - super(topic, properties, partitioner, fieldNames, fieldTypes); + public Kafka08JsonTableSink(String topic, Properties properties, KafkaPartitioner partitioner) { + super(topic, properties, partitioner); } @Override diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java index 96661b30af5eb..87304455919b9 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java @@ -30,12 +30,12 @@ public void kafka08JsonTableSinkTest() throws Exception { @Override protected KafkaTableSink createTableSink() { - return new Kafka08JsonTableSink( + Kafka08JsonTableSink sink = new Kafka08JsonTableSink( TOPIC, createSinkProperties(), - createPartitioner(), - FIELD_NAMES, - FIELD_TYPES); + createPartitioner()); + sink.configure(FIELD_NAMES, FIELD_TYPES); + return sink; } protected DeserializationSchema createRowDeserializationSchema() { diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java index 93873cccc0683..6b603aaeed96a 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java @@ -17,7 +17,6 @@ */ package org.apache.flink.streaming.connectors.kafka; -import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.table.Row; import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; import org.apache.flink.streaming.util.serialization.SerializationSchema; @@ -34,24 +33,9 @@ public class Kafka09JsonTableSink extends KafkaJsonTableSink { * @param topic topic in Kafka * @param properties properties to connect to Kafka * @param partitioner Kafka partitioner - * @param fieldNames row field names - * @param fieldTypes row field types */ - public Kafka09JsonTableSink(String topic, Properties properties, KafkaPartitioner partitioner, String[] fieldNames, Class[] fieldTypes) { - super(topic, properties, partitioner, fieldNames, fieldTypes); - } - - /** - * Creates {@link KafkaTableSink} for Kafka 0.9 - * - * @param topic topic in Kafka - * @param properties properties to connect to Kafka - * @param partitioner Kafka partitioner - * @param fieldNames row field names - * @param fieldTypes row field types - */ - public Kafka09JsonTableSink(String topic, Properties properties, KafkaPartitioner partitioner, String[] fieldNames, TypeInformation[] fieldTypes) { - super(topic, properties, partitioner, fieldNames, fieldTypes); + public Kafka09JsonTableSink(String topic, Properties properties, KafkaPartitioner partitioner) { + super(topic, properties, partitioner); } @Override diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java index a291063f828f7..1e3e54bfd0fa1 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java @@ -30,12 +30,12 @@ public void kafka09JsonTableSinkTest() throws Exception { @Override protected KafkaTableSink createTableSink() { - return new Kafka09JsonTableSink( + Kafka09JsonTableSink sink = new Kafka09JsonTableSink( TOPIC, createSinkProperties(), - createPartitioner(), - FIELD_NAMES, - FIELD_TYPES); + createPartitioner()); + sink.configure(FIELD_NAMES, FIELD_TYPES); + return sink; } protected DeserializationSchema createRowDeserializationSchema() { diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java index fac3f63f72dad..9f147fb44cf70 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java @@ -17,10 +17,10 @@ */ package org.apache.flink.streaming.connectors.kafka; -import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.table.Row; import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema; +import org.apache.flink.streaming.util.serialization.SerializationSchema; import java.util.Properties; @@ -35,24 +35,13 @@ public abstract class KafkaJsonTableSink extends KafkaTableSink { * @param topic topic in Kafka * @param properties properties to connect to Kafka * @param partitioner Kafka partitioner - * @param fieldNames row field names - * @param fieldTypes row field types */ - public KafkaJsonTableSink(String topic, Properties properties, KafkaPartitioner partitioner, String[] fieldNames, Class[] fieldTypes) { - super(topic, properties, new JsonRowSerializationSchema(fieldNames), partitioner, fieldNames, fieldTypes); + public KafkaJsonTableSink(String topic, Properties properties, KafkaPartitioner partitioner) { + super(topic, properties, partitioner); } - /** - * Creates KafkaJsonTableSink - * - * @param topic topic in Kafka - * @param properties properties to connect to Kafka - * @param partitioner Kafka partitioner - * @param fieldNames row field names - * @param fieldTypes row field types - */ - public KafkaJsonTableSink(String topic, Properties properties, KafkaPartitioner partitioner, String[] fieldNames, TypeInformation[] fieldTypes) { - super(topic, properties, new JsonRowSerializationSchema(fieldNames), partitioner, fieldNames, fieldTypes); + @Override + protected SerializationSchema createSerializationSchema(String[] fieldNames) { + return new JsonRowSerializationSchema(fieldNames); } - } diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java index 1191fbe7babd4..428cd034bbca1 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java @@ -28,8 +28,6 @@ import java.util.Properties; -import static org.apache.flink.streaming.connectors.kafka.internals.TypeUtil.toTypeInfo; - /** * A version-agnostic Kafka {@link StreamTableSink}. * @@ -40,54 +38,25 @@ public abstract class KafkaTableSink implements StreamTableSink { private final String topic; private final Properties properties; - private final SerializationSchema serializationSchema; + private SerializationSchema serializationSchema; private final KafkaPartitioner partitioner; private String[] fieldNames; private TypeInformation[] fieldTypes; - /** * Creates KafkaTableSink * * @param topic Kafka topic to consume. * @param properties Properties for the Kafka consumer. - * @param serializationSchema Serialization schema for emitted items * @param partitioner Partitioner to select Kafka partition for each item - * @param fieldNames Row field names. - * @param fieldTypes Row field types. */ public KafkaTableSink( String topic, Properties properties, - SerializationSchema serializationSchema, - KafkaPartitioner partitioner, - String[] fieldNames, - Class[] fieldTypes) { - this(topic, properties, serializationSchema, partitioner, fieldNames, toTypeInfo(fieldTypes)); - } - - /** - * Creates KafkaTableSink - * - * @param topic Kafka topic to consume. - * @param properties Properties for the Kafka consumer. - * @param serializationSchema Serialization schema for emitted items - * @param partitioner Partitioner to select Kafka partition for each item - * @param fieldNames Row field names. - * @param fieldTypes Row field types. - */ - public KafkaTableSink( - String topic, - Properties properties, - SerializationSchema serializationSchema, - KafkaPartitioner partitioner, - String[] fieldNames, - TypeInformation[] fieldTypes) { + KafkaPartitioner partitioner) { this.topic = Preconditions.checkNotNull(topic, "topic"); this.properties = Preconditions.checkNotNull(properties, "properties"); - this.serializationSchema = Preconditions.checkNotNull(serializationSchema, "serializationSchema"); this.partitioner = Preconditions.checkNotNull(partitioner, "partitioner"); - configure(fieldNames, fieldTypes); } /** @@ -104,6 +73,8 @@ protected abstract FlinkKafkaProducerBase createKafkaProducer( SerializationSchema serializationSchema, KafkaPartitioner partitioner); + protected abstract SerializationSchema createSerializationSchema(String[] fieldNames); + @Override public void emitDataStream(DataStream dataStream) { FlinkKafkaProducerBase kafkaProducer = createKafkaProducer(topic, properties, serializationSchema, partitioner); @@ -130,7 +101,10 @@ public KafkaTableSink configure(String[] fieldNames, TypeInformation[] fieldT this.fieldTypes = Preconditions.checkNotNull(fieldTypes, "fieldTypes"); Preconditions.checkArgument(fieldNames.length == fieldTypes.length, "Number of provided field names and types does not match."); + this.serializationSchema = createSerializationSchema(fieldNames); return this; } + + } diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java index 16eb74adcd33c..3b026856bc394 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java @@ -19,11 +19,13 @@ import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.table.Row; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.connectors.kafka.internals.TypeUtil; import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; import org.apache.flink.streaming.util.serialization.DeserializationSchema; import org.apache.flink.test.util.SuccessException; @@ -39,7 +41,7 @@ public abstract class KafkaTableSinkTestBase extends KafkaTestBase implements Se protected final static String TOPIC = "customPartitioningTestTopic"; protected final static int PARALLELISM = 1; protected final static String[] FIELD_NAMES = new String[] {"field1", "field2"}; - protected final static Class[] FIELD_TYPES = new Class[] {Integer.class, String.class}; + protected final static TypeInformation[] FIELD_TYPES = TypeUtil.toTypeInfo(new Class[] {Integer.class, String.class}); public void testKafkaTableSink() throws Exception { LOG.info("Starting KafkaTableSinkTestBase.testKafkaTableSink()"); From 50724c6d5d8ad205a209e8dd4d4f40e5c4fea922 Mon Sep 17 00:00:00 2001 From: Ivan Mushketyk Date: Thu, 18 Aug 2016 21:43:33 +0100 Subject: [PATCH 11/14] [FLINK-3874] Remove CsvTableSinkTest.scala --- .../api/table/sink/CsvTableSinkTest.scala | 79 ------------------- 1 file changed, 79 deletions(-) delete mode 100644 flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/sink/CsvTableSinkTest.scala diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/sink/CsvTableSinkTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/sink/CsvTableSinkTest.scala deleted file mode 100644 index a3732d5344ed5..0000000000000 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/sink/CsvTableSinkTest.scala +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.table.sink - -import java.io.File -import java.nio.charset.Charset -import java.nio.file.Files -import java.util.Collections - -import org.apache.flink.api.java.ExecutionEnvironment -import org.apache.flink.api.java.operators.DataSource -import org.apache.flink.api.table.Row -import org.apache.flink.api.table.sinks.CsvTableSink -import org.junit.rules.TemporaryFolder -import org.junit.{Rule, Test} -import org.junit.Assert._ - -class CsvTableSinkTest { - - val _folder = new TemporaryFolder - - @Rule - def folder = _folder - - @Test - def testSaveDataSetToCsvFileWithDefaultDelimiter(): Unit = { - val file = new File(folder.getRoot, "test.csv") - val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment - val ds = createDataset(env) - - val sink = new CsvTableSink(file.getAbsolutePath) - writeToCsv(env, ds, sink) - - val lines = Files.readAllLines(file.toPath, Charset.defaultCharset()) - assertEquals(Collections.singletonList("1,str,false"), lines) - } - - @Test - def testSaveDataSetToCsvFileWithCustomDelimiter(): Unit = { - val file = new File(folder.getRoot, "test.csv") - val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment - val ds = createDataset(env) - - val sink = new CsvTableSink(file.getAbsolutePath, "|") - writeToCsv(env, ds, sink) - - val lines = Files.readAllLines(file.toPath, Charset.defaultCharset()) - assertEquals(Collections.singletonList("1|str|false"), lines) - } - - def writeToCsv(env: ExecutionEnvironment, ds: DataSource[Row], sink: CsvTableSink): Unit = { - sink.emitDataSet(ds) - env.execute("job") - } - - def createDataset(env: ExecutionEnvironment): DataSource[Row] = { - val row = new Row(3) - row.setField(0, 1) - row.setField(1, "str") - row.setField(2, false) - env.fromCollection(Collections.singletonList(row)) - } -} From 29022de0c8f2614688c0283b6dc7bbd915139ef7 Mon Sep 17 00:00:00 2001 From: Ivan Mushketyk Date: Wed, 24 Aug 2016 00:07:07 +0100 Subject: [PATCH 12/14] [FLINK-3874] Fix according to PR review --- .../flink/api/table/sinks/TableSink.scala | 10 +++- .../org/apache/flink/api/table/table.scala | 2 - .../kafka/Kafka08JsonTableSink.java | 9 +++- .../kafka/Kafka08JsonTableSinkTest.java | 8 +--- .../kafka/Kafka09JsonTableSink.java | 9 +++- .../kafka/Kafka09JsonTableSinkTest.java | 8 +--- .../connectors/kafka/KafkaJsonTableSink.java | 2 +- .../connectors/kafka/KafkaTableSink.java | 27 ++++++----- .../JsonRowSerializationSchema.java | 1 + .../kafka/JsonRowSerializationSchemaTest.java | 48 +++++++++++++++---- .../kafka/KafkaTableSinkTestBase.java | 2 + 11 files changed, 83 insertions(+), 43 deletions(-) diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/TableSink.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/TableSink.scala index 588dcde63a2e8..3dfc6f14887ec 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/TableSink.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/TableSink.scala @@ -44,7 +44,15 @@ trait TableSink[T] { /** Returns the types of the table fields. */ def getFieldTypes: Array[TypeInformation[_]] - + /** + * Return a copy of this [[TableSink]] configured with the field names and types of the + * [[org.apache.flink.api.table.Table]] to emit. + * + * @param fieldNames The field names of the table to emit. + * @param fieldTypes The field types of the table to emit. + * @return A copy of this [[TableSink]] configured with the field names and types of the + * [[org.apache.flink.api.table.Table]] to emit. + */ def configure(fieldNames: Array[String], fieldTypes: Array[TypeInformation[_]]): TableSink[T] } diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala index 37e107732c946..cbb9a077b27f3 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala @@ -26,8 +26,6 @@ import org.apache.flink.api.table.plan.logical._ import org.apache.flink.api.table.sinks.TableSink import scala.collection.JavaConverters._ -import org.apache.flink.api.table.sinks.{TableSink, TableSinkBase} -import org.apache.flink.api.table.typeutils.TypeConverter /** * A Table is the core component of the Table API. diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java index 83f5585a77d63..5f869ec35fad6 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java @@ -30,7 +30,7 @@ public class Kafka08JsonTableSink extends KafkaJsonTableSink { /** * Creates {@link KafkaTableSink} for Kafka 0.8 * - * @param topic topic in Kafka + * @param topic topic in Kafka to which table is written * @param properties properties to connect to Kafka * @param partitioner Kafka partitioner */ @@ -40,7 +40,12 @@ public Kafka08JsonTableSink(String topic, Properties properties, KafkaPartitione @Override protected FlinkKafkaProducerBase createKafkaProducer(String topic, Properties properties, SerializationSchema serializationSchema, KafkaPartitioner partitioner) { - return new FlinkKafkaProducer08(topic,serializationSchema, properties, partitioner); + return new FlinkKafkaProducer08<>(topic, serializationSchema, properties, partitioner); + } + + @Override + protected Kafka08JsonTableSink createCopy() { + return new Kafka08JsonTableSink(topic, properties, partitioner); } } diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java index 87304455919b9..f51b49915f31a 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java @@ -20,13 +20,8 @@ import org.apache.flink.api.table.Row; import org.apache.flink.streaming.util.serialization.DeserializationSchema; import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema; -import org.junit.Test; public class Kafka08JsonTableSinkTest extends KafkaTableSinkTestBase { - @Test - public void kafka08JsonTableSinkTest() throws Exception { - testKafkaTableSink(); - } @Override protected KafkaTableSink createTableSink() { @@ -34,8 +29,7 @@ protected KafkaTableSink createTableSink() { TOPIC, createSinkProperties(), createPartitioner()); - sink.configure(FIELD_NAMES, FIELD_TYPES); - return sink; + return sink.configure(FIELD_NAMES, FIELD_TYPES); } protected DeserializationSchema createRowDeserializationSchema() { diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java index 6b603aaeed96a..38ea47c237d28 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java @@ -30,7 +30,7 @@ public class Kafka09JsonTableSink extends KafkaJsonTableSink { /** * Creates {@link KafkaTableSink} for Kafka 0.9 * - * @param topic topic in Kafka + * @param topic topic in Kafka to which table is written * @param properties properties to connect to Kafka * @param partitioner Kafka partitioner */ @@ -40,6 +40,11 @@ public Kafka09JsonTableSink(String topic, Properties properties, KafkaPartitione @Override protected FlinkKafkaProducerBase createKafkaProducer(String topic, Properties properties, SerializationSchema serializationSchema, KafkaPartitioner partitioner) { - return new FlinkKafkaProducer09(topic,serializationSchema, properties, partitioner); + return new FlinkKafkaProducer09<>(topic, serializationSchema, properties, partitioner); + } + + @Override + protected Kafka09JsonTableSink createCopy() { + return new Kafka09JsonTableSink(topic, properties, partitioner); } } diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java index 1e3e54bfd0fa1..8b4e5c827c9ea 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java @@ -20,13 +20,8 @@ import org.apache.flink.api.table.Row; import org.apache.flink.streaming.util.serialization.DeserializationSchema; import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema; -import org.junit.Test; public class Kafka09JsonTableSinkTest extends KafkaTableSinkTestBase { - @Test - public void kafka09JsonTableSinkTest() throws Exception { - testKafkaTableSink(); - } @Override protected KafkaTableSink createTableSink() { @@ -34,8 +29,7 @@ protected KafkaTableSink createTableSink() { TOPIC, createSinkProperties(), createPartitioner()); - sink.configure(FIELD_NAMES, FIELD_TYPES); - return sink; + return sink.configure(FIELD_NAMES, FIELD_TYPES); } protected DeserializationSchema createRowDeserializationSchema() { diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java index 9f147fb44cf70..ee987838c143e 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java @@ -32,7 +32,7 @@ public abstract class KafkaJsonTableSink extends KafkaTableSink { /** * Creates KafkaJsonTableSink * - * @param topic topic in Kafka + * @param topic topic in Kafka to which table is written * @param properties properties to connect to Kafka * @param partitioner Kafka partitioner */ diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java index 428cd034bbca1..931435a30d826 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java @@ -36,16 +36,16 @@ */ public abstract class KafkaTableSink implements StreamTableSink { - private final String topic; - private final Properties properties; - private SerializationSchema serializationSchema; - private final KafkaPartitioner partitioner; - private String[] fieldNames; - private TypeInformation[] fieldTypes; + protected final String topic; + protected final Properties properties; + protected SerializationSchema serializationSchema; + protected final KafkaPartitioner partitioner; + protected String[] fieldNames; + protected TypeInformation[] fieldTypes; /** * Creates KafkaTableSink * - * @param topic Kafka topic to consume. + * @param topic Kafka topic to write to. * @param properties Properties for the Kafka consumer. * @param partitioner Partitioner to select Kafka partition for each item */ @@ -75,6 +75,8 @@ protected abstract FlinkKafkaProducerBase createKafkaProducer( protected abstract SerializationSchema createSerializationSchema(String[] fieldNames); + protected abstract KafkaTableSink createCopy(); + @Override public void emitDataStream(DataStream dataStream) { FlinkKafkaProducerBase kafkaProducer = createKafkaProducer(topic, properties, serializationSchema, partitioner); @@ -97,14 +99,17 @@ public TypeInformation[] getFieldTypes() { @Override public KafkaTableSink configure(String[] fieldNames, TypeInformation[] fieldTypes) { - this.fieldNames = Preconditions.checkNotNull(fieldNames, "fieldNames"); - this.fieldTypes = Preconditions.checkNotNull(fieldTypes, "fieldTypes"); + KafkaTableSink copy = createCopy(); + copy.fieldNames = Preconditions.checkNotNull(fieldNames, "fieldNames"); + copy.fieldTypes = Preconditions.checkNotNull(fieldTypes, "fieldTypes"); Preconditions.checkArgument(fieldNames.length == fieldTypes.length, "Number of provided field names and types does not match."); - this.serializationSchema = createSerializationSchema(fieldNames); + copy.serializationSchema = createSerializationSchema(fieldNames); - return this; + return copy; } + + } diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java index 3139114fc1c3a..6aba565154896 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java @@ -55,6 +55,7 @@ public byte[] serialize(Row row) { } ObjectNode objectNode = mapper.createObjectNode(); + for (int i = 0; i < row.productArity(); i++) { JsonNode node = mapper.valueToTree(row.productElement(i)); objectNode.set(fieldNames[i], node); diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowSerializationSchemaTest.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowSerializationSchemaTest.java index 9497cc0a399be..388860a571e40 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowSerializationSchemaTest.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowSerializationSchemaTest.java @@ -35,19 +35,29 @@ public void testRowSerialization() throws IOException { row.setField(1, true); row.setField(2, "str"); + Row resultRow = serializeAndDeserialize(fieldNames, fieldTypes, row); + assertEqualRows(row, resultRow); + } - JsonRowSerializationSchema serializationSchema = new JsonRowSerializationSchema(fieldNames); - JsonRowDeserializationSchema deserializationSchema = new JsonRowDeserializationSchema(fieldNames, fieldTypes); + @Test + public void testSerializationOfTwoRows() throws IOException { + String[] fieldNames1 = new String[] {"f1", "f2", "f3"}; + Class[] fieldTypes1 = new Class[] {Integer.class, Boolean.class, String.class}; + Row row1 = new Row(3); + row1.setField(0, 1); + row1.setField(1, true); + row1.setField(2, "str"); - byte[] bytes = serializationSchema.serialize(row); - Row resultRow = deserializationSchema.deserialize(bytes); + Row resultRow = serializeAndDeserialize(fieldNames1, fieldTypes1, row1); + assertEqualRows(row1, resultRow); - assertEquals("Deserialized row should have expected number of fields", - row.productArity(), resultRow.productArity()); - for (int i = 0; i < row.productArity(); i++) { - assertEquals(String.format("Field number %d should be as in the original row", i), - row.productElement(i), resultRow.productElement(i)); - } + String[] fieldNames2 = new String[] {"f1"}; + Class[] fieldTypes2 = new Class[] {Integer.class}; + Row row2 = new Row(1); + row2.setField(0, 1); + + resultRow = serializeAndDeserialize(fieldNames2, fieldTypes2, row2); + assertEqualRows(row2, resultRow); } @Test(expected = NullPointerException.class) @@ -65,4 +75,22 @@ public void testSerializeRowWithInvalidNumberOfFields() { JsonRowSerializationSchema serializationSchema = new JsonRowSerializationSchema(fieldNames); serializationSchema.serialize(row); } + + private Row serializeAndDeserialize(String[] fieldNames, Class[] fieldTypes, Row row) throws IOException { + JsonRowSerializationSchema serializationSchema = new JsonRowSerializationSchema(fieldNames); + JsonRowDeserializationSchema deserializationSchema = new JsonRowDeserializationSchema(fieldNames, fieldTypes); + + byte[] bytes = serializationSchema.serialize(row); + return deserializationSchema.deserialize(bytes); + } + + private void assertEqualRows(Row expectedRow, Row resultRow) { + assertEquals("Deserialized row should have expected number of fields", + expectedRow.productArity(), resultRow.productArity()); + for (int i = 0; i < expectedRow.productArity(); i++) { + assertEquals(String.format("Field number %d should be as in the original row", i), + expectedRow.productElement(i), resultRow.productElement(i)); + } + } + } diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java index 3b026856bc394..5e55b0ab46b64 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java @@ -29,6 +29,7 @@ import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; import org.apache.flink.streaming.util.serialization.DeserializationSchema; import org.apache.flink.test.util.SuccessException; +import org.junit.Test; import java.io.Serializable; import java.util.HashSet; @@ -43,6 +44,7 @@ public abstract class KafkaTableSinkTestBase extends KafkaTestBase implements Se protected final static String[] FIELD_NAMES = new String[] {"field1", "field2"}; protected final static TypeInformation[] FIELD_TYPES = TypeUtil.toTypeInfo(new Class[] {Integer.class, String.class}); + @Test public void testKafkaTableSink() throws Exception { LOG.info("Starting KafkaTableSinkTestBase.testKafkaTableSink()"); From 44addb6c844250c98207e456bab35f1052d6590e Mon Sep 17 00:00:00 2001 From: Ivan Mushketyk Date: Wed, 24 Aug 2016 21:31:36 +0100 Subject: [PATCH 13/14] [FLINK-3874] Fix according to PR review --- .../connectors/kafka/KafkaTableSink.java | 11 +++++++++ .../JsonRowSerializationSchema.java | 2 +- .../kafka/JsonRowSerializationSchemaTest.java | 24 ++++++++++--------- 3 files changed, 25 insertions(+), 12 deletions(-) diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java index 931435a30d826..8f5e8110e05ec 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java @@ -73,8 +73,19 @@ protected abstract FlinkKafkaProducerBase createKafkaProducer( SerializationSchema serializationSchema, KafkaPartitioner partitioner); + /** + * Create serialization schema for converting table rows into bytes. + * + * @param fieldNames Field names in table rows. + * @return Instance of serialization schema + */ protected abstract SerializationSchema createSerializationSchema(String[] fieldNames); + /** + * Create a deep copy of this sink. + * + * @return Deep copy of this sink + */ protected abstract KafkaTableSink createCopy(); @Override diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java index 6aba565154896..077ff132642aa 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java @@ -29,7 +29,7 @@ *

Serializes the input {@link Row} object into a JSON string and * converts it into byte[]. * - *

Result byte[] messages can be deserialized using + *

Result byte[] messages can be deserialized using * {@link JsonRowDeserializationSchema}. */ public class JsonRowSerializationSchema implements SerializationSchema { diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowSerializationSchemaTest.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowSerializationSchemaTest.java index 388860a571e40..92af15de2336c 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowSerializationSchemaTest.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowSerializationSchemaTest.java @@ -41,23 +41,26 @@ public void testRowSerialization() throws IOException { @Test public void testSerializationOfTwoRows() throws IOException { - String[] fieldNames1 = new String[] {"f1", "f2", "f3"}; - Class[] fieldTypes1 = new Class[] {Integer.class, Boolean.class, String.class}; + String[] fieldNames = new String[] {"f1", "f2", "f3"}; + Class[] fieldTypes = new Class[] {Integer.class, Boolean.class, String.class}; Row row1 = new Row(3); row1.setField(0, 1); row1.setField(1, true); row1.setField(2, "str"); - Row resultRow = serializeAndDeserialize(fieldNames1, fieldTypes1, row1); - assertEqualRows(row1, resultRow); + JsonRowSerializationSchema serializationSchema = new JsonRowSerializationSchema(fieldNames); + JsonRowDeserializationSchema deserializationSchema = new JsonRowDeserializationSchema(fieldNames, fieldTypes); + + byte[] bytes = serializationSchema.serialize(row1); + assertEqualRows(row1, deserializationSchema.deserialize(bytes)); - String[] fieldNames2 = new String[] {"f1"}; - Class[] fieldTypes2 = new Class[] {Integer.class}; - Row row2 = new Row(1); - row2.setField(0, 1); + Row row2 = new Row(3); + row2.setField(0, 10); + row2.setField(1, false); + row2.setField(2, "newStr"); - resultRow = serializeAndDeserialize(fieldNames2, fieldTypes2, row2); - assertEqualRows(row2, resultRow); + bytes = serializationSchema.serialize(row2); + assertEqualRows(row2, deserializationSchema.deserialize(bytes)); } @Test(expected = NullPointerException.class) @@ -68,7 +71,6 @@ public void testInputValidation() { @Test(expected = IllegalStateException.class) public void testSerializeRowWithInvalidNumberOfFields() { String[] fieldNames = new String[] {"f1", "f2", "f3"}; - Class[] fieldTypes = new Class[] {Integer.class}; Row row = new Row(1); row.setField(0, 1); From 82757328b371b1ba4232b97afdedce9219a570ef Mon Sep 17 00:00:00 2001 From: Ivan Mushketyk Date: Thu, 25 Aug 2016 18:50:20 +0100 Subject: [PATCH 14/14] [FLINK-3874] Fix according to PR review --- ...08JsonTableSinkTest.java => Kafka08JsonTableSinkITCase.java} | 2 +- ...09JsonTableSinkTest.java => Kafka09JsonTableSinkITCase.java} | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) rename flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/{Kafka08JsonTableSinkTest.java => Kafka08JsonTableSinkITCase.java} (95%) rename flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/{Kafka09JsonTableSinkTest.java => Kafka09JsonTableSinkITCase.java} (95%) diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkITCase.java similarity index 95% rename from flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java rename to flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkITCase.java index f51b49915f31a..f870adf565e74 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkITCase.java @@ -21,7 +21,7 @@ import org.apache.flink.streaming.util.serialization.DeserializationSchema; import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema; -public class Kafka08JsonTableSinkTest extends KafkaTableSinkTestBase { +public class Kafka08JsonTableSinkITCase extends KafkaTableSinkTestBase { @Override protected KafkaTableSink createTableSink() { diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkITCase.java similarity index 95% rename from flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java rename to flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkITCase.java index 8b4e5c827c9ea..74415f84c4297 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkITCase.java @@ -21,7 +21,7 @@ import org.apache.flink.streaming.util.serialization.DeserializationSchema; import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema; -public class Kafka09JsonTableSinkTest extends KafkaTableSinkTestBase { +public class Kafka09JsonTableSinkITCase extends KafkaTableSinkTestBase { @Override protected KafkaTableSink createTableSink() {