Skip to content

Commit

Permalink
Refactor UnderStoreBlockInStream to use composition
Browse files Browse the repository at this point in the history
  • Loading branch information
aaudiber committed Aug 2, 2016
1 parent 62fa1d2 commit ea4be92
Show file tree
Hide file tree
Showing 9 changed files with 187 additions and 505 deletions.

This file was deleted.

This file was deleted.

Expand Up @@ -11,9 +11,11 @@


package alluxio.client.block; package alluxio.client.block;


import alluxio.Configuration;
import alluxio.Constants; import alluxio.Constants;
import alluxio.exception.ExceptionMessage; import alluxio.exception.ExceptionMessage;
import alluxio.exception.PreconditionMessage;

import com.google.common.base.Preconditions;


import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
Expand All @@ -28,15 +30,13 @@
* storage client. * storage client.
*/ */
@NotThreadSafe @NotThreadSafe
public abstract class UnderStoreBlockInStream extends BlockInStream { public final class UnderStoreBlockInStream extends BlockInStream {
/** /**
* The block size of the file. See {@link #getLength()} for more length information. * The block size of the file. See {@link #getLength()} for more length information.
*/ */
private final long mFileBlockSize; private final long mFileBlockSize;
/** The start of this block. This is the absolute position within the UFS file. */ /** The start of this block. This is the absolute position within the UFS file. */
protected final long mInitPos; protected final long mInitPos;
/** The UFS path for this block. */
protected final String mUfsPath;
/** /**
* The length of this current block. This may be {@link Constants#UNKNOWN_SIZE}, and may be * The length of this current block. This may be {@link Constants#UNKNOWN_SIZE}, and may be
* updated to a valid length. See {@link #getLength()} for more length information. * updated to a valid length. See {@link #getLength()} for more length information.
Expand All @@ -50,19 +50,33 @@ public abstract class UnderStoreBlockInStream extends BlockInStream {
/** The current under store stream. */ /** The current under store stream. */
protected InputStream mUnderStoreStream; protected InputStream mUnderStoreStream;


private UnderStoreStreamFactory mUnderStoreStreamFactory;

/**
* A factory which can create an input stream to under storage.
*/
public interface UnderStoreStreamFactory extends AutoCloseable {
InputStream create() throws IOException;

void close() throws IOException;
}

/** /**
* Creates a new under storage file input stream. * Creates a new under storage file input stream.
* *
* @param initPos the initial position * @param initPos the initial position
* @param length the length of this current block (allowed to be {@link Constants#UNKNOWN_SIZE}) * @param length the length of this current block (allowed to be {@link Constants#UNKNOWN_SIZE})
* @param fileBlockSize the block size for the file * @param fileBlockSize the block size for the file
* @param ufsPath the under file system path * @param underStoreStreamFactory a factory for getting input streams from the under storage
* @throws IOException
*/ */
protected UnderStoreBlockInStream(long initPos, long length, long fileBlockSize, String ufsPath) { protected UnderStoreBlockInStream(long initPos, long length, long fileBlockSize,
UnderStoreStreamFactory underStoreStreamFactory) throws IOException {
mInitPos = initPos; mInitPos = initPos;
mLength = length; mLength = length;
mFileBlockSize = fileBlockSize; mFileBlockSize = fileBlockSize;
mUfsPath = ufsPath; mUnderStoreStreamFactory = underStoreStreamFactory;
setUnderStoreStream(0);
} }


/** /**
Expand All @@ -81,25 +95,19 @@ private Factory() {} // prevent instantiation
* @param blockStart the start position of the block stream relative to the entire file * @param blockStart the start position of the block stream relative to the entire file
* @param length length of this block * @param length length of this block
* @param blockSize the block size of the file * @param blockSize the block size of the file
* @param path the path of the file in the under storage * @param factory a factory which can be used to create input streams from under storage
* @return a stream which can access data from blockStart to blockStart + length * @return a stream which can access data from blockStart to blockStart + length
* @throws IOException if an error occurs creating the stream * @throws IOException if an error occurs creating the stream
*/ */
public static UnderStoreBlockInStream create(long blockStart, long length, long blockSize, public static UnderStoreBlockInStream create(long blockStart, long length, long blockSize,
String path) throws IOException { UnderStoreStreamFactory factory) throws IOException {
UnderStoreBlockInStream stream; return new UnderStoreBlockInStream(blockStart, length, blockSize, factory);
if (Configuration.getBoolean(Constants.USER_UFS_DELEGATION_ENABLED)) {
stream = new DelegatedUnderStoreBlockInStream(blockStart, length, blockSize, path);
} else {
stream = new DirectUnderStoreBlockInStream(blockStart, length, blockSize, path);
}
stream.setUnderStoreStream(0);
return stream;
} }
} }


@Override @Override
public void close() throws IOException { public void close() throws IOException {
mUnderStoreStreamFactory.close();
mUnderStoreStream.close(); mUnderStoreStream.close();
} }


Expand Down Expand Up @@ -185,7 +193,23 @@ public long skip(long n) throws IOException {
* @param pos the position within this block * @param pos the position within this block
* @throws IOException if the stream from the position cannot be created * @throws IOException if the stream from the position cannot be created
*/ */
protected abstract void setUnderStoreStream(long pos) throws IOException; private void setUnderStoreStream(long pos) throws IOException {
if (mUnderStoreStream != null) {
mUnderStoreStream.close();
}
Preconditions.checkArgument(pos >= 0, PreconditionMessage.ERR_SEEK_NEGATIVE.toString(), pos);
Preconditions.checkArgument(pos <= mLength,
PreconditionMessage.ERR_SEEK_PAST_END_OF_BLOCK.toString(), pos);
mUnderStoreStream = mUnderStoreStreamFactory.create();
long streamStart = mInitPos + pos;
// The stream is at the beginning of the file, so skip to the correct absolute position.
if (streamStart != 0 && streamStart != mUnderStoreStream.skip(streamStart)) {
mUnderStoreStream.close();
throw new IOException(ExceptionMessage.FAILED_SKIP.getMessage(pos));
}
// Set the current block position to the specified block position.
mPos = pos;
}


/** /**
* Returns the length of the current UFS block. This method handles the situation when the UFS * Returns the length of the current UFS block. This method handles the situation when the UFS
Expand Down
@@ -0,0 +1,55 @@
/*
* The Alluxio Open Foundation licenses this work under the Apache License, version 2.0
* (the "License"). You may not use this work except in compliance with the License, which is
* available at www.apache.org/licenses/LICENSE-2.0
*
* This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied, as more fully set forth in the License.
*
* See the NOTICE file distributed with this work for information regarding copyright ownership.
*/

package alluxio.client.file;

import alluxio.AlluxioURI;
import alluxio.client.block.UnderStoreBlockInStream.UnderStoreStreamFactory;
import alluxio.client.file.options.CloseUfsFileOptions;
import alluxio.client.file.options.OpenUfsFileOptions;
import alluxio.exception.AlluxioException;

import java.io.IOException;
import java.io.InputStream;

/**
*
*/
public class DelegatedUnderStoreStreamFactory implements UnderStoreStreamFactory {
private final FileSystemWorkerClient mClient;
private final long mFileId;

public DelegatedUnderStoreStreamFactory(FileSystemContext context, String path) throws IOException {
mClient = FileSystemContext.INSTANCE.createWorkerClient();
try {
mFileId = mClient.openUfsFile(new AlluxioURI(path), OpenUfsFileOptions.defaults());
} catch (AlluxioException | IOException e) {
mClient.close();
throw new IOException(e);
}
}

@Override
public InputStream create() {
return new UnderFileSystemFileInStream(mClient.getWorkerDataServerAddress(), mFileId);
}

@Override
public void close() throws IOException {
try {
mClient.closeUfsFile(mFileId, CloseUfsFileOptions.defaults());
} catch (AlluxioException e) {
throw new IOException(e);
} finally {
mClient.close();
}
}
}
@@ -0,0 +1,42 @@
/*
* The Alluxio Open Foundation licenses this work under the Apache License, version 2.0
* (the "License"). You may not use this work except in compliance with the License, which is
* available at www.apache.org/licenses/LICENSE-2.0
*
* This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied, as more fully set forth in the License.
*
* See the NOTICE file distributed with this work for information regarding copyright ownership.
*/

package alluxio.client.file;

import alluxio.client.block.UnderStoreBlockInStream.UnderStoreStreamFactory;
import alluxio.underfs.UnderFileSystem;

import java.io.IOException;
import java.io.InputStream;

/**
*
*/
public class DirectUnderStoreStreamFactory implements UnderStoreStreamFactory {
private final String mPath;

public DirectUnderStoreStreamFactory(String path) {
mPath = path;
}

@Override
public InputStream create() throws IOException {
return UnderFileSystem.get(mPath).open(mPath);
}

/* (non-Javadoc)
* @see alluxio.client.block.UnderStoreBlockInStream.UnderStoreStreamFactory#close()
*/
@Override
public void close() throws IOException {
// Nothing needs to be closed.
}
}

0 comments on commit ea4be92

Please sign in to comment.