From 4b867019d1131d362ea50d09e5eaa9a895b98dca Mon Sep 17 00:00:00 2001 From: Ufuk Celebi Date: Fri, 28 Oct 2016 11:31:44 +0200 Subject: [PATCH] [FLINK-4894] [network] Fix hasData() and correctly clear buffer state --- .../network/api/serialization/SpanningRecordSerializer.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java index 155111f060006..482774676d594 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java @@ -172,6 +172,8 @@ public Buffer getCurrentBuffer() { @Override public void clearCurrentBuffer() { targetBuffer = null; + position = 0; + limit = 0; } @Override @@ -188,7 +190,7 @@ public void clear() { @Override public boolean hasData() { // either data in current target buffer or intermediate buffers - return (this.position > 0 && this.position < this.limit) || (this.lengthBuffer.hasRemaining() || this.dataBuffer.hasRemaining()); + return this.position > 0 || (this.lengthBuffer.hasRemaining() || this.dataBuffer.hasRemaining()); } @Override