Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs;

import java.io.IOException;
import java.nio.ByteBuffer;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;

/**
* Implementers of this interface provide a positioned read API that writes to a
* {@link ByteBuffer} rather than a {@code byte[]}.
*
* @see PositionedReadable
* @see ByteBufferReadable
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public interface ByteBufferPositionedReadable {
/**
* Reads up to {@code buf.remaining()} bytes into buf from a given position
* in the file and returns the number of bytes read. Callers should use
* {@code buf.limit(...)} to control the size of the desired read and
* {@code buf.position(...)} to control the offset into the buffer the data
* should be written to.
* <p>
* After a successful call, {@code buf.position()} will be advanced by the
* number of bytes read and {@code buf.limit()} will be unchanged.
* <p>
* In the case of an exception, the state of the buffer (the contents of the
* buffer, the {@code buf.position()}, the {@code buf.limit()}, etc.) is
* undefined, and callers should be prepared to recover from this
* eventuality.
* <p>
* Callers should use {@link StreamCapabilities#hasCapability(String)} with
* {@link StreamCapabilities#PREADBYTEBUFFER} to check if the underlying
* stream supports this interface, otherwise they might get a
* {@link UnsupportedOperationException}.
* <p>
* Implementations should treat 0-length requests as legitimate, and must not
* signal an error upon their receipt.
*
* @param position position within file
* @param buf the ByteBuffer to receive the results of the read operation.
* @return the number of bytes read, possibly zero, or -1 if reached
* end-of-stream
* @throws IOException if there is some error performing the read
*/
int read(long position, ByteBuffer buf) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.io.IOException;
import java.nio.ByteBuffer;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;

