Skip to content

Commit

Permalink
HDFS-12636. Ozone: OzoneFileSystem: Implement seek functionality for …
Browse files Browse the repository at this point in the history
…rpc client. Contributed by Lokesh Jain.
  • Loading branch information
mukul1987 authored and omalley committed Apr 26, 2018
1 parent 377b31f commit 9272e10
Show file tree
Hide file tree
Showing 14 changed files with 559 additions and 475 deletions.
Expand Up @@ -18,6 +18,8 @@
package org.apache.hadoop.ozone.client.io;

import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
import org.apache.hadoop.ozone.ksm.helpers.KsmKeyInfo;
import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfo;
Expand All @@ -27,26 +29,35 @@
import org.apache.hadoop.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
import org.apache.hadoop.scm.storage.ChunkInputStream;
import org.apache.hadoop.scm.storage.ContainerProtocolCalls;
import org.apache.ratis.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

/**
* Maintaining a list of ChunkInputStream. Read based on offset.
*/
public class ChunkGroupInputStream extends InputStream {
public class ChunkGroupInputStream extends InputStream implements Seekable {

private static final Logger LOG =
LoggerFactory.getLogger(ChunkGroupInputStream.class);

private static final int EOF = -1;

private final ArrayList<ChunkInputStreamEntry> streamEntries;
// streamOffset[i] stores the offset at which chunkInputStream i stores
// data in the key
private long[] streamOffset = null;
private int currentStreamIndex;
private long length = 0;
private boolean closed = false;
private String key;

public ChunkGroupInputStream() {
streamEntries = new ArrayList<>();
Expand All @@ -66,19 +77,21 @@ public long getRemainingOfIndex(int index) {
/**
* Append another stream to the end of the list.
*
* @param stream the stream instance.
* @param length the max number of bytes that should be written to this
* stream.
* @param stream the stream instance.
* @param streamLength the max number of bytes that should be written to this
* stream.
*/
public synchronized void addStream(InputStream stream, long length) {
streamEntries.add(new ChunkInputStreamEntry(stream, length));
public synchronized void addStream(ChunkInputStream stream,
long streamLength) {
streamEntries.add(new ChunkInputStreamEntry(stream, streamLength));
}


@Override
public synchronized int read() throws IOException {
checkNotClosed();
if (streamEntries.size() <= currentStreamIndex) {
throw new IndexOutOfBoundsException();
return EOF;
}
ChunkInputStreamEntry entry = streamEntries.get(currentStreamIndex);
int data = entry.read();
Expand All @@ -87,6 +100,7 @@ public synchronized int read() throws IOException {

@Override
public synchronized int read(byte[] b, int off, int len) throws IOException {
checkNotClosed();
if (b == null) {
throw new NullPointerException();
}
Expand Down Expand Up @@ -122,15 +136,82 @@ public synchronized int read(byte[] b, int off, int len) throws IOException {
return totalReadLen;
}

private static class ChunkInputStreamEntry extends InputStream {
@Override
public void seek(long pos) throws IOException {
checkNotClosed();
if (pos < 0 || pos >= length) {
if (pos == 0) {
// It is possible for length and pos to be zero in which case
// seek should return instead of throwing exception
return;
}
throw new EOFException(
"EOF encountered at pos: " + pos + " for key: " + key);
}
Preconditions.assertTrue(currentStreamIndex >= 0);
if (currentStreamIndex >= streamEntries.size()) {
currentStreamIndex = Arrays.binarySearch(streamOffset, pos);
} else if (pos < streamOffset[currentStreamIndex]) {
currentStreamIndex =
Arrays.binarySearch(streamOffset, 0, currentStreamIndex, pos);
} else if (pos >= streamOffset[currentStreamIndex] + streamEntries
.get(currentStreamIndex).length) {
currentStreamIndex = Arrays
.binarySearch(streamOffset, currentStreamIndex + 1,
streamEntries.size(), pos);
}
if (currentStreamIndex < 0) {
// Binary search returns -insertionPoint - 1 if element is not present
// in the array. insertionPoint is the point at which element would be
// inserted in the sorted array. We need to adjust the currentStreamIndex
// accordingly so that currentStreamIndex = insertionPoint - 1
currentStreamIndex = -currentStreamIndex - 2;
}
// seek to the proper offset in the ChunkInputStream
streamEntries.get(currentStreamIndex)
.seek(pos - streamOffset[currentStreamIndex]);
}

@Override
public long getPos() throws IOException {
return length == 0 ? 0 :
streamOffset[currentStreamIndex] + streamEntries.get(currentStreamIndex)
.getPos();
}

@Override
public boolean seekToNewSource(long targetPos) throws IOException {
return false;
}

@Override
public int available() throws IOException {
checkNotClosed();
long remaining = length - getPos();
return remaining <= Integer.MAX_VALUE ? (int) remaining : Integer.MAX_VALUE;
}

@Override
public void close() throws IOException {
closed = true;
for (int i = 0; i < streamEntries.size(); i++) {
streamEntries.get(i).close();
}
}

/**
* Encapsulates ChunkInputStream.
*/
public static class ChunkInputStreamEntry extends InputStream
implements Seekable {

private final InputStream inputStream;
private final ChunkInputStream chunkInputStream;
private final long length;
private long currentPosition;


ChunkInputStreamEntry(InputStream chunkInputStream, long length) {
this.inputStream = chunkInputStream;
public ChunkInputStreamEntry(ChunkInputStream chunkInputStream,
long length) {
this.chunkInputStream = chunkInputStream;
this.length = length;
this.currentPosition = 0;
}
Expand All @@ -142,21 +223,36 @@ synchronized long getRemaining() {
@Override
public synchronized int read(byte[] b, int off, int len)
throws IOException {
int readLen = inputStream.read(b, off, len);
int readLen = chunkInputStream.read(b, off, len);
currentPosition += readLen;
return readLen;
}

@Override
public synchronized int read() throws IOException {
int data = inputStream.read();
int data = chunkInputStream.read();
currentPosition += 1;
return data;
}

@Override
public synchronized void close() throws IOException {
inputStream.close();
chunkInputStream.close();
}

@Override
public void seek(long pos) throws IOException {
chunkInputStream.seek(pos);
}

@Override
public long getPos() throws IOException {
return chunkInputStream.getPos();
}

@Override
public boolean seekToNewSource(long targetPos) throws IOException {
return false;
}
}

Expand All @@ -168,8 +264,12 @@ public static LengthInputStream getFromKsmKeyInfo(KsmKeyInfo keyInfo,
long length = 0;
String containerKey;
ChunkGroupInputStream groupInputStream = new ChunkGroupInputStream();
for (KsmKeyLocationInfo ksmKeyLocationInfo :
keyInfo.getLatestVersionLocations().getBlocksLatestVersionOnly()) {
groupInputStream.key = keyInfo.getKeyName();
List<KsmKeyLocationInfo> keyLocationInfos =
keyInfo.getLatestVersionLocations().getBlocksLatestVersionOnly();
groupInputStream.streamOffset = new long[keyLocationInfos.size()];
for (int i = 0; i < keyLocationInfos.size(); i++) {
KsmKeyLocationInfo ksmKeyLocationInfo = keyLocationInfos.get(i);
String containerName = ksmKeyLocationInfo.getContainerName();
Pipeline pipeline =
storageContainerLocationClient.getContainer(containerName);
Expand All @@ -180,6 +280,7 @@ public static LengthInputStream getFromKsmKeyInfo(KsmKeyInfo keyInfo,
try {
LOG.debug("get key accessing {} {}",
xceiverClient.getPipeline().getContainerName(), containerKey);
groupInputStream.streamOffset[i] = length;
ContainerProtos.KeyData containerKeyData = OzoneContainerTranslation
.containerKeyDataForRead(
xceiverClient.getPipeline().getContainerName(), containerKey);
Expand All @@ -202,6 +303,19 @@ public static LengthInputStream getFromKsmKeyInfo(KsmKeyInfo keyInfo,
}
}
}
groupInputStream.length = length;
return new LengthInputStream(groupInputStream, length);
}

/**
* Verify that the input stream is open. Non blocking; this gives
* the last state of the volatile {@link #closed} field.
* @throws IOException if the connection is closed.
*/
private void checkNotClosed() throws IOException {
if (closed) {
throw new IOException(
": " + FSExceptionMessages.STREAM_IS_CLOSED + " Key: " + key);
}
}
}
Expand Up @@ -19,6 +19,7 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.Result;
import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfoGroup;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationType;
Expand Down Expand Up @@ -72,6 +73,7 @@ public class ChunkGroupOutputStream extends OutputStream {
private final XceiverClientManager xceiverClientManager;
private final int chunkSize;
private final String requestID;
private boolean closed;

/**
* A constructor for testing purpose only.
Expand All @@ -86,6 +88,7 @@ public ChunkGroupOutputStream() {
xceiverClientManager = null;
chunkSize = 0;
requestID = null;
closed = false;
}

/**
Expand Down Expand Up @@ -196,6 +199,8 @@ public long getByteOffset() {

@Override
public synchronized void write(int b) throws IOException {
checkNotClosed();

if (streamEntries.size() <= currentStreamIndex) {
Preconditions.checkNotNull(ksmClient);
// allocate a new block, if a exception happens, log an error and
Expand Down Expand Up @@ -230,6 +235,8 @@ public synchronized void write(int b) throws IOException {
@Override
public synchronized void write(byte[] b, int off, int len)
throws IOException {
checkNotClosed();

if (b == null) {
throw new NullPointerException();
}
Expand Down Expand Up @@ -286,6 +293,7 @@ private void allocateNewBlock(int index) throws IOException {

@Override
public synchronized void flush() throws IOException {
checkNotClosed();
for (int i = 0; i <= currentStreamIndex; i++) {
streamEntries.get(i).flush();
}
Expand All @@ -298,6 +306,10 @@ public synchronized void flush() throws IOException {
*/
@Override
public synchronized void close() throws IOException {
if (closed) {
return;
}
closed = true;
for (ChunkOutputStreamEntry entry : streamEntries) {
if (entry != null) {
entry.close();
Expand Down Expand Up @@ -464,4 +476,17 @@ public void close() throws IOException {
}
}
}

/**
* Verify that the output stream is open. Non blocking; this gives
* the last state of the volatile {@link #closed} field.
* @throws IOException if the connection is closed.
*/
private void checkNotClosed() throws IOException {
if (closed) {
throw new IOException(
": " + FSExceptionMessages.STREAM_IS_CLOSED + " Key: " + keyArgs
.getKeyName());
}
}
}
Expand Up @@ -49,4 +49,12 @@ public synchronized void close() throws IOException {
inputStream.close();
}

@Override
public int available() throws IOException {
return inputStream.available();
}

public InputStream getInputStream() {
return inputStream;
}
}
Expand Up @@ -57,4 +57,8 @@ public synchronized void close() throws IOException {
//commitKey can be done here, if needed.
outputStream.close();
}

public OutputStream getOutputStream() {
return outputStream;
}
}

0 comments on commit 9272e10

Please sign in to comment.