Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-35215][core] Fix the bug when Kryo serialize length is 0 #24717

Merged
merged 2 commits into from
May 22, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -73,14 +73,17 @@ protected int require(int required) throws KryoException {
position = 0;
int bytesRead = 0;
int count;
while (bytesRead < required) {
while (true) {
count = fill(buffer, bytesRead, required - bytesRead);

if (count == -1) {
throw new KryoException(new EOFException("No more bytes left."));
}

bytesRead += count;
if (bytesRead == required) {
break;
}
}
limit = required;
return required;
Expand Down Expand Up @@ -114,18 +117,26 @@ public void readBytes(byte[] bytes, int offset, int count) throws KryoException
throw new IllegalArgumentException("bytes cannot be null.");
}

if (count == 0) {
return;
}

Comment on lines +120 to +123
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This if (count == 0) { is alternative change for FLINK-34954, I ran the demo that provided in FLINK-34954, it works well. So I think the change is fine.

Also, I ran the benchmark, the performance of serializerKryo and serializerKryoWithoutRegistration are recovered.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @dannycranmer , this issue is the blocker of Flink 1.20, and you are the reviewer of #24586, would you mind reviewing this PR as well? thanks in advance.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@1996fanrui Can you add the demo from FLINK-34954 as a test?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @uce for the reminder!

Let's follow it up at #24837 .

try {
int bytesRead = 0;
int c;

while (bytesRead < count) {
while (true) {
c = inputStream.read(bytes, offset + bytesRead, count - bytesRead);

if (c == -1) {
throw new KryoException(new EOFException("No more bytes left."));
}

bytesRead += c;

if (bytesRead == count) {
Comment on lines -121 to +137
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not to block this, just curious do you know why FLINK-34954 affects performance?

Hi @reswqa , I'm not very sure the reason. But I guess:

  • For non-empty data, let us assume inputStream.read will be called once.
    • Before FLINK-34954, we only check bytesRead == count once.
    • After FLINK-34954, we call bytesRead < count twice.
  • Also, I'm not sure whether FLINK-34954 breaks any JIT optimization.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! Yes, I don't know the details also, but it's possible that JIT optimizations were affected.

break;
}
}
} catch (IOException ex) {
throw new KryoException(ex);
Expand Down