Expand All @@ -33,16 +34,18 @@ public interface ByteBufferReadable {
* Reads up to buf.remaining() bytes into buf. Callers should use
* buf.limit(..) to control the size of the desired read.
* <p>
* After a successful call, buf.position() will be advanced by the number
* of bytes read and buf.limit() should be unchanged.
* After a successful call, {@code buf.position()} will be advanced by the
* number of bytes read and {@code buf.limit()} will be unchanged.
* <p>
* In the case of an exception, the values of buf.position() and buf.limit()
* are undefined, and callers should be prepared to recover from this
* In the case of an exception, the state of the buffer (the contents of the
* buffer, the {@code buf.position()}, the {@code buf.limit()}, etc.) is
* undefined, and callers should be prepared to recover from this
* eventuality.
* <p>
* Many implementations will throw {@link UnsupportedOperationException}, so
* callers that are not confident in support for this method from the
* underlying filesystem should be prepared to handle that exception.
* Callers should use {@link StreamCapabilities#hasCapability(String)} with
* {@link StreamCapabilities#READBYTEBUFFER} to check if the underlying
* stream supports this interface, otherwise they might get a
* {@link UnsupportedOperationException}.
* <p>
* Implementations should treat 0-length requests as legitimate, and must not
* signal an error upon their receipt.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@
public class FSDataInputStream extends DataInputStream
implements Seekable, PositionedReadable,
ByteBufferReadable, HasFileDescriptor, CanSetDropBehind, CanSetReadahead,
HasEnhancedByteBufferAccess, CanUnbuffer, StreamCapabilities {
HasEnhancedByteBufferAccess, CanUnbuffer, StreamCapabilities,
ByteBufferPositionedReadable {
/**
* Map ByteBuffers that we have handed out to readers to ByteBufferPool
* objects
Expand Down Expand Up @@ -148,7 +149,8 @@ public int read(ByteBuffer buf) throws IOException {
return ((ByteBufferReadable)in).read(buf);
}

throw new UnsupportedOperationException("Byte-buffer read unsupported by input stream");
throw new UnsupportedOperationException("Byte-buffer read unsupported " +
"by input stream");
}

@Override
Expand Down Expand Up @@ -247,4 +249,13 @@ public boolean hasCapability(String capability) {
public String toString() {
return super.toString() + ": " + in;
}

@Override
public int read(long position, ByteBuffer buf) throws IOException {
if (in instanceof ByteBufferPositionedReadable) {
return ((ByteBufferPositionedReadable) in).read(position, buf);
}
throw new UnsupportedOperationException("Byte-buffer pread unsupported " +
"by input stream");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,12 @@ public interface StreamCapabilities {
*/
String READBYTEBUFFER = "in:readbytebuffer";

/**
* Stream read(long, ByteBuffer) capability implemented by
* {@link ByteBufferPositionedReadable#read(long, java.nio.ByteBuffer)}.
*/
String PREADBYTEBUFFER = "in:preadbytebuffer";

/**
* Capabilities that a stream can support and be queried for.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.EnumSet;
import java.util.Random;

import org.apache.hadoop.fs.ByteBufferPositionedReadable;
import org.apache.hadoop.fs.ByteBufferReadable;
import org.apache.hadoop.fs.CanUnbuffer;
import org.apache.hadoop.fs.FSDataOutputStream;
Expand Down Expand Up @@ -129,6 +130,32 @@ private void preadCheck(PositionedReadable in) throws Exception {
Assert.assertArrayEquals(result, expectedData);
}

private int byteBufferPreadAll(ByteBufferPositionedReadable in,
ByteBuffer buf) throws IOException {
int n = 0;
int total = 0;
while (n != -1) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If n is 0, will we get stuck in a dead loop ? , try to use if(n > 0 )

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC the HDFS read APIs make the same guarantees as InputStream#read(byte[], which returns -1 if there is no more data to read.

total += n;
if (!buf.hasRemaining()) {
break;
}
n = in.read(total, buf);
}

return total;
}

private void byteBufferPreadCheck(ByteBufferPositionedReadable in)
throws Exception {
ByteBuffer result = ByteBuffer.allocate(dataLen);
int n = byteBufferPreadAll(in, result);

Assert.assertEquals(dataLen, n);
ByteBuffer expectedData = ByteBuffer.allocate(n);
expectedData.put(data, 0, n);
Assert.assertArrayEquals(result.array(), expectedData.array());
}

protected OutputStream getOutputStream(int bufferSize) throws IOException {
return getOutputStream(bufferSize, key, iv);
}
Expand Down Expand Up @@ -288,20 +315,36 @@ private int readAll(InputStream in, long pos, byte[] b, int off, int len)

return total;
}

private int readAll(InputStream in, long pos, ByteBuffer buf)
throws IOException {
int n = 0;
int total = 0;
while (n != -1) {
total += n;
if (!buf.hasRemaining()) {
break;
}
n = ((ByteBufferPositionedReadable) in).read(pos + total, buf);
}

return total;
}

/** Test positioned read. */
@Test(timeout=120000)
public void testPositionedRead() throws Exception {
OutputStream out = getOutputStream(defaultBufferSize);
writeData(out);
try (OutputStream out = getOutputStream(defaultBufferSize)) {
writeData(out);
}

InputStream in = getInputStream(defaultBufferSize);
// Pos: 1/3 dataLen
positionedReadCheck(in , dataLen / 3);
try (InputStream in = getInputStream(defaultBufferSize)) {
// Pos: 1/3 dataLen
positionedReadCheck(in, dataLen / 3);

// Pos: 1/2 dataLen
positionedReadCheck(in, dataLen / 2);
in.close();
// Pos: 1/2 dataLen
positionedReadCheck(in, dataLen / 2);
}
}

private void positionedReadCheck(InputStream in, int pos) throws Exception {
Expand All @@ -315,6 +358,35 @@ private void positionedReadCheck(InputStream in, int pos) throws Exception {
System.arraycopy(data, pos, expectedData, 0, n);
Assert.assertArrayEquals(readData, expectedData);
}

/** Test positioned read with ByteBuffers. */
@Test(timeout=120000)
public void testPositionedReadWithByteBuffer() throws Exception {
try (OutputStream out = getOutputStream(defaultBufferSize)) {
writeData(out);
}

try (InputStream in = getInputStream(defaultBufferSize)) {
// Pos: 1/3 dataLen
positionedReadCheckWithByteBuffer(in, dataLen / 3);

// Pos: 1/2 dataLen
positionedReadCheckWithByteBuffer(in, dataLen / 2);
}
}

private void positionedReadCheckWithByteBuffer(InputStream in, int pos)
throws Exception {
ByteBuffer result = ByteBuffer.allocate(dataLen);
int n = readAll(in, pos, result);

Assert.assertEquals(dataLen, n + pos);
byte[] readData = new byte[n];
System.arraycopy(result.array(), 0, readData, 0, n);
byte[] expectedData = new byte[n];
System.arraycopy(data, pos, expectedData, 0, n);
Assert.assertArrayEquals(readData, expectedData);
}

/** Test read fully */
@Test(timeout=120000)
Expand Down Expand Up @@ -505,12 +577,40 @@ private void byteBufferReadCheck(InputStream in, ByteBuffer buf,
System.arraycopy(data, 0, expectedData, 0, n);
Assert.assertArrayEquals(readData, expectedData);
}

private void byteBufferPreadCheck(InputStream in, ByteBuffer buf,
int bufPos) throws Exception {
// Test reading from position 0
buf.position(bufPos);
int n = ((ByteBufferPositionedReadable) in).read(0, buf);
Assert.assertEquals(bufPos + n, buf.position());
byte[] readData = new byte[n];
buf.rewind();
buf.position(bufPos);
buf.get(readData);
byte[] expectedData = new byte[n];
System.arraycopy(data, 0, expectedData, 0, n);
Assert.assertArrayEquals(readData, expectedData);

// Test reading from half way through the data
buf.position(bufPos);
n = ((ByteBufferPositionedReadable) in).read(dataLen / 2, buf);
Assert.assertEquals(bufPos + n, buf.position());
readData = new byte[n];
buf.rewind();
buf.position(bufPos);
buf.get(readData);
expectedData = new byte[n];
System.arraycopy(data, dataLen / 2, expectedData, 0, n);
Assert.assertArrayEquals(readData, expectedData);
}

/** Test byte buffer read with different buffer size. */
@Test(timeout=120000)
public void testByteBufferRead() throws Exception {
OutputStream out = getOutputStream(defaultBufferSize);
writeData(out);
try (OutputStream out = getOutputStream(defaultBufferSize)) {
writeData(out);
}

// Default buffer size, initial buffer position is 0
InputStream in = getInputStream(defaultBufferSize);
Expand Down Expand Up @@ -560,6 +660,53 @@ public void testByteBufferRead() throws Exception {
byteBufferReadCheck(in, buf, 11);
in.close();
}

/** Test byte buffer pread with different buffer size. */
@Test(timeout=120000)
public void testByteBufferPread() throws Exception {
try (OutputStream out = getOutputStream(defaultBufferSize)) {
writeData(out);
}

try (InputStream defaultBuf = getInputStream(defaultBufferSize);
InputStream smallBuf = getInputStream(smallBufferSize)) {

ByteBuffer buf = ByteBuffer.allocate(dataLen + 100);

// Default buffer size, initial buffer position is 0
byteBufferPreadCheck(defaultBuf, buf, 0);

// Default buffer size, initial buffer position is not 0
buf.clear();
byteBufferPreadCheck(defaultBuf, buf, 11);

// Small buffer size, initial buffer position is 0
buf.clear();
byteBufferPreadCheck(smallBuf, buf, 0);

// Small buffer size, initial buffer position is not 0
buf.clear();
byteBufferPreadCheck(smallBuf, buf, 11);

// Test with direct ByteBuffer
buf = ByteBuffer.allocateDirect(dataLen + 100);

// Direct buffer, default buffer size, initial buffer position is 0
byteBufferPreadCheck(defaultBuf, buf, 0);

// Direct buffer, default buffer size, initial buffer position is not 0
buf.clear();
byteBufferPreadCheck(defaultBuf, buf, 11);

// Direct buffer, small buffer size, initial buffer position is 0
buf.clear();
byteBufferPreadCheck(smallBuf, buf, 0);

// Direct buffer, small buffer size, initial buffer position is not 0
buf.clear();
byteBufferPreadCheck(smallBuf, buf, 11);
}
}

@Test(timeout=120000)
public void testCombinedOp() throws Exception {
Expand Down Expand Up @@ -797,5 +944,23 @@ public void testUnbuffer() throws Exception {
// The close will be called when exiting this try-with-resource block
}
}

// Test ByteBuffer pread
try (InputStream in = getInputStream(smallBufferSize)) {
if (in instanceof ByteBufferPositionedReadable) {
ByteBufferPositionedReadable bbpin = (ByteBufferPositionedReadable) in;

// Test unbuffer after pread
byteBufferPreadCheck(bbpin);
((CanUnbuffer) in).unbuffer();

// Test pread again after unbuffer
byteBufferPreadCheck(bbpin);

// Test close after unbuffer
((CanUnbuffer) in).unbuffer();
// The close will be called when exiting this try-with-resource block
}
}
}
}
Loading