Skip to content

Commit

Permalink
[hotfix][runtime] Drop one of the two clear methods in RecordSerializer
Browse files Browse the repository at this point in the history
This simplifies an API a little bit
  • Loading branch information
pnowojski committed Feb 19, 2018
1 parent 058c0ed commit 8f59e7b
Show file tree
Hide file tree
Showing 6 changed files with 33 additions and 79 deletions.
Expand Up @@ -98,30 +98,12 @@ public boolean isFullBuffer() {
Buffer getCurrentBuffer();

/**
* Resets the target buffer to <tt>null</tt>.
*
* <p><strong>NOTE:</strong> After calling this method, <strong>a new target
* buffer is required to continue writing</strong> (see
* {@link #setNextBufferBuilder(BufferBuilder)}).</p>
*/
void clearCurrentBuffer();

/**
* Resets the target buffer to <tt>null</tt> and resets internal state set
* up for the record to serialize.
*
* <p><strong>NOTE:</strong> After calling this method, a <strong>new record
* and a new target buffer is required to start writing again</strong>
* (see {@link #setNextBufferBuilder(BufferBuilder)}). If you want to continue
* with the current record, use {@link #clearCurrentBuffer()} instead.</p>
* Clear and release internal state.
*/
void clear();

/**
* Determines whether data is left, either in the current target buffer or
* in any internal state set up for the record to serialize.
*
* @return <tt>true</tt> if some data is present
* @return <tt>true</tt> if has some serialized data pending copying to the result {@link BufferBuilder}.
*/
boolean hasData();
boolean hasSerializedData();
}
Expand Up @@ -156,23 +156,13 @@ public Buffer getCurrentBuffer() {
return result;
}

@Override
public void clearCurrentBuffer() {
targetBuffer = null;
}

@Override
public void clear() {
targetBuffer = null;

// ensure clear state with hasRemaining false (for correct setNextBufferBuilder logic)
dataBuffer.position(dataBuffer.limit());
lengthBuffer.position(4);
}

@Override
public boolean hasData() {
// either data in current target buffer or intermediate buffers
return (targetBuffer != null && !targetBuffer.isEmpty()) || lengthBuffer.hasRemaining() || dataBuffer.hasRemaining();
public boolean hasSerializedData() {
return lengthBuffer.hasRemaining() || dataBuffer.hasRemaining();
}
}
Expand Up @@ -34,6 +34,7 @@
import java.util.Random;

import static org.apache.flink.runtime.io.network.api.serialization.RecordSerializer.SerializationResult;
import static org.apache.flink.util.Preconditions.checkState;

