Skip to content

Commit

Permalink
tmp
Browse files Browse the repository at this point in the history
  • Loading branch information
peisun1115 committed Mar 10, 2017
1 parent 66d222c commit d0e28e8
Show file tree
Hide file tree
Showing 10 changed files with 40 additions and 42 deletions.
Expand Up @@ -35,6 +35,7 @@
import alluxio.wire.LockBlockResult;
import alluxio.wire.ThriftUtils;
import alluxio.wire.WorkerNetAddress;
import alluxio.worker.block.BlockLockIdUtil;

import com.codahale.metrics.Counter;
import com.google.common.base.Preconditions;
Expand Down Expand Up @@ -248,17 +249,15 @@ public LockBlockResource lockUfsBlock(final long blockId, final LockBlockOptions
int retryInterval = Constants.SECOND_MS;
RetryPolicy retryPolicy = new TimeoutRetry(Configuration
.getLong(PropertyKey.USER_UFS_BLOCK_OPEN_TIMEOUT_MS), retryInterval);
UfsBlockAccessTokenUnavailableException exception;
do {
try {
return lockBlock(blockId, options);
} catch (UfsBlockAccessTokenUnavailableException e) {
LockBlockResource resource = lockBlock(blockId, options);
if (BlockLockIdUtil.isUfsBlockReadTokenUnavailable(resource.getResult().getLockId())) {
LOG.debug("Failed to acquire a UFS read token because of contention for block {} with "
+ "LockBlockOptions {}", blockId, options);
exception = e;
}
} while (retryPolicy.attemptRetry());
throw exception;
throw new UfsBlockAccessTokenUnavailableException(
ExceptionMessage.UFS_BLOCK_ACCESS_TOKEN_UNAVAILABLE, blockId, options.getUfsPath());
}

@Override
Expand Down
Expand Up @@ -100,8 +100,7 @@ public enum ExceptionMessage {
UFS_BLOCK_ALREADY_EXISTS_FOR_SESSION(
"UFS block {0,number,#} from UFS file {1} exists for session {2,number,#}"),
UFS_BLOCK_ACCESS_TOKEN_UNAVAILABLE(
"Failed to acquire a access token ({0,number,#} active) for the UFS block {1,number,#} "
+ "(filename: {2})."),
"Failed to acquire an access token for the UFS block {1,number,#} (filename: {2})."),
UFS_BLOCK_DOES_NOT_EXIST_FOR_SESSION(
"UFS block {0,number,#} does not exist for session {1,number,#}"),

Expand Down
4 changes: 3 additions & 1 deletion core/common/src/main/java/alluxio/wire/LockBlockResult.java
Expand Up @@ -11,6 +11,8 @@

package alluxio.wire;

import alluxio.worker.block.BlockLockIdUtil;

import com.google.common.base.Objects;
import com.google.common.base.Preconditions;

Expand Down Expand Up @@ -84,7 +86,7 @@ public LockBlockResult setBlockPath(String blockPath) {
* @return true if the block is in cached in Alluxio
*/
public boolean blockCachedInAlluxio() {
return getLockId() >= 0;
return BlockLockIdUtil.isAlluxioBlockLockId(getLockId());
}

/**
Expand Down
Expand Up @@ -46,8 +46,6 @@
public final class BlockLockManager {
private static final Logger LOG = LoggerFactory.getLogger(BlockLockManager.class);

/** Invalid lock ID that indicates a failure to lock a block. */
public static final long INVALID_LOCK_ID = -1;
/** The unique id of each lock. */
private static final AtomicLong LOCK_ID_GEN = new AtomicLong(0);

Expand Down
Expand Up @@ -42,12 +42,12 @@ interface BlockStore {

/**
* Locks an existing block and guards subsequent reads on this block. If the lock fails, return
* {@link BlockLockManager#INVALID_LOCK_ID}.
* {@link BlockLockIdUtil#INVALID_LOCK_ID}.
*
* @param sessionId the id of the session to lock this block
* @param blockId the id of the block to lock
* @return the lock id (non-negative) that uniquely identifies the lock obtained or
* {@link BlockLockManager#INVALID_LOCK_ID} if it failed to lock
* {@link BlockLockIdUtil#INVALID_LOCK_ID} if it failed to lock
*/
long lockBlockNoException(long sessionId, long blockId);

Expand Down
Expand Up @@ -58,9 +58,8 @@ public interface BlockWorker extends Worker {
* @throws InvalidWorkerStateException if blockId does not belong to sessionId
* @throws IOException if temporary block cannot be deleted
*/
void abortBlock(long sessionId, long blockId)
throws BlockAlreadyExistsException, BlockDoesNotExistException, InvalidWorkerStateException,
IOException;
void abortBlock(long sessionId, long blockId) throws BlockAlreadyExistsException,
BlockDoesNotExistException, InvalidWorkerStateException, IOException;

/**
* Access the block for a given session. This should be called to update the evictor when
Expand Down Expand Up @@ -229,12 +228,12 @@ BlockMeta getBlockMeta(long sessionId, long blockId, long lockId)

/**
* Obtains a read lock the block without throwing an exception. If the lock fails, return
* {@link BlockLockManager#INVALID_LOCK_ID}.
* {@link BlockLockIdUtil#INVALID_LOCK_ID}.
*
* @param sessionId the id of the client
* @param blockId the id of the block to be locked
* @return the lock id that uniquely identifies the lock obtained or
* {@link BlockLockManager#INVALID_LOCK_ID} if it failed to lock
* {@link BlockLockIdUtil#INVALID_LOCK_ID} if it failed to lock
*/
long lockBlockNoException(long sessionId, long blockId);

Expand Down Expand Up @@ -373,7 +372,9 @@ void requestSpace(long sessionId, long blockId, long additionalBytes)
FileInfo getFileInfo(long fileId) throws IOException;

/**
* Opens a UFS block.
* Opens a UFS block. It registers the block metadata information to the UFS block store. It
* throws an {@link UfsBlockAccessTokenUnavailableException} if the number of concurrent readers
* on this block exceeds a threshold.
*
* @param sessionId the session ID
* @param blockId the block ID
Expand All @@ -394,7 +395,8 @@ void openUfsBlock(long sessionId, long blockId, OpenUfsBlockOptions options)
* @param blockId the block ID
* @throws BlockAlreadyExistsException if it fails to commit the block to Alluxio block store
* because the block exists in the Alluxio block store
* @throws BlockDoesNotExistException if the UFS block does not exist in the {@link UfsBlockStore}
* @throws BlockDoesNotExistException if the UFS block does not exist in the
* {@link UnderFileSystemBlockStore}
* @throws InvalidWorkerStateException the worker is not in a valid state
* @throws IOException if any I/O related errors occur
* @throws WorkerOutOfSpaceException the the worker does not have enough space to commit the block
Expand Down
Expand Up @@ -20,6 +20,7 @@
import alluxio.WorkerStorageTierAssoc;
import alluxio.exception.AlluxioException;
import alluxio.exception.BlockDoesNotExistException;
import alluxio.exception.UfsBlockAccessTokenUnavailableException;
import alluxio.exception.UnexpectedAlluxioException;
import alluxio.exception.WorkerOutOfSpaceException;
import alluxio.thrift.AlluxioTException;
Expand All @@ -29,6 +30,7 @@
import alluxio.thrift.ThriftIOException;
import alluxio.worker.block.options.OpenUfsBlockOptions;

import com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -156,16 +158,22 @@ public LockBlockResult lockBlock(final long blockId, final long sessionId,
public LockBlockResult call() throws AlluxioException {
if (!options.isSetUfsPath() || options.getUfsPath().isEmpty()) {
long lockId = mWorker.lockBlock(sessionId, blockId);
Preconditions.checkState(BlockLockIdUtil.isAlluxioBlockLockId(lockId));
return new LockBlockResult(lockId, mWorker.readBlock(sessionId, blockId, lockId));
}

long lockId = mWorker.lockBlockNoException(sessionId, blockId);
if (lockId != BlockLockManager.INVALID_LOCK_ID) {
if (BlockLockIdUtil.isAlluxioBlockLockId(lockId)) {
return new LockBlockResult(lockId, mWorker.readBlock(sessionId, blockId, lockId));
}
// When the block does not exist in Alluxio but exists in UFS, try to open the UFS
// block.
mWorker.openUfsBlock(sessionId, blockId, new OpenUfsBlockOptions(options));
try {
mWorker.openUfsBlock(sessionId, blockId, new OpenUfsBlockOptions(options));
lockId = BlockLockIdUtil.UFS_BLOCK_LOCK_ID;
} catch (UfsBlockAccessTokenUnavailableException e) {
lockId = BlockLockIdUtil.UFS_BLOCK_READ_TOKEN_UNAVAILABLE;
}
return new LockBlockResult(lockId, "");
}

Expand Down
Expand Up @@ -158,7 +158,7 @@ public long lockBlockNoException(long sessionId, long blockId) {
}

mLockManager.unlockBlockNoException(lockId);
return BlockLockManager.INVALID_LOCK_ID;
return BlockLockIdUtil.INVALID_LOCK_ID;
}

@Override
Expand Down
Expand Up @@ -14,7 +14,6 @@
import alluxio.exception.BlockAlreadyExistsException;
import alluxio.exception.BlockDoesNotExistException;
import alluxio.exception.ExceptionMessage;
import alluxio.exception.UfsBlockAccessTokenUnavailableException;
import alluxio.worker.block.io.BlockReader;
import alluxio.worker.block.io.BlockWriter;
import alluxio.worker.block.meta.UnderFileSystemBlockMeta;
Expand Down Expand Up @@ -81,21 +80,17 @@ public UnderFileSystemBlockStore(BlockStore localBlockStore) {
* @param sessionId the session ID
* @param blockId maximum concurrency
* @param options the options
* @return whether an access token is acquired
* @throws BlockAlreadyExistsException if the block already exists for a session ID
* @throws UfsBlockAccessTokenUnavailableException if there are too many concurrent sessions
* accessing the block
*/
// TODO(peis): Avoid throwing UfsBlockAccessTokenUnavailableException by returning a status.
public void acquireAccess(long sessionId, long blockId, OpenUfsBlockOptions options)
throws BlockAlreadyExistsException, UfsBlockAccessTokenUnavailableException {
public boolean acquireAccess(long sessionId, long blockId, OpenUfsBlockOptions options)
throws BlockAlreadyExistsException {
UnderFileSystemBlockMeta blockMeta = new UnderFileSystemBlockMeta(sessionId, blockId, options);
mLock.lock();
try {
Set<Long> sessionIds = mBlockIdToSessionIds.get(blockId);
if (sessionIds != null && sessionIds.size() >= options.getMaxUfsReadConcurrency()) {
throw new UfsBlockAccessTokenUnavailableException(
ExceptionMessage.UFS_BLOCK_ACCESS_TOKEN_UNAVAILABLE, sessionIds.size(), blockId,
blockMeta.getUnderFileSystemPath());
return false;
}
if (sessionIds == null) {
sessionIds = new HashSet<>();
Expand All @@ -116,6 +111,7 @@ public void acquireAccess(long sessionId, long blockId, OpenUfsBlockOptions opti
throw new BlockAlreadyExistsException(ExceptionMessage.UFS_BLOCK_ALREADY_EXISTS_FOR_SESSION,
blockId, blockMeta.getUnderFileSystemPath(), sessionId);
}
return true;
}

/**
Expand Down
Expand Up @@ -11,7 +11,6 @@

package alluxio.worker.block;

import alluxio.exception.UfsBlockAccessTokenUnavailableException;
import alluxio.thrift.LockBlockTOptions;
import alluxio.worker.block.options.OpenUfsBlockOptions;

Expand Down Expand Up @@ -48,25 +47,20 @@ public void before() throws Exception {
public void acquireAccess() throws Exception {
UnderFileSystemBlockStore blockStore = new UnderFileSystemBlockStore(mAlluxioBlockStore);
for (int i = 0; i < 5; i++) {
blockStore.acquireAccess(i + 1, BLOCK_ID, mOpenUfsBlockOptions);
Assert.assertTrue(blockStore.acquireAccess(i + 1, BLOCK_ID, mOpenUfsBlockOptions));
}

try {
blockStore.acquireAccess(6, BLOCK_ID, mOpenUfsBlockOptions);
Assert.fail();
} catch (UfsBlockAccessTokenUnavailableException e) {
// expected
}
Assert.assertFalse(blockStore.acquireAccess(6, BLOCK_ID, mOpenUfsBlockOptions));
}

@Test
public void releaseAccess() throws Exception {
UnderFileSystemBlockStore blockStore = new UnderFileSystemBlockStore(mAlluxioBlockStore);
for (int i = 0; i < 5; i++) {
blockStore.acquireAccess(i + 1, BLOCK_ID, mOpenUfsBlockOptions);
Assert.assertTrue(blockStore.acquireAccess(i + 1, BLOCK_ID, mOpenUfsBlockOptions));
blockStore.releaseAccess(i + 1, BLOCK_ID);
}

blockStore.acquireAccess(6, BLOCK_ID, mOpenUfsBlockOptions);
Assert.assertTrue(blockStore.acquireAccess(6, BLOCK_ID, mOpenUfsBlockOptions));
}
}

0 comments on commit d0e28e8

Please sign in to comment.