From 20336c5186cb75870a8e6d4b8234a983368cda56 Mon Sep 17 00:00:00 2001 From: Nico Kruber Date: Thu, 17 Nov 2016 13:57:18 +0100 Subject: [PATCH 1/4] [hotfix] add javadocs to SpanningRecordSerializer and RecordSerializer --- .../api/serialization/RecordSerializer.java | 69 +++++++++++++++++-- .../SpanningRecordSerializer.java | 16 +++++ 2 files changed, 79 insertions(+), 6 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java index e8179dc300094..25b25739bb74a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java @@ -43,30 +43,87 @@ private SerializationResult(boolean isFullRecord, boolean isFullBuffer) { this.isFullRecord = isFullRecord; this.isFullBuffer = isFullBuffer; } - + + /** + * Whether the full record was serialized and completely written to + * a target buffer. + * + * @return true if the complete record was written + */ public boolean isFullRecord() { return this.isFullRecord; } - + + /** + * Whether the target buffer is full after the serialization process. + * + * @return true if the target buffer is full + */ public boolean isFullBuffer() { return this.isFullBuffer; } } - + + /** + * Starts serializing and copying the given record to the target buffer + * (if available). + * + * @param record the record to serialize + * @return how much information was written to the target buffer and + * whether this buffer is full + * @throws IOException + */ SerializationResult addRecord(T record) throws IOException; + /** + * Sets a (next) target buffer to use and continues writing remaining data + * to it until it is full. + * + * @param buffer the new target buffer to use + * @return how much information was written to the target buffer and + * whether this buffer is full + * @throws IOException + */ SerializationResult setNextBuffer(Buffer buffer) throws IOException; + /** + * Retrieves the current target buffer and sets its size to the actual + * number of written bytes. + * + * After calling this method, a new target buffer is required to continue + * writing (see {@link #setNextBuffer(Buffer)}). + * + * @return the target buffer that was used + */ Buffer getCurrentBuffer(); + /** + * Resets the target buffer to null. + * + * After calling this method, a new target buffer is required to continue + * writing (see {@link #setNextBuffer(Buffer)}). + */ void clearCurrentBuffer(); - + + /** + * Resets the target buffer to null and resets internal state set + * up for the record to serialize. + * + * After calling this method, a new record and a new target buffer is + * required to start writing again (see {@link #setNextBuffer(Buffer)}). + */ void clear(); - + + /** + * Determines whether data is left, either in the current target buffer or + * in any internal state set up for the record to serialize. + * + * @return true if some data is present + */ boolean hasData(); /** - * Insantiates all metrics. + * Instantiates all metrics. * * @param metrics metric group */ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java index a8fe3fee07919..335d12e9c69ba 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java @@ -29,6 +29,13 @@ import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.util.DataOutputSerializer; +/** + * Record serializer which serializes the complete record to an intermediate + * data serialization buffer and copies this buffer to target buffers + * one-by-one using {@link #setNextBuffer(Buffer)}. + * + * @param + */ public class SpanningRecordSerializer implements RecordSerializer { /** Flag to enable/disable checks, if buffer not set/full or pending serialization */ @@ -65,6 +72,15 @@ public SpanningRecordSerializer() { this.lengthBuffer.position(4); } + /** + * Serializes the complete record to an intermediate data serialization + * buffer and starts copying it to the target buffer (if available). + * + * @param record the record to serialize + * @return how much information was written to the target buffer and + * whether this buffer is full + * @throws IOException + */ @Override public SerializationResult addRecord(T record) throws IOException { if (CHECKED) { From 28b7ae5ea82ac7afdb6e34b8cadd3b54735419b1 Mon Sep 17 00:00:00 2001 From: Nico Kruber Date: Thu, 17 Nov 2016 14:01:52 +0100 Subject: [PATCH 2/4] [hotfix] no need to clear the serializer twice in RecordWriter#flush() --- .../flink/runtime/io/network/api/writer/RecordWriter.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java index 799187d98129f..e789f9a2c2ee3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java @@ -171,7 +171,7 @@ public void flush() throws IOException { Buffer buffer = serializer.getCurrentBuffer(); if (buffer != null) { - writeAndClearBuffer(buffer, targetChannel, serializer); + targetPartition.writeBuffer(buffer, targetChannel); } } finally { serializer.clear(); From d01fccb6fe49220d6fa338d6d894971fa45d93c1 Mon Sep 17 00:00:00 2001 From: Nico Kruber Date: Thu, 17 Nov 2016 17:52:09 +0100 Subject: [PATCH 3/4] [hotfix] further javadoc tweaks in RecordWriter --- .../flink/runtime/io/network/api/writer/RecordWriter.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java index e789f9a2c2ee3..dfa2f3d59af55 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java @@ -208,9 +208,10 @@ public void setMetricGroup(TaskIOMetricGroup metrics) { } /** - * Writes the buffer to the {@link ResultPartitionWriter}. + * Writes the buffer to the {@link ResultPartitionWriter} and removes the + * buffer from the serializer state. * - *

The buffer is cleared from the serializer state after a call to this method. + * Needs to be synchronized on the serializer! */ private void writeAndClearBuffer( Buffer buffer, From 67d33af9290963eb3ed6436a9ba2ebe849a29d33 Mon Sep 17 00:00:00 2001 From: Nico Kruber Date: Thu, 8 Dec 2016 11:53:54 +0100 Subject: [PATCH 4/4] [hotfix] emphasize the difference between clear() and clearCurrentBuffer() in the new RecordSerializer comments --- .../network/api/serialization/RecordSerializer.java | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java index 25b25739bb74a..5fe56c43d0be0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java @@ -100,8 +100,9 @@ public boolean isFullBuffer() { /** * Resets the target buffer to null. * - * After calling this method, a new target buffer is required to continue - * writing (see {@link #setNextBuffer(Buffer)}). + *

NOTE: After calling this method, a new target + * buffer is required to continue writing (see + * {@link #setNextBuffer(Buffer)}).

*/ void clearCurrentBuffer(); @@ -109,8 +110,10 @@ public boolean isFullBuffer() { * Resets the target buffer to null and resets internal state set * up for the record to serialize. * - * After calling this method, a new record and a new target buffer is - * required to start writing again (see {@link #setNextBuffer(Buffer)}). + *

NOTE: After calling this method, a new record + * and a new target buffer is required to start writing again + * (see {@link #setNextBuffer(Buffer)}). If you want to continue + * with the current record, use {@link #clearCurrentBuffer()} instead.

*/ void clear();