Skip to content

Commit

Permalink
[FLINK-35045][state] Support ByteBufferReadable for HadoopDataInputSt…
Browse files Browse the repository at this point in the history
…ream
  • Loading branch information
masteryhx committed Apr 18, 2024
1 parent a147684 commit a312a3b
Showing 1 changed file with 57 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,23 @@

package org.apache.flink.runtime.fs.hdfs;

import org.apache.flink.core.fs.ByteBufferReadable;
import org.apache.flink.core.fs.FSDataInputStream;

import org.apache.hadoop.fs.StreamCapabilities;

import javax.annotation.Nonnull;

import java.io.IOException;
import java.nio.ByteBuffer;

import static org.apache.flink.util.Preconditions.checkNotNull;

/**
* Concrete implementation of the {@link FSDataInputStream} for Hadoop's input streams. This
* supports all file systems supported by Hadoop, such as HDFS and S3 (S3a/S3n).
*/
public final class HadoopDataInputStream extends FSDataInputStream {
public final class HadoopDataInputStream extends FSDataInputStream implements ByteBufferReadable {

/**
* Minimum amount of bytes to skip forward before we issue a seek instead of discarding read.
Expand Down Expand Up @@ -140,4 +144,56 @@ public void skipFully(long bytes) throws IOException {
bytes -= fsDataInputStream.skip(bytes);
}
}

@Override
public int read(ByteBuffer byteBuffer) throws IOException {
// Not all internal stream supports ByteBufferReadable
if (fsDataInputStream.hasCapability(StreamCapabilities.READBYTEBUFFER)) {
return fsDataInputStream.read(byteBuffer);
} else {
if (byteBuffer.hasArray()) {
int len = byteBuffer.remaining();
fsDataInputStream.readFully(byteBuffer.array(), byteBuffer.arrayOffset(), len);
return len;
} else {
// Fallback to read byte then put
int c = read();
if (c == -1) {
return -1;
}
byteBuffer.put((byte) c);

int n = 1, len = byteBuffer.remaining() + 1;
for (; n < len; n++) {
c = read();
if (c == -1) {
break;
}
byteBuffer.put((byte) c);
}
return n;
}
}
}

@Override
public int read(long position, ByteBuffer byteBuffer) throws IOException {
// Not all internal stream supports ByteBufferPositionedReadable
if (fsDataInputStream.hasCapability(StreamCapabilities.PREADBYTEBUFFER)) {
return fsDataInputStream.read(position, byteBuffer);
} else {
if (byteBuffer.hasArray()) {
int len = byteBuffer.remaining();
fsDataInputStream.readFully(
position, byteBuffer.array(), byteBuffer.arrayOffset(), len);
return len;
} else {
// Fallback to positionable read bytes then put
byte[] tmp = new byte[byteBuffer.remaining()];
fsDataInputStream.readFully(position, tmp, 0, tmp.length);
byteBuffer.put(tmp);
return tmp.length;
}
}
}
}

0 comments on commit a312a3b

Please sign in to comment.