Skip to content

Commit

Permalink
[FLINK-8207][network-tests] Unify TestInfiniteBufferProvider and Test…
Browse files Browse the repository at this point in the history
…PooledBufferProvider
  • Loading branch information
pnowojski authored and StefanRRichter committed Jan 8, 2018
1 parent 1f60a1d commit 91c72b9
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 101 deletions.
Expand Up @@ -36,7 +36,7 @@
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.util.TestBufferFactory;
import org.apache.flink.runtime.io.network.util.TestInfiniteBufferProvider;
import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider;
import org.apache.flink.runtime.io.network.util.TestTaskEvent;
import org.apache.flink.runtime.testutils.DiscardingRecycler;
import org.apache.flink.types.IntValue;
Expand Down Expand Up @@ -421,8 +421,7 @@ public void testBroadcastEventBufferReferenceCounting() throws Exception {
new ArrayDeque[]{new ArrayDeque(), new ArrayDeque()};

ResultPartitionWriter partition =
createCollectingPartitionWriter(queues,
new TestInfiniteBufferProvider());
createCollectingPartitionWriter(queues, new TestPooledBufferProvider(Integer.MAX_VALUE));
RecordWriter<?> writer = new RecordWriter<>(partition);

writer.broadcastEvent(EndOfPartitionEvent.INSTANCE);
Expand Down
Expand Up @@ -24,8 +24,8 @@
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.BufferProvider;
import org.apache.flink.runtime.io.network.util.TestBufferFactory;
import org.apache.flink.runtime.io.network.util.TestConsumerCallback;
import org.apache.flink.runtime.io.network.util.TestInfiniteBufferProvider;
import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider;
import org.apache.flink.runtime.io.network.util.TestSubpartitionConsumer;

Expand All @@ -52,9 +52,6 @@ public class SpilledSubpartitionViewTest {

private static final IOManager IO_MANAGER = new IOManagerAsync();

private static final TestInfiniteBufferProvider writerBufferPool =
new TestInfiniteBufferProvider();

@AfterClass
public static void shutdown() {
IO_MANAGER.shutdown();
Expand All @@ -66,7 +63,7 @@ public void testWriteConsume() throws Exception {
final int numberOfBuffersToWrite = 512;

// Setup
final BufferFileWriter writer = createWriterAndWriteBuffers(IO_MANAGER, writerBufferPool, numberOfBuffersToWrite);
final BufferFileWriter writer = createWriterAndWriteBuffers(numberOfBuffersToWrite);

writer.close();

Expand Down Expand Up @@ -94,7 +91,7 @@ public void testConsumeWithFewBuffers() throws Exception {
final int numberOfBuffersToWrite = 512;

// Setup
final BufferFileWriter writer = createWriterAndWriteBuffers(IO_MANAGER, writerBufferPool, numberOfBuffersToWrite);
final BufferFileWriter writer = createWriterAndWriteBuffers(numberOfBuffersToWrite);

writer.close();

Expand Down Expand Up @@ -134,8 +131,8 @@ public void testReadMultipleFilesWithSingleBufferPool() throws Exception {

// Setup
writers = new BufferFileWriter[]{
createWriterAndWriteBuffers(IO_MANAGER, writerBufferPool, 512),
createWriterAndWriteBuffers(IO_MANAGER, writerBufferPool, 512)
createWriterAndWriteBuffers(512),
createWriterAndWriteBuffers(512)
};

readers = new ResultSubpartitionView[writers.length];
Expand Down Expand Up @@ -211,15 +208,12 @@ public void testReadMultipleFilesWithSingleBufferPool() throws Exception {
*
* <p> Call {@link BufferFileWriter#close()} to ensure that all buffers have been written.
*/
static BufferFileWriter createWriterAndWriteBuffers(
IOManager ioManager,
BufferProvider bufferProvider,
int numberOfBuffers) throws IOException {
private static BufferFileWriter createWriterAndWriteBuffers(int numberOfBuffers) throws IOException {

final BufferFileWriter writer = ioManager.createBufferFileWriter(ioManager.createChannel());
final BufferFileWriter writer = IO_MANAGER.createBufferFileWriter(IO_MANAGER.createChannel());

for (int i = 0; i < numberOfBuffers; i++) {
writer.writeBlock(bufferProvider.requestBuffer());
writer.writeBlock(TestBufferFactory.createBuffer());
}

writer.writeBlock(EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE));
Expand Down

This file was deleted.

Expand Up @@ -28,16 +28,17 @@

import java.io.IOException;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.LinkedBlockingDeque;

import static org.apache.flink.util.Preconditions.checkArgument;

public class TestPooledBufferProvider implements BufferProvider {

private final Object bufferCreationLock = new Object();

private final ArrayBlockingQueue<Buffer> buffers;
private final BlockingQueue<Buffer> buffers = new LinkedBlockingDeque<>();

private final TestBufferFactory bufferFactory;

Expand All @@ -49,7 +50,6 @@ public TestPooledBufferProvider(int poolSize) {
checkArgument(poolSize > 0);
this.poolSize = poolSize;

this.buffers = new ArrayBlockingQueue<Buffer>(poolSize);
this.bufferRecycler = new PooledBufferProviderRecycler(buffers);
this.bufferFactory = new TestBufferFactory(32 * 1024, bufferRecycler);
}
Expand Down Expand Up @@ -109,6 +109,10 @@ public int getNumberOfAvailableBuffers() {
return buffers.size();
}

public int getNumberOfCreatedBuffers() {
return bufferFactory.getNumberOfCreatedBuffers();
}

private static class PooledBufferProviderRecycler implements BufferRecycler {

private final Object listenerRegistrationLock = new Object();
Expand Down

0 comments on commit 91c72b9

Please sign in to comment.