/**
* A record-oriented runtime result writer.
Expand Down Expand Up @@ -115,12 +116,7 @@ private void sendToTarget(T record, int targetChannel) throws IOException, Inter
SerializationResult result = serializer.addRecord(record);

while (result.isFullBuffer()) {
Buffer buffer = serializer.getCurrentBuffer();

if (buffer != null) {
numBytesOut.inc(buffer.getSizeUnsafe());
writeAndClearBuffer(buffer, targetChannel, serializer);

if (tryWriteAndClearBuffer(targetChannel, serializer)) {
// If this was a full record, we are done. Not breaking
// out of the loop at this point will lead to another
// buffer request before breaking out (that would not be
Expand All @@ -135,6 +131,7 @@ private void sendToTarget(T record, int targetChannel) throws IOException, Inter
result = serializer.setNextBufferBuilder(bufferBuilder);
}
}
checkState(!serializer.hasSerializedData(), "All data should be written at once");
}
}

Expand All @@ -145,14 +142,7 @@ public void broadcastEvent(AbstractEvent event) throws IOException, InterruptedE
RecordSerializer<T> serializer = serializers[targetChannel];

synchronized (serializer) {
Buffer buffer = serializer.getCurrentBuffer();
if (buffer != null) {
numBytesOut.inc(buffer.getSizeUnsafe());
writeAndClearBuffer(buffer, targetChannel, serializer);
} else if (serializer.hasData()) {
// sanity check
throw new IllegalStateException("No buffer, but serializer has buffered data.");
}
tryWriteAndClearBuffer(targetChannel, serializer);

// retain the buffer so that it can be recycled by each channel of targetPartition
targetPartition.writeBuffer(eventBuffer.readOnlySlice().retainBuffer(), targetChannel);
Expand All @@ -170,16 +160,7 @@ public void flush() throws IOException {
RecordSerializer<T> serializer = serializers[targetChannel];

synchronized (serializer) {
try {
Buffer buffer = serializer.getCurrentBuffer();

if (buffer != null) {
numBytesOut.inc(buffer.getSizeUnsafe());
targetPartition.writeBuffer(buffer, targetChannel);
}
} finally {
serializer.clear();
}
tryWriteAndClearBuffer(targetChannel, serializer);
}
}
}
Expand Down Expand Up @@ -213,18 +194,24 @@ public void setMetricGroup(TaskIOMetricGroup metrics) {
* buffer from the serializer state.
*
* <p><b>Needs to be synchronized on the serializer!</b>
*
* @return true if some data were written
*/
private void writeAndClearBuffer(
Buffer buffer,
private boolean tryWriteAndClearBuffer(
int targetChannel,
RecordSerializer<T> serializer) throws IOException {

Buffer buffer = serializer.getCurrentBuffer();
if (buffer == null) {
return false;
}

numBytesOut.inc(buffer.getSizeUnsafe());
try {
targetPartition.writeBuffer(buffer, targetChannel);
}
finally {
serializer.clearCurrentBuffer();
return true;
} finally {
serializer.clear();
}
}

}
Expand Up @@ -146,8 +146,6 @@ private void test(Util.MockRecords records, int segmentSize,
// deserialize left over records
deserializer.setNextMemorySegment(serializer.getCurrentBuffer().getMemorySegment(), (numBytes % segmentSize));

serializer.clear();

while (!serializedRecords.isEmpty()) {
SerializationTestType expected = serializedRecords.poll();

Expand All @@ -161,7 +159,7 @@ private void test(Util.MockRecords records, int segmentSize,

// assert that all records have been serialized and deserialized
Assert.assertEquals(0, numRecords);
Assert.assertFalse(serializer.hasData());
Assert.assertFalse(serializer.hasSerializedData());
Assert.assertFalse(deserializer.hasUnfinishedData());
}
}
Expand Up @@ -39,30 +39,28 @@
public class SpanningRecordSerializerTest {

@Test
public void testHasData() throws IOException {
public void testHasSerializedData() throws IOException {
final int segmentSize = 16;

final SpanningRecordSerializer<SerializationTestType> serializer = new SpanningRecordSerializer<>();
final SerializationTestType randomIntRecord = Util.randomRecord(SerializationTestTypeFactory.INT);

Assert.assertFalse(serializer.hasData());
Assert.assertFalse(serializer.hasSerializedData());

serializer.addRecord(randomIntRecord);
Assert.assertTrue(serializer.hasData());
Assert.assertTrue(serializer.hasSerializedData());

serializer.setNextBufferBuilder(createBufferBuilder(segmentSize));
Assert.assertTrue(serializer.hasData());
Assert.assertFalse(serializer.hasSerializedData());

serializer.clear();
Assert.assertFalse(serializer.hasData());

serializer.setNextBufferBuilder(createBufferBuilder(segmentSize));
serializer.setNextBufferBuilder(createBufferBuilder(8));

serializer.addRecord(randomIntRecord);
Assert.assertTrue(serializer.hasData());
Assert.assertFalse(serializer.hasSerializedData());

serializer.addRecord(randomIntRecord);
Assert.assertTrue(serializer.hasData());
// Buffer builder full!
Assert.assertTrue(serializer.hasSerializedData());
}

@Test
Expand Down Expand Up @@ -189,7 +187,6 @@ private void test(Util.MockRecords records, int segmentSize) throws Exception {

while (result.isFullBuffer()) {
numBytes -= segmentSize;

result = serializer.setNextBufferBuilder(createBufferBuilder(segmentSize));
}
}
Expand Down
Expand Up @@ -128,7 +128,7 @@ public void testHandleMixedLargeRecords() {
}

// might be that the last big records has not yet been fully moved, and a small one is missing
assertFalse(serializer.hasData());
assertFalse(serializer.hasSerializedData());
assertFalse(deserializer.hasUnfinishedData());
}
catch (Exception e) {
Expand Down Expand Up @@ -226,7 +226,7 @@ public void testHandleMixedLargeRecordsSpillingAdaptiveSerializer() {
}

// might be that the last big records has not yet been fully moved, and a small one is missing
assertFalse(serializer.hasData());
assertFalse(serializer.hasSerializedData());
assertFalse(deserializer.hasUnfinishedData());
}
catch (Exception e) {
Expand Down

0 comments on commit 8f59e7b

Please sign in to comment.