Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-11820][serialization] SimpleStringSchema handle message record which value is null #8583

Closed
wants to merge 1 commit into from

Conversation

lamberken
Copy link
Member

My branch has been deleted unexpectly, so I open this one, for more detail, please see #7987

What is the purpose of the change

when kafka msg queue contains some records which value is null, SimpleStringSchema can't process these records.

for example, msg queue like bellow.

msg null msg msg msg

for normal, use SimpleStringSchema to process msg queue data

env.addSource(new FlinkKafkaConsumer010("topic", new SimpleStringSchema(), properties));

but, will get NullPointerException

java.lang.NullPointerException
	at java.lang.String.<init>(String.java:515)
	at org.apache.flink.api.common.serialization.SimpleStringSchema.deserialize(SimpleStringSchema.java:75)
	at org.apache.flink.api.common.serialization.SimpleStringSchema.deserialize(SimpleStringSchema.java:36)

@flinkbot
Copy link
Collaborator

flinkbot commented May 31, 2019

Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
to review your pull request. We will use this comment to track the progress of the review.

Automated Checks

Last check on commit bebe5fb (Fri Sep 06 09:08:42 UTC 2019)

Warnings:

  • No documentation files were touched! Remember to keep the Flink docs up to date!

Mention the bot in a comment to re-run the automated checks.

Review Progress

  • ❓ 1. The [description] looks good.
  • ❓ 2. There is [consensus] that the contribution should go into to Flink.
  • ❓ 3. Needs [attention] from.
  • ❓ 4. The change fits into the overall [architecture].
  • ❓ 5. Overall code [quality] is good.

Please see the Pull Request Review Guide for a full explanation of the review process.


The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands
The @flinkbot bot supports the following commands:

  • @flinkbot approve description to approve one or more aspects (aspects: description, consensus, architecture and quality)
  • @flinkbot approve all to approve all aspects
  • @flinkbot approve-until architecture to approve everything until architecture
  • @flinkbot attention @username1 [@username2 ..] to require somebody's attention
  • @flinkbot disapprove architecture to remove an approval you gave earlier

@aljoscha
Copy link
Contributor

aljoscha commented Jun 7, 2019

Yes, I think this should work. +1

@aljoscha
Copy link
Contributor

aljoscha commented Jun 7, 2019

Scratch that, I don't think we can do this. Our Kafka consumer silently swallows null values:

// if the record is null, simply just update the offset state for partition
synchronized (checkpointLock) {
partitionState.setOffset(offset);
}
. Plus, I think our serializers in general don't always support null values. The fact that StringSerializer does is more of an anomaly. (also thanks to @GJL for pointing this out to me 😃)

@lamberken
Copy link
Member Author

lamberken commented Jun 7, 2019

hi, thanks for your commnet @aljoscha. here is detail stackstrace, flink-version: 1.6.3

Caused by: java.lang.NullPointerException
	at java.lang.String.<init>(String.java:515)
	at org.apache.flink.api.common.serialization.SimpleStringSchema.deserialize(SimpleStringSchema.java:75)
	at org.apache.flink.api.common.serialization.SimpleStringSchema.deserialize(SimpleStringSchema.java:36)
	at org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper.deserialize(KeyedDeserializationSchemaWrapper.java:44)
	at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:142)
	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:738)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:94)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
	at java.lang.Thread.run(Thread.java:748)

@lamberken
Copy link
Member Author

Scratch that, I don't think we can do this. Our Kafka consumer silently swallows null values:

// if the record is null, simply just update the offset state for partition
synchronized (checkpointLock) {
partitionState.setOffset(offset);
}

. Plus, I think our serializers in general don't always support null values. The fact that StringSerializer does is more of an anomaly. (also thanks to @GJL for pointing this out to me 😃)

from the NPE stackstrace, we can see that the NPE happens before AbstractFetcher#emitRecord.

@lamberken
Copy link
Member Author

Hi, @aljoscha @GJL, from Kafka09Fetcher#runFetchLoop, we can see that it needs to deserialize the kafka value first , and call emitRecord method after.

while (running) {
	// this blocks until we get the next records
	// it automatically re-throws exceptions encountered in the consumer thread
	final ConsumerRecords<byte[], byte[]> records = handover.pollNext();

	// get the records for each topic partition
	for (KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitionStates()) {

		List<ConsumerRecord<byte[], byte[]>> partitionRecords =
				records.records(partition.getKafkaPartitionHandle());

		for (ConsumerRecord<byte[], byte[]> record : partitionRecords) {

			final T value = deserializer.deserialize(record);

			if (deserializer.isEndOfStream(value)) {
				// end of stream signaled
				running = false;
				break;
			}

			// emit the actual record. this also updates offset state atomically
			// and deals with timestamps and watermark generation
			emitRecord(value, partition, record.offset(), record);
		}
	}
}

@GJL
Copy link
Member

GJL commented Jun 11, 2019

@lamber-ken

The contract of deserialize() says:

         *
	 * @return The deserialized message as an object (null if the message cannot be deserialized).
	 */

That is, returning null from the deserialization schema denotes an erroneous condition. Null records in Kafka can have valid semantics. However, if we apply your patch, null records will be discarded by the Kafka connector (i.e., they won't be consumed by downstream operators). Let me know if you see a solution to this problem.

@lamberken
Copy link
Member Author

@lamber-ken

The contract of deserialize() says:

         *
	 * @return The deserialized message as an object (null if the message cannot be deserialized).
	 */

That is, returning null from the deserialization schema denotes an erroneous condition. Null records in Kafka can have valid semantics. However, if we apply your patch, null records will be discarded by the Kafka connector (i.e., they won't be consumed by downstream operators). Let me know if you see a solution to this problem.

hi, @GJL , I can't fully understand your comment. from my side, as a message queue like kafka, consumers and producers are decoupled. Consumers don't known whether producers put a valid data or not, may producers put a null into message queue, so flink kafka consumer need to consider this scene.

The flink job will go into an infinite circle because of the NPE, so this pr just aims to fix the NPE. This pr don't care about whether null needs to be sent to downstream operators. If downstream operators needs to consumer null valuses, may need to modify AbstractFetcher#emitRecord.

@aljoscha
Copy link
Contributor

I wrote this message to the ML: https://lists.apache.org/thread.html/2991b6b3c520380a9172588bc1f6d7e6d632c3d421458a1b44c71c01@%3Cdev.flink.apache.org%3E

Regarding this PR and Jira issue, I think it is wrong to return NULL values which are then silently dropped by the Kafka consumer. The solution is for users to have their custom schema that can handle null values and encodes them in a custom user type.

@lamberken
Copy link
Member Author

@aljoscha Thanks to bring up the discussion, I think it's meaningful.

@lamberken
Copy link
Member Author

lamberken commented Jun 18, 2019

Hi, @aljoscha, your point is right. I have two cents:

1, If I understand correctly that user can use TypeInformationKeyValueSerializationSchema to handle the DELETE semantic message, it'll return new Tuple2<>(key,null) when value is null.

2, but, the SimpleStringSchema is just a simple serializer, which doesn't have DELETE semantic.


DISCUSS Connectors and NULL handling

In light of the last one I’d like to look again at the first two. What they introduced is that when the deserialisation schema returns NULL, the Kafka consumer (and maybe also the Kinesis consumer) silently drops the record. In Kafka NULL values have semantic meaning, i.e. they usually encode a DELETE for the key of the message. If SimpleStringSchema returned that null, our consumer would silently drop it and we would lose that DELETE message. That doesn’t seem right.

@aljoscha
Copy link
Contributor

I think SimpleStringSchema should not by default silently swallow null messages because this is a somewhat unexpected behaviour that could obscure problems in the topic or in the job. If users want that behaviour they can write their own version of SimpleStringSchema that silently ignores null values.

@lamberken
Copy link
Member Author

@ajaybhat thanks for your comment. I think this pr aims to fix NPE only, the SimpleStringSchema will not swallow null messages, they will be send to downstream operators.

@lamberken
Copy link
Member Author

Scratch that, I don't think we can do this. Our Kafka consumer silently swallows null values:

// if the record is null, simply just update the offset state for partition
synchronized (checkpointLock) {
partitionState.setOffset(offset);
}

. Plus, I think our serializers in general don't always support null values. The fact that StringSerializer does is more of an anomaly. (also thanks to @GJL for pointing this out to me 😃)

as your comment, the Kafka consumer silently swallows null values not SimpleStringSchema

@aljoscha
Copy link
Contributor

Yes, the Kafka consumer swallows them silently, but if you change SimpleStringSchema, it will create the nulls in the first place. The problem is in the interaction of the different parts, so we cannot change SimpleStringSchema in isolation because it would lead to silent data dropping.

@lamberken
Copy link
Member Author

lamberken commented Jun 25, 2019

Yes, the Kafka consumer swallows them silently, but if you change SimpleStringSchema, it will create the nulls in the first place. The problem is in the interaction of the different parts, so we cannot change SimpleStringSchema in isolation because it would lead to silent data dropping.

Thanks for your comment. But, I have a slightly different view from yours. Users may not care about the null values in many scenarios, and if mq queue contains a null value, the consumer will can not comsume any records because of the NPE and the kafka offset can't commit anymore.

Users can use TypeInformationKeyValueSerializationSchema to handle null valuses for now.

@GJL
Copy link
Member

GJL commented Sep 3, 2019

Users may not care about the null values in many scenarios

Users may or may not care about nulls. If nulls have a meaning for a use case then dropping null records can be fatal or lead to unexpected behavior in the worst case. According to the ML thread Flink currently does not consistently handle null records – changing that would be probably a bigger effort. All in all I think it is better to accept that SimpleStringSchema currently cannot be used if there are null records because:

  • There are workarounds, e.g., one does not have to use SimpleStringSchema
  • An NPE, which will likely happen while testing the job, is better than silent data dropping in production

I will close this issue. Feel free to re-open if you think otherwise.

@GJL GJL closed this Sep 3, 2019
@lamberken
Copy link
Member Author

Users may not care about the null values in many scenarios

Users may or may not care about nulls. If nulls have a meaning for a use case then dropping null records can be fatal or lead to unexpected behavior in the worst case. According to the ML thread Flink currently does not consistently handle null records – changing that would be probably a bigger effort. All in all I think it is better to accept that SimpleStringSchema currently cannot be used if there are null records because:

  • There are workarounds, e.g., one does not have to use SimpleStringSchema
  • An NPE, which will likely happen while testing the job, is better than silent data dropping in production

I will close this issue. Feel free to re-open if you think otherwise.

ok, no problem. btw, we had modify it in our production.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
5 participants