Skip to content

Commit

Permalink
[Distributed runtime] Rename read method of BufferReader to indicate …
Browse files Browse the repository at this point in the history
…blocking behaviour
  • Loading branch information
uce committed Jan 19, 2015
1 parent d1cc30d commit c9709a8
Show file tree
Hide file tree
Showing 5 changed files with 11 additions and 11 deletions.
Expand Up @@ -75,7 +75,7 @@ protected boolean getNextRecord(T target) throws IOException, InterruptedExcepti
} }
} }


final Buffer nextBuffer = reader.getNextBuffer(); final Buffer nextBuffer = reader.getNextBufferBlocking();
final int channelIndex = reader.getChannelIndexOfLastBuffer(); final int channelIndex = reader.getChannelIndexOfLastBuffer();


if (nextBuffer == null) { if (nextBuffer == null) {
Expand Down
Expand Up @@ -230,7 +230,7 @@ public void requestPartitionsOnce() throws IOException {
} }


@Override @Override
public Buffer getNextBuffer() throws IOException, InterruptedException { public Buffer getNextBufferBlocking() throws IOException, InterruptedException {
requestPartitionsOnce(); requestPartitionsOnce();


while (true) { while (true) {
Expand Down
Expand Up @@ -30,9 +30,9 @@
* <p> * <p>
* {@link BufferReaderBase} is the runtime API for consuming results. Events * {@link BufferReaderBase} is the runtime API for consuming results. Events
* are handled by the reader and users can query for buffers with * are handled by the reader and users can query for buffers with
* {@link #getNextBuffer()} or {@link #getNextBuffer(Buffer)}. * {@link #getNextBufferBlocking()} or {@link #getNextBuffer(Buffer)}.
* <p> * <p>
* <strong>Important</strong>: If {@link #getNextBuffer()} is used, it is * <strong>Important</strong>: If {@link #getNextBufferBlocking()} is used, it is
* necessary to release the returned buffers with {@link Buffer#recycle()} * necessary to release the returned buffers with {@link Buffer#recycle()}
* after they are consumed. * after they are consumed.
*/ */
Expand All @@ -50,10 +50,10 @@ public interface BufferReaderBase extends ReaderBase {
* *
* @see #getChannelIndexOfLastBuffer() * @see #getChannelIndexOfLastBuffer()
*/ */
Buffer getNextBuffer() throws IOException, InterruptedException; Buffer getNextBufferBlocking() throws IOException, InterruptedException;


/** /**
* {@link #getNextBuffer()} requires the user to quickly recycle the * {@link #getNextBufferBlocking()} requires the user to quickly recycle the
* returned buffer. For a fully buffer-oriented runtime, we need to * returned buffer. For a fully buffer-oriented runtime, we need to
* support a variant of this method, which allows buffers to be exchanged * support a variant of this method, which allows buffers to be exchanged
* in order to save unnecessary memory copies between buffer pools. * in order to save unnecessary memory copies between buffer pools.
Expand All @@ -66,7 +66,7 @@ public interface BufferReaderBase extends ReaderBase {


/** /**
* Returns a channel index for the last {@link Buffer} instance returned by * Returns a channel index for the last {@link Buffer} instance returned by
* {@link #getNextBuffer()} or {@link #getNextBuffer(Buffer)}. * {@link #getNextBufferBlocking()} or {@link #getNextBuffer(Buffer)}.
* <p> * <p>
* The returned index is guaranteed to be the same for all buffers read by * The returned index is guaranteed to be the same for all buffers read by
* the same {@link RemoteInputChannel} instance. This is useful when data spans * the same {@link RemoteInputChannel} instance. This is useful when data spans
Expand Down
Expand Up @@ -101,7 +101,7 @@ public void requestPartitionsOnce() throws IOException {




@Override @Override
public Buffer getNextBuffer() throws IOException, InterruptedException { public Buffer getNextBufferBlocking() throws IOException, InterruptedException {
requestPartitionsOnce(); requestPartitionsOnce();


do { do {
Expand Down Expand Up @@ -135,7 +135,7 @@ else if (isIterative && remainingReaders.isEmpty()) {
} }
} }


Buffer buffer = currentReader.getNextBuffer(); Buffer buffer = currentReader.getNextBufferBlocking();
channelIndexOfLastReadBuffer = currentReaderChannelIndexOffset + currentReader.getChannelIndexOfLastBuffer(); channelIndexOfLastReadBuffer = currentReaderChannelIndexOffset + currentReader.getChannelIndexOfLastBuffer();


isTaskEvent = false; isTaskEvent = false;
Expand Down
Expand Up @@ -109,7 +109,7 @@ public void testExceptionEndOfSuperstepEventWithNonIterativeReader() throws IOEx
final BufferReader reader = mockReader.getMock(); final BufferReader reader = mockReader.getMock();


// Should throw Exception, because it's a non-iterative reader // Should throw Exception, because it's a non-iterative reader
reader.getNextBuffer(); reader.getNextBufferBlocking();
} }


// ------------------------------------------------------------------------ // ------------------------------------------------------------------------
Expand All @@ -128,7 +128,7 @@ static void consumeAndVerify(BufferReaderBase reader, int expectedNumReadBuffers


while (true) { while (true) {
Buffer buffer; Buffer buffer;
while ((buffer = reader.getNextBuffer()) != null) { while ((buffer = reader.getNextBufferBlocking()) != null) {
buffer.recycle(); buffer.recycle();


numReadBuffers++; numReadBuffers++;
Expand Down

0 comments on commit c9709a8

Please sign in to comment.