Skip to content

Commit

Permalink
[FLINK-20379][connector/kafka] Add a convenient method setValueOnlyDe…
Browse files Browse the repository at this point in the history
…serializer(DeserializationSchema) to KafkaSourceBuilder.
  • Loading branch information
becketqin committed Dec 6, 2020
1 parent 8ad8a24 commit 2d0641f
Showing 1 changed file with 15 additions and 0 deletions.
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.connector.kafka.source;

import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.connector.kafka.source.enumerator.initializer.NoStoppingOffsetsInitializer;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
Expand Down Expand Up @@ -329,6 +330,20 @@ public KafkaSourceBuilder<OUT> setDeserializer(KafkaRecordDeserializationSchema<
return this;
}

/**
* Sets the {@link KafkaRecordDeserializationSchema deserializer} of the
* {@link org.apache.kafka.clients.consumer.ConsumerRecord ConsumerRecord} for KafkaSource.
* The given {@link DeserializationSchema} will be used to deserialize the ConsumerRecord from
* its value. The other information in a ConsumerRecord will be ignored.
*
* @param deserializationSchema the {@link DeserializationSchema} to use for deserialization.
* @return this KafkaSourceBuilder.
*/
public KafkaSourceBuilder<OUT> setValueOnlyDeserializer(DeserializationSchema<OUT> deserializationSchema) {
this.deserializationSchema = KafkaRecordDeserializationSchema.valueOnly(deserializationSchema);
return this;
}

/**
* Sets the client id prefix of this KafkaSource.
*
Expand Down

0 comments on commit 2d0641f

Please sign in to comment.