Skip to content

Commit b31aaa6

Browse files
author
kevin.cyj
committed
[FLINK-25796][network] Avoid record copy for result partition of sort-shuffle if there are enough buffers for better performance
1 parent 9d162dd commit b31aaa6

File tree

8 files changed

+383
-255
lines changed

8 files changed

+383
-255
lines changed

flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -44,13 +44,8 @@ public class BufferBuilder implements AutoCloseable {
4444
private boolean bufferConsumerCreated = false;
4545

4646
public BufferBuilder(MemorySegment memorySegment, BufferRecycler recycler) {
47-
this(memorySegment, Buffer.DataType.DATA_BUFFER, recycler);
48-
}
49-
50-
public BufferBuilder(
51-
MemorySegment memorySegment, Buffer.DataType dataType, BufferRecycler recycler) {
5247
this.memorySegment = checkNotNull(memorySegment);
53-
this.buffer = new NetworkBuffer(memorySegment, recycler, dataType);
48+
this.buffer = new NetworkBuffer(memorySegment, recycler);
5449
this.maxCapacity = buffer.getMaxCapacity();
5550
}
5651

Lines changed: 122 additions & 103 deletions
Original file line numberDiff line numberDiff line change
@@ -24,20 +24,29 @@
2424
import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
2525
import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
2626
import org.apache.flink.runtime.io.network.buffer.BufferPool;
27+
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
28+
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
2729

2830
import javax.annotation.Nullable;
2931

3032
import java.io.IOException;
3133
import java.nio.ByteBuffer;
3234
import java.util.ArrayDeque;
33-
import java.util.Queue;
3435

3536
import static org.apache.flink.util.Preconditions.checkArgument;
3637
import static org.apache.flink.util.Preconditions.checkNotNull;
3738
import static org.apache.flink.util.Preconditions.checkState;
3839

39-
/** Placeholder. */
40-
public class SparsePartitionSortedBuffer implements SortBuffer {
40+
/**
41+
* * A {@link SortBuffer} implementation which sorts all appended records only by subpartition
42+
* index. Records of the same subpartition keep the appended order.
43+
*
44+
* <p>Different from the {@link SortBasedPartitionSortedBuffer}, in this {@link SortBuffer}
45+
* implementation, memory segment boundary serves as the nature data boundary of different
46+
* subpartitions, which means that one memory segment can never contain data from different
47+
* subpartitions.
48+
*/
49+
public class HashBasedPartitionSortedBuffer implements SortBuffer {
4150

4251
/** A buffer pool to request memory segments from. */
4352
private final BufferPool bufferPool;
@@ -46,40 +55,51 @@ public class SparsePartitionSortedBuffer implements SortBuffer {
4655
private final int numGuaranteedBuffers;
4756

4857
/** Buffers containing data for all subpartitions. */
49-
private final ArrayDeque<BufferBuilder>[] buffers;
58+
private final ArrayDeque<BufferConsumer>[] buffers;
5059

5160
// ---------------------------------------------------------------------------------------------
5261
// Statistics and states
5362
// ---------------------------------------------------------------------------------------------
5463

55-
private int numTotalBuffers;
56-
5764
/** Total number of bytes already appended to this sort buffer. */
5865
private long numTotalBytes;
5966

6067
/** Total number of records already appended to this sort buffer. */
6168
private long numTotalRecords;
6269

70+
/** Whether this sort buffer is full and ready to read data from. */
71+
private boolean isFull;
72+
6373
/** Whether this sort buffer is finished. One can only read a finished sort buffer. */
6474
private boolean isFinished;
6575

6676
/** Whether this sort buffer is released. A released sort buffer can not be used. */
6777
private boolean isReleased;
6878

79+
// ---------------------------------------------------------------------------------------------
80+
// For writing
81+
// ---------------------------------------------------------------------------------------------
82+
83+
/** Partial buffers to be appended data for each channel. */
84+
private final BufferBuilder[] builders;
85+
86+
/** Total number of network buffers already occupied currently by this sort buffer. */
87+
private int numBuffersOccupied;
88+
6989
// ---------------------------------------------------------------------------------------------
7090
// For reading
7191
// ---------------------------------------------------------------------------------------------
7292

93+
/** Used to index the current available channel to read data from. */
94+
private int readOrderIndex;
95+
7396
/** Data of different subpartitions in this sort buffer will be read in this order. */
7497
private final int[] subpartitionReadOrder;
7598

7699
/** Total number of bytes already read from this sort buffer. */
77100
private long numTotalBytesRead;
78101

79-
/** Used to index the current available channel to read data from. */
80-
private int readOrderIndex;
81-
82-
public SparsePartitionSortedBuffer(
102+
public HashBasedPartitionSortedBuffer(
83103
BufferPool bufferPool,
84104
int numSubpartitions,
85105
int numGuaranteedBuffers,
@@ -89,6 +109,7 @@ public SparsePartitionSortedBuffer(
89109
this.bufferPool = checkNotNull(bufferPool);
90110
this.numGuaranteedBuffers = numGuaranteedBuffers;
91111

112+
this.builders = new BufferBuilder[numSubpartitions];
92113
this.buffers = new ArrayDeque[numSubpartitions];
93114
for (int channel = 0; channel < numSubpartitions; ++channel) {
94115
this.buffers[channel] = new ArrayDeque<>();
@@ -109,133 +130,114 @@ public SparsePartitionSortedBuffer(
109130
public boolean append(ByteBuffer source, int targetChannel, Buffer.DataType dataType)
110131
throws IOException {
111132
checkArgument(source.hasRemaining(), "Cannot append empty data.");
133+
checkState(!isFull, "Sort buffer is already full.");
112134
checkState(!isFinished, "Sort buffer is already finished.");
113135
checkState(!isReleased, "Sort buffer is already released.");
114136

115137
int totalBytes = source.remaining();
116138
if (dataType.isBuffer()) {
117-
// return false directly if it can not allocate enough buffers for the given record
118-
Queue<BufferBuilder> freeSegments = allocateBuffersForRecord(totalBytes, targetChannel);
119-
if (freeSegments == null) {
120-
return false;
121-
}
122-
writeRecord(source, targetChannel, freeSegments);
139+
writeRecord(source, targetChannel);
123140
} else {
124141
writeEvent(source, targetChannel, dataType);
125142
}
126143

127-
++numTotalRecords;
128-
numTotalBytes += totalBytes;
129-
return true;
144+
isFull = source.hasRemaining();
145+
if (!isFull) {
146+
++numTotalRecords;
147+
}
148+
numTotalBytes += totalBytes - source.remaining();
149+
return isFull;
130150
}
131151

132152
private void writeEvent(ByteBuffer source, int targetChannel, Buffer.DataType dataType) {
133-
ArrayDeque<BufferBuilder> channelBuffers = buffers[targetChannel];
134-
if (!channelBuffers.isEmpty()) {
135-
channelBuffers.peekLast().finish();
153+
BufferBuilder builder = builders[targetChannel];
154+
if (builder != null) {
155+
builder.finish();
156+
buffers[targetChannel].add(builder.createBufferConsumerFromBeginning());
157+
builder.close();
158+
builders[targetChannel] = null;
136159
}
137160

138-
MemorySegment segment = MemorySegmentFactory.allocateUnpooledSegment(source.remaining());
139-
BufferBuilder builder = new BufferBuilder(segment, dataType, ignored -> {});
140-
builder.append(source);
141-
channelBuffers.add(builder);
161+
MemorySegment segment =
162+
MemorySegmentFactory.allocateUnpooledOffHeapMemory(source.remaining());
163+
segment.put(0, source, segment.size());
164+
BufferConsumer consumer =
165+
new BufferConsumer(
166+
new NetworkBuffer(segment, FreeingBufferRecycler.INSTANCE, dataType),
167+
segment.size());
168+
buffers[targetChannel].add(consumer);
142169
}
143170

144-
private void writeRecord(
145-
ByteBuffer source, int targetChannel, Queue<BufferBuilder> freeSegments) {
146-
int remainingFreeSpace = 0;
147-
ArrayDeque<BufferBuilder> channelBuffers = buffers[targetChannel];
148-
149-
BufferBuilder prevBuffer = null;
150-
if (!channelBuffers.isEmpty()) {
151-
prevBuffer = channelBuffers.peekLast();
152-
remainingFreeSpace = prevBuffer.getWritableBytes();
153-
}
154-
155-
if (remainingFreeSpace > 0) {
156-
prevBuffer.append(source);
157-
}
158-
159-
while (source.hasRemaining()) {
160-
BufferBuilder targetBuffer = freeSegments.poll();
161-
targetBuffer.append(source);
162-
if (prevBuffer != null) {
163-
prevBuffer.finish();
171+
private void writeRecord(ByteBuffer source, int targetChannel) throws IOException {
172+
do {
173+
BufferBuilder builder = builders[targetChannel];
174+
if (builder == null) {
175+
builder = requestBufferFromPool();
176+
if (builder == null) {
177+
break;
178+
}
179+
++numBuffersOccupied;
180+
builders[targetChannel] = builder;
164181
}
165-
channelBuffers.add(targetBuffer);
166-
prevBuffer = targetBuffer;
167-
}
168-
}
169-
170-
private ArrayDeque<BufferBuilder> allocateBuffersForRecord(
171-
int numRecordBytes, int targetChannel) throws IOException {
172-
int remainingFreeSpace = 0;
173-
ArrayDeque<BufferBuilder> channelBuffers = buffers[targetChannel];
174-
if (!channelBuffers.isEmpty()) {
175-
remainingFreeSpace = channelBuffers.peekLast().getWritableBytes();
176-
}
177182

178-
ArrayDeque<BufferBuilder> freeBuffers = new ArrayDeque<>();
179-
while (remainingFreeSpace < numRecordBytes) {
180-
BufferBuilder buffer = requestBufferFromPool();
181-
if (buffer == null) {
182-
// return null if we can not allocate enough buffers for the appended record
183-
numTotalBuffers -= freeBuffers.size();
184-
freeBuffers.forEach(BufferBuilder::close);
185-
freeBuffers.clear();
186-
return null;
183+
builder.append(source);
184+
if (builder.isFull()) {
185+
builder.finish();
186+
buffers[targetChannel].add(builder.createBufferConsumerFromBeginning());
187+
builder.close();
188+
builders[targetChannel] = null;
187189
}
188-
189-
++numTotalBuffers;
190-
freeBuffers.add(buffer);
191-
remainingFreeSpace += buffer.getMaxCapacity();
192-
}
193-
return freeBuffers;
190+
} while (source.hasRemaining());
194191
}
195192

196193
private BufferBuilder requestBufferFromPool() throws IOException {
197194
try {
198195
// blocking request buffers if there is still guaranteed memory
199-
if (numTotalBuffers < numGuaranteedBuffers) {
196+
if (numBuffersOccupied < numGuaranteedBuffers) {
200197
return bufferPool.requestBufferBuilderBlocking();
201198
}
202199
} catch (InterruptedException e) {
203-
throw new IOException("Interrupted while requesting buffer.");
200+
throw new IOException("Interrupted while requesting buffer.", e);
204201
}
205202

206203
return bufferPool.requestBufferBuilder();
207204
}
208205

209206
@Override
210207
public BufferWithChannel copyIntoSegment(MemorySegment target) {
211-
checkState(hasRemaining(), "No data remaining.");
212-
checkState(isFinished, "Should finish the sort buffer first before coping any data.");
208+
checkState(isFull, "Sort buffer is not ready to be read.");
213209
checkState(!isReleased, "Sort buffer is already released.");
214210

215211
BufferWithChannel buffer = null;
212+
if (!hasRemaining() || readOrderIndex >= subpartitionReadOrder.length) {
213+
return null;
214+
}
215+
216216
int targetChannel = subpartitionReadOrder[readOrderIndex];
217217
while (buffer == null) {
218-
try (BufferBuilder builder = buffers[targetChannel].poll()) {
219-
if (builder != null) {
220-
try (BufferConsumer consumer = builder.createBufferConsumerFromBeginning()) {
221-
buffer = new BufferWithChannel(consumer.build(), targetChannel);
222-
numTotalBytesRead += buffer.getBuffer().readableBytes();
223-
}
224-
} else {
225-
targetChannel = subpartitionReadOrder[++readOrderIndex];
218+
BufferConsumer consumer = buffers[targetChannel].poll();
219+
if (consumer != null) {
220+
buffer = new BufferWithChannel(consumer.build(), targetChannel);
221+
numBuffersOccupied -= buffer.getBuffer().isBuffer() ? 1 : 0;
222+
numTotalBytesRead += buffer.getBuffer().readableBytes();
223+
consumer.close();
224+
} else {
225+
if (++readOrderIndex >= subpartitionReadOrder.length) {
226+
break;
226227
}
228+
targetChannel = subpartitionReadOrder[readOrderIndex];
227229
}
228230
}
229231
return buffer;
230232
}
231233

232234
@Override
233-
public long numRecords() {
235+
public long numTotalRecords() {
234236
return numTotalRecords;
235237
}
236238

237239
@Override
238-
public long numBytes() {
240+
public long numTotalBytes() {
239241
return numTotalBytes;
240242
}
241243

@@ -246,12 +248,18 @@ public boolean hasRemaining() {
246248

247249
@Override
248250
public void finish() {
251+
checkState(!isFull, "SortBuffer must not be full.");
249252
checkState(!isFinished, "SortBuffer is already finished.");
250253

254+
isFull = true;
251255
isFinished = true;
252-
for (ArrayDeque<BufferBuilder> channelBuffers : buffers) {
253-
if (!channelBuffers.isEmpty()) {
254-
channelBuffers.peekLast().finish();
256+
for (int channel = 0; channel < builders.length; ++channel) {
257+
BufferBuilder builder = builders[channel];
258+
if (builder != null) {
259+
builder.finish();
260+
buffers[channel].add(builder.createBufferConsumerFromBeginning());
261+
builder.close();
262+
builders[channel] = null;
255263
}
256264
}
257265
}
@@ -263,13 +271,26 @@ public boolean isFinished() {
263271

264272
@Override
265273
public void release() {
266-
// the sort buffer can be released by other threads
267274
if (isReleased) {
268275
return;
269276
}
270277
isReleased = true;
271278

272-
clear();
279+
for (int channel = 0; channel < builders.length; ++channel) {
280+
BufferBuilder builder = builders[channel];
281+
if (builder != null) {
282+
builder.close();
283+
builders[channel] = null;
284+
}
285+
}
286+
287+
for (ArrayDeque<BufferConsumer> buffer : buffers) {
288+
BufferConsumer consumer = buffer.poll();
289+
while (consumer != null) {
290+
consumer.close();
291+
consumer = buffer.poll();
292+
}
293+
}
273294
}
274295

275296
@Override
@@ -278,18 +299,16 @@ public boolean isReleased() {
278299
}
279300

280301
@Override
281-
public void clear() {
282-
for (ArrayDeque<BufferBuilder> channelBuffers : buffers) {
283-
while (!channelBuffers.isEmpty()) {
284-
channelBuffers.poll().close();
285-
}
286-
}
302+
public void reset() {
303+
checkState(!isFinished, "Sort buffer has been finished.");
304+
checkState(!isReleased, "Sort buffer has been released.");
287305

288-
numTotalBuffers = 0;
289-
numTotalBytes = 0;
290-
numTotalRecords = 0;
291-
isFinished = false;
292-
numTotalBytesRead = 0;
306+
isFull = false;
293307
readOrderIndex = 0;
294308
}
309+
310+
@Override
311+
public BufferPool getBufferPool() {
312+
return bufferPool;
313+
}
295314
}

0 commit comments

Comments
 (0)