Skip to content

Commit

Permalink
Change open to return UnderFileInputStream
Browse files Browse the repository at this point in the history
  • Loading branch information
madanadit committed Dec 13, 2016
1 parent 970333d commit 08ad769
Show file tree
Hide file tree
Showing 10 changed files with 69 additions and 47 deletions.
Expand Up @@ -15,13 +15,13 @@
import alluxio.exception.ExceptionMessage; import alluxio.exception.ExceptionMessage;
import alluxio.exception.PreconditionMessage; import alluxio.exception.PreconditionMessage;
import alluxio.metrics.MetricsSystem; import alluxio.metrics.MetricsSystem;
import alluxio.underfs.UnderFileInputStream;
import alluxio.underfs.options.OpenOptions; import alluxio.underfs.options.OpenOptions;


import com.codahale.metrics.Counter; import com.codahale.metrics.Counter;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;


import java.io.IOException; import java.io.IOException;
import java.io.InputStream;


import javax.annotation.concurrent.NotThreadSafe; import javax.annotation.concurrent.NotThreadSafe;
import javax.annotation.concurrent.ThreadSafe; import javax.annotation.concurrent.ThreadSafe;
Expand Down Expand Up @@ -53,7 +53,7 @@ public final class UnderStoreBlockInStream extends BlockInStream {
*/ */
private long mPos; private long mPos;
/** The current under store stream. */ /** The current under store stream. */
private InputStream mUnderStoreStream; private UnderFileInputStream mUnderStoreStream;


/** /**
* A factory which can create an input stream to under storage. * A factory which can create an input stream to under storage.
Expand All @@ -64,7 +64,7 @@ public interface UnderStoreStreamFactory extends AutoCloseable {
* @return an input stream to under storage * @return an input stream to under storage
* @throws IOException if an IO exception occurs * @throws IOException if an IO exception occurs
*/ */
InputStream create(OpenOptions options) throws IOException; UnderFileInputStream create(OpenOptions options) throws IOException;


/** /**
* Closes the factory, releasing any resources it was holding. * Closes the factory, releasing any resources it was holding.
Expand Down Expand Up @@ -153,14 +153,7 @@ public long remaining() {


@Override @Override
public void seek(long pos) throws IOException { public void seek(long pos) throws IOException {
if (pos < mPos) { mUnderStoreStream.seek(pos);
setUnderStoreStream(pos);
} else {
long toSkip = pos - mPos;
if (skip(toSkip) != toSkip) {
throw new IOException(ExceptionMessage.FAILED_SEEK.getMessage(pos));
}
}
} }


@Override @Override
Expand Down
Expand Up @@ -17,6 +17,7 @@
import alluxio.client.file.options.OpenUfsFileOptions; import alluxio.client.file.options.OpenUfsFileOptions;
import alluxio.client.UnderFileSystemFileReader; import alluxio.client.UnderFileSystemFileReader;
import alluxio.exception.AlluxioException; import alluxio.exception.AlluxioException;
import alluxio.underfs.UnderFileInputStream;
import alluxio.underfs.options.OpenOptions; import alluxio.underfs.options.OpenOptions;


import java.io.IOException; import java.io.IOException;
Expand Down Expand Up @@ -47,7 +48,7 @@ public DelegatedUnderStoreStreamFactory(FileSystemContext context, String path)
} }


@Override @Override
public InputStream create(OpenOptions options) { public UnderFileInputStream create(OpenOptions options) {
return new UnderFileSystemFileInStream(mClient.getWorkerDataServerAddress(), return new UnderFileSystemFileInStream(mClient.getWorkerDataServerAddress(),
options.getOffset(), mFileId, UnderFileSystemFileReader.Factory.create()); options.getOffset(), mFileId, UnderFileSystemFileReader.Factory.create());
} }
Expand Down
Expand Up @@ -12,11 +12,11 @@
package alluxio.client.file; package alluxio.client.file;


import alluxio.client.block.UnderStoreBlockInStream.UnderStoreStreamFactory; import alluxio.client.block.UnderStoreBlockInStream.UnderStoreStreamFactory;
import alluxio.underfs.UnderFileInputStream;
import alluxio.underfs.UnderFileSystem; import alluxio.underfs.UnderFileSystem;
import alluxio.underfs.options.OpenOptions; import alluxio.underfs.options.OpenOptions;


import java.io.IOException; import java.io.IOException;
import java.io.InputStream;


/** /**
* Factory which creates input streams to a specified path in under storage. The streams are created * Factory which creates input streams to a specified path in under storage. The streams are created
Expand All @@ -33,7 +33,7 @@ public DirectUnderStoreStreamFactory(String path) {
} }


@Override @Override
public InputStream create(OpenOptions options) throws IOException { public UnderFileInputStream create(OpenOptions options) throws IOException {
return UnderFileSystem.Factory.get(mPath).open(mPath, options); return UnderFileSystem.Factory.get(mPath).open(mPath, options);
} }


Expand Down
Expand Up @@ -15,12 +15,12 @@
import alluxio.PropertyKey; import alluxio.PropertyKey;
import alluxio.client.UnderFileSystemFileReader; import alluxio.client.UnderFileSystemFileReader;
import alluxio.exception.PreconditionMessage; import alluxio.exception.PreconditionMessage;
import alluxio.underfs.UnderFileInputStream;
import alluxio.util.io.BufferUtils; import alluxio.util.io.BufferUtils;


import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;


import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;


Expand All @@ -32,7 +32,7 @@
*/ */
// TODO(calvin): See if common logic in this class and buffered block in stream can be abstracted // TODO(calvin): See if common logic in this class and buffered block in stream can be abstracted
@NotThreadSafe @NotThreadSafe
public final class UnderFileSystemFileInStream extends InputStream { public final class UnderFileSystemFileInStream extends UnderFileInputStream {
/** Current position of the stream, relative to the start of the block. */ /** Current position of the stream, relative to the start of the block. */
private long mPos; private long mPos;
/** If the bytes in the internal buffer are valid. */ /** If the bytes in the internal buffer are valid. */
Expand Down Expand Up @@ -136,6 +136,17 @@ public int read(byte[] b, int off, int len) throws IOException {
return toRead; return toRead;
} }


@Override
public void seek(long pos) throws IOException {
if (pos < 0) {
throw new IOException(String.format("Unable to seek to negative position %f", pos));
}
if (pos != mPos) {
mIsBufferValid = false;
mPos = pos;
}
}

@Override @Override
public long skip(long n) throws IOException { public long skip(long n) throws IOException {
checkIfClosed(); checkIfClosed();
Expand Down
Expand Up @@ -14,6 +14,7 @@
import alluxio.ConfigurationTestUtils; import alluxio.ConfigurationTestUtils;
import alluxio.client.block.UnderStoreBlockInStream.UnderStoreStreamFactory; import alluxio.client.block.UnderStoreBlockInStream.UnderStoreStreamFactory;
import alluxio.client.util.ClientTestUtils; import alluxio.client.util.ClientTestUtils;
import alluxio.underfs.UnderFileInputStream;
import alluxio.underfs.options.OpenOptions; import alluxio.underfs.options.OpenOptions;
import alluxio.util.io.BufferUtils; import alluxio.util.io.BufferUtils;


Expand Down Expand Up @@ -299,7 +300,7 @@ private FileUnderStoreStreamFactory(File file) {
} }


@Override @Override
public InputStream create(OpenOptions options) throws IOException { public UnderFileInputStream create(OpenOptions options) throws IOException {
try { try {
InputStream inputStream = new FileInputStream(mFile); InputStream inputStream = new FileInputStream(mFile);
if (options.getOffset() > 0) { if (options.getOffset() > 0) {
Expand Down
Expand Up @@ -23,7 +23,6 @@
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;


import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.util.ArrayDeque; import java.util.ArrayDeque;
import java.util.ArrayList; import java.util.ArrayList;
Expand Down Expand Up @@ -120,7 +119,7 @@ public UnderFileStatus[] listStatus(String path, ListOptions options) throws IOE
} }


@Override @Override
public InputStream open(String path) throws IOException { public UnderFileInputStream open(String path) throws IOException {
return open(path, OpenOptions.defaults()); return open(path, OpenOptions.defaults());
} }


Expand Down
@@ -0,0 +1,32 @@
/*
* The Alluxio Open Foundation licenses this work under the Apache License, version 2.0
* (the "License"). You may not use this work except in compliance with the License, which is
* available at www.apache.org/licenses/LICENSE-2.0
*
* This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied, as more fully set forth in the License.
*
* See the NOTICE file distributed with this work for information regarding copyright ownership.
*/

package alluxio.underfs;

import java.io.IOException;
import java.io.InputStream;

import javax.annotation.concurrent.NotThreadSafe;

/**
* A wrapper over an {@link InputStream} from an {@link UnderFileSystem}.
*/
@NotThreadSafe
public abstract class UnderFileInputStream extends InputStream {
/**
* Seek to the given offset from the start of the file. The next read() will be from that
* location.Can't seek past the end of the file.
*
* @param pos offset from the start of the file in bytes
* @throws IOException if pos is negative, or if a non-Alluxio error occurs
*/
abstract public void seek(long pos) throws IOException;
}
Expand Up @@ -23,7 +23,6 @@
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;


import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
Expand Down Expand Up @@ -488,23 +487,23 @@ public int getValue() {
boolean mkdirs(String path, MkdirsOptions options) throws IOException; boolean mkdirs(String path, MkdirsOptions options) throws IOException;


/** /**
* Opens an {@link InputStream} at the indicated path. * Opens an {@link UnderFileInputStream} at the indicated path.
* *
* @param path the file name * @param path the file name
* @return The {@code InputStream} object * @return The {@code InputStream} object
* @throws IOException if a non-Alluxio error occurs * @throws IOException if a non-Alluxio error occurs
*/ */
InputStream open(String path) throws IOException; UnderFileInputStream open(String path) throws IOException;


/** /**
* Opens an {@link InputStream} at the indicated path. * Opens an {@link UnderFileInputStream} at the indicated path.
* *
* @param path the file name * @param path the file name
* @param options to open input stream * @param options to open input stream
* @return The {@code InputStream} object * @return The {@code InputStream} object
* @throws IOException if a non-Alluxio error occurs * @throws IOException if a non-Alluxio error occurs
*/ */
InputStream open(String path, OpenOptions options) throws IOException; UnderFileInputStream open(String path, OpenOptions options) throws IOException;


/** /**
* Renames a directory from {@code src} to {@code dst} in under file system. * Renames a directory from {@code src} to {@code dst} in under file system.
Expand Down
Expand Up @@ -20,14 +20,14 @@
import alluxio.exception.FileDoesNotExistException; import alluxio.exception.FileDoesNotExistException;
import alluxio.exception.PreconditionMessage; import alluxio.exception.PreconditionMessage;
import alluxio.security.authorization.Permission; import alluxio.security.authorization.Permission;
import alluxio.underfs.UnderFileInputStream;
import alluxio.underfs.UnderFileSystem; import alluxio.underfs.UnderFileSystem;
import alluxio.underfs.options.CreateOptions; import alluxio.underfs.options.CreateOptions;
import alluxio.underfs.options.OpenOptions; import alluxio.underfs.options.OpenOptions;
import alluxio.util.IdUtils; import alluxio.util.IdUtils;
import alluxio.util.network.NetworkAddressUtils; import alluxio.util.network.NetworkAddressUtils;


import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.io.CountingInputStream;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;


Expand Down Expand Up @@ -124,10 +124,8 @@ private final class InputStreamAgent {
/** The string form of the uri to the file in the under file system. */ /** The string form of the uri to the file in the under file system. */
private final String mUri; private final String mUri;


/** The initial position of the stream, only valid if mStream != null. */
private long mInitPos;
/** The underlying stream to read data from. */ /** The underlying stream to read data from. */
private CountingInputStream mStream; private UnderFileInputStream mStream;


/** /**
* Constructor for an input stream agent for a UFS file. The file must exist when this is * Constructor for an input stream agent for a UFS file. The file must exist when this is
Expand Down Expand Up @@ -184,25 +182,12 @@ private InputStream openAtPosition(long position) throws IOException {
return null; return null;
} }


// If no stream has been created or if we need to go backward, make a new stream and cache it. // If no stream has been created, make a new stream and cache it.
if (mStream == null || mInitPos + mStream.getCount() > position) { if (mStream == null) {
if (mStream != null) { // Close the existing stream if needed
mStream.close();
}
UnderFileSystem ufs = UnderFileSystem.Factory.get(mUri); UnderFileSystem ufs = UnderFileSystem.Factory.get(mUri);
mStream = mStream = ufs.open(mUri, OpenOptions.defaults().setOffset(position));
new CountingInputStream(ufs.open(mUri, OpenOptions.defaults().setOffset(position))); } else {
mInitPos = position; mStream.seek(position);
}

// We are guaranteed mStream has been created and the initial position has been set.
// Guaranteed by the previous code block that currentPos <= position.
long currentPos = mInitPos + mStream.getCount();
if (position > currentPos) { // Can skip to next position with the same stream
long toSkip = position - currentPos;
if (toSkip != mStream.skip(toSkip)) {
throw new IOException(ExceptionMessage.FAILED_SKIP.getMessage(toSkip));
}
} }
return mStream; return mStream;
} }
Expand Down
Expand Up @@ -21,6 +21,7 @@
import alluxio.underfs.AtomicFileOutputStream; import alluxio.underfs.AtomicFileOutputStream;
import alluxio.underfs.AtomicFileOutputStreamCallback; import alluxio.underfs.AtomicFileOutputStreamCallback;
import alluxio.underfs.BaseUnderFileSystem; import alluxio.underfs.BaseUnderFileSystem;
import alluxio.underfs.UnderFileInputStream;
import alluxio.underfs.UnderFileStatus; import alluxio.underfs.UnderFileStatus;
import alluxio.underfs.UnderFileSystem; import alluxio.underfs.UnderFileSystem;
import alluxio.underfs.options.CreateOptions; import alluxio.underfs.options.CreateOptions;
Expand Down Expand Up @@ -369,7 +370,7 @@ public boolean mkdirs(String path, MkdirsOptions options) throws IOException {
} }


@Override @Override
public FSDataInputStream open(String path, OpenOptions options) throws IOException { public UnderFileInputStream open(String path, OpenOptions options) throws IOException {
IOException te = null; IOException te = null;
RetryPolicy retryPolicy = new CountingRetry(MAX_TRY); RetryPolicy retryPolicy = new CountingRetry(MAX_TRY);
while (retryPolicy.attemptRetry()) { while (retryPolicy.attemptRetry()) {
Expand Down

0 comments on commit 08ad769

Please sign in to comment.