Skip to content

Commit

Permalink
Address comments. Create LockBlockResource
Browse files Browse the repository at this point in the history
  • Loading branch information
peisun1115 committed Mar 10, 2017
1 parent c0c63d7 commit f2c3c11
Show file tree
Hide file tree
Showing 8 changed files with 95 additions and 94 deletions.
Expand Up @@ -43,7 +43,7 @@ public static UnderFileSystemBlockReader create(FileSystemContext context) {
} }


/** /**
* Reads a block from UFS with a offset and length. * Reads a block from UFS with an offset and length.
* *
* @param address the {@link InetSocketAddress} of the data server * @param address the {@link InetSocketAddress} of the data server
* @param blockId the id of the block trying to read * @param blockId the id of the block trying to read
Expand Down
Expand Up @@ -14,6 +14,7 @@
import alluxio.client.block.options.LockBlockOptions; import alluxio.client.block.options.LockBlockOptions;
import alluxio.client.file.FileSystemContext; import alluxio.client.file.FileSystemContext;
import alluxio.client.file.options.InStreamOptions; import alluxio.client.file.options.InStreamOptions;
import alluxio.client.resource.LockBlockResource;
import alluxio.exception.AlluxioException; import alluxio.exception.AlluxioException;
import alluxio.metrics.MetricsSystem; import alluxio.metrics.MetricsSystem;
import alluxio.util.CommonUtils; import alluxio.util.CommonUtils;
Expand All @@ -25,7 +26,6 @@
import com.codahale.metrics.Counter; import com.codahale.metrics.Counter;
import com.google.common.io.Closer; import com.google.common.io.Closer;


import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;


Expand Down Expand Up @@ -62,15 +62,17 @@ public static LocalBlockInStream create(long blockId, long blockSize,
Closer closer = Closer.create(); Closer closer = Closer.create();
BlockWorkerClient client; BlockWorkerClient client;
LockBlockResult result; LockBlockResult result;
LocalFileBlockReader reader;
try { try {
client = closer.register(context.createBlockWorkerClient(workerNetAddress)); client = closer.register(context.createBlockWorkerClient(workerNetAddress));
result = client.lockBlock(blockId, LockBlockOptions.defaults()); result = client.lockBlock(blockId, LockBlockOptions.defaults());
closer.register(new LockBlockResource(client, blockId));
reader = new LocalFileBlockReader(result.getBlockPath());
} catch (AlluxioException | IOException e) { } catch (AlluxioException | IOException e) {
CommonUtils.closeCloserIgnoreException(closer); CommonUtils.closeCloserIgnoreException(closer);
throw CommonUtils.castToIOException(e); throw CommonUtils.castToIOException(e);
} }
return new LocalBlockInStream(client, blockId, blockSize, result.getBlockPath(), return new LocalBlockInStream(client, blockId, blockSize, reader, options);
options);
} }


/** /**
Expand All @@ -79,14 +81,14 @@ public static LocalBlockInStream create(long blockId, long blockSize,
* @param client the block worker client * @param client the block worker client
* @param blockId the block id * @param blockId the block id
* @param blockSize the size of the block * @param blockSize the size of the block
* @param path the block file path * @param reader the local block file reader, the {@link LocalBlockInStream} created should
* close this reader
* @param options the instream options * @param options the instream options
* @return the {@link LocalBlockInStream} instance * @return the {@link LocalBlockInStream} instance
* @throws IOException if I/O error occurs
*/ */
public static LocalBlockInStream createWithBlockLocked(BlockWorkerClient client, long blockId, public static LocalBlockInStream createWithLockedBlock(BlockWorkerClient client, long blockId,
long blockSize, String path, InStreamOptions options) throws IOException { long blockSize, LocalFileBlockReader reader, InStreamOptions options) {
return new LocalBlockInStream(client, blockId, blockSize, path, options); return new LocalBlockInStream(client, blockId, blockSize, reader, options);
} }


/** /**
Expand All @@ -95,29 +97,19 @@ public static LocalBlockInStream createWithBlockLocked(BlockWorkerClient client,
* @param client the block worker client * @param client the block worker client
* @param blockId the block id * @param blockId the block id
* @param blockSize the size of the block * @param blockSize the size of the block
* @param path the block file path * @param reader the local block file reader which should be closed by this class
* @param options the instream options * @param options the instream options
* @throws IOException if I/O error occurs
*/ */
private LocalBlockInStream(BlockWorkerClient client, long blockId, private LocalBlockInStream(BlockWorkerClient client, long blockId,
long blockSize, String path, InStreamOptions options) throws IOException { long blockSize, LocalFileBlockReader reader, InStreamOptions options) {
super(blockId, blockSize); super(blockId, blockSize);
mBlockWorkerClient = client; mBlockWorkerClient = client;
mReader = reader;


mCloser = Closer.create(); mCloser = Closer.create();
mCloser.register(client); mCloser.register(client);
mCloser.register(new Closeable() { mCloser.register(new LockBlockResource(mBlockWorkerClient, mBlockId));
@Override mCloser.register(reader);
public void close() throws IOException {
mBlockWorkerClient.unlockBlock(mBlockId);
}
});
try {
mReader = mCloser.register(new LocalFileBlockReader(path));
} catch (IOException e) {
CommonUtils.closeCloserIgnoreException(mCloser);
throw e;
}
} }


@Override @Override
Expand Down
Expand Up @@ -15,6 +15,7 @@
import alluxio.client.block.options.LockBlockOptions; import alluxio.client.block.options.LockBlockOptions;
import alluxio.client.file.FileSystemContext; import alluxio.client.file.FileSystemContext;
import alluxio.client.file.options.InStreamOptions; import alluxio.client.file.options.InStreamOptions;
import alluxio.client.resource.LockBlockResource;
import alluxio.exception.AlluxioException; import alluxio.exception.AlluxioException;
import alluxio.metrics.MetricsSystem; import alluxio.metrics.MetricsSystem;
import alluxio.util.CommonUtils; import alluxio.util.CommonUtils;
Expand All @@ -24,7 +25,6 @@
import com.codahale.metrics.Counter; import com.codahale.metrics.Counter;
import com.google.common.io.Closer; import com.google.common.io.Closer;


import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;


Expand Down Expand Up @@ -89,7 +89,7 @@ public static RemoteBlockInStream create(long blockId, long blockSize,
* @param options the input stream options * @param options the input stream options
* @return the {@link RemoteBlockInStream} created * @return the {@link RemoteBlockInStream} created
*/ */
public static RemoteBlockInStream createWithBlockLocked(FileSystemContext context, public static RemoteBlockInStream createWithLockedBlock(FileSystemContext context,
BlockWorkerClient client, long blockId, long blockSize, long lockId, BlockWorkerClient client, long blockId, long blockSize, long lockId,
InStreamOptions options) { InStreamOptions options) {
return new RemoteBlockInStream(context, client, blockId, blockSize, lockId, options); return new RemoteBlockInStream(context, client, blockId, blockSize, lockId, options);
Expand All @@ -114,12 +114,7 @@ private RemoteBlockInStream(FileSystemContext context, BlockWorkerClient client,


mCloser = Closer.create(); mCloser = Closer.create();
mCloser.register(mBlockWorkerClient); mCloser.register(mBlockWorkerClient);
mCloser.register(new Closeable() { mCloser.register(new LockBlockResource(mBlockWorkerClient, mBlockId));
@Override
public void close() throws IOException {
mBlockWorkerClient.unlockBlock(mBlockId);
}
});
} }


@Override @Override
Expand All @@ -139,8 +134,8 @@ public void close() throws IOException {
mBlockWorkerClient.accessBlock(mBlockId); mBlockWorkerClient.accessBlock(mBlockId);
Metrics.BLOCKS_READ_REMOTE.inc(); Metrics.BLOCKS_READ_REMOTE.inc();
} }
} catch (Throwable e) { } catch (Throwable e) { // must catch Throwable
mCloser.rethrow(e); mCloser.rethrow(e); // IOException will be thrown as-is
} finally { } finally {
mClosed = true; mClosed = true;
// The block is unlocked by this. // The block is unlocked by this.
Expand Down
Expand Up @@ -168,6 +168,7 @@ public void close() {
@Override @Override
public void run() { public void run() {
mHeartbeat.cancel(true); mHeartbeat.cancel(true);
mHeartbeat = null;
NUM_ACTIVE_SESSIONS.decrementAndGet(); NUM_ACTIVE_SESSIONS.decrementAndGet();
} }
}); });
Expand Down Expand Up @@ -252,9 +253,8 @@ public LockBlockResult lockUfsBlock(final long blockId, final LockBlockOptions o
if (System.currentTimeMillis() >= timeout) { if (System.currentTimeMillis() >= timeout) {
throw e; throw e;
} }
LOG.debug( LOG.debug("Failed to acquire a UFS read token because of contention for block {} with "
"Failed to acquire a UFS read token because of contention for block {} in file {}", + "LockBlockOptions {}", blockId, options);
blockId, blockId);
CommonUtils.sleepMs(retryInterval); CommonUtils.sleepMs(retryInterval);
} }
} }
Expand Down
Expand Up @@ -123,9 +123,9 @@ public static InputStream createRemoteBlockInStream(FileSystemContext context, l
} }


/** /**
* Creates an {@link InputStream} read a block from UFS if that block is in UFS but not in * Creates an {@link InputStream} to read a block from UFS if that block is in UFS but not in
* Alluxio. If the block is cached to Alluxio while it attempts to create the {@link InputStream} * Alluxio. If the block is cached to Alluxio while it attempts to create the {@link InputStream}
* that reads from UFS, it returns a {@link InputStream} that reads from Aluxio instead. * that reads from UFS, it returns an {@link InputStream} that reads from Aluxio instead.
* *
* @param context the file system context * @param context the file system context
* @param ufsPath the UFS path * @param ufsPath the UFS path
Expand Down
Expand Up @@ -16,16 +16,18 @@
import alluxio.client.block.options.LockBlockOptions; import alluxio.client.block.options.LockBlockOptions;
import alluxio.client.file.FileSystemContext; import alluxio.client.file.FileSystemContext;
import alluxio.client.file.options.InStreamOptions; import alluxio.client.file.options.InStreamOptions;
import alluxio.client.resource.LockBlockResource;
import alluxio.exception.AlluxioException; import alluxio.exception.AlluxioException;
import alluxio.metrics.MetricsSystem; import alluxio.metrics.MetricsSystem;
import alluxio.util.CommonUtils;
import alluxio.util.network.NetworkAddressUtils; import alluxio.util.network.NetworkAddressUtils;
import alluxio.wire.LockBlockResult; import alluxio.wire.LockBlockResult;
import alluxio.wire.WorkerNetAddress; import alluxio.wire.WorkerNetAddress;
import alluxio.worker.block.io.LocalFileBlockReader;


import com.codahale.metrics.Counter; import com.codahale.metrics.Counter;
import com.google.common.io.Closer; import com.google.common.io.Closer;


import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
Expand Down Expand Up @@ -71,43 +73,36 @@ public final class UnderFileSystemBlockInStream extends BufferedBlockInStream im
* @throws IOException if it fails to create {@link UnderFileSystemBlockInStream} * @throws IOException if it fails to create {@link UnderFileSystemBlockInStream}
*/ */
public static BufferedBlockInStream create(FileSystemContext context, String ufsPath, public static BufferedBlockInStream create(FileSystemContext context, String ufsPath,
final long blockId, long blockSize, long blockStart, WorkerNetAddress workerNetAddress, long blockId, long blockSize, long blockStart, WorkerNetAddress workerNetAddress,
InStreamOptions options) throws IOException { InStreamOptions options) throws IOException {
Closer closer = Closer.create(); Closer closer = Closer.create();
try { try {
final BlockWorkerClient blockWorkerClient = BlockWorkerClient blockWorkerClient =
closer.register(context.createBlockWorkerClient(workerNetAddress)); closer.register(context.createBlockWorkerClient(workerNetAddress));
LockBlockOptions lockBlockOptions = LockBlockOptions lockBlockOptions =
LockBlockOptions.defaults().setUfsPath(ufsPath).setOffset(blockStart) LockBlockOptions.defaults().setUfsPath(ufsPath).setOffset(blockStart)
.setBlockSize(blockSize).setMaxUfsReadConcurrency(options.getMaxUfsReadConcurrency()); .setBlockSize(blockSize).setMaxUfsReadConcurrency(options.getMaxUfsReadConcurrency());
LockBlockResult result = blockWorkerClient.lockUfsBlock(blockId, lockBlockOptions); LockBlockResult result = blockWorkerClient.lockUfsBlock(blockId, lockBlockOptions);
closer.register(new Closeable() { closer.register(new LockBlockResource(blockWorkerClient, blockId));
@Override
public void close() throws IOException {
blockWorkerClient.unlockBlock(blockId);
}
});
if (LockBlockResult.isBlockCachedInAlluxio(result)) { if (LockBlockResult.isBlockCachedInAlluxio(result)) {
boolean local = blockWorkerClient.getDataServerAddress().getHostName() boolean local = blockWorkerClient.getDataServerAddress().getHostName()
.equals(NetworkAddressUtils.getLocalHostName()); .equals(NetworkAddressUtils.getLocalHostName());
if (local) { if (local) {
LocalFileBlockReader reader =
closer.register(new LocalFileBlockReader(result.getBlockPath()));
return LocalBlockInStream return LocalBlockInStream
.createWithBlockLocked(blockWorkerClient, blockId, blockSize, result.getBlockPath(), .createWithLockedBlock(blockWorkerClient, blockId, blockSize, reader, options);
options);
} else { } else {
return RemoteBlockInStream return RemoteBlockInStream
.createWithBlockLocked(context, blockWorkerClient, blockId, blockSize, .createWithLockedBlock(context, blockWorkerClient, blockId, blockSize,
result.getLockId(), options); result.getLockId(), options);
} }
} }
return new UnderFileSystemBlockInStream(context, blockId, blockSize, blockWorkerClient, return new UnderFileSystemBlockInStream(context, blockId, blockSize, blockWorkerClient,
options); options);
} catch (AlluxioException e) { } catch (AlluxioException | IOException e) {
closer.close(); CommonUtils.closeCloserIgnoreException(closer);
throw new IOException(e); throw CommonUtils.castToIOException(e);
} catch (IOException e) {
closer.close();
throw e;
} }
} }


Expand All @@ -128,12 +123,7 @@ private UnderFileSystemBlockInStream(FileSystemContext context, long blockId, lo
mCloser = Closer.create(); mCloser = Closer.create();
mBlockWorkerClient = blockWorkerClient; mBlockWorkerClient = blockWorkerClient;
mCloser.register(blockWorkerClient); mCloser.register(blockWorkerClient);
mCloser.register(new Closeable() { mCloser.register(new LockBlockResource(mBlockWorkerClient, mBlockId));
@Override
public void close() throws IOException {
mBlockWorkerClient.unlockBlock(mBlockId);
}
});
mNoCache = !options.getAlluxioStorageType().isStore(); mNoCache = !options.getAlluxioStorageType().isStore();
mLocal = blockWorkerClient.getDataServerAddress().getHostName() mLocal = blockWorkerClient.getDataServerAddress().getHostName()
.equals(NetworkAddressUtils.getLocalHostName()); .equals(NetworkAddressUtils.getLocalHostName());
Expand Down
Expand Up @@ -20,6 +20,7 @@
import alluxio.client.block.options.LockBlockOptions; import alluxio.client.block.options.LockBlockOptions;
import alluxio.client.file.FileSystemContext; import alluxio.client.file.FileSystemContext;
import alluxio.client.file.options.InStreamOptions; import alluxio.client.file.options.InStreamOptions;
import alluxio.client.resource.LockBlockResource;
import alluxio.exception.AlluxioException; import alluxio.exception.AlluxioException;
import alluxio.proto.dataserver.Protocol; import alluxio.proto.dataserver.Protocol;
import alluxio.util.CommonUtils; import alluxio.util.CommonUtils;
Expand All @@ -29,7 +30,6 @@


import com.google.common.io.Closer; import com.google.common.io.Closer;


import java.io.Closeable;
import java.io.FilterInputStream; import java.io.FilterInputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
Expand Down Expand Up @@ -70,7 +70,7 @@ public final class BlockInStream extends FilterInputStream implements BoundedStr
* @throws IOException if it fails to create an instance * @throws IOException if it fails to create an instance
* @return the {@link BlockInStream} created * @return the {@link BlockInStream} created
*/ */
public static BlockInStream createLocalBlockInStream(final long blockId, long blockSize, public static BlockInStream createLocalBlockInStream(long blockId, long blockSize,
WorkerNetAddress workerNetAddress, FileSystemContext context, InStreamOptions options) WorkerNetAddress workerNetAddress, FileSystemContext context, InStreamOptions options)
throws IOException { throws IOException {
Closer closer = Closer.create(); Closer closer = Closer.create();
Expand All @@ -79,12 +79,7 @@ public static BlockInStream createLocalBlockInStream(final long blockId, long bl
closer.register(context.createBlockWorkerClient(workerNetAddress)); closer.register(context.createBlockWorkerClient(workerNetAddress));
LockBlockResult lockBlockResult = LockBlockResult lockBlockResult =
blockWorkerClient.lockBlock(blockId, LockBlockOptions.defaults()); blockWorkerClient.lockBlock(blockId, LockBlockOptions.defaults());
closer.register(new Closeable() { closer.register(new LockBlockResource(blockWorkerClient, blockId));
@Override
public void close() throws IOException {
blockWorkerClient.unlockBlock(blockId);
}
});
PacketInStream inStream = closer.register(PacketInStream PacketInStream inStream = closer.register(PacketInStream
.createLocalPacketInstream(lockBlockResult.getBlockPath(), blockId, blockSize)); .createLocalPacketInstream(lockBlockResult.getBlockPath(), blockId, blockSize));
blockWorkerClient.accessBlock(blockId); blockWorkerClient.accessBlock(blockId);
Expand All @@ -106,7 +101,7 @@ public void close() throws IOException {
* @throws IOException if it fails to create an instance * @throws IOException if it fails to create an instance
* @return the {@link BlockInStream} created * @return the {@link BlockInStream} created
*/ */
public static BlockInStream createRemoteBlockInStream(final long blockId, long blockSize, public static BlockInStream createRemoteBlockInStream(long blockId, long blockSize,
WorkerNetAddress workerNetAddress, FileSystemContext context, InStreamOptions options) WorkerNetAddress workerNetAddress, FileSystemContext context, InStreamOptions options)
throws IOException { throws IOException {
Closer closer = Closer.create(); Closer closer = Closer.create();
Expand All @@ -115,12 +110,7 @@ public static BlockInStream createRemoteBlockInStream(final long blockId, long b
closer.register(context.createBlockWorkerClient(workerNetAddress)); closer.register(context.createBlockWorkerClient(workerNetAddress));
LockBlockResult lockBlockResult = LockBlockResult lockBlockResult =
blockWorkerClient.lockBlock(blockId, LockBlockOptions.defaults()); blockWorkerClient.lockBlock(blockId, LockBlockOptions.defaults());
closer.register(new Closeable() { closer.register(new LockBlockResource(blockWorkerClient, blockId));
@Override
public void close() throws IOException {
blockWorkerClient.unlockBlock(blockId);
}
});
PacketInStream inStream = closer.register(PacketInStream PacketInStream inStream = closer.register(PacketInStream
.createNettyPacketInStream(context, blockWorkerClient.getDataServerAddress(), blockId, .createNettyPacketInStream(context, blockWorkerClient.getDataServerAddress(), blockId,
lockBlockResult.getLockId(), blockWorkerClient.getSessionId(), blockSize, false, lockBlockResult.getLockId(), blockWorkerClient.getSessionId(), blockSize, false,
Expand All @@ -134,8 +124,8 @@ public void close() throws IOException {
} }


/** /**
* Creates an instance of remote {@link BlockInStream} that reads a block from UFS from a remote * Creates an instance of {@link BlockInStream} that reads a block from an Alluxio worker that
* worker. * indirectly reads from UFS.
* *
* @param context the file system context * @param context the file system context
* @param ufsPath the UFS path * @param ufsPath the UFS path
Expand All @@ -148,23 +138,18 @@ public void close() throws IOException {
* @return the {@link BlockInStream} created * @return the {@link BlockInStream} created
*/ */
public static BlockInStream createUfsBlockInStream(FileSystemContext context, String ufsPath, public static BlockInStream createUfsBlockInStream(FileSystemContext context, String ufsPath,
final long blockId, long blockSize, long blockStart, long blockId, long blockSize, long blockStart,
WorkerNetAddress workerNetAddress, InStreamOptions options) throws IOException { WorkerNetAddress workerNetAddress, InStreamOptions options) throws IOException {
Closer closer = Closer.create(); Closer closer = Closer.create();
try { try {
final BlockWorkerClient blockWorkerClient = BlockWorkerClient blockWorkerClient =
closer.register(context.createBlockWorkerClient(workerNetAddress)); closer.register(context.createBlockWorkerClient(workerNetAddress));
LockBlockOptions lockBlockOptions = LockBlockOptions lockBlockOptions =
LockBlockOptions.defaults().setUfsPath(ufsPath).setOffset(blockStart) LockBlockOptions.defaults().setUfsPath(ufsPath).setOffset(blockStart)
.setBlockSize(blockSize).setMaxUfsReadConcurrency(options.getMaxUfsReadConcurrency()); .setBlockSize(blockSize).setMaxUfsReadConcurrency(options.getMaxUfsReadConcurrency());


LockBlockResult lockBlockResult = blockWorkerClient.lockUfsBlock(blockId, lockBlockOptions); LockBlockResult lockBlockResult = blockWorkerClient.lockUfsBlock(blockId, lockBlockOptions);
closer.register(new Closeable() { closer.register(new LockBlockResource(blockWorkerClient, blockId));
@Override
public void close() throws IOException {
blockWorkerClient.unlockBlock(blockId);
}
});
PacketInStream inStream; PacketInStream inStream;
if (LockBlockResult.isBlockCachedInAlluxio(lockBlockResult)) { if (LockBlockResult.isBlockCachedInAlluxio(lockBlockResult)) {
boolean local = blockWorkerClient.getDataServerAddress().getHostName() boolean local = blockWorkerClient.getDataServerAddress().getHostName()
Expand Down Expand Up @@ -242,12 +227,7 @@ private BlockInStream(PacketInStream inputStream, long blockId,
mCloser = Closer.create(); mCloser = Closer.create();
// Closer closes the closeables in LIFO order. // Closer closes the closeables in LIFO order.
mCloser.register(mBlockWorkerClient); mCloser.register(mBlockWorkerClient);
mCloser.register(new Closeable() { mCloser.register(new LockBlockResource(mBlockWorkerClient, mBlockId));
@Override
public void close() throws IOException {
mBlockWorkerClient.unlockBlock(mBlockId);
}
});
mCloser.register(mInputStream); mCloser.register(mInputStream);
mLocal = blockWorkerClient.getDataServerAddress().getHostName() mLocal = blockWorkerClient.getDataServerAddress().getHostName()
.equals(NetworkAddressUtils.getClientHostName()); .equals(NetworkAddressUtils.getClientHostName());
Expand Down

0 comments on commit f2c3c11

Please sign in to comment.