-
Notifications
You must be signed in to change notification settings - Fork 334
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SAMZA-2587: IntermediateMessageSerde exception handling #1426
SAMZA-2587: IntermediateMessageSerde exception handling #1426
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please prepare some documentation that a user would be able to use if they are upgrading from a version older than 0.13.1?
When we are writing the release notes for the next version of Samza, then we will include your user documentation in there.
You can include this documentation as part of the PR description as an "Upgrade instructions" section, as described in https://cwiki.apache.org/confluence/display/SAMZA/SEP-25%3A+PR+Title+And+Description+Guidelines.
samza-core/src/main/java/org/apache/samza/serializers/IntermediateMessageSerde.java
Outdated
Show resolved
Hide resolved
samza-core/src/main/java/org/apache/samza/serializers/IntermediateMessageSerde.java
Outdated
Show resolved
Hide resolved
...c/test/java/org/apache/samza/serializers/model/serializers/TestIntermediateMessageSerde.java
Outdated
Show resolved
Hide resolved
...c/test/java/org/apache/samza/serializers/model/serializers/TestIntermediateMessageSerde.java
Show resolved
Hide resolved
samza-core/src/main/java/org/apache/samza/serializers/IntermediateMessageSerde.java
Outdated
Show resolved
Hide resolved
samza-core/src/main/java/org/apache/samza/serializers/IntermediateMessageSerde.java
Show resolved
Hide resolved
...c/test/java/org/apache/samza/serializers/model/serializers/TestIntermediateMessageSerde.java
Outdated
Show resolved
Hide resolved
Upgrade Instructions: For users that are upgrading directly from samza version 0.13.0 or older versions: A message type of intermediate messages was introduced in samza 0.13.1. For samza 0.13.0 or older versions, the first byte of MessageType doesn't exist in the bytes. Thus, upgrading from those versions will fail. There are three ways to fix this issue: a) Reset checkpoint to consume from newest message in the intermediate stream b) Clean all existing messages in the intermediate stream c) Run the application in any version between 0.13.1 and 1.5 until all old messages in intermediate stream has reached retention time. Co-authored-by: Yixing Zhang <yixzhang@linkedin.com>
Symptom: The user provided serde failed to deserialize a message. Then, IntermediateMessageSerde tried to deserialize the message for the second time, which caused OOM and container died.
Direct cause: The user provided serde would construct an array based on the encoded array size. Given wrong size, the serde constructed a huge array and caused OOM. The exception was hidden by OOM and it was hard to debug.
Root cause: In samza 0.13.1, we added a byte to the head of the payload. The byte represents the message type (event|watermark|EOS). During deserialization, IntermediateMessageSerde will read the first byte, then deserialize the message according to the message byte. For compatibility, if it fails to read the message type, it will try to deserialize again with all bytes (including the first byte). More details in this PR: #207
Changes: In the case when the type byte exists but user serde fails to deserialize the message, we shouldn't pass invalid data to user serde and try to deserialize again. The second deserialization may cause unpredictable results. Considering 0.13 is a very old version and intermediate streams usually have retention time, we should be safe to remove the second try. This will make upgrades from 0.13 to master to fail. Workaround is upgrading to 1.4/1.5 instead or resetting the checkpoint of intermediate topic to newest.
Tests: Added unit test: TestIntermediateMessageSerde.testUserMessageSerdeException()
Upgrade Instructions: For users that are upgrading directly from samza version 0.13.0 or older versions: A message type of intermediate messages was introduced in samza 0.13.1. For samza 0.13.0 or older versions, the first byte of MessageType doesn't exist in the bytes. Thus, upgrading from those versions will fail. There are three ways to fix this issue:
a) Reset checkpoint to consume from newest message in the intermediate stream
b) Clean all existing messages in the intermediate stream
c) Run the application in any version between 0.13.1 and 1.5 until all old messages in intermediate stream has reached retention time.