Skip to content

Commit

Permalink
Address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
peisun1115 committed Mar 10, 2017
1 parent 8b39cb7 commit 6114bde
Show file tree
Hide file tree
Showing 35 changed files with 604 additions and 380 deletions.
Expand Up @@ -12,7 +12,7 @@
package alluxio.client; package alluxio.client;


import alluxio.client.file.FileSystemContext; import alluxio.client.file.FileSystemContext;
import alluxio.client.netty.NettyUfsBlockReader; import alluxio.client.netty.NettyUnderFileSystemBlockReader;


import java.io.Closeable; import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
Expand All @@ -22,32 +22,32 @@
/** /**
* The interface to read an under file system file through a worker's data server. * The interface to read an under file system file through a worker's data server.
*/ */
public interface UfsBlockReader extends Closeable { public interface UnderFileSystemBlockReader extends Closeable {


/** /**
* The factory for the {@link UfsBlockReader}. * The factory for the {@link UnderFileSystemBlockReader}.
*/ */
class Factory { class Factory {


private Factory() {} // prevent instantiation private Factory() {} // prevent instantiation


/** /**
* Factory for {@link UfsBlockReader}. * Factory for {@link UnderFileSystemBlockReader}.
* *
* @param context the file system context * @param context the file system context
* @return a new instance of {@link UfsBlockReader} * @return a new instance of {@link UnderFileSystemBlockReader}
*/ */
public static UfsBlockReader create(FileSystemContext context) { public static UnderFileSystemBlockReader create(FileSystemContext context) {
return new NettyUfsBlockReader(context); return new NettyUnderFileSystemBlockReader(context);
} }
} }


/** /**
* Reads a UFS block with a offset and length. * Reads a block from UFS with a offset and length.
* *
* @param address the {@link InetSocketAddress} of the data server * @param address the {@link InetSocketAddress} of the data server
* @param blockId the id of the block trying to read * @param blockId the id of the block trying to read
* @param offset the offset of the block * @param offset the offset within the block (not file)
* @param length the length the client wants to read * @param length the length the client wants to read
* @param sessionId the session id of the client * @param sessionId the session id of the client
* @param noCache do not cache the data read from UFS in the Alluxio worker if set * @param noCache do not cache the data read from UFS in the Alluxio worker if set
Expand Down
Expand Up @@ -98,13 +98,27 @@ public static BlockWorkerClient create(BlockWorkerThriftClientPool clientPool,
* *
* @param blockId the ID of the block * @param blockId the ID of the block
* @param options the lock block options * @param options the lock block options
* @return the path of the block file locked * @return the lock block result
* @throws IOException if a non-Alluxio exception occurs * @throws IOException if a non-Alluxio exception occurs
* @throws AlluxioException if an Alluxio error occurs * @throws AlluxioException if an Alluxio error occurs
*/ */
LockBlockResult lockBlock(final long blockId, final LockBlockOptions options) LockBlockResult lockBlock(final long blockId, final LockBlockOptions options)
throws IOException, AlluxioException; throws IOException, AlluxioException;


/**
* A wrapper over {@link BlockWorkerClient#lockBlock(long, LockBlockOptions)} to lock a block
* that is not in Alluxio but in UFS. It retries if it fails to lock because of contention for
* the block on the worker.
*
* @param blockId the block ID
* @param options the lock block options
* @return the lock block result
* @throws IOException if a non-Alluxio exception occurs
* @throws AlluxioException if an Alluxio error occurs
*/
LockBlockResult lockUfsBlock(final long blockId, final LockBlockOptions options)
throws IOException, AlluxioException;

/** /**
* Promotes block back to the top StorageTier. * Promotes block back to the top StorageTier.
* *
Expand Down
Expand Up @@ -24,6 +24,7 @@
import com.codahale.metrics.Counter; import com.codahale.metrics.Counter;
import com.google.common.io.Closer; import com.google.common.io.Closer;


import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;


Expand All @@ -40,8 +41,6 @@ public final class LocalBlockInStream extends BufferedBlockInStream {
private final Closer mCloser; private final Closer mCloser;
/** Client to communicate with the local worker. */ /** Client to communicate with the local worker. */
private final BlockWorkerClient mBlockWorkerClient; private final BlockWorkerClient mBlockWorkerClient;
/** The block store context which provides block worker clients. */
private final FileSystemContext mContext;
/** The file reader to read a local block. */ /** The file reader to read a local block. */
private final LocalFileBlockReader mReader; private final LocalFileBlockReader mReader;


Expand All @@ -53,23 +52,80 @@ public final class LocalBlockInStream extends BufferedBlockInStream {
* @param workerNetAddress the address of the local worker * @param workerNetAddress the address of the local worker
* @param context the file system context * @param context the file system context
* @param options the instream options * @param options the instream options
* @return the {@link LocalBlockInStream} instance
* @throws IOException if I/O error occurs * @throws IOException if I/O error occurs
*/ */
public LocalBlockInStream(long blockId, long blockSize, WorkerNetAddress workerNetAddress, public static LocalBlockInStream create(long blockId, long blockSize,
FileSystemContext context, InStreamOptions options) throws IOException { WorkerNetAddress workerNetAddress, FileSystemContext context, InStreamOptions options)
throws IOException {
Closer closer = Closer.create();
BlockWorkerClient client;
LockBlockResult result;
try {
client = closer.register(context.createBlockWorkerClient(workerNetAddress));
result = client.lockBlock(blockId, LockBlockOptions.defaults());
} catch (AlluxioException e) {
closer.close();
throw new IOException(e);
} catch (IOException e) {
try {
closer.close();
} catch (IOException ee) {
// Ignore.
}
throw e;
}
return new LocalBlockInStream(client, blockId, blockSize, result.getBlockPath(),
options);
}

/**
* Creates a local block input stream. It requires the block to be locked before calling this.
*
* @param client the block worker client
* @param blockId the block id
* @param blockSize the size of the block
* @param path the block file path
* @param options the instream options
* @return the {@link LocalBlockInStream} instance
* @throws IOException if I/O error occurs
*/
public static LocalBlockInStream createWithBlockLocked(BlockWorkerClient client, long blockId,
long blockSize, String path, InStreamOptions options) throws IOException {
return new LocalBlockInStream(client, blockId, blockSize, path, options);
}

/**
* Creates a local block input stream. It requires the block to be locked before calling this.
*
* @param client the block worker client
* @param blockId the block id
* @param blockSize the size of the block
* @param path the block file path
* @param options the instream options
* @throws IOException if I/O error occurs
*/
private LocalBlockInStream(BlockWorkerClient client, long blockId,
long blockSize, String path, InStreamOptions options) throws IOException {
super(blockId, blockSize); super(blockId, blockSize);
mContext = context; mBlockWorkerClient = client;


mCloser = Closer.create(); mCloser = Closer.create();
mCloser.register(client);
mCloser.register(new Closeable() {
@Override
public void close() throws IOException {
mBlockWorkerClient.unlockBlock(mBlockId);
}
});
try { try {
mBlockWorkerClient = mCloser.register(mContext.createBlockWorkerClient(workerNetAddress)); mReader = mCloser.register(new LocalFileBlockReader(path));
LockBlockResult result = mBlockWorkerClient.lockBlock(blockId, LockBlockOptions.defaults());
mReader = mCloser.register(new LocalFileBlockReader(result.getBlockPath()));
} catch (AlluxioException e) {
mCloser.close();
throw new IOException(e);
} catch (IOException e) { } catch (IOException e) {
mCloser.close(); try {
mCloser.close();
} catch (IOException ee) {
// Ignore.
}
throw e; throw e;
} }
} }
Expand All @@ -90,7 +146,7 @@ public void close() throws IOException {
mBlockWorkerClient.accessBlock(mBlockId); mBlockWorkerClient.accessBlock(mBlockId);
Metrics.BLOCKS_READ_LOCAL.inc(); Metrics.BLOCKS_READ_LOCAL.inc();
} }
mBlockWorkerClient.unlockBlock(mBlockId); // Note that the block is unlocked by closing mCloser.
} catch (Throwable e) { // must catch Throwable } catch (Throwable e) { // must catch Throwable
throw mCloser.rethrow(e); // IOException will be thrown as-is throw mCloser.rethrow(e); // IOException will be thrown as-is
} finally { } finally {
Expand Down
Expand Up @@ -23,8 +23,8 @@
import com.codahale.metrics.Counter; import com.codahale.metrics.Counter;
import com.google.common.io.Closer; import com.google.common.io.Closer;


import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;


import javax.annotation.concurrent.NotThreadSafe; import javax.annotation.concurrent.NotThreadSafe;
Expand All @@ -38,10 +38,6 @@
public final class RemoteBlockInStream extends BufferedBlockInStream { public final class RemoteBlockInStream extends BufferedBlockInStream {
/** Used to manage closeable resources. */ /** Used to manage closeable resources. */
private final Closer mCloser; private final Closer mCloser;
/** The address of the worker to read the data from. */
private final WorkerNetAddress mWorkerNetAddress;
/** mWorkerNetAddress converted to an InetSocketAddress. */
private final InetSocketAddress mWorkerInetSocketAddress;
/** The returned lock id after acquiring the block lock. */ /** The returned lock id after acquiring the block lock. */
private final Long mLockId; private final Long mLockId;


Expand All @@ -60,29 +56,72 @@ public final class RemoteBlockInStream extends BufferedBlockInStream {
* @param workerNetAddress the worker address * @param workerNetAddress the worker address
* @param context the file system context to use for acquiring worker and master clients * @param context the file system context to use for acquiring worker and master clients
* @param options the instream options * @param options the instream options
* @return the {@link RemoteBlockInStream} created
* @throws IOException if the block is not available on the remote worker * @throws IOException if the block is not available on the remote worker
*/ */
public RemoteBlockInStream(long blockId, long blockSize, WorkerNetAddress workerNetAddress, public static RemoteBlockInStream create(long blockId, long blockSize,
FileSystemContext context, InStreamOptions options) throws IOException { WorkerNetAddress workerNetAddress, FileSystemContext context, InStreamOptions options)
super(blockId, blockSize); throws IOException {
mWorkerNetAddress = workerNetAddress; Closer closer = Closer.create();
mWorkerInetSocketAddress = BlockWorkerClient client;
new InetSocketAddress(workerNetAddress.getHost(), workerNetAddress.getDataPort()); LockBlockResult result;

mContext = context;
mCloser = Closer.create();

try { try {
mBlockWorkerClient = mCloser.register(mContext.createBlockWorkerClient(workerNetAddress)); client = closer.register(context.createBlockWorkerClient(workerNetAddress));
LockBlockResult result = mBlockWorkerClient.lockBlock(blockId, LockBlockOptions.defaults()); result = client.lockBlock(blockId, LockBlockOptions.defaults());
mLockId = result.getLockId();
} catch (AlluxioException e) { } catch (AlluxioException e) {
mCloser.close(); closer.close();
throw new IOException(e); throw new IOException(e);
} catch (IOException e) { } catch (IOException e) {
mCloser.close(); closer.close();
throw e; throw e;
} }

return new RemoteBlockInStream(context, client, blockId, blockSize, result.getLockId(),
options);
}

/**
* Creates a new remote block input stream with the blocked being locked beforehand.
*
* @param context the file system context
* @param client the block worker client
* @param blockId the block ID
* @param blockSize the block size
* @param lockId the lock ID
* @param options the input stream options
* @return the {@link RemoteBlockInStream} created
*/
public static RemoteBlockInStream createWithBlockLocked(FileSystemContext context,
BlockWorkerClient client, long blockId, long blockSize, long lockId,
InStreamOptions options) {
return new RemoteBlockInStream(context, client, blockId, blockSize, lockId, options);
}

/**
* Creates a new remote block input stream with the blocked being locked beforehand.
*
* @param context the file system context
* @param client the block worker client
* @param blockId the block ID
* @param blockSize the block size
* @param lockId the lock ID
* @param options the input stream options
*/
private RemoteBlockInStream(FileSystemContext context, BlockWorkerClient client, long blockId,
long blockSize, long lockId, InStreamOptions options) {
super(blockId, blockSize);
mContext = context;
mBlockWorkerClient = client;
mLockId = lockId;

mCloser = Closer.create();
mCloser.register(mBlockWorkerClient);
mCloser.register(new Closeable() {
@Override
public void close() throws IOException {
mBlockWorkerClient.unlockBlock(mBlockId);
}
});
} }


@Override @Override
Expand All @@ -97,15 +136,16 @@ public void close() throws IOException {
return; return;
} }


if (mBlockIsRead) {
Metrics.BLOCKS_READ_REMOTE.inc();
}
try { try {
mBlockWorkerClient.unlockBlock(mBlockId); if (mBlockIsRead) {
} catch (Throwable e) { // must catch Throwable mBlockWorkerClient.accessBlock(mBlockId);
throw mCloser.rethrow(e); // IOException will be thrown as-is Metrics.BLOCKS_READ_REMOTE.inc();
}
} catch (Throwable e) {
mCloser.rethrow(e);
} finally { } finally {
mClosed = true; mClosed = true;
// The block is unlocked by this.
mCloser.close(); mCloser.close();
} }
} }
Expand Down Expand Up @@ -136,7 +176,7 @@ protected void incrementBytesReadMetric(int bytes) {
* @return the {@link WorkerNetAddress} from which this RemoteBlockInStream is reading * @return the {@link WorkerNetAddress} from which this RemoteBlockInStream is reading
*/ */
public WorkerNetAddress getWorkerNetAddress() { public WorkerNetAddress getWorkerNetAddress() {
return mWorkerNetAddress; return mBlockWorkerClient.getWorkerNetAddress();
} }


/** /**
Expand All @@ -159,10 +199,12 @@ private int readFromRemote(byte[] b, int off, int len) throws IOException {
} }


while (bytesLeft > 0) { while (bytesLeft > 0) {
ByteBuffer data = mReader.readRemoteBlock(mWorkerInetSocketAddress, mBlockId, getPosition(), ByteBuffer data = mReader
bytesLeft, mLockId, mBlockWorkerClient.getSessionId()); .readRemoteBlock(mBlockWorkerClient.getDataServerAddress(), mBlockId, getPosition(),
bytesLeft, mLockId, mBlockWorkerClient.getSessionId());
int bytesRead = data.remaining(); int bytesRead = data.remaining();
data.get(b, off, bytesRead); data.get(b, off, bytesRead);
off += bytesRead;
bytesLeft -= bytesRead; bytesLeft -= bytesRead;
} }


Expand Down

0 comments on commit 6114bde

Please sign in to comment.