diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java index 155111f060006..482774676d594 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java @@ -172,6 +172,8 @@ public Buffer getCurrentBuffer() { @Override public void clearCurrentBuffer() { targetBuffer = null; + position = 0; + limit = 0; } @Override @@ -188,7 +190,7 @@ public void clear() { @Override public boolean hasData() { // either data in current target buffer or intermediate buffers - return (this.position > 0 && this.position < this.limit) || (this.lengthBuffer.hasRemaining() || this.dataBuffer.hasRemaining()); + return this.position > 0 || (this.lengthBuffer.hasRemaining() || this.dataBuffer.hasRemaining()); } @Override