Skip to content

Commit

Permalink
Update BlockLockManager to use ClientRWLock
Browse files Browse the repository at this point in the history
  • Loading branch information
apc999 authored and calvinjia committed Jun 27, 2015
1 parent 99c55cc commit cd702ef
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 56 deletions.
135 changes: 79 additions & 56 deletions servers/src/main/java/tachyon/worker/block/BlockLockManager.java
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -15,19 +15,22 @@


package tachyon.worker.block; package tachyon.worker.block;


import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;


import com.google.common.collect.Sets;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;


import com.google.common.base.Optional; import com.google.common.base.Optional;


import tachyon.Constants; import tachyon.Constants;
import tachyon.Pair; import tachyon.worker.ClientRWLock;


/** /**
* Handle all block locks. * Handle all block locks.
Expand All @@ -36,55 +39,80 @@
*/ */
public class BlockLockManager { public class BlockLockManager {
private static final Logger LOG = LoggerFactory.getLogger(Constants.LOGGER_TYPE); private static final Logger LOG = LoggerFactory.getLogger(Constants.LOGGER_TYPE);
private static final int NUM_LOCKS = 100;
/** The unique id of each lock **/ /** The unique id of each lock **/
private static final AtomicLong LOCK_ID_GEN = new AtomicLong(0); private static final AtomicLong LOCK_ID_GEN = new AtomicLong(0);


/** A map from a block ID to its lock */ /** A map from a block ID to its lock */
private final Map<Long, BlockLock> mBlockIdToLockMap = new HashMap<Long, BlockLock>(); private final List<ClientRWLock> mLockArray = new ArrayList<ClientRWLock>(NUM_LOCKS);
/** A map from a user ID to all the locks hold by this user */ /** A map from a user ID to all the locks hold by this user */
private final Map<Long, Set<Long>> mUserIdToLockIdsMap = new HashMap<Long, Set<Long>>(); private final Map<Long, Set<Long>> mUserIdToLockIdsMap = new HashMap<Long, Set<Long>>();
/** A map from a lock ID to the user ID holding this lock */ /** A map from a lock ID to the user ID holding this lock */
private final Map<Long, Pair<Long, Lock>> mLockIdToUserIdAndLockMap = private final Map<Long, LockRecord> mLockIdToRecordMap = new HashMap<Long, LockRecord>();
new HashMap<Long, Pair<Long, Lock>>();
private class LockRecord {
private final long mUserId;
private final long mBlockId;
private final Lock mLock;

LockRecord(long userId, long blockId, Lock lock) {
mUserId = userId;
mBlockId = blockId;
mLock = lock;
}

long userId() {
return mUserId;
}

long blockId() {
return mBlockId;
}

Lock lock() {
return mLock;
}
}


public BlockLockManager() {} public BlockLockManager() {}


public synchronized Optional<Long> lockBlock(long userId, long blockId, public synchronized Optional<Long> lockBlock(long userId, long blockId,
BlockLock.BlockLockType blockLockType) { BlockLockType blockLockType) {
if (!mBlockIdToLockMap.containsKey(blockId)) { int hashValue = (int) blockId % NUM_LOCKS;
LOG.error("Cannot get lock for block {}: not exists", blockId); ClientRWLock blockLock = mLockArray.get(hashValue);
return Optional.absent();
}
BlockLock blockLock = mBlockIdToLockMap.get(blockId);
Lock lock = null; Lock lock = null;
if (blockLockType == BlockLock.BlockLockType.READ) { if (blockLockType == BlockLockType.READ) {
lock = blockLock.readLock(); lock = blockLock.readLock();
} else if (blockLockType == BlockLock.BlockLockType.WRITE) { } else if (blockLockType == BlockLockType.WRITE) {
lock = blockLock.writeLock(); lock = blockLock.writeLock();
} }
lock.lock(); lock.lock();
long lockId = createLockId(userId, lock);
return Optional.of(lockId);
}

public synchronized boolean unlockBlock(long lockId) {
// TODO: implement me
// do unlock

cleanupLockId(lockId);
return true;
}

private synchronized long createLockId(long userId, Lock lock) {
// TODO: implement me
long lockId = LOCK_ID_GEN.getAndIncrement(); long lockId = LOCK_ID_GEN.getAndIncrement();
// mUserIdToAcquiredLockIdsMap.put(userId, lockID); mLockIdToRecordMap.put(lockId, new LockRecord(userId, blockId, lock));
return lockId; Set<Long> userLockIds = mUserIdToLockIdsMap.get(userId);
if (null == userLockIds) {
mUserIdToLockIdsMap.put(userId, Sets.newHashSet(lockId));
} else {
userLockIds.add(lockId);
}
return Optional.of(lockId);
} }


private synchronized boolean cleanupLockId(long lockId) { public boolean unlockBlock(long lockId) {
// TODO: implement me LockRecord record = mLockIdToRecordMap.get(lockId);
// mUserIdToAcquiredLockIdsMap.put(userId, lockID); if (null == record) {
return false;
}
long userId = record.userId();
Lock lock = record.lock();

mLockIdToRecordMap.remove(lockId);
Set<Long> userLockIds = mUserIdToLockIdsMap.get(userId);
userLockIds.remove(lockId);
if (userLockIds.isEmpty()) {
mUserIdToLockIdsMap.remove(userId);
}
lock.unlock();
return true; return true;
} }


Expand All @@ -96,38 +124,33 @@ private synchronized boolean cleanupLockId(long lockId) {
* @param lockId The ID of the lock * @param lockId The ID of the lock
* @return true if validation succeeds, false otherwise * @return true if validation succeeds, false otherwise
*/ */
public synchronized boolean validateLockId(long userId, long blockId, long lockId) { public boolean validateLockId(long userId, long blockId, long lockId) {
// TODO: implement me LockRecord record = mLockIdToRecordMap.get(lockId);
return true; if (null == record) {
}

/**
* Get the lock for the given block id. If there is no such a lock yet, create one.
*
* @param blockId The id of the block
* @return true if success, false otherwise
*/
public synchronized boolean addBlockLock(long blockId) {
if (mBlockIdToLockMap.containsKey(blockId)) {
LOG.error("Cannot add lock for block {}: already exists", blockId);
return false; return false;
} }
mBlockIdToLockMap.put(blockId, new BlockLock(blockId)); return userId == record.userId() && blockId == record.blockId();
return true;
} }


/** /**
* Remove a lock for the given block id. * Cleans up the locks currently hold by a specific user
* *
* @param blockId The id of the block * @param userId the ID of the user to cleanup
* @return true if success, false otherwise
*/ */
public synchronized boolean removeBlockLock(long blockId) { public void cleanupUser(long userId) {
if (!mBlockIdToLockMap.containsKey(blockId)) { Set<Long> userLockIds = mUserIdToLockIdsMap.get(userId);
LOG.error("Cannot remove lock for block {}: not exists", blockId); if (null == userLockIds) {
return false; return;
} }
mBlockIdToLockMap.remove(blockId); for (long lockId : userLockIds) {
return true; LockRecord record = mLockIdToRecordMap.get(lockId);
if (null == record) {
return;
}
Lock lock = record.lock();
lock.unlock();
mLockIdToRecordMap.remove(lockId);
}
mUserIdToLockIdsMap.remove(userId);
} }
} }
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ public boolean freeSpace(long userId, long size, BlockStoreLocation location) th
public boolean cleanupUser(long userId) { public boolean cleanupUser(long userId) {
mEvictionLock.writeLock().lock(); mEvictionLock.writeLock().lock();
mMetaManager.cleanupUser(userId); mMetaManager.cleanupUser(userId);
mLockManager.cleanupUser(userId);
mEvictionLock.writeLock().unlock(); mEvictionLock.writeLock().unlock();
return false; return false;
} }
Expand Down

0 comments on commit cd702ef

Please sign in to comment.