From ac8d42a0cc6828c22dc27b5e2f9d5728b62a3610 Mon Sep 17 00:00:00 2001 From: Ufuk Celebi Date: Thu, 2 Jun 2016 22:38:23 +0200 Subject: [PATCH 1/5] [FLINK-3872] [table, connector-kafka] Add JsonRowDeserializationSchema - Adds a deserialization schema from byte[] to Row to be used in conjunction with the Table API. --- .../flink-connector-kafka-base/pom.xml | 12 +- .../JsonRowDeserializationSchema.java | 116 ++++++++++++++++++ .../JsonRowDeserializationSchemaTest.java | 116 ++++++++++++++++++ 3 files changed, 242 insertions(+), 2 deletions(-) create mode 100644 flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowDeserializationSchema.java create mode 100644 flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowDeserializationSchemaTest.java diff --git a/flink-streaming-connectors/flink-connector-kafka-base/pom.xml b/flink-streaming-connectors/flink-connector-kafka-base/pom.xml index e430ff4eee9e8..1cbb554a9293d 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/pom.xml +++ b/flink-streaming-connectors/flink-connector-kafka-base/pom.xml @@ -49,6 +49,16 @@ under the License. provided + + org.apache.flink + flink-table_2.10 + ${project.version} + provided + + true + + org.apache.kafka kafka_${scala.binary.version} @@ -101,7 +111,6 @@ under the License. jar - org.apache.curator curator-test @@ -124,7 +133,6 @@ under the License. test-jar test - diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowDeserializationSchema.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowDeserializationSchema.java new file mode 100644 index 0000000000000..82eb08bdf2a6c --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowDeserializationSchema.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.util.serialization; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.api.table.Row; +import org.apache.flink.api.table.typeutils.RowTypeInfo; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; + +/** + * Deserialization schema from JSON to {@link Row}. + * + *

Deserializes the byte[] messages as a JSON object and reads + * the specified fields. + * + *

Failure during deserialization are forwarded as wrapped IOExceptions. + */ +public class JsonRowDeserializationSchema implements DeserializationSchema { + + /** Field names to parse. Indices match fieldTypes indices. */ + private final String[] fieldNames; + + /** Types to parse fields as. Indices match fieldNames indices. */ + private final TypeInformation[] fieldTypes; + + /** Object mapper for parsing the JSON. */ + private final ObjectMapper objectMapper = new ObjectMapper(); + + /** + * Creates a JSON deserializtion schema for the given fields and type classes. + * + * @param fieldNames Names of JSON fields to parse. + * @param fieldTypes Type classes to parse JSON fields as. + */ + public JsonRowDeserializationSchema(String[] fieldNames, Class[] fieldTypes) { + this.fieldNames = Preconditions.checkNotNull(fieldNames, "Field names"); + + this.fieldTypes = new TypeInformation[fieldTypes.length]; + for (int i = 0; i < fieldTypes.length; i++) { + this.fieldTypes[i] = TypeExtractor.getForClass(fieldTypes[i]); + } + + Preconditions.checkArgument(fieldNames.length == fieldTypes.length, + "Number of provided field names and types does not match."); + } + + /** + * Creates a JSON deserializtion schema for the given fields and types. + * + * @param fieldNames Names of JSON fields to parse. + * @param fieldTypes Types to parse JSON fields as. + */ + public JsonRowDeserializationSchema(String[] fieldNames, TypeInformation[] fieldTypes) { + this.fieldNames = Preconditions.checkNotNull(fieldNames, "Field names"); + this.fieldTypes = Preconditions.checkNotNull(fieldTypes, "Field types"); + + Preconditions.checkArgument(fieldNames.length == fieldTypes.length, + "Number of provided field names and types does not match."); + } + + @Override + public Row deserialize(byte[] message) throws IOException { + try { + JsonNode root = objectMapper.readTree(message); + + Row row = new Row(fieldNames.length); + for (int i = 0; i < fieldNames.length; i++) { + JsonNode node = root.get(fieldNames[i]); + + if (node == null) { + throw new IllegalStateException("Field '" + fieldNames[i] + "' not found."); + } + + // Read the value as specified type + Object value = objectMapper.treeToValue(node, fieldTypes[i].getTypeClass()); + row.setField(i, value); + } + + return row; + } catch (Throwable t) { + throw new IOException("Failed to deserialize JSON object.", t); + } + } + + @Override + public boolean isEndOfStream(Row nextElement) { + return false; + } + + @Override + public TypeInformation getProducedType() { + return new RowTypeInfo(fieldTypes, fieldNames); + } + +} diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowDeserializationSchemaTest.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowDeserializationSchemaTest.java new file mode 100644 index 0000000000000..88b34027fac1d --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowDeserializationSchemaTest.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.flink.api.table.Row; +import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema; +import org.junit.Test; + +import java.io.IOException; +import java.util.concurrent.ThreadLocalRandom; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +public class JsonRowDeserializationSchemaTest { + + /** + * Tests simple deserialization. + */ + @Test + public void testDeserialization() throws Exception { + long id = 1238123899121L; + String name = "asdlkjasjkdla998y1122"; + byte[] bytes = new byte[1024]; + ThreadLocalRandom.current().nextBytes(bytes); + + ObjectMapper objectMapper = new ObjectMapper(); + + // Root + ObjectNode root = objectMapper.createObjectNode(); + root.put("id", id); + root.put("name", name); + root.put("bytes", bytes); + + byte[] serializedJson = objectMapper.writeValueAsBytes(root); + + JsonRowDeserializationSchema deserializationSchema = new JsonRowDeserializationSchema( + new String[] { "id", "name", "bytes" }, + new Class[] { Long.class, String.class, byte[].class }); + + Row deserialized = deserializationSchema.deserialize(serializedJson); + + assertEquals(3, deserialized.productArity()); + assertEquals(id, deserialized.productElement(0)); + assertEquals(name, deserialized.productElement(1)); + assertArrayEquals(bytes, (byte[]) deserialized.productElement(2)); + } + + /** + * Tests deserialization with non-existing field name. + */ + @Test + public void testMissingNode() throws Exception { + ObjectMapper objectMapper = new ObjectMapper(); + + // Root + ObjectNode root = objectMapper.createObjectNode(); + root.put("id", 123123123); + byte[] serializedJson = objectMapper.writeValueAsBytes(root); + + JsonRowDeserializationSchema deserializationSchema = new JsonRowDeserializationSchema( + new String[] { "name" }, + new Class[] { String.class }); + + try { + deserializationSchema.deserialize(serializedJson); + fail("Did not throw expected Exception"); + } catch (IOException ignored) { + assertTrue(ignored.getCause() instanceof IllegalStateException); + } + } + + /** + * Tests that number of field names and types has to match. + */ + @Test + public void testNumberOfFieldNamesAndTypesMismatch() throws Exception { + try { + new JsonRowDeserializationSchema( + new String[] { "one", "two", "three" }, + new Class[] { Long.class }); + fail("Did not throw expected Exception"); + } catch (IllegalArgumentException ignored) { + // Expected + } + + try { + new JsonRowDeserializationSchema( + new String[] { "one" }, + new Class[] { Long.class, String.class }); + fail("Did not throw expected Exception"); + } catch (IllegalArgumentException ignored) { + // Expected + } + } +} From 1a80b1195f545d0e2d0a0d609868d2f5f5587f3e Mon Sep 17 00:00:00 2001 From: Ufuk Celebi Date: Fri, 3 Jun 2016 15:24:22 +0200 Subject: [PATCH 2/5] [FLINK-3872] [table, connector-kafka] Add KafkaTableSource --- .../flink-connector-kafka-0.8/pom.xml | 11 +- .../kafka/Kafka08JsonTableSource.java | 71 ++++++++ .../connectors/kafka/Kafka08TableSource.java | 75 +++++++++ .../connectors/kafka/Kafka08ITCase.java | 27 ++- .../flink-connector-kafka-0.9/pom.xml | 10 +- .../kafka/Kafka09JsonTableSource.java | 71 ++++++++ .../connectors/kafka/Kafka09TableSource.java | 75 +++++++++ .../connectors/kafka/Kafka09ITCase.java | 27 +++ .../kafka/KafkaJsonTableSource.java | 85 ++++++++++ .../connectors/kafka/KafkaTableSource.java | 156 ++++++++++++++++++ .../kafka/KafkaConsumerTestBase.java | 130 ++++++++++++++- 11 files changed, 730 insertions(+), 8 deletions(-) create mode 100644 flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSource.java create mode 100644 flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java create mode 100644 flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSource.java create mode 100644 flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSource.java create mode 100644 flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java create mode 100644 flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/pom.xml b/flink-streaming-connectors/flink-connector-kafka-0.8/pom.xml index b2701c137288d..3557f1cf9c27b 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.8/pom.xml +++ b/flink-streaming-connectors/flink-connector-kafka-0.8/pom.xml @@ -55,7 +55,6 @@ under the License. test - org.apache.flink flink-streaming-java_2.10 @@ -63,6 +62,16 @@ under the License. provided + + org.apache.flink + flink-table_2.10 + ${project.version} + provided + + true + + org.apache.kafka kafka_${scala.binary.version} diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSource.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSource.java new file mode 100644 index 0000000000000..63bb57e121010 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSource.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.table.Row; +import org.apache.flink.api.table.sources.StreamTableSource; +import org.apache.flink.streaming.util.serialization.DeserializationSchema; + +import java.util.Properties; + +/** + * Kafka {@link StreamTableSource} for Kafka 0.8. + */ +public class Kafka08JsonTableSource extends KafkaJsonTableSource { + + /** + * Creates a Kafka 0.8 JSON {@link StreamTableSource}. + * + * @param topic Kafka topic to consume. + * @param properties Properties for the Kafka consumer. + * @param fieldNames Row field names. + * @param fieldTypes Row field types. + */ + public Kafka08JsonTableSource( + String topic, + Properties properties, + String[] fieldNames, + TypeInformation[] fieldTypes) { + + super(topic, properties, fieldNames, fieldTypes); + } + + /** + * Creates a Kafka 0.8 JSON {@link StreamTableSource}. + * + * @param topic Kafka topic to consume. + * @param properties Properties for the Kafka consumer. + * @param fieldNames Row field names. + * @param fieldTypes Row field types. + */ + public Kafka08JsonTableSource( + String topic, + Properties properties, + String[] fieldNames, + Class[] fieldTypes) { + + super(topic, properties, fieldNames, fieldTypes); + } + + @Override + FlinkKafkaConsumerBase getKafkaConsumer(String topic, Properties properties, DeserializationSchema deserializationSchema) { + return new FlinkKafkaConsumer08<>(topic, deserializationSchema, properties); + } +} diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java new file mode 100644 index 0000000000000..8f51237fbb680 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.table.Row; +import org.apache.flink.api.table.sources.StreamTableSource; +import org.apache.flink.streaming.util.serialization.DeserializationSchema; + +import java.util.Properties; + +/** + * Kafka {@link StreamTableSource} for Kafka 0.8. + */ +public class Kafka08TableSource extends KafkaTableSource { + + /** + * Creates a Kafka 0.8 {@link StreamTableSource}. + * + * @param topic Kafka topic to consume. + * @param properties Properties for the Kafka consumer. + * @param deserializationSchema Deserialization schema to use for Kafka records. + * @param fieldNames Row field names. + * @param fieldTypes Row field types. + */ + public Kafka08TableSource( + String topic, + Properties properties, + DeserializationSchema deserializationSchema, + String[] fieldNames, + TypeInformation[] fieldTypes) { + + super(topic, properties, deserializationSchema, fieldNames, fieldTypes); + } + + /** + * Creates a Kafka 0.8 {@link StreamTableSource}. + * + * @param topic Kafka topic to consume. + * @param properties Properties for the Kafka consumer. + * @param deserializationSchema Deserialization schema to use for Kafka records. + * @param fieldNames Row field names. + * @param fieldTypes Row field types. + */ + public Kafka08TableSource( + String topic, + Properties properties, + DeserializationSchema deserializationSchema, + String[] fieldNames, + Class[] fieldTypes) { + + super(topic, properties, deserializationSchema, fieldNames, fieldTypes); + } + + @Override + FlinkKafkaConsumerBase getKafkaConsumer(String topic, Properties properties, DeserializationSchema deserializationSchema) { + return new FlinkKafkaConsumer08<>(topic, deserializationSchema, properties); + } +} diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java index 530c032bd1e56..7a1c8d18be5fc 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java @@ -18,8 +18,9 @@ package org.apache.flink.streaming.connectors.kafka; import org.apache.curator.framework.CuratorFramework; - import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.runtime.client.JobCancellationException; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -31,6 +32,7 @@ import org.junit.Test; import java.util.Properties; +import java.util.UUID; import java.util.concurrent.atomic.AtomicReference; import static org.junit.Assert.assertEquals; @@ -360,4 +362,27 @@ public void run() { curatorFramework.close(); } + + @Test + public void testJsonTableSource() throws Exception { + String topic = UUID.randomUUID().toString(); + + // Names and types are determined in the actual test method of the + // base test class. + Kafka08JsonTableSource tableSource = new Kafka08JsonTableSource( + topic, + standardProps, + new String[] { + "long", + "string", + "boolean", + "double" }, + new TypeInformation[] { + BasicTypeInfo.LONG_TYPE_INFO, + BasicTypeInfo.STRING_TYPE_INFO, + BasicTypeInfo.BOOLEAN_TYPE_INFO, + BasicTypeInfo.DOUBLE_TYPE_INFO }); + + runJsonTableSource(topic, tableSource); + } } diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/pom.xml b/flink-streaming-connectors/flink-connector-kafka-0.9/pom.xml index e45a1d01a0c28..8feadb570f629 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.9/pom.xml +++ b/flink-streaming-connectors/flink-connector-kafka-0.9/pom.xml @@ -70,7 +70,6 @@ under the License. - org.apache.kafka kafka-clients ${kafka.version} @@ -99,6 +98,15 @@ under the License. provided + + org.apache.flink + flink-table_2.10 + ${project.version} + provided + + true + org.apache.flink diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSource.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSource.java new file mode 100644 index 0000000000000..975ef5800a07f --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSource.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.table.Row; +import org.apache.flink.api.table.sources.StreamTableSource; +import org.apache.flink.streaming.util.serialization.DeserializationSchema; + +import java.util.Properties; + +/** + * Kafka {@link StreamTableSource} for Kafka 0.9. + */ +public class Kafka09JsonTableSource extends KafkaJsonTableSource { + + /** + * Creates a Kafka 0.9 JSON {@link StreamTableSource}. + * + * @param topic Kafka topic to consume. + * @param properties Properties for the Kafka consumer. + * @param fieldNames Row field names. + * @param fieldTypes Row field types. + */ + public Kafka09JsonTableSource( + String topic, + Properties properties, + String[] fieldNames, + TypeInformation[] fieldTypes) { + + super(topic, properties, fieldNames, fieldTypes); + } + + /** + * Creates a Kafka 0.9 JSON {@link StreamTableSource}. + * + * @param topic Kafka topic to consume. + * @param properties Properties for the Kafka consumer. + * @param fieldNames Row field names. + * @param fieldTypes Row field types. + */ + public Kafka09JsonTableSource( + String topic, + Properties properties, + String[] fieldNames, + Class[] fieldTypes) { + + super(topic, properties, fieldNames, fieldTypes); + } + + @Override + FlinkKafkaConsumerBase getKafkaConsumer(String topic, Properties properties, DeserializationSchema deserializationSchema) { + return new FlinkKafkaConsumer09<>(topic, deserializationSchema, properties); + } +} diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSource.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSource.java new file mode 100644 index 0000000000000..03b504024645f --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSource.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.table.Row; +import org.apache.flink.api.table.sources.StreamTableSource; +import org.apache.flink.streaming.util.serialization.DeserializationSchema; + +import java.util.Properties; + +/** + * Kafka {@link StreamTableSource} for Kafka 0.9. + */ +public class Kafka09TableSource extends KafkaTableSource { + + /** + * Creates a Kafka 0.9 {@link StreamTableSource}. + * + * @param topic Kafka topic to consume. + * @param properties Properties for the Kafka consumer. + * @param deserializationSchema Deserialization schema to use for Kafka records. + * @param fieldNames Row field names. + * @param fieldTypes Row field types. + */ + public Kafka09TableSource( + String topic, + Properties properties, + DeserializationSchema deserializationSchema, + String[] fieldNames, + TypeInformation[] fieldTypes) { + + super(topic, properties, deserializationSchema, fieldNames, fieldTypes); + } + + /** + * Creates a Kafka 0.9 {@link StreamTableSource}. + * + * @param topic Kafka topic to consume. + * @param properties Properties for the Kafka consumer. + * @param deserializationSchema Deserialization schema to use for Kafka records. + * @param fieldNames Row field names. + * @param fieldTypes Row field types. + */ + public Kafka09TableSource( + String topic, + Properties properties, + DeserializationSchema deserializationSchema, + String[] fieldNames, + Class[] fieldTypes) { + + super(topic, properties, deserializationSchema, fieldNames, fieldTypes); + } + + @Override + FlinkKafkaConsumerBase getKafkaConsumer(String topic, Properties properties, DeserializationSchema deserializationSchema) { + return new FlinkKafkaConsumer09<>(topic, deserializationSchema, properties); + } +} diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java index afb00565262a2..001b4be50f0cc 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java @@ -17,8 +17,12 @@ package org.apache.flink.streaming.connectors.kafka; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.junit.Test; +import java.util.UUID; + public class Kafka09ITCase extends KafkaConsumerTestBase { @@ -115,4 +119,27 @@ public void testMetricsAndEndOfStream() throws Exception { runMetricsAndEndOfStreamTest(); } + @Test + public void testJsonTableSource() throws Exception { + String topic = UUID.randomUUID().toString(); + + // Names and types are determined in the actual test method of the + // base test class. + Kafka09JsonTableSource tableSource = new Kafka09JsonTableSource( + topic, + standardProps, + new String[] { + "long", + "string", + "boolean", + "double" }, + new TypeInformation[] { + BasicTypeInfo.LONG_TYPE_INFO, + BasicTypeInfo.STRING_TYPE_INFO, + BasicTypeInfo.BOOLEAN_TYPE_INFO, + BasicTypeInfo.DOUBLE_TYPE_INFO }); + + runJsonTableSource(topic, tableSource); + } + } diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java new file mode 100644 index 0000000000000..10fe3ac71e8a2 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.table.sources.StreamTableSource; +import org.apache.flink.streaming.util.serialization.DeserializationSchema; +import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema; + +import java.util.Properties; + +/** + * A version-agnostic Kafka JSON {@link StreamTableSource}. + * + *

The version-specific Kafka consumers need to extend this class and + * override {@link #getKafkaConsumer(String, Properties, DeserializationSchema)}}. + * + *

The field names are used to parse the JSON file and so are the types. + */ +public abstract class KafkaJsonTableSource extends KafkaTableSource { + + /** + * Creates a generic Kafka JSON {@link StreamTableSource}. + * + * @param topic Kafka topic to consume. + * @param properties Properties for the Kafka consumer. + * @param fieldNames Row field names. + * @param fieldTypes Row field types. + */ + KafkaJsonTableSource( + String topic, + Properties properties, + String[] fieldNames, + Class[] fieldTypes) { + + super(topic, properties, getDeserializationSchema(fieldNames, fieldTypes), fieldNames, fieldTypes); + } + + /** + * Creates a generic Kafka JSON {@link StreamTableSource}. + * + * @param topic Kafka topic to consume. + * @param properties Properties for the Kafka consumer. + * @param fieldNames Row field names. + * @param fieldTypes Row field types. + */ + KafkaJsonTableSource( + String topic, + Properties properties, + String[] fieldNames, + TypeInformation[] fieldTypes) { + + super(topic, properties, getDeserializationSchema(fieldNames, fieldTypes), fieldNames, fieldTypes); + } + + private static JsonRowDeserializationSchema getDeserializationSchema( + String[] fieldNames, + TypeInformation[] fieldTypes) { + + return new JsonRowDeserializationSchema(fieldNames, fieldTypes); + } + + private static JsonRowDeserializationSchema getDeserializationSchema( + String[] fieldNames, + Class[] fieldTypes) { + + return new JsonRowDeserializationSchema(fieldNames, fieldTypes); + } +} diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java new file mode 100644 index 0000000000000..a1907340c8d4a --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.api.table.Row; +import org.apache.flink.api.table.sources.StreamTableSource; +import org.apache.flink.api.table.typeutils.RowTypeInfo; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.util.serialization.DeserializationSchema; +import org.apache.flink.util.Preconditions; + +import java.util.Properties; + +/** + * A version-agnostic Kafka {@link StreamTableSource}. + * + *

The version-specific Kafka consumers need to extend this class and + * override {@link #getKafkaConsumer(String, Properties, DeserializationSchema)}}. + */ +abstract class KafkaTableSource implements StreamTableSource { + + /** The Kafka topic to consume. */ + private final String topic; + + /** Properties for the Kafka consumer. */ + private final Properties properties; + + /** Deserialization schema to use for Kafka records. */ + private final DeserializationSchema deserializationSchema; + + /** Row field names. */ + private final String[] fieldNames; + + /** Row field types. */ + private final TypeInformation[] fieldTypes; + + /** + * Creates a generic Kafka {@link StreamTableSource}. + * + * @param topic Kafka topic to consume. + * @param properties Properties for the Kafka consumer. + * @param deserializationSchema Deserialization schema to use for Kafka records. + * @param fieldNames Row field names. + * @param fieldTypes Row field types. + */ + KafkaTableSource( + String topic, + Properties properties, + DeserializationSchema deserializationSchema, + String[] fieldNames, + Class[] fieldTypes) { + + this(topic, properties, deserializationSchema, fieldNames, toTypeInfo(fieldTypes)); + } + + /** + * Creates a generic Kafka {@link StreamTableSource}. + * + * @param topic Kafka topic to consume. + * @param properties Properties for the Kafka consumer. + * @param deserializationSchema Deserialization schema to use for Kafka records. + * @param fieldNames Row field names. + * @param fieldTypes Row field types. + */ + KafkaTableSource( + String topic, + Properties properties, + DeserializationSchema deserializationSchema, + String[] fieldNames, + TypeInformation[] fieldTypes) { + + this.topic = Preconditions.checkNotNull(topic, "Topic"); + this.properties = Preconditions.checkNotNull(properties, "Properties"); + this.deserializationSchema = Preconditions.checkNotNull(deserializationSchema, "Deserialization schema"); + this.fieldNames = Preconditions.checkNotNull(fieldNames, "Field names"); + this.fieldTypes = Preconditions.checkNotNull(fieldTypes, "Field types"); + + Preconditions.checkArgument(fieldNames.length == fieldNames.length, + "Number of provided field names and types does not match."); + } + + @Override + public DataStream getDataStream(StreamExecutionEnvironment env) { + // Version-specific Kafka consumer + FlinkKafkaConsumerBase kafkaConsumer = getKafkaConsumer(topic, properties, deserializationSchema); + DataStream kafkaSource = env.addSource(kafkaConsumer); + return kafkaSource; + } + + @Override + public int getNumberOfFields() { + return fieldNames.length; + } + + @Override + public String[] getFieldsNames() { + return fieldNames; + } + + @Override + public TypeInformation[] getFieldTypes() { + return fieldTypes; + } + + @Override + public TypeInformation getReturnType() { + return new RowTypeInfo(fieldTypes, fieldNames); + } + + /** + * Returns the version-specific Kafka consumer. + * + * @param topic Kafka topic to consume. + * @param properties Properties for the Kafka consumer. + * @param deserializationSchema Deserialization schema to use for Kafka records. + * @return The version-specific Kafka consumer + */ + abstract FlinkKafkaConsumerBase getKafkaConsumer( + String topic, + Properties properties, + DeserializationSchema deserializationSchema); + + /** + * Creates TypeInformation array for an array of Classes. + * + * @param fieldTypes + * @return + */ + private static TypeInformation[] toTypeInfo(Class[] fieldTypes) { + TypeInformation[] typeInfos = new TypeInformation[fieldTypes.length]; + for (int i = 0; i < fieldTypes.length; i++) { + typeInfos[i] = TypeExtractor.getForClass(fieldTypes[i]); + } + return typeInfos; + } + +} diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java index 660f24c885990..a5b8e8234987b 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java @@ -18,6 +18,8 @@ package org.apache.flink.streaming.connectors.kafka; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; @@ -25,7 +27,6 @@ import kafka.javaapi.consumer.ConsumerConnector; import kafka.message.MessageAndMetadata; import kafka.server.KafkaServer; - import org.apache.commons.io.output.ByteArrayOutputStream; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobExecutionResult; @@ -38,9 +39,12 @@ import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.table.StreamTableEnvironment; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.typeutils.TypeInfoParser; +import org.apache.flink.api.table.Row; +import org.apache.flink.api.table.Table; import org.apache.flink.client.program.ProgramInvocationException; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.memory.DataInputView; @@ -74,23 +78,21 @@ import org.apache.flink.streaming.connectors.kafka.testutils.ValidatingExactlyOnceSink; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.util.serialization.DeserializationSchema; +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper; +import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema; import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper; import org.apache.flink.streaming.util.serialization.SimpleStringSchema; import org.apache.flink.streaming.util.serialization.TypeInformationKeyValueSerializationSchema; -import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; -import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema; import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema; import org.apache.flink.test.util.SuccessException; import org.apache.flink.testutils.junit.RetryOnException; import org.apache.flink.testutils.junit.RetryRule; import org.apache.flink.util.Collector; - import org.apache.flink.util.StringUtils; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.errors.TimeoutException; import org.junit.Assert; - import org.junit.Before; import org.junit.Rule; @@ -107,6 +109,7 @@ import java.util.Properties; import java.util.Random; import java.util.UUID; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicReference; import static org.apache.flink.test.util.TestUtils.tryExecute; @@ -736,6 +739,123 @@ public void flatMap(Tuple3 value, Collector o } } + /** + * Runs a table source test with JSON data. + * + * The table source needs to parse the following JSON fields: + * - "long" -> number + * - "string" -> "string" + * - "boolean" -> true|false + * - "double" -> fraction + */ + public void runJsonTableSource(String topic, KafkaTableSource kafkaTableSource) throws Exception { + final ObjectMapper mapper = new ObjectMapper(); + + final int numElements = 1024; + final long[] longs = new long[numElements]; + final String[] strings = new String[numElements]; + final boolean[] booleans = new boolean[numElements]; + final double[] doubles = new double[numElements]; + + final byte[][] serializedJson = new byte[numElements][]; + + ThreadLocalRandom random = ThreadLocalRandom.current(); + + for (int i = 0; i < numElements; i++) { + longs[i] = random.nextLong(); + strings[i] = Integer.toHexString(random.nextInt()); + booleans[i] = random.nextBoolean(); + doubles[i] = random.nextDouble(); + + ObjectNode entry = mapper.createObjectNode(); + entry.put("long", longs[i]); + entry.put("string", strings[i]); + entry.put("boolean", booleans[i]); + entry.put("double", doubles[i]); + + serializedJson[i] = mapper.writeValueAsBytes(entry); + } + + // Produce serialized JSON data + createTestTopic(topic, 1, 1); + + StreamExecutionEnvironment env = StreamExecutionEnvironment + .createRemoteEnvironment("localhost", flinkPort); + env.getConfig().disableSysoutLogging(); + + env.addSource(new SourceFunction() { + @Override + public void run(SourceContext ctx) throws Exception { + for (int i = 0; i < numElements; i++) { + ctx.collect(serializedJson[i]); + } + } + + @Override + public void cancel() { + } + }).addSink(kafkaServer.getProducer( + topic, + new ByteArraySerializationSchema(), + standardProps, + null)); + + // Execute blocks + env.execute(); + + // Register as table source + StreamTableEnvironment tableEnvironment = StreamTableEnvironment.getTableEnvironment(env); + tableEnvironment.registerTableSource("kafka", kafkaTableSource); + + Table result = tableEnvironment.ingest("kafka"); + + tableEnvironment.toDataStream(result, Row.class).addSink(new SinkFunction() { + + int i = 0; + + @Override + public void invoke(Row value) throws Exception { + if (i > numElements) { + throw new IllegalStateException("Received too many rows."); + } + + assertEquals(longs[i], value.productElement(0)); + assertEquals(strings[i], value.productElement(1)); + assertEquals(booleans[i], value.productElement(2)); + assertEquals(doubles[i], value.productElement(3)); + + if (i == numElements-1) { + throw new SuccessException(); + } else { + i++; + } + } + }); + + tryExecutePropagateExceptions(env, "KafkaTableSource"); + } + + /** + * Serialization scheme forwarding byte[] records. + */ + private static class ByteArraySerializationSchema implements KeyedSerializationSchema { + + @Override + public byte[] serializeKey(byte[] element) { + return null; + } + + @Override + public byte[] serializeValue(byte[] element) { + return element; + } + + @Override + public String getTargetTopic(byte[] element) { + return null; + } + } + private static class Tuple2WithTopicSchema implements KeyedDeserializationSchema>, KeyedSerializationSchema> { From c119e44d1ca393250f469bccb5cd24ad95851073 Mon Sep 17 00:00:00 2001 From: Ufuk Celebi Date: Fri, 10 Jun 2016 14:46:45 +0200 Subject: [PATCH 3/5] Address Fabians comments --- .../streaming/connectors/kafka/KafkaTableSource.java | 2 +- .../serialization/JsonRowDeserializationSchema.java | 10 +++++----- .../kafka/JsonRowDeserializationSchemaTest.java | 11 +++++------ .../connectors/kafka/KafkaConsumerTestBase.java | 4 ---- 4 files changed, 11 insertions(+), 16 deletions(-) diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java index a1907340c8d4a..9a578f6e42696 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java @@ -94,7 +94,7 @@ abstract class KafkaTableSource implements StreamTableSource { this.fieldNames = Preconditions.checkNotNull(fieldNames, "Field names"); this.fieldTypes = Preconditions.checkNotNull(fieldTypes, "Field types"); - Preconditions.checkArgument(fieldNames.length == fieldNames.length, + Preconditions.checkArgument(fieldNames.length == fieldTypes.length, "Number of provided field names and types does not match."); } diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowDeserializationSchema.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowDeserializationSchema.java index 82eb08bdf2a6c..658316925aed0 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowDeserializationSchema.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowDeserializationSchema.java @@ -89,12 +89,12 @@ public Row deserialize(byte[] message) throws IOException { JsonNode node = root.get(fieldNames[i]); if (node == null) { - throw new IllegalStateException("Field '" + fieldNames[i] + "' not found."); + row.setField(i, null); + } else { + // Read the value as specified type + Object value = objectMapper.treeToValue(node, fieldTypes[i].getTypeClass()); + row.setField(i, value); } - - // Read the value as specified type - Object value = objectMapper.treeToValue(node, fieldTypes[i].getTypeClass()); - row.setField(i, value); } return row; diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowDeserializationSchemaTest.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowDeserializationSchemaTest.java index 88b34027fac1d..ae40346b49e02 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowDeserializationSchemaTest.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowDeserializationSchemaTest.java @@ -29,6 +29,7 @@ import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -82,12 +83,10 @@ public void testMissingNode() throws Exception { new String[] { "name" }, new Class[] { String.class }); - try { - deserializationSchema.deserialize(serializedJson); - fail("Did not throw expected Exception"); - } catch (IOException ignored) { - assertTrue(ignored.getCause() instanceof IllegalStateException); - } + Row row = deserializationSchema.deserialize(serializedJson); + + assertEquals(1, row.productArity()); + assertNull("Missing field not null", row.productElement(0)); } /** diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java index a5b8e8234987b..873111a342e41 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java @@ -815,10 +815,6 @@ public void cancel() { @Override public void invoke(Row value) throws Exception { - if (i > numElements) { - throw new IllegalStateException("Received too many rows."); - } - assertEquals(longs[i], value.productElement(0)); assertEquals(strings[i], value.productElement(1)); assertEquals(booleans[i], value.productElement(2)); From f95ba6142fdfb0390697deaa4fac2cc8fe9b0df2 Mon Sep 17 00:00:00 2001 From: Ufuk Celebi Date: Tue, 21 Jun 2016 18:32:38 +0200 Subject: [PATCH 4/5] Make missing field failure behaviour configurable --- .../connectors/kafka/Kafka08ITCase.java | 43 ++++++++++++++++- .../connectors/kafka/Kafka09ITCase.java | 46 ++++++++++++++++++- .../kafka/KafkaJsonTableSource.java | 20 ++++++-- .../connectors/kafka/KafkaTableSource.java | 12 +++-- .../JsonRowDeserializationSchema.java | 21 ++++++++- .../JsonRowDeserializationSchemaTest.java | 9 ++++ .../kafka/KafkaConsumerTestBase.java | 2 + 7 files changed, 141 insertions(+), 12 deletions(-) diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java index 7a1c8d18be5fc..b393e5b5176aa 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java @@ -376,13 +376,52 @@ public void testJsonTableSource() throws Exception { "long", "string", "boolean", - "double" }, + "double", + "missing-field"}, new TypeInformation[] { BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.BOOLEAN_TYPE_INFO, - BasicTypeInfo.DOUBLE_TYPE_INFO }); + BasicTypeInfo.DOUBLE_TYPE_INFO, + BasicTypeInfo.LONG_TYPE_INFO }); + + // Don't fail on missing field, but set to null (default) + tableSource.setFailOnMissingField(false); runJsonTableSource(topic, tableSource); } + + @Test + public void testJsonTableSourceWithFailOnMissingField() throws Exception { + String topic = UUID.randomUUID().toString(); + + // Names and types are determined in the actual test method of the + // base test class. + Kafka08JsonTableSource tableSource = new Kafka08JsonTableSource( + topic, + standardProps, + new String[] { + "long", + "string", + "boolean", + "double", + "missing-field"}, + new TypeInformation[] { + BasicTypeInfo.LONG_TYPE_INFO, + BasicTypeInfo.STRING_TYPE_INFO, + BasicTypeInfo.BOOLEAN_TYPE_INFO, + BasicTypeInfo.DOUBLE_TYPE_INFO, + BasicTypeInfo.LONG_TYPE_INFO }); + + // Don't fail on missing field, but set to null (default) + tableSource.setFailOnMissingField(true); + + try { + runJsonTableSource(topic, tableSource); + fail("Did not throw expected Exception"); + } catch (Exception e) { + Throwable rootCause = e.getCause().getCause().getCause(); + assertTrue("Unexpected root cause", rootCause instanceof IllegalStateException); + } + } } diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java index 001b4be50f0cc..ef64171301945 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java @@ -23,6 +23,9 @@ import java.util.UUID; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + public class Kafka09ITCase extends KafkaConsumerTestBase { @@ -132,14 +135,53 @@ public void testJsonTableSource() throws Exception { "long", "string", "boolean", - "double" }, + "double", + "missing-field"}, new TypeInformation[] { BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.BOOLEAN_TYPE_INFO, - BasicTypeInfo.DOUBLE_TYPE_INFO }); + BasicTypeInfo.DOUBLE_TYPE_INFO, + BasicTypeInfo.LONG_TYPE_INFO }); + + // Don't fail on missing field, but set to null (default) + tableSource.setFailOnMissingField(false); runJsonTableSource(topic, tableSource); } + @Test + public void testJsonTableSourceWithFailOnMissingField() throws Exception { + String topic = UUID.randomUUID().toString(); + + // Names and types are determined in the actual test method of the + // base test class. + Kafka09JsonTableSource tableSource = new Kafka09JsonTableSource( + topic, + standardProps, + new String[] { + "long", + "string", + "boolean", + "double", + "missing-field"}, + new TypeInformation[] { + BasicTypeInfo.LONG_TYPE_INFO, + BasicTypeInfo.STRING_TYPE_INFO, + BasicTypeInfo.BOOLEAN_TYPE_INFO, + BasicTypeInfo.DOUBLE_TYPE_INFO, + BasicTypeInfo.LONG_TYPE_INFO }); + + // Don't fail on missing field, but set to null (default) + tableSource.setFailOnMissingField(true); + + try { + runJsonTableSource(topic, tableSource); + fail("Did not throw expected Exception"); + } catch (Exception e) { + Throwable rootCause = e.getCause().getCause().getCause(); + assertTrue("Unexpected root cause", rootCause instanceof IllegalStateException); + } + } + } diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java index 10fe3ac71e8a2..f145509bab2ff 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java @@ -49,7 +49,7 @@ public abstract class KafkaJsonTableSource extends KafkaTableSource { String[] fieldNames, Class[] fieldTypes) { - super(topic, properties, getDeserializationSchema(fieldNames, fieldTypes), fieldNames, fieldTypes); + super(topic, properties, createDeserializationSchema(fieldNames, fieldTypes), fieldNames, fieldTypes); } /** @@ -66,17 +66,29 @@ public abstract class KafkaJsonTableSource extends KafkaTableSource { String[] fieldNames, TypeInformation[] fieldTypes) { - super(topic, properties, getDeserializationSchema(fieldNames, fieldTypes), fieldNames, fieldTypes); + super(topic, properties, createDeserializationSchema(fieldNames, fieldTypes), fieldNames, fieldTypes); } - private static JsonRowDeserializationSchema getDeserializationSchema( + /** + * Configures the failure behaviour if a JSON field is missing. + * + *

By default, a missing field is ignored and the field is set to null. + * + * @param failOnMissingField Flag indicating whether to fail or not on a missing field. + */ + public void setFailOnMissingField(boolean failOnMissingField) { + JsonRowDeserializationSchema deserializationSchema = (JsonRowDeserializationSchema) getDeserializationSchema(); + deserializationSchema.setFailOnMissingField(failOnMissingField); + } + + private static JsonRowDeserializationSchema createDeserializationSchema( String[] fieldNames, TypeInformation[] fieldTypes) { return new JsonRowDeserializationSchema(fieldNames, fieldTypes); } - private static JsonRowDeserializationSchema getDeserializationSchema( + private static JsonRowDeserializationSchema createDeserializationSchema( String[] fieldNames, Class[] fieldTypes) { diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java index 9a578f6e42696..e43760b27c81b 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java @@ -140,10 +140,16 @@ abstract FlinkKafkaConsumerBase getKafkaConsumer( DeserializationSchema deserializationSchema); /** - * Creates TypeInformation array for an array of Classes. + * Returns the deserialization schema. * - * @param fieldTypes - * @return + * @return The deserialization schema + */ + protected DeserializationSchema getDeserializationSchema() { + return deserializationSchema; + } + + /** + * Creates TypeInformation array for an array of Classes. */ private static TypeInformation[] toTypeInfo(Class[] fieldTypes) { TypeInformation[] typeInfos = new TypeInformation[fieldTypes.length]; diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowDeserializationSchema.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowDeserializationSchema.java index 658316925aed0..c323b50bcb793 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowDeserializationSchema.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowDeserializationSchema.java @@ -47,6 +47,9 @@ public class JsonRowDeserializationSchema implements DeserializationSchema /** Object mapper for parsing the JSON. */ private final ObjectMapper objectMapper = new ObjectMapper(); + /** Flag indicating whether to fail on a missing field. */ + private volatile boolean failOnMissingField; + /** * Creates a JSON deserializtion schema for the given fields and type classes. * @@ -89,7 +92,12 @@ public Row deserialize(byte[] message) throws IOException { JsonNode node = root.get(fieldNames[i]); if (node == null) { - row.setField(i, null); + if (failOnMissingField) { + throw new IllegalStateException("Failed to find field with name '" + + fieldNames[i] + "'."); + } else { + row.setField(i, null); + } } else { // Read the value as specified type Object value = objectMapper.treeToValue(node, fieldTypes[i].getTypeClass()); @@ -113,4 +121,15 @@ public TypeInformation getProducedType() { return new RowTypeInfo(fieldTypes, fieldNames); } + /** + * Configures the failure behaviour if a JSON field is missing. + * + *

By default, a missing field is ignored and the field is set to null. + * + * @param failOnMissingField Flag indicating whether to fail or not on a missing field. + */ + public void setFailOnMissingField(boolean failOnMissingField) { + this.failOnMissingField = failOnMissingField; + } + } diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowDeserializationSchemaTest.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowDeserializationSchemaTest.java index ae40346b49e02..68225e2ae461f 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowDeserializationSchemaTest.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowDeserializationSchemaTest.java @@ -87,6 +87,15 @@ public void testMissingNode() throws Exception { assertEquals(1, row.productArity()); assertNull("Missing field not null", row.productElement(0)); + + deserializationSchema.setFailOnMissingField(true); + + try { + deserializationSchema.deserialize(serializedJson); + fail("Did not throw expected Exception"); + } catch (IOException e) { + assertTrue(e.getCause() instanceof IllegalStateException); + } } /** diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java index 873111a342e41..220f06133bdad 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java @@ -815,10 +815,12 @@ public void cancel() { @Override public void invoke(Row value) throws Exception { + assertEquals(5, value.productArity()); assertEquals(longs[i], value.productElement(0)); assertEquals(strings[i], value.productElement(1)); assertEquals(booleans[i], value.productElement(2)); assertEquals(doubles[i], value.productElement(3)); + assertNull(value.productElement(4)); if (i == numElements-1) { throw new SuccessException(); From c8766ddb72ee945ab55ea04fa25a7cd531933230 Mon Sep 17 00:00:00 2001 From: Ufuk Celebi Date: Tue, 21 Jun 2016 19:09:52 +0200 Subject: [PATCH 5/5] Add docs --- docs/apis/table.md | 43 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 43 insertions(+) diff --git a/docs/apis/table.md b/docs/apis/table.md index 35caa08499b4a..f0a65286e7863 100644 --- a/docs/apis/table.md +++ b/docs/apis/table.md @@ -207,6 +207,49 @@ A `TableSource` can provide access to data stored in various storage systems suc Currently, Flink only provides a `CsvTableSource` to read CSV files. A custom `TableSource` can be defined by implementing the `BatchTableSource` or `StreamTableSource` interface. +### Available Table Sources + +| **Class name** | **Maven dependency** | **Batch?** | **Streaming?** | **Description** +| `CsvTableSouce` | `flink-table` | Y | Y | A simple source for CSV files with up to 25 fields. +| `Kafka08JsonTableSource` | `flink-connector-kafka-0.8` | N | Y | A Kafka 0.8 source for JSON data. +| `Kafka09JsonTableSource` | `flink-connector-kafka-0.9` | N | Y | A Kafka 0.9 source for JSON data. + +All source that come with the `flink-table` dependency can be directly used by your Table programs. For all other table sources, you have to add the respective dependency in addition to the `flink-table` dependency. + +#### KafkaJsonTableSource + +To use the Kafka JSON source, you have to add the Kafka connector dependency to your project: + + - `flink-connector-kafka-0.8` for Kafka 0.8, and + - `flink-connector-kafka-0.9` for Kafka 0.9, respectively. + +You can then create the source as follows (example for Kafka 0.8): + +```java +// The JSON field names and types +String[] fieldNames = new String[] { "id", "name", "score"}; +Class[] fieldTypes = new Class[] { Integer.class, String.class, Double.class }; + +KafkaJsonTableSource kafkaTableSource = new Kafka08JsonTableSource( + kafkaTopic, + kafkaProperties, + fieldNames, + fieldTypes); +``` + +By default, a missing JSON field does not fail the source. You can configure this via: + +```java +// Fail on missing JSON field +tableSource.setFailOnMissingField(true); +``` + +You can work with the Table as explained in the rest of the Table API guide: + +```java +tableEnvironment.registerTableSource("kafka-source", kafkaTableSource); +Table result = tableEnvironment.ingest("kafka-source"); +``` Table API ----------