Skip to content

Commit

Permalink
[FLINK-8178][network] Introduce not threadsafe write only BufferBuilder
Browse files Browse the repository at this point in the history
While Buffer class is used in multithreaded context it requires synchronisation.
Previously it was miss-leading and unclear, suggesting that RecordSerializer should
take into account synchronisation of the Buffer that's holding. With NotThreadSafe
BufferBuilder there is now clear separation between single-threaded writing/creating
a BufferBuilder and multithreaded Buffer handling/retaining/recycling.

This increases throughput of network stack by factor of 2, because previously
method getMemorySegment() was called twice per record and it is a synchronized
method on recycleLock, while RecordSerializer is sole owner of the Buffer at
this point, so synchronisation is not needed.
  • Loading branch information
pnowojski authored and StefanRRichter committed Jan 8, 2018
1 parent 0888bb6 commit c6945c2
Show file tree
Hide file tree
Showing 16 changed files with 352 additions and 114 deletions.
Expand Up @@ -19,11 +19,12 @@


package org.apache.flink.runtime.io.network.api.serialization; package org.apache.flink.runtime.io.network.api.serialization;


import java.io.IOException;

import org.apache.flink.core.io.IOReadableWritable; import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;

import java.io.IOException;


/** /**
* Interface for turning records into sequences of memory segments. * Interface for turning records into sequences of memory segments.
Expand Down Expand Up @@ -79,19 +80,19 @@ public boolean isFullBuffer() {
* Sets a (next) target buffer to use and continues writing remaining data * Sets a (next) target buffer to use and continues writing remaining data
* to it until it is full. * to it until it is full.
* *
* @param buffer the new target buffer to use * @param bufferBuilder the new target buffer to use
* @return how much information was written to the target buffer and * @return how much information was written to the target buffer and
* whether this buffer is full * whether this buffer is full
* @throws IOException * @throws IOException
*/ */
SerializationResult setNextBuffer(Buffer buffer) throws IOException; SerializationResult setNextBufferBuilder(BufferBuilder bufferBuilder) throws IOException;


/** /**
* Retrieves the current target buffer and sets its size to the actual * Retrieves the current target buffer and sets its size to the actual
* number of written bytes. * number of written bytes.
* *
* After calling this method, a new target buffer is required to continue * After calling this method, a new target buffer is required to continue
* writing (see {@link #setNextBuffer(Buffer)}). * writing (see {@link #setNextBufferBuilder(BufferBuilder)}).
* *
* @return the target buffer that was used * @return the target buffer that was used
*/ */
Expand All @@ -102,7 +103,7 @@ public boolean isFullBuffer() {
* *
* <p><strong>NOTE:</strong> After calling this method, <strong>a new target * <p><strong>NOTE:</strong> After calling this method, <strong>a new target
* buffer is required to continue writing</strong> (see * buffer is required to continue writing</strong> (see
* {@link #setNextBuffer(Buffer)}).</p> * {@link #setNextBufferBuilder(BufferBuilder)}).</p>
*/ */
void clearCurrentBuffer(); void clearCurrentBuffer();


Expand All @@ -112,7 +113,7 @@ public boolean isFullBuffer() {
* *
* <p><strong>NOTE:</strong> After calling this method, a <strong>new record * <p><strong>NOTE:</strong> After calling this method, a <strong>new record
* and a new target buffer is required to start writing again</strong> * and a new target buffer is required to start writing again</strong>
* (see {@link #setNextBuffer(Buffer)}). If you want to continue * (see {@link #setNextBufferBuilder(BufferBuilder)}). If you want to continue
* with the current record, use {@link #clearCurrentBuffer()} instead.</p> * with the current record, use {@link #clearCurrentBuffer()} instead.</p>
*/ */
void clear(); void clear();
Expand Down
Expand Up @@ -18,20 +18,23 @@


package org.apache.flink.runtime.io.network.api.serialization; package org.apache.flink.runtime.io.network.api.serialization;


import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;

import org.apache.flink.core.io.IOReadableWritable; import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.core.memory.DataOutputSerializer; import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;

import javax.annotation.Nullable;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;


/** /**
* Record serializer which serializes the complete record to an intermediate * Record serializer which serializes the complete record to an intermediate
* data serialization buffer and copies this buffer to target buffers * data serialization buffer and copies this buffer to target buffers
* one-by-one using {@link #setNextBuffer(Buffer)}. * one-by-one using {@link #setNextBufferBuilder(BufferBuilder)}.
* *
* @param <T> * @param <T>
*/ */
Expand All @@ -50,21 +53,16 @@ public class SpanningRecordSerializer<T extends IOReadableWritable> implements R
private final ByteBuffer lengthBuffer; private final ByteBuffer lengthBuffer;


/** Current target {@link Buffer} of the serializer */ /** Current target {@link Buffer} of the serializer */
private Buffer targetBuffer; @Nullable

private BufferBuilder targetBuffer;
/** Position in current {@link MemorySegment} of target buffer */
private int position;

/** Limit of current {@link MemorySegment} of target buffer */
private int limit;


public SpanningRecordSerializer() { public SpanningRecordSerializer() {
serializationBuffer = new DataOutputSerializer(128); serializationBuffer = new DataOutputSerializer(128);


lengthBuffer = ByteBuffer.allocate(4); lengthBuffer = ByteBuffer.allocate(4);
lengthBuffer.order(ByteOrder.BIG_ENDIAN); lengthBuffer.order(ByteOrder.BIG_ENDIAN);


// ensure initial state with hasRemaining false (for correct setNextBuffer logic) // ensure initial state with hasRemaining false (for correct setNextBufferBuilder logic)
dataBuffer = serializationBuffer.wrapAsByteBuffer(); dataBuffer = serializationBuffer.wrapAsByteBuffer();
lengthBuffer.position(4); lengthBuffer.position(4);
} }
Expand Down Expand Up @@ -105,10 +103,8 @@ public SerializationResult addRecord(T record) throws IOException {
} }


@Override @Override
public SerializationResult setNextBuffer(Buffer buffer) throws IOException { public SerializationResult setNextBufferBuilder(BufferBuilder buffer) throws IOException {
targetBuffer = buffer; targetBuffer = buffer;
position = 0;
limit = buffer.getSize();


if (lengthBuffer.hasRemaining()) { if (lengthBuffer.hasRemaining()) {
copyToTargetBufferFrom(lengthBuffer); copyToTargetBufferFrom(lengthBuffer);
Expand Down Expand Up @@ -140,19 +136,12 @@ private void copyToTargetBufferFrom(ByteBuffer source) {
if (targetBuffer == null) { if (targetBuffer == null) {
return; return;
} }

targetBuffer.append(source);
int needed = source.remaining();
int available = limit - position;
int toCopy = Math.min(needed, available);

targetBuffer.getMemorySegment().put(position, source, toCopy);

position += toCopy;
} }


private SerializationResult getSerializationResult() { private SerializationResult getSerializationResult() {
if (!dataBuffer.hasRemaining() && !lengthBuffer.hasRemaining()) { if (!dataBuffer.hasRemaining() && !lengthBuffer.hasRemaining()) {
return (position < limit) return !targetBuffer.isFull()
? SerializationResult.FULL_RECORD ? SerializationResult.FULL_RECORD
: SerializationResult.FULL_RECORD_MEMORY_SEGMENT_FULL; : SerializationResult.FULL_RECORD_MEMORY_SEGMENT_FULL;
} }
Expand All @@ -165,33 +154,29 @@ public Buffer getCurrentBuffer() {
if (targetBuffer == null) { if (targetBuffer == null) {
return null; return null;
} }

Buffer result = targetBuffer.build();
targetBuffer.setSize(position); targetBuffer = null;
return targetBuffer; return result;
} }


@Override @Override
public void clearCurrentBuffer() { public void clearCurrentBuffer() {
targetBuffer = null; targetBuffer = null;
position = 0;
limit = 0;
} }


@Override @Override
public void clear() { public void clear() {
targetBuffer = null; targetBuffer = null;
position = 0;
limit = 0;


// ensure clear state with hasRemaining false (for correct setNextBuffer logic) // ensure clear state with hasRemaining false (for correct setNextBufferBuilder logic)
dataBuffer.position(dataBuffer.limit()); dataBuffer.position(dataBuffer.limit());
lengthBuffer.position(4); lengthBuffer.position(4);
} }


@Override @Override
public boolean hasData() { public boolean hasData() {
// either data in current target buffer or intermediate buffers // either data in current target buffer or intermediate buffers
return position > 0 || (lengthBuffer.hasRemaining() || dataBuffer.hasRemaining()); return (targetBuffer != null && !targetBuffer.isEmpty()) || lengthBuffer.hasRemaining() || dataBuffer.hasRemaining();
} }


@Override @Override
Expand Down
Expand Up @@ -21,12 +21,13 @@
import org.apache.flink.core.io.IOReadableWritable; import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.SimpleCounter; import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.runtime.event.AbstractEvent; import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.api.serialization.RecordSerializer; import org.apache.flink.runtime.io.network.api.serialization.RecordSerializer;
import org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer; import org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.util.XORShiftRandom; import org.apache.flink.util.XORShiftRandom;


import java.io.IOException; import java.io.IOException;
Expand Down Expand Up @@ -129,8 +130,9 @@ private void sendToTarget(T record, int targetChannel) throws IOException, Inter
break; break;
} }
} else { } else {
buffer = targetPartition.getBufferProvider().requestBufferBlocking(); BufferBuilder bufferBuilder =
result = serializer.setNextBuffer(buffer); targetPartition.getBufferProvider().requestBufferBuilderBlocking();
result = serializer.setNextBufferBuilder(bufferBuilder);
} }
} }
} }
Expand Down
Expand Up @@ -54,11 +54,14 @@ public Buffer(MemorySegment memorySegment, BufferRecycler recycler) {
} }


public Buffer(MemorySegment memorySegment, BufferRecycler recycler, boolean isBuffer) { public Buffer(MemorySegment memorySegment, BufferRecycler recycler, boolean isBuffer) {
this(memorySegment, recycler, isBuffer, memorySegment.size());
}

public Buffer(MemorySegment memorySegment, BufferRecycler recycler, boolean isBuffer, int size) {
this.memorySegment = checkNotNull(memorySegment); this.memorySegment = checkNotNull(memorySegment);
this.recycler = checkNotNull(recycler); this.recycler = checkNotNull(recycler);
this.isBuffer = isBuffer; this.isBuffer = isBuffer;

this.currentSize = size;
this.currentSize = memorySegment.size();
} }


public boolean isBuffer() { public boolean isBuffer() {
Expand Down
@@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.io.network.buffer;

import org.apache.flink.core.memory.MemorySegment;

import javax.annotation.concurrent.NotThreadSafe;

import java.nio.ByteBuffer;

import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;

/**
* Not thread safe class for filling in the initial content of the {@link Buffer}. Once writing to the builder
* is complete, {@link Buffer} instance can be built and shared across multiple threads.
*/
@NotThreadSafe
public class BufferBuilder {
private final MemorySegment memorySegment;

private final BufferRecycler recycler;

private int position = 0;

private boolean built = false;

public BufferBuilder(MemorySegment memorySegment, BufferRecycler recycler) {
this.memorySegment = checkNotNull(memorySegment);
this.recycler = checkNotNull(recycler);
}

/**
* @return number of copied bytes
*/
public int append(ByteBuffer source) {
checkState(!built);

int needed = source.remaining();
int available = limit() - position;
int toCopy = Math.min(needed, available);

memorySegment.put(position, source, toCopy);
position += toCopy;
return toCopy;
}

public boolean isFull() {
checkState(position <= limit());
return position == limit();
}

public Buffer build() {
checkState(!built);
built = true;
return new Buffer(memorySegment, recycler, true, position);
}

public boolean isEmpty() {
return position == 0;
}

private int limit() {
return memorySegment.size();
}
}
Expand Up @@ -43,6 +43,14 @@ public interface BufferProvider {
*/ */
Buffer requestBufferBlocking() throws IOException, InterruptedException; Buffer requestBufferBlocking() throws IOException, InterruptedException;


/**
* Returns a {@link BufferBuilder} instance from the buffer provider.
*
* <p>If there is no buffer available, the call will block until one becomes available again or the
* buffer provider has been destroyed.
*/
BufferBuilder requestBufferBuilderBlocking() throws IOException, InterruptedException;

/** /**
* Adds a buffer availability listener to the buffer provider. * Adds a buffer availability listener to the buffer provider.
* *
Expand Down
Expand Up @@ -179,7 +179,8 @@ public void setBufferPoolOwner(BufferPoolOwner owner) {
@Override @Override
public Buffer requestBuffer() throws IOException { public Buffer requestBuffer() throws IOException {
try { try {
return requestBuffer(false); BufferBuilder bufferBuilder = requestBufferBuilder(false);
return bufferBuilder != null ? bufferBuilder.build() : null;
} }
catch (InterruptedException e) { catch (InterruptedException e) {
throw new IOException(e); throw new IOException(e);
Expand All @@ -188,10 +189,16 @@ public Buffer requestBuffer() throws IOException {


@Override @Override
public Buffer requestBufferBlocking() throws IOException, InterruptedException { public Buffer requestBufferBlocking() throws IOException, InterruptedException {
return requestBuffer(true); BufferBuilder bufferBuilder = requestBufferBuilder(true);
return bufferBuilder != null ? bufferBuilder.build() : null;
} }


private Buffer requestBuffer(boolean isBlocking) throws InterruptedException, IOException { @Override
public BufferBuilder requestBufferBuilderBlocking() throws IOException, InterruptedException {
return requestBufferBuilder(true);
}

private BufferBuilder requestBufferBuilder(boolean isBlocking) throws InterruptedException, IOException {
synchronized (availableMemorySegments) { synchronized (availableMemorySegments) {
returnExcessMemorySegments(); returnExcessMemorySegments();


Expand Down Expand Up @@ -226,7 +233,7 @@ private Buffer requestBuffer(boolean isBlocking) throws InterruptedException, IO
} }
} }


return new Buffer(availableMemorySegments.poll(), this); return new BufferBuilder(availableMemorySegments.poll(), this);
} }
} }


Expand Down

0 comments on commit c6945c2

Please sign in to comment.