From d015071071cc426c82961f748cbc06b2191d26a1 Mon Sep 17 00:00:00 2001 From: zodo Date: Wed, 18 Nov 2020 17:27:45 +0700 Subject: [PATCH] [FLINK-20158][connectors/kafka] Add ResultTypeQueryable interface to KafkaSource --- .../apache/flink/connector/kafka/source/KafkaSource.java | 9 ++++++++- .../flink/connector/kafka/source/KafkaSourceITCase.java | 3 +-- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java index 325408070b69c..9d1bde2918986 100644 --- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java +++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java @@ -18,6 +18,7 @@ package org.apache.flink.connector.kafka.source; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.connector.source.Boundedness; import org.apache.flink.api.connector.source.Source; import org.apache.flink.api.connector.source.SourceReader; @@ -25,6 +26,7 @@ import org.apache.flink.api.connector.source.SplitEnumerator; import org.apache.flink.api.connector.source.SplitEnumeratorContext; import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue; @@ -66,7 +68,7 @@ * * @param the output type of the source. */ -public class KafkaSource implements Source { +public class KafkaSource implements Source, ResultTypeQueryable { private static final long serialVersionUID = -8755372893283732098L; // Users can choose only one of the following ways to specify the topics to consume from. private final KafkaSubscriber subscriber; @@ -161,6 +163,11 @@ public SimpleVersionedSerializer getEnumeratorCheckpointSe return new KafkaSourceEnumStateSerializer(); } + @Override + public TypeInformation getProducedType() { + return deserializationSchema.getProducedType(); + } + // ----------- private helper methods --------------- private Configuration toConfiguration(Properties props) { diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java index fee6f52f909d1..b4c47875fb643 100644 --- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java +++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java @@ -82,8 +82,7 @@ public void testBasicRead() throws Exception { DataStream stream = env.fromSource( source, WatermarkStrategy.noWatermarks(), - "testBasicRead", - TypeInformation.of(PartitionAndValue.class)); + "testBasicRead"); executeAndVerify(env, stream); }