specificStartupOffsets) {
+
+ return new Kafka09TableSource(
+ schema,
+ proctimeAttribute,
+ rowtimeAttributeDescriptors,
+ fieldMapping,
+ topic,
+ properties,
+ deserializationSchema,
+ startupMode,
+ specificStartupOffsets
+ );
+ }
+}
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java
index 8c8ce324ba997..3e9f2b03e344f 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java
@@ -21,16 +21,14 @@
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.formats.avro.AvroRowDeserializationSchema;
-import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.sources.DefinedFieldMapping;
+import org.apache.flink.table.descriptors.ConnectorDescriptor;
import org.apache.flink.table.sources.StreamTableSource;
import org.apache.avro.specific.SpecificRecord;
import org.apache.avro.specific.SpecificRecordBase;
import java.util.Map;
-import java.util.Objects;
import java.util.Properties;
/**
@@ -38,13 +36,15 @@
*
* The version-specific Kafka consumers need to extend this class and
* override {@link #createKafkaConsumer(String, Properties, DeserializationSchema)}}.
+ *
+ * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
+ * with descriptors for schema and format instead. Descriptors allow for
+ * implementation-agnostic definition of tables. See also
+ * {@link org.apache.flink.table.api.TableEnvironment#from(ConnectorDescriptor)}.
*/
+@Deprecated
@Internal
-public abstract class KafkaAvroTableSource extends KafkaTableSource implements DefinedFieldMapping {
-
- private final Class extends SpecificRecordBase> avroRecordClass;
-
- private Map fieldMapping;
+public abstract class KafkaAvroTableSource extends KafkaTableSource {
/**
* Creates a generic Kafka Avro {@link StreamTableSource} using a given {@link SpecificRecord}.
@@ -53,7 +53,12 @@ public abstract class KafkaAvroTableSource extends KafkaTableSource implements D
* @param properties Properties for the Kafka consumer.
* @param schema Schema of the produced table.
* @param avroRecordClass Class of the Avro record that is read from the Kafka topic.
+ * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
+ * with descriptors for schema and format instead. Descriptors allow for
+ * implementation-agnostic definition of tables. See also
+ * {@link org.apache.flink.table.api.TableEnvironment#from(ConnectorDescriptor)}.
*/
+ @Deprecated
protected KafkaAvroTableSource(
String topic,
Properties properties,
@@ -61,59 +66,15 @@ protected KafkaAvroTableSource(
Class extends SpecificRecordBase> avroRecordClass) {
super(
+ schema,
topic,
properties,
- schema,
- AvroSchemaConverter.convertToTypeInfo(avroRecordClass));
-
- this.avroRecordClass = avroRecordClass;
- }
-
- @Override
- public Map getFieldMapping() {
- return fieldMapping;
+ new AvroRowDeserializationSchema(avroRecordClass));
}
@Override
public String explainSource() {
- return "KafkaAvroTableSource(" + this.avroRecordClass.getSimpleName() + ")";
- }
-
- @Override
- protected AvroRowDeserializationSchema getDeserializationSchema() {
- return new AvroRowDeserializationSchema(avroRecordClass);
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (!(o instanceof KafkaAvroTableSource)) {
- return false;
- }
- if (!super.equals(o)) {
- return false;
- }
- final KafkaAvroTableSource that = (KafkaAvroTableSource) o;
- return Objects.equals(avroRecordClass, that.avroRecordClass) &&
- Objects.equals(fieldMapping, that.fieldMapping);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(super.hashCode(), avroRecordClass, fieldMapping);
- }
-
- //////// SETTERS FOR OPTIONAL PARAMETERS
-
- /**
- * Configures a field mapping for this TableSource.
- *
- * @param fieldMapping The field mapping.
- */
- protected void setFieldMapping(Map fieldMapping) {
- this.fieldMapping = fieldMapping;
+ return "KafkaAvroTableSource";
}
//////// HELPER METHODS
@@ -124,7 +85,13 @@ protected void setFieldMapping(Map fieldMapping) {
*
* @param Type of the KafkaAvroTableSource produced by the builder.
* @param Type of the KafkaAvroTableSource.Builder subclass.
+ *
+ * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
+ * with descriptors for schema and format instead. Descriptors allow for
+ * implementation-agnostic definition of tables. See also
+ * {@link org.apache.flink.table.api.TableEnvironment#from(ConnectorDescriptor)}.
*/
+ @Deprecated
protected abstract static class Builder
extends KafkaTableSource.Builder {
@@ -137,7 +104,9 @@ protected abstract static class Builder avroClass) {
this.avroClass = avroClass;
return builder();
@@ -153,7 +122,9 @@ public B forAvroRecordClass(Class extends SpecificRecordBase> avroClass) {
*
* @param schemaToAvroMapping A mapping from schema fields to Avro fields.
* @return The builder.
+ * @deprecated Use table descriptors instead of implementation-specific builders.
*/
+ @Deprecated
public B withTableToAvroMapping(Map schemaToAvroMapping) {
this.fieldMapping = schemaToAvroMapping;
return builder();
@@ -163,7 +134,9 @@ public B withTableToAvroMapping(Map schemaToAvroMapping) {
* Returns the configured Avro class.
*
* @return The configured Avro class.
+ * @deprecated Use table descriptors instead of implementation-specific builders.
*/
+ @Deprecated
protected Class extends SpecificRecordBase> getAvroRecordClass() {
return this.avroClass;
}
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSourceFactory.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSourceFactory.java
deleted file mode 100644
index 8ef7270ccae5a..0000000000000
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSourceFactory.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
-import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.descriptors.AvroValidator;
-import org.apache.flink.table.descriptors.DescriptorProperties;
-import org.apache.flink.table.descriptors.FormatDescriptorValidator;
-import org.apache.flink.table.descriptors.SchemaValidator;
-
-import org.apache.avro.specific.SpecificRecordBase;
-
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-
-/**
- * Factory for creating configured instances of {@link KafkaAvroTableSource}.
- */
-public abstract class KafkaAvroTableSourceFactory extends KafkaTableSourceFactory {
-
- @Override
- protected String formatType() {
- return AvroValidator.FORMAT_TYPE_VALUE;
- }
-
- @Override
- protected int formatPropertyVersion() {
- return 1;
- }
-
- @Override
- protected List formatProperties() {
- return Collections.singletonList(AvroValidator.FORMAT_RECORD_CLASS);
- }
-
- @Override
- protected FormatDescriptorValidator formatValidator() {
- return new AvroValidator();
- }
-
- @Override
- protected KafkaTableSource.Builder createBuilderWithFormat(DescriptorProperties params) {
- KafkaAvroTableSource.Builder builder = createKafkaAvroBuilder();
-
- // Avro format schema
- final Class extends SpecificRecordBase> avroRecordClass =
- params.getClass(AvroValidator.FORMAT_RECORD_CLASS, SpecificRecordBase.class);
- builder.forAvroRecordClass(avroRecordClass);
- final TableSchema formatSchema = TableSchema.fromTypeInfo(AvroSchemaConverter.convertToTypeInfo(avroRecordClass));
-
- // field mapping
- final Map mapping = SchemaValidator.deriveFieldMapping(params, Optional.of(formatSchema));
- builder.withTableToAvroMapping(mapping);
-
- return builder;
- }
-
- /**
- * Creates a version specific {@link KafkaAvroTableSource.Builder}.
- */
- protected abstract KafkaAvroTableSource.Builder createKafkaAvroBuilder();
-}
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java
index b2bb8ff773bf4..bd0d0dedf22e7 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java
@@ -20,14 +20,12 @@
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.formats.json.JsonRowDeserializationSchema;
import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.sources.DefinedFieldMapping;
+import org.apache.flink.table.descriptors.ConnectorDescriptor;
import org.apache.flink.table.sources.StreamTableSource;
import java.util.Map;
-import java.util.Objects;
import java.util.Properties;
/**
@@ -37,15 +35,15 @@
* override {@link #createKafkaConsumer(String, Properties, DeserializationSchema)}}.
*
* The field names are used to parse the JSON file and so are the types.
+ *
+ * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
+ * with descriptors for schema and format instead. Descriptors allow for
+ * implementation-agnostic definition of tables. See also
+ * {@link org.apache.flink.table.api.TableEnvironment#from(ConnectorDescriptor)}.
*/
+@Deprecated
@Internal
-public abstract class KafkaJsonTableSource extends KafkaTableSource implements DefinedFieldMapping {
-
- private TableSchema jsonSchema;
-
- private Map fieldMapping;
-
- private boolean failOnMissingField;
+public abstract class KafkaJsonTableSource extends KafkaTableSource {
/**
* Creates a generic Kafka JSON {@link StreamTableSource}.
@@ -54,7 +52,9 @@ public abstract class KafkaJsonTableSource extends KafkaTableSource implements D
* @param properties Properties for the Kafka consumer.
* @param tableSchema The schema of the table.
* @param jsonSchema The schema of the JSON messages to decode from Kafka.
+ * @deprecated Use table descriptors instead of implementation-specific builders.
*/
+ @Deprecated
protected KafkaJsonTableSource(
String topic,
Properties properties,
@@ -62,24 +62,10 @@ protected KafkaJsonTableSource(
TableSchema jsonSchema) {
super(
+ tableSchema,
topic,
properties,
- tableSchema,
- jsonSchemaToReturnType(jsonSchema));
-
- this.jsonSchema = jsonSchema;
- }
-
- @Override
- public Map getFieldMapping() {
- return fieldMapping;
- }
-
- @Override
- protected JsonRowDeserializationSchema getDeserializationSchema() {
- JsonRowDeserializationSchema deserSchema = new JsonRowDeserializationSchema(jsonSchemaToReturnType(jsonSchema));
- deserSchema.setFailOnMissingField(failOnMissingField);
- return deserSchema;
+ new JsonRowDeserializationSchema(jsonSchema.toRowType()));
}
@Override
@@ -87,28 +73,6 @@ public String explainSource() {
return "KafkaJsonTableSource";
}
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (!(o instanceof KafkaJsonTableSource)) {
- return false;
- }
- if (!super.equals(o)) {
- return false;
- }
- KafkaJsonTableSource that = (KafkaJsonTableSource) o;
- return failOnMissingField == that.failOnMissingField &&
- Objects.equals(jsonSchema, that.jsonSchema) &&
- Objects.equals(fieldMapping, that.fieldMapping);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(super.hashCode(), jsonSchema, fieldMapping, failOnMissingField);
- }
-
//////// SETTERS FOR OPTIONAL PARAMETERS
/**
@@ -116,34 +80,27 @@ public int hashCode() {
* TableSource will fail for missing fields if set to true. If set to false, the missing field is set to null.
*
* @param failOnMissingField Flag that specifies the TableSource behavior in case of missing fields.
+ * @deprecated Use table descriptors instead of implementation-specific builders.
*/
+ @Deprecated
protected void setFailOnMissingField(boolean failOnMissingField) {
- this.failOnMissingField = failOnMissingField;
- }
-
- /**
- * Sets the mapping from table schema fields to JSON schema fields.
- *
- * @param fieldMapping The mapping from table schema fields to JSON schema fields.
- */
- protected void setFieldMapping(Map fieldMapping) {
- this.fieldMapping = fieldMapping;
+ ((JsonRowDeserializationSchema) getDeserializationSchema()).setFailOnMissingField(failOnMissingField);
}
//////// HELPER METHODS
- /** Converts the JSON schema into into the return type. */
- private static RowTypeInfo jsonSchemaToReturnType(TableSchema jsonSchema) {
- return new RowTypeInfo(jsonSchema.getTypes(), jsonSchema.getColumnNames());
- }
-
/**
* Abstract builder for a {@link KafkaJsonTableSource} to be extended by builders of subclasses of
* KafkaJsonTableSource.
*
* @param Type of the KafkaJsonTableSource produced by the builder.
* @param Type of the KafkaJsonTableSource.Builder subclass.
+ * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
+ * with descriptors for schema and format instead. Descriptors allow for
+ * implementation-agnostic definition of tables. See also
+ * {@link org.apache.flink.table.api.TableEnvironment#from(ConnectorDescriptor)}.
*/
+ @Deprecated
protected abstract static class Builder
extends KafkaTableSource.Builder {
@@ -159,7 +116,9 @@ protected abstract static class Builder tableToJsonMapping) {
this.fieldMapping = tableToJsonMapping;
return builder();
@@ -188,7 +149,9 @@ public B withTableToJsonMapping(Map tableToJsonMapping) {
* field.
* If set to false, a missing field is set to null.
* @return The builder.
+ * @deprecated Use table descriptors instead of implementation-specific builders.
*/
+ @Deprecated
public B failOnMissingField(boolean failOnMissingField) {
this.failOnMissingField = failOnMissingField;
return builder();
@@ -199,7 +162,9 @@ public B failOnMissingField(boolean failOnMissingField) {
* is returned.
*
* @return The JSON schema for the TableSource.
+ * @deprecated Use table descriptors instead of implementation-specific builders.
*/
+ @Deprecated
protected TableSchema getJsonSchema() {
if (jsonSchema != null) {
return this.jsonSchema;
@@ -208,7 +173,13 @@ protected TableSchema getJsonSchema() {
}
}
- @Override
+ /**
+ * Configures a TableSource with optional parameters.
+ *
+ * @param source The TableSource to configure.
+ * @deprecated Use table descriptors instead of implementation-specific builders.
+ */
+ @Deprecated
protected void configureTableSource(T source) {
super.configureTableSource(source);
// configure field mapping
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceFactory.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceFactory.java
deleted file mode 100644
index 98985694ab9a2..0000000000000
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceFactory.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.formats.json.JsonSchemaConverter;
-import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.descriptors.DescriptorProperties;
-import org.apache.flink.table.descriptors.FormatDescriptorValidator;
-import org.apache.flink.table.descriptors.JsonValidator;
-import org.apache.flink.table.descriptors.SchemaValidator;
-
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-
-/**
- * Factory for creating configured instances of {@link KafkaJsonTableSource}.
- */
-public abstract class KafkaJsonTableSourceFactory extends KafkaTableSourceFactory {
-
- @Override
- protected String formatType() {
- return JsonValidator.FORMAT_TYPE_VALUE;
- }
-
- @Override
- protected int formatPropertyVersion() {
- return 1;
- }
-
- @Override
- protected List formatProperties() {
- return Arrays.asList(
- JsonValidator.FORMAT_JSON_SCHEMA,
- JsonValidator.FORMAT_SCHEMA,
- JsonValidator.FORMAT_FAIL_ON_MISSING_FIELD,
- FormatDescriptorValidator.FORMAT_DERIVE_SCHEMA());
- }
-
- @Override
- protected FormatDescriptorValidator formatValidator() {
- return new JsonValidator();
- }
-
- @Override
- protected KafkaTableSource.Builder createBuilderWithFormat(DescriptorProperties params) {
- final KafkaJsonTableSource.Builder builder = createKafkaJsonBuilder();
-
- // missing field
- params.getOptionalBoolean(JsonValidator.FORMAT_FAIL_ON_MISSING_FIELD)
- .ifPresent(builder::failOnMissingField);
-
- // json schema
- final TableSchema formatSchema;
- if (params.containsKey(JsonValidator.FORMAT_SCHEMA)) {
- final TypeInformation> info = params.getType(JsonValidator.FORMAT_SCHEMA);
- formatSchema = TableSchema.fromTypeInfo(info);
- } else if (params.containsKey(JsonValidator.FORMAT_JSON_SCHEMA)) {
- final TypeInformation> info = JsonSchemaConverter.convert(params.getString(JsonValidator.FORMAT_JSON_SCHEMA));
- formatSchema = TableSchema.fromTypeInfo(info);
- } else {
- formatSchema = SchemaValidator.deriveFormatFields(params);
- }
- builder.forJsonSchema(formatSchema);
-
- // field mapping
- final Map mapping = SchemaValidator.deriveFieldMapping(params, Optional.of(formatSchema));
- builder.withTableToJsonMapping(mapping);
-
- return builder;
- }
-
- /**
- * Creates a version specific {@link KafkaJsonTableSource.Builder}.
- */
- protected abstract KafkaJsonTableSource.Builder createKafkaJsonBuilder();
-
-}
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
index 29654b061c790..873b521d8a5c3 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
@@ -28,6 +28,8 @@
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.Types;
import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.descriptors.ConnectorDescriptor;
+import org.apache.flink.table.sources.DefinedFieldMapping;
import org.apache.flink.table.sources.DefinedProctimeAttribute;
import org.apache.flink.table.sources.DefinedRowtimeAttributes;
import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
@@ -55,28 +57,36 @@
*/
@Internal
public abstract class KafkaTableSource
- implements StreamTableSource, DefinedProctimeAttribute, DefinedRowtimeAttributes {
+ implements StreamTableSource, DefinedProctimeAttribute, DefinedRowtimeAttributes, DefinedFieldMapping {
+
+ // common table source attributes
+ // TODO make all attributes final once we drop support for format-specific table sources
/** The schema of the table. */
private final TableSchema schema;
+ /** Field name of the processing time attribute, null if no processing time field is defined. */
+ private String proctimeAttribute;
+
+ /** Descriptor for a rowtime attribute. */
+ private List rowtimeAttributeDescriptors;
+
+ /** Mapping for the fields of the table schema to fields of the physical returned type or null. */
+ private Map fieldMapping;
+
+ // Kafka-specific attributes
+
/** The Kafka topic to consume. */
private final String topic;
/** Properties for the Kafka consumer. */
private final Properties properties;
- /** Type information describing the result type. */
- private TypeInformation returnType;
-
- /** Field name of the processing time attribute, null if no processing time field is defined. */
- private String proctimeAttribute;
-
- /** Descriptor for a rowtime attribute. */
- private List rowtimeAttributeDescriptors;
+ /** Deserialization schema for decoding records from Kafka. */
+ private final DeserializationSchema deserializationSchema;
/** The startup mode for the contained consumer (default is {@link StartupMode#GROUP_OFFSETS}). */
- private StartupMode startupMode = StartupMode.GROUP_OFFSETS;
+ private StartupMode startupMode;
/** Specific startup offsets; only relevant when startup mode is {@link StartupMode#SPECIFIC_OFFSETS}. */
private Map specificStartupOffsets;
@@ -84,21 +94,64 @@ public abstract class KafkaTableSource
/**
* Creates a generic Kafka {@link StreamTableSource}.
*
- * @param topic Kafka topic to consume.
- * @param properties Properties for the Kafka consumer.
- * @param schema Schema of the produced table.
- * @param returnType Type information of the produced physical DataStream.
+ * @param schema Schema of the produced table.
+ * @param proctimeAttribute Field name of the processing time attribute, null if no
+ * processing time field is defined.
+ * @param rowtimeAttributeDescriptors Descriptor for a rowtime attribute
+ * @param fieldMapping Mapping for the fields of the table schema to
+ * fields of the physical returned type or null.
+ * @param topic Kafka topic to consume.
+ * @param properties Properties for the Kafka consumer.
+ * @param deserializationSchema Deserialization schema for decoding records from Kafka.
+ * @param startupMode Startup mode for the contained consumer.
+ * @param specificStartupOffsets Specific startup offsets; only relevant when startup
+ * mode is {@link StartupMode#SPECIFIC_OFFSETS}.
*/
protected KafkaTableSource(
+ TableSchema schema,
+ String proctimeAttribute,
+ List rowtimeAttributeDescriptors,
+ Map fieldMapping,
String topic,
Properties properties,
- TableSchema schema,
- TypeInformation returnType) {
-
+ DeserializationSchema deserializationSchema,
+ StartupMode startupMode,
+ Map specificStartupOffsets) {
+ this.schema = Preconditions.checkNotNull(schema, "Schema must not be null.");
+ this.proctimeAttribute = validateProctimeAttribute(proctimeAttribute);
+ this.rowtimeAttributeDescriptors = validateRowtimeAttributeDescriptors(rowtimeAttributeDescriptors);
+ this.fieldMapping = fieldMapping;
this.topic = Preconditions.checkNotNull(topic, "Topic must not be null.");
this.properties = Preconditions.checkNotNull(properties, "Properties must not be null.");
- this.schema = Preconditions.checkNotNull(schema, "Schema must not be null.");
- this.returnType = Preconditions.checkNotNull(returnType, "Type information must not be null.");
+ this.deserializationSchema = Preconditions.checkNotNull(
+ deserializationSchema, "Deserialization schema must not be null.");
+ this.startupMode = Preconditions.checkNotNull(startupMode, "Startup mode must not be null.");
+ this.specificStartupOffsets = Preconditions.checkNotNull(
+ specificStartupOffsets, "Specific offsets must not be null.");
+ }
+
+ /**
+ * Creates a generic Kafka {@link StreamTableSource}.
+ *
+ * @param schema Schema of the produced table.
+ * @param topic Kafka topic to consume.
+ * @param properties Properties for the Kafka consumer.
+ * @param deserializationSchema Deserialization schema for decoding records from Kafka.
+ */
+ protected KafkaTableSource(
+ TableSchema schema,
+ String topic,
+ Properties properties,
+ DeserializationSchema deserializationSchema) {
+ this(
+ schema,
+ null,
+ Collections.emptyList(),
+ null,
+ topic, properties,
+ deserializationSchema,
+ StartupMode.GROUP_OFFSETS,
+ Collections.emptyMap());
}
/**
@@ -116,7 +169,7 @@ public DataStream getDataStream(StreamExecutionEnvironment env) {
@Override
public TypeInformation getReturnType() {
- return returnType;
+ return deserializationSchema.getProducedType();
}
@Override
@@ -134,34 +187,60 @@ public List getRowtimeAttributeDescriptors() {
return rowtimeAttributeDescriptors;
}
+ @Override
+ public Map getFieldMapping() {
+ return fieldMapping;
+ }
+
@Override
public String explainSource() {
return TableConnectorUtil.generateRuntimeName(this.getClass(), schema.getColumnNames());
}
+ /**
+ * Returns the properties for the Kafka consumer.
+ *
+ * @return properties for the Kafka consumer.
+ */
+ public Properties getProperties() {
+ return properties;
+ }
+
+ /**
+ * Returns the deserialization schema.
+ *
+ * @return The deserialization schema
+ */
+ public DeserializationSchema getDeserializationSchema(){
+ return deserializationSchema;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
- if (!(o instanceof KafkaTableSource)) {
+ // TODO force classes to be equal once we drop support for format-specific table sources
+ // if (o == null || getClass() != o.getClass()) {
+ if (o == null || !(o instanceof KafkaTableSource)) {
return false;
}
- KafkaTableSource that = (KafkaTableSource) o;
+ final KafkaTableSource that = (KafkaTableSource) o;
return Objects.equals(schema, that.schema) &&
- Objects.equals(topic, that.topic) &&
- Objects.equals(properties, that.properties) &&
- Objects.equals(returnType, that.returnType) &&
Objects.equals(proctimeAttribute, that.proctimeAttribute) &&
Objects.equals(rowtimeAttributeDescriptors, that.rowtimeAttributeDescriptors) &&
+ Objects.equals(fieldMapping, that.fieldMapping) &&
+ Objects.equals(topic, that.topic) &&
+ Objects.equals(properties, that.properties) &&
+ Objects.equals(deserializationSchema, that.deserializationSchema) &&
startupMode == that.startupMode &&
Objects.equals(specificStartupOffsets, that.specificStartupOffsets);
}
@Override
public int hashCode() {
- return Objects.hash(schema, topic, properties, returnType,
- proctimeAttribute, rowtimeAttributeDescriptors, startupMode, specificStartupOffsets);
+ return Objects.hash(schema, proctimeAttribute, rowtimeAttributeDescriptors, fieldMapping,
+ topic, properties, deserializationSchema, startupMode, specificStartupOffsets);
}
/**
@@ -195,14 +274,14 @@ protected FlinkKafkaConsumerBase getKafkaConsumer(
return kafkaConsumer;
}
- //////// SETTERS FOR OPTIONAL PARAMETERS
+ //////// VALIDATION FOR PARAMETERS
/**
- * Declares a field of the schema to be the processing time attribute.
+ * Validates a field of the schema to be the processing time attribute.
*
* @param proctimeAttribute The name of the field that becomes the processing time field.
*/
- protected void setProctimeAttribute(String proctimeAttribute) {
+ private String validateProctimeAttribute(String proctimeAttribute) {
if (proctimeAttribute != null) {
// validate that field exists and is of correct type
Option> tpe = schema.getType(proctimeAttribute);
@@ -212,15 +291,16 @@ protected void setProctimeAttribute(String proctimeAttribute) {
throw new ValidationException("Processing time attribute '" + proctimeAttribute + "' is not of type SQL_TIMESTAMP.");
}
}
- this.proctimeAttribute = proctimeAttribute;
+ return proctimeAttribute;
}
/**
- * Declares a list of fields to be rowtime attributes.
+ * Validates a list of fields to be rowtime attributes.
*
* @param rowtimeAttributeDescriptors The descriptors of the rowtime attributes.
*/
- protected void setRowtimeAttributeDescriptors(List rowtimeAttributeDescriptors) {
+ private List validateRowtimeAttributeDescriptors(List rowtimeAttributeDescriptors) {
+ Preconditions.checkNotNull(rowtimeAttributeDescriptors, "List of rowtime attributes must not be null.");
// validate that all declared fields exist and are of correct type
for (RowtimeAttributeDescriptor desc : rowtimeAttributeDescriptors) {
String rowtimeAttribute = desc.getAttributeName();
@@ -231,25 +311,64 @@ protected void setRowtimeAttributeDescriptors(List r
throw new ValidationException("Rowtime attribute '" + rowtimeAttribute + "' is not of type SQL_TIMESTAMP.");
}
}
- this.rowtimeAttributeDescriptors = rowtimeAttributeDescriptors;
+ return rowtimeAttributeDescriptors;
+ }
+
+ //////// SETTERS FOR OPTIONAL PARAMETERS
+
+ /**
+ * Declares a field of the schema to be the processing time attribute.
+ *
+ * @param proctimeAttribute The name of the field that becomes the processing time field.
+ * @deprecated Use table descriptors instead of implementation-specific builders.
+ */
+ @Deprecated
+ protected void setProctimeAttribute(String proctimeAttribute) {
+ this.proctimeAttribute = validateProctimeAttribute(proctimeAttribute);
+ }
+
+ /**
+ * Declares a list of fields to be rowtime attributes.
+ *
+ * @param rowtimeAttributeDescriptors The descriptors of the rowtime attributes.
+ * @deprecated Use table descriptors instead of implementation-specific builders.
+ */
+ @Deprecated
+ protected void setRowtimeAttributeDescriptors(List rowtimeAttributeDescriptors) {
+ this.rowtimeAttributeDescriptors = validateRowtimeAttributeDescriptors(rowtimeAttributeDescriptors);
}
/**
* Sets the startup mode of the TableSource.
*
* @param startupMode The startup mode.
+ * @deprecated Use table descriptors instead of implementation-specific builders.
*/
+ @Deprecated
protected void setStartupMode(StartupMode startupMode) {
- this.startupMode = startupMode;
+ this.startupMode = Preconditions.checkNotNull(startupMode);
}
/**
* Sets the startup offsets of the TableSource; only relevant when the startup mode is {@link StartupMode#SPECIFIC_OFFSETS}.
*
* @param specificStartupOffsets The startup offsets for different partitions.
+ * @deprecated Use table descriptors instead of implementation-specific builders.
*/
+ @Deprecated
protected void setSpecificStartupOffsets(Map specificStartupOffsets) {
- this.specificStartupOffsets = specificStartupOffsets;
+ this.specificStartupOffsets = Preconditions.checkNotNull(specificStartupOffsets);
+ }
+
+ /**
+ * Mapping for the fields of the table schema to fields of the physical returned type or null.
+ *
+ * @param fieldMapping The mapping from table schema fields to format schema fields.
+ * @deprecated Use table descriptors instead of implementation-specific builders.
+ */
+ @Deprecated
+ protected void setFieldMapping(Map fieldMapping) {
+ this.fieldMapping = fieldMapping;
}
//////// ABSTRACT METHODS FOR SUBCLASSES
@@ -267,20 +386,18 @@ protected abstract FlinkKafkaConsumerBase createKafkaConsumer(
Properties properties,
DeserializationSchema deserializationSchema);
- /**
- * Returns the deserialization schema.
- *
- * @return The deserialization schema
- */
- protected abstract DeserializationSchema getDeserializationSchema();
-
/**
* Abstract builder for a {@link KafkaTableSource} to be extended by builders of subclasses of
* KafkaTableSource.
*
* @param Type of the KafkaTableSource produced by the builder.
* @param Type of the KafkaTableSource.Builder subclass.
+ * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
+ * with descriptors for schema and format instead. Descriptors allow for
+ * implementation-agnostic definition of tables. See also
+ * {@link org.apache.flink.table.api.TableEnvironment#from(ConnectorDescriptor)}.
*/
+ @Deprecated
protected abstract static class Builder {
private String topic;
@@ -299,13 +416,14 @@ protected abstract static class Builder specificStartupOffsets = null;
-
/**
* Sets the topic from which the table is read.
*
* @param topic The topic from which the table is read.
* @return The builder.
+ * @deprecated Use table descriptors instead of implementation-specific builders.
*/
+ @Deprecated
public B forTopic(String topic) {
Preconditions.checkNotNull(topic, "Topic must not be null.");
Preconditions.checkArgument(this.topic == null, "Topic has already been set.");
@@ -318,7 +436,9 @@ public B forTopic(String topic) {
*
* @param props The configuration properties for the Kafka consumer.
* @return The builder.
+ * @deprecated Use table descriptors instead of implementation-specific builders.
*/
+ @Deprecated
public B withKafkaProperties(Properties props) {
Preconditions.checkNotNull(props, "Properties must not be null.");
Preconditions.checkArgument(this.kafkaProps == null, "Properties have already been set.");
@@ -331,7 +451,9 @@ public B withKafkaProperties(Properties props) {
*
* @param schema The schema of the produced table.
* @return The builder.
+ * @deprecated Use table descriptors instead of implementation-specific builders.
*/
+ @Deprecated
public B withSchema(TableSchema schema) {
Preconditions.checkNotNull(schema, "Schema must not be null.");
Preconditions.checkArgument(this.schema == null, "Schema has already been set.");
@@ -345,7 +467,9 @@ public B withSchema(TableSchema schema) {
*
* @param proctimeAttribute The name of the processing time attribute in the table schema.
* @return The builder.
+ * @deprecated Use table descriptors instead of implementation-specific builders.
*/
+ @Deprecated
public B withProctimeAttribute(String proctimeAttribute) {
Preconditions.checkNotNull(proctimeAttribute, "Proctime attribute must not be null.");
Preconditions.checkArgument(!proctimeAttribute.isEmpty(), "Proctime attribute must not be empty.");
@@ -362,7 +486,9 @@ public B withProctimeAttribute(String proctimeAttribute) {
* @param timestampExtractor The {@link TimestampExtractor} to extract the rowtime attribute from the physical type.
* @param watermarkStrategy The {@link WatermarkStrategy} to generate watermarks for the rowtime attribute.
* @return The builder.
+ * @deprecated Use table descriptors instead of implementation-specific builders.
*/
+ @Deprecated
public B withRowtimeAttribute(
String rowtimeAttribute,
TimestampExtractor timestampExtractor,
@@ -389,7 +515,9 @@ public B withRowtimeAttribute(
* @param rowtimeAttribute The name of the rowtime attribute in the table schema.
* @param watermarkStrategy The {@link WatermarkStrategy} to generate watermarks for the rowtime attribute.
* @return The builder.
+ * @deprecated Use table descriptors instead of implementation-specific builders.
*/
+ @Deprecated
public B withKafkaTimestampAsRowtimeAttribute(
String rowtimeAttribute,
WatermarkStrategy watermarkStrategy) {
@@ -412,7 +540,9 @@ public B withKafkaTimestampAsRowtimeAttribute(
* Configures the TableSource to start reading from the earliest offset for all partitions.
*
* @see FlinkKafkaConsumerBase#setStartFromEarliest()
+ * @deprecated Use table descriptors instead of implementation-specific builders.
*/
+ @Deprecated
public B fromEarliest() {
this.startupMode = StartupMode.EARLIEST;
this.specificStartupOffsets = null;
@@ -423,7 +553,9 @@ public B fromEarliest() {
* Configures the TableSource to start reading from the latest offset for all partitions.
*
* @see FlinkKafkaConsumerBase#setStartFromLatest()
+ * @deprecated Use table descriptors instead of implementation-specific builders.
*/
+ @Deprecated
public B fromLatest() {
this.startupMode = StartupMode.LATEST;
this.specificStartupOffsets = null;
@@ -434,7 +566,9 @@ public B fromLatest() {
* Configures the TableSource to start reading from any committed group offsets found in Zookeeper / Kafka brokers.
*
* @see FlinkKafkaConsumerBase#setStartFromGroupOffsets()
+ * @deprecated Use table descriptors instead of implementation-specific builders.
*/
+ @Deprecated
public B fromGroupOffsets() {
this.startupMode = StartupMode.GROUP_OFFSETS;
this.specificStartupOffsets = null;
@@ -446,7 +580,9 @@ public B fromGroupOffsets() {
*
* @param specificStartupOffsets the specified offsets for partitions
* @see FlinkKafkaConsumerBase#setStartFromSpecificOffsets(Map)
+ * @deprecated Use table descriptors instead of implementation-specific builders.
*/
+ @Deprecated
public B fromSpecificOffsets(Map specificStartupOffsets) {
this.startupMode = StartupMode.SPECIFIC_OFFSETS;
this.specificStartupOffsets = Preconditions.checkNotNull(specificStartupOffsets);
@@ -457,7 +593,9 @@ public B fromSpecificOffsets(Map specificStartupOffse
* Returns the configured topic.
*
* @return the configured topic.
+ * @deprecated Use table descriptors instead of implementation-specific builders.
*/
+ @Deprecated
protected String getTopic() {
return this.topic;
}
@@ -466,7 +604,9 @@ protected String getTopic() {
* Returns the configured Kafka properties.
*
* @return the configured Kafka properties.
+ * @deprecated Use table descriptors instead of implementation-specific builders.
*/
+ @Deprecated
protected Properties getKafkaProps() {
return this.kafkaProps;
}
@@ -475,7 +615,9 @@ protected Properties getKafkaProps() {
* Returns the configured table schema.
*
* @return the configured table schema.
+ * @deprecated Use table descriptors instead of implementation-specific builders.
*/
+ @Deprecated
protected TableSchema getTableSchema() {
return this.schema;
}
@@ -484,14 +626,18 @@ protected TableSchema getTableSchema() {
* True if the KafkaSource supports Kafka timestamps, false otherwise.
*
* @return True if the KafkaSource supports Kafka timestamps, false otherwise.
+ * @deprecated Use table descriptors instead of implementation-specific builders.
*/
+ @Deprecated
protected abstract boolean supportsKafkaTimestamps();
/**
* Configures a TableSource with optional parameters.
*
* @param tableSource The TableSource to configure.
+ * @deprecated Use table descriptors instead of implementation-specific builders.
*/
+ @Deprecated
protected void configureTableSource(T tableSource) {
// configure processing time attributes
tableSource.setProctimeAttribute(proctimeAttribute);
@@ -516,13 +662,20 @@ protected void configureTableSource(T tableSource) {
/**
* Returns the builder.
* @return the builder.
+ * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
+ * with descriptors for schema and format instead. Descriptors allow for
+ * implementation-agnostic definition of tables. See also
+ * {@link org.apache.flink.table.api.TableEnvironment#from(ConnectorDescriptor)}.
*/
+ @Deprecated
protected abstract B builder();
/**
* Builds the configured {@link KafkaTableSource}.
* @return The configured {@link KafkaTableSource}.
+ * @deprecated Use table descriptors instead of implementation-specific builders.
*/
+ @Deprecated
protected abstract KafkaTableSource build();
}
}
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceFactory.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceFactory.java
index 6aaef9b8d2b45..5652f819bdedc 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceFactory.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceFactory.java
@@ -18,13 +18,16 @@
package org.apache.flink.streaming.connectors.kafka;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.descriptors.DescriptorProperties;
-import org.apache.flink.table.descriptors.FormatDescriptorValidator;
import org.apache.flink.table.descriptors.KafkaValidator;
import org.apache.flink.table.descriptors.SchemaValidator;
+import org.apache.flink.table.formats.DeserializationSchemaFactory;
+import org.apache.flink.table.formats.TableFormatFactoryService;
import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
import org.apache.flink.table.sources.TableSource;
import org.apache.flink.table.sources.TableSourceFactory;
@@ -35,13 +38,13 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Properties;
import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION;
import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE;
import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_VERSION;
-import static org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT_PROPERTY_VERSION;
-import static org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT_TYPE;
+import static org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT;
import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_PROPERTIES;
import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_PROPERTIES_KEY;
import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_PROPERTIES_VALUE;
@@ -66,21 +69,16 @@
import static org.apache.flink.table.descriptors.SchemaValidator.SCHEMA_TYPE;
/**
- * Factory for creating configured instances of {@link KafkaJsonTableSource}.
+ * Factory for creating configured instances of {@link KafkaTableSource}.
*/
-abstract class KafkaTableSourceFactory implements TableSourceFactory {
+public abstract class KafkaTableSourceFactory implements TableSourceFactory {
@Override
public Map requiredContext() {
Map context = new HashMap<>();
context.put(CONNECTOR_TYPE(), CONNECTOR_TYPE_VALUE_KAFKA); // kafka
context.put(CONNECTOR_VERSION(), kafkaVersion()); // version
-
- context.put(FORMAT_TYPE(), formatType()); // format
-
context.put(CONNECTOR_PROPERTY_VERSION(), "1"); // backwards compatibility
- context.put(FORMAT_PROPERTY_VERSION(), String.valueOf(formatPropertyVersion()));
-
return context;
}
@@ -113,7 +111,8 @@ public List supportedProperties() {
properties.add(SCHEMA() + ".#." + ROWTIME_WATERMARKS_SERIALIZED());
properties.add(SCHEMA() + ".#." + ROWTIME_WATERMARKS_DELAY());
- properties.addAll(formatProperties());
+ // format wildcard
+ properties.add(FORMAT() + ".*");
return properties;
}
@@ -124,110 +123,130 @@ public TableSource create(Map properties) {
params.putProperties(properties);
// validate
- new SchemaValidator(true).validate(params);
+ // allow Kafka timestamps to be used, watermarks can not be received from source
+ new SchemaValidator(true, supportsKafkaTimestamps(), false).validate(params);
new KafkaValidator().validate(params);
- formatValidator().validate(params);
- // build
- final KafkaTableSource.Builder builder = createBuilderWithFormat(params);
+ // deserialization schema using format discovery
+ final DeserializationSchemaFactory> formatFactory = TableFormatFactoryService.find(
+ DeserializationSchemaFactory.class,
+ properties,
+ this.getClass().getClassLoader());
+ @SuppressWarnings("unchecked")
+ final DeserializationSchema deserializationSchema = (DeserializationSchema) formatFactory
+ .createDeserializationSchema(properties);
- // topic
- final String topic = params.getString(CONNECTOR_TOPIC);
- builder.forTopic(topic);
+ // schema
+ final TableSchema schema = params.getTableSchema(SCHEMA());
+
+ // proctime
+ final String proctimeAttribute = SchemaValidator.deriveProctimeAttribute(params).orElse(null);
+
+ // rowtime
+ final List rowtimeAttributes = SchemaValidator.deriveRowtimeAttributes(params);
+ if (rowtimeAttributes.size() > 1) {
+ throw new TableException("More than one rowtime attribute is not supported yet.");
+ }
+
+ // field mapping
+ final Map fieldMapping = SchemaValidator.deriveFieldMapping(params, Optional.of(schema));
// properties
- final Properties props = new Properties();
+ final Properties kafkaProperties = new Properties();
final List