Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HDDS-2703. OzoneFSInputStream to support ByteBufferReadable #345

Merged
merged 8 commits into from Dec 17, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -20,9 +20,11 @@

import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.ByteBufferReadable;
import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.Seekable;

Expand All @@ -34,7 +36,8 @@
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public final class OzoneFSInputStream extends FSInputStream {
public final class OzoneFSInputStream extends FSInputStream
implements ByteBufferReadable {

private final InputStream inputStream;

Expand Down Expand Up @@ -76,4 +79,23 @@ public boolean seekToNewSource(long targetPos) throws IOException {
public int available() throws IOException {
return inputStream.available();
}

/**
* @param buf the ByteBuffer to receive the results of the read operation.
* @return the number of bytes read, possibly zero, or -1 if
* reach end-of-stream
* @throws IOException if there is some error performing the read
*/
@Override
public int read(ByteBuffer buf) throws IOException {

int bufInitPos = buf.position();
int readLen = Math.min(buf.remaining(), inputStream.available());

byte[] readData = new byte[readLen];
int bytesRead = inputStream.read(readData, bufInitPos, readLen);
buf.put(readData);

return bytesRead;
}
}
Expand Up @@ -20,6 +20,9 @@

import java.io.IOException;

import java.nio.ByteBuffer;


import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FSDataInputStream;
Expand Down Expand Up @@ -133,4 +136,20 @@ public void testO3FSMultiByteRead() throws IOException {
Assert.assertArrayEquals(value, data);
}
}

@Test
public void testO3FSByteBufferRead() throws IOException {
try (FSDataInputStream inputStream = fs.open(filePath)) {

ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
int byteRead = inputStream.read(buffer);

Assert.assertEquals(byteRead, 1024 * 1024);

byte[] value = new byte[1024 * 1024];
System.arraycopy(data, 0, value, 0, value.length);

Assert.assertArrayEquals(value, buffer.array());
}
}
}