Skip to content

Commit

Permalink
[FLINK-20158][connectors/kafka] Add ResultTypeQueryable interface to …
Browse files Browse the repository at this point in the history
…KafkaSource
  • Loading branch information
zodo committed Nov 18, 2020
1 parent 6221e3b commit d015071
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 3 deletions.
Expand Up @@ -18,13 +18,15 @@

package org.apache.flink.connector.kafka.source;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
Expand Down Expand Up @@ -66,7 +68,7 @@
*
* @param <OUT> the output type of the source.
*/
public class KafkaSource<OUT> implements Source<OUT, KafkaPartitionSplit, KafkaSourceEnumState> {
public class KafkaSource<OUT> implements Source<OUT, KafkaPartitionSplit, KafkaSourceEnumState>, ResultTypeQueryable<OUT> {
private static final long serialVersionUID = -8755372893283732098L;
// Users can choose only one of the following ways to specify the topics to consume from.
private final KafkaSubscriber subscriber;
Expand Down Expand Up @@ -161,6 +163,11 @@ public SimpleVersionedSerializer<KafkaSourceEnumState> getEnumeratorCheckpointSe
return new KafkaSourceEnumStateSerializer();
}

@Override
public TypeInformation<OUT> getProducedType() {
return deserializationSchema.getProducedType();
}

// ----------- private helper methods ---------------

private Configuration toConfiguration(Properties props) {
Expand Down
Expand Up @@ -82,8 +82,7 @@ public void testBasicRead() throws Exception {
DataStream<PartitionAndValue> stream = env.fromSource(
source,
WatermarkStrategy.noWatermarks(),
"testBasicRead",
TypeInformation.of(PartitionAndValue.class));
"testBasicRead");
executeAndVerify(env, stream);
}

Expand Down

0 comments on commit d015071

Please sign in to comment.