Skip to content

Commit

Permalink
Update a bunch to address comments from calvinjia
Browse files Browse the repository at this point in the history
  • Loading branch information
apc999 committed Jul 1, 2015
1 parent bca8118 commit e3dfa73
Show file tree
Hide file tree
Showing 6 changed files with 74 additions and 62 deletions.
42 changes: 21 additions & 21 deletions servers/src/main/java/tachyon/worker/block/BlockStore.java
Expand Up @@ -39,7 +39,7 @@ public interface BlockStore {
* *
* @param userId the ID of the user to lock this block * @param userId the ID of the user to lock this block
* @param blockId the ID of the block to lock * @param blockId the ID of the block to lock
* @return the lock ID if the lock is acquired successfully, {@link Optional#absent()} otherwise * @return the lock ID if the lock is acquired successfully
* @throws IOException if blockId is invalid * @throws IOException if blockId is invalid
*/ */
long lockBlock(long userId, long blockId) throws IOException; long lockBlock(long userId, long blockId) throws IOException;
Expand All @@ -53,7 +53,7 @@ public interface BlockStore {
void unlockBlock(long lockId) throws IOException; void unlockBlock(long lockId) throws IOException;


/** /**
* NOTE: debug only, will be removed after changing client side code. * NOTE: temporary, will be removed after changing client side code.
*/ */
void unlockBlock(long userId, long blockId) throws IOException; void unlockBlock(long userId, long blockId) throws IOException;


Expand All @@ -72,78 +72,78 @@ public interface BlockStore {
* @param location location to create this block * @param location location to create this block
* @param initialBlockSize initial size of this block in bytes * @param initialBlockSize initial size of this block in bytes
* @return metadata of the temp block created * @return metadata of the temp block created
* @throws IOException * @throws IOException if blockId or location is invalid, or this Store has no space
*/ */
TempBlockMeta createBlockMeta(long userId, long blockId, BlockStoreLocation location, TempBlockMeta createBlockMeta(long userId, long blockId, BlockStoreLocation location,
long initialBlockSize) throws IOException; long initialBlockSize) throws IOException;


/** /**
* Gets the meta data of a specific block from local storage. * Gets the meta data of a specific block from local storage.
* <p> * <p>
* This method requires the lock ID returned by a proceeding {@link #lockBlock}. * This method requires the lock ID returned by a previously acquired {@link #lockBlock}.
* *
* @param userId the ID of the user to get this file * @param userId the ID of the user to get this file
* @param blockId the ID of the block * @param blockId the ID of the block
* @param lockId the ID of the lock * @param lockId the ID of the lock
* @return metadata of the block * @return metadata of the block
* @throws IOException if the block can not be found. * @throws IOException if the block can not be found
*/ */
BlockMeta getBlockMeta(long userId, long blockId, long lockId) throws IOException; BlockMeta getBlockMeta(long userId, long blockId, long lockId) throws IOException;


/** /**
* Commits a temporary block to the local store. After commit, the block will be available in this * Commits a temporary block to the local store. After commit, the block will be available in this
* block store for all clients to access. Since a temp block is "private" to the writer, this * block store for all clients to access. Since a temp block is "private" to the writer, this
* method requires no proceeding lock acquired. * method requires no previously acquired lock.
* *
* @param userId the ID of the user * @param userId the ID of the user
* @param blockId the ID of a temp block * @param blockId the ID of a temp block
* @throws IOException * @throws IOException if the block can not be found or moved
*/ */
void commitBlock(long userId, long blockId) throws IOException; void commitBlock(long userId, long blockId) throws IOException;


/** /**
* Aborts a temporary block. The meta data of this block will not be added, its data will be * Aborts a temporary block. The meta data of this block will not be added, its data will be
* deleted and the space will be reclaimed. Since a temp block is "private" to the writer, this * deleted and the space will be reclaimed. Since a temp block is "private" to the writer, this
* requires no proceeding lock acquired. * requires no previously acquired lock.
* *
* @param userId the ID of the user * @param userId the ID of the user
* @param blockId the ID of a temp block * @param blockId the ID of a temp block
* @throws IOException * @throws IOException if the block is invalid or committed or can not be removed
*/ */
void abortBlock(long userId, long blockId) throws IOException; void abortBlock(long userId, long blockId) throws IOException;


/** /**
* Requests to increase the size of a temp block. Since a temp block is "private" to the writer * Requests to increase the size of a temp block. Since a temp block is "private" to the writer
* client, this operation requires no proceeding lock acquired. * client, this operation requires no previously acquired lock.
* *
* @param userId the ID of the user to request space * @param userId the ID of the user to request space
* @param blockId the ID of the temp block * @param blockId the ID of the temp block
* @param moreBytes the amount of more space to request in bytes * @param additionalBytes the amount of more space to request in bytes
* @throws IOException * @throws IOException if there is not enough space or other blocks fail to be moved/evicted
*/ */
void requestSpace(long userId, long blockId, long moreBytes) throws IOException; void requestSpace(long userId, long blockId, long additionalBytes) throws IOException;


/** /**
* Creates a writer to write data to a temp block. Since the temp block is "private" to the * Creates a writer to write data to a temp block. Since the temp block is "private" to the
* writer, this operation requires no proceeding lock acquired. * writer, this operation requires no previously acquired lock.
* *
* @param userId the ID of the user to get the writer * @param userId the ID of the user to get the writer
* @param blockId the ID of the temp block * @param blockId the ID of the temp block
* @return a {@link BlockWriter} instance on this block * @return a {@link BlockWriter} instance on this block
* @throws IOException * @throws IOException if the blockId is invalid, or block can not be created
*/ */
BlockWriter getBlockWriter(long userId, long blockId) throws IOException; BlockWriter getBlockWriter(long userId, long blockId) throws IOException;


/** /**
* Creates a reader of an existing block to read data from this block. * Creates a reader of an existing block to read data from this block.
* <p> * <p>
* This operation requires the lock ID returned by a proceeding {@link #lockBlock}. * This operation requires the lock ID returned by a previously acquired {@link #lockBlock}.
* *
* @param userId the ID of the user to get the reader * @param userId the ID of the user to get the reader
* @param blockId the ID of an existing block * @param blockId the ID of an existing block
* @param lockId the ID of the lock returned by {@link #lockBlock} * @param lockId the ID of the lock returned by {@link #lockBlock}
* @return a {@link BlockReader} instance on this block * @return a {@link BlockReader} instance on this block
* @throws IOException * @throws IOException if the blockId or lockId is invalid, or block can not be read
*/ */
BlockReader getBlockReader(long userId, long blockId, long lockId) throws IOException; BlockReader getBlockReader(long userId, long blockId, long lockId) throws IOException;


Expand All @@ -153,7 +153,7 @@ TempBlockMeta createBlockMeta(long userId, long blockId, BlockStoreLocation loca
* @param userId the ID of the user to remove a block * @param userId the ID of the user to remove a block
* @param blockId the ID of an existing block * @param blockId the ID of an existing block
* @param newLocation the location of the destination * @param newLocation the location of the destination
* @throws IOException * @throws IOException if blockId or newLocation is invalid, or block cannot be moved
*/ */
void moveBlock(long userId, long blockId, BlockStoreLocation newLocation) throws IOException; void moveBlock(long userId, long blockId, BlockStoreLocation newLocation) throws IOException;


Expand All @@ -162,7 +162,7 @@ TempBlockMeta createBlockMeta(long userId, long blockId, BlockStoreLocation loca
* *
* @param userId the ID of the user to remove a block * @param userId the ID of the user to remove a block
* @param blockId the ID of an existing block * @param blockId the ID of an existing block
* @throws IOException * @throws IOException if blockId is invalid, or block cannot be removed
*/ */
void removeBlock(long userId, long blockId) throws IOException; void removeBlock(long userId, long blockId) throws IOException;


Expand All @@ -187,7 +187,7 @@ TempBlockMeta createBlockMeta(long userId, long blockId, BlockStoreLocation loca
* unreleased locks by this user, reclaim space of temp blocks created by this user. * unreleased locks by this user, reclaim space of temp blocks created by this user.
* *
* @param userId the user ID * @param userId the user ID
* @throws IOException if failed to clean up (e.g., cannot delete file) * @throws IOException if block can not be deleted or locks can not be released
*/ */
void cleanupUser(long userId) throws IOException; void cleanupUser(long userId) throws IOException;


Expand All @@ -197,7 +197,7 @@ TempBlockMeta createBlockMeta(long userId, long blockId, BlockStoreLocation loca
* @param userId the user ID * @param userId the user ID
* @param availableBytes the amount of free space in bytes * @param availableBytes the amount of free space in bytes
* @param location the location to free space * @param location the location to free space
* @throws IOException * @throws IOException if location is invalid, or there is not enough space, or eviction failed
*/ */
void freeSpace(long userId, long availableBytes, BlockStoreLocation location) throws IOException; void freeSpace(long userId, long availableBytes, BlockStoreLocation location) throws IOException;


Expand Down
Expand Up @@ -16,7 +16,8 @@
package tachyon.worker.block; package tachyon.worker.block;


/** /**
* A listener interface for receiving meta data mutation events of {@link BlockStore}. * A listener interface for receiving meta data mutation events of {@link BlockStore}. All the
* callback methods are triggered only after the actual event has been completed successfully.
* <p> * <p>
* All methods may be called concurrently, thus listener implementation needs to ensure * All methods may be called concurrently, thus listener implementation needs to ensure
* thread-safety. * thread-safety.
Expand All @@ -31,6 +32,14 @@ public interface BlockStoreEventListener {
*/ */
void onAccessBlock(long userId, long blockId); void onAccessBlock(long userId, long blockId);


/**
* Actions when aborting a temporary block.
*
* @param userId the ID of the user to abort on this block
* @param blockId the ID of the block where the mutation to abort
*/
void onAbortBlock(long userId, long blockId);

/** /**
* Actions when committing a temporary block to a {@link BlockStoreLocation}. * Actions when committing a temporary block to a {@link BlockStoreLocation}.
* *
Expand All @@ -40,14 +49,6 @@ public interface BlockStoreEventListener {
*/ */
void onCommitBlock(long userId, long blockId, BlockStoreLocation location); void onCommitBlock(long userId, long blockId, BlockStoreLocation location);


/**
* Actions when aborting a temporary block.
*
* @param userId the ID of the user to abort on this block
* @param blockId the ID of the block where the mutation to abort
*/
void onAbortBlock(long userId, long blockId);

/** /**
* Actions when moving a block by a client from a {@link BlockStoreLocation} to another. * Actions when moving a block by a client from a {@link BlockStoreLocation} to another.
* *
Expand All @@ -59,14 +60,6 @@ public interface BlockStoreEventListener {
void onMoveBlockByClient(long userId, long blockId, BlockStoreLocation oldLocation, void onMoveBlockByClient(long userId, long blockId, BlockStoreLocation oldLocation,
BlockStoreLocation newLocation); BlockStoreLocation newLocation);


/**
* Actions when removing an existing block.
*
* @param userId the ID of the user to remove this block
* @param blockId the ID of the block to be removed
*/
void onRemoveBlockByClient(long userId, long blockId);

/** /**
* Actions when moving a block by a worker from a {@link BlockStoreLocation} to another. * Actions when moving a block by a worker from a {@link BlockStoreLocation} to another.
* *
Expand All @@ -78,6 +71,14 @@ void onMoveBlockByClient(long userId, long blockId, BlockStoreLocation oldLocati
void onMoveBlockByWorker(long userId, long blockId, BlockStoreLocation oldLocation, void onMoveBlockByWorker(long userId, long blockId, BlockStoreLocation oldLocation,
BlockStoreLocation newLocation); BlockStoreLocation newLocation);


/**
* Actions when removing an existing block.
*
* @param userId the ID of the user to remove this block
* @param blockId the ID of the block to be removed
*/
void onRemoveBlockByClient(long userId, long blockId);

/** /**
* Actions when removing an existing block by worker. * Actions when removing an existing block by worker.
* *
Expand Down
Expand Up @@ -25,22 +25,23 @@ public class BlockStoreEventListenerBase implements BlockStoreEventListener {
public void onAccessBlock(long userId, long blockId) {} public void onAccessBlock(long userId, long blockId) {}


@Override @Override
public void onCommitBlock(long userId, long blockId, BlockStoreLocation location) {} public void onAbortBlock(long userId, long blockId) {}


@Override @Override
public void onAbortBlock(long userId, long blockId) {} public void onCommitBlock(long userId, long blockId, BlockStoreLocation location) {}


@Override @Override
public void onMoveBlockByClient(long userId, long blockId, BlockStoreLocation oldLocation, public void onMoveBlockByClient(long userId, long blockId, BlockStoreLocation oldLocation,
BlockStoreLocation newLocation) {} BlockStoreLocation newLocation) {}


@Override
public void onRemoveBlockByClient(long userId, long blockId) {}

@Override @Override
public void onMoveBlockByWorker(long userId, long blockId, BlockStoreLocation oldLocation, public void onMoveBlockByWorker(long userId, long blockId, BlockStoreLocation oldLocation,
BlockStoreLocation newLocation) {} BlockStoreLocation newLocation) {}


@Override
public void onRemoveBlockByClient(long userId, long blockId) {}


@Override @Override
public void onRemoveBlockByWorker(long userId, long blockId) {} public void onRemoveBlockByWorker(long userId, long blockId) {}


Expand Down
Expand Up @@ -63,7 +63,7 @@ public int dir() {
} }


/** /**
* Whether this location belongs to the specific location. * Returns whether this location belongs to the specific location.
* *
* location A belongs to B either when A.equals(B) or tier and dir of A are all in the range of B. * location A belongs to B either when A.equals(B) or tier and dir of A are all in the range of B.
* *
Expand Down Expand Up @@ -97,6 +97,12 @@ public String toString() {
return result.toString(); return result.toString();
} }


/**
* Compares to a specific object.
*
* @param object the object to compare
* @return true if object is also {@link BlockStoreLocation} and represents the same tier and dir.
*/
@Override @Override
public boolean equals(Object object) { public boolean equals(Object object) {
if (object instanceof BlockStoreLocation if (object instanceof BlockStoreLocation
Expand Down
Expand Up @@ -32,10 +32,13 @@
* TODO: use proto buf to represent this information * TODO: use proto buf to represent this information
*/ */
public class BlockStoreMeta { public class BlockStoreMeta {
// TODO: the following two fields don't need to be computed on the creation of each
// {@link BlockStoreMeta} instance.
private final List<Long> mCapacityBytesOnTiers = new ArrayList<Long>(); private final List<Long> mCapacityBytesOnTiers = new ArrayList<Long>();
private final Map<Long, Long> mCapacityBytesOnDirs = new HashMap<Long, Long>();

private final List<Long> mUsedBytesOnTiers = new ArrayList<Long>(); private final List<Long> mUsedBytesOnTiers = new ArrayList<Long>();
private final Map<Long, List<Long>> mBlockIdsOnDirs = new HashMap<Long, List<Long>>(); private final Map<Long, List<Long>> mBlockIdsOnDirs = new HashMap<Long, List<Long>>();
private final Map<Long, Long> mCapacityBytesOnDirs = new HashMap<Long, Long>();
private final Map<Long, Long> mUsedBytesOnDirs = new HashMap<Long, Long>(); private final Map<Long, Long> mUsedBytesOnDirs = new HashMap<Long, Long>();
private final Map<Long, String> mDirPaths = new LinkedHashMap<Long, String>(); private final Map<Long, String> mDirPaths = new LinkedHashMap<Long, String>();


Expand Down
35 changes: 18 additions & 17 deletions servers/src/main/java/tachyon/worker/block/TieredBlockStore.java
Expand Up @@ -130,14 +130,12 @@ public BlockReader getBlockReader(long userId, long blockId, long lockId) throws
@Override @Override
public TempBlockMeta createBlockMeta(long userId, long blockId, BlockStoreLocation location, public TempBlockMeta createBlockMeta(long userId, long blockId, BlockStoreLocation location,
long initialBlockSize) throws IOException { long initialBlockSize) throws IOException {
TempBlockMeta tempBlock;
mEvictionLock.readLock().lock(); mEvictionLock.readLock().lock();
try { try {
tempBlock = createBlockMetaNoLock(userId, blockId, location, initialBlockSize); return createBlockMetaNoLock(userId, blockId, location, initialBlockSize);
} finally { } finally {
mEvictionLock.readLock().unlock(); mEvictionLock.readLock().unlock();
} }
return tempBlock;
} }


@Override @Override
Expand All @@ -149,9 +147,10 @@ public BlockMeta getBlockMeta(long userId, long blockId, long lockId) throws IOE
@Override @Override
public void commitBlock(long userId, long blockId) throws IOException { public void commitBlock(long userId, long blockId) throws IOException {
mEvictionLock.readLock().lock(); mEvictionLock.readLock().lock();
TempBlockMeta tempBlockMeta = mMetaManager.getTempBlockMeta(blockId);
try { try {
TempBlockMeta tempBlockMeta = mMetaManager.getTempBlockMeta(blockId);
commitBlockNoLock(userId, blockId, tempBlockMeta); commitBlockNoLock(userId, blockId, tempBlockMeta);
// TODO: move listeners outside of the lock.
for (BlockStoreEventListener listener : mBlockStoreEventListeners) { for (BlockStoreEventListener listener : mBlockStoreEventListeners) {
listener.onCommitBlock(userId, blockId, tempBlockMeta.getBlockLocation()); listener.onCommitBlock(userId, blockId, tempBlockMeta.getBlockLocation());
} }
Expand All @@ -174,15 +173,15 @@ public void abortBlock(long userId, long blockId) throws IOException {
} }


@Override @Override
public void requestSpace(long userId, long blockId, long moreBytes) throws IOException { public void requestSpace(long userId, long blockId, long additionalBytes) throws IOException {
// TODO: Change the lock to read lock and only upgrade to write lock if necessary // TODO: Change the lock to read lock and only upgrade to write lock if necessary
mEvictionLock.writeLock().lock(); mEvictionLock.writeLock().lock();
try { try {
TempBlockMeta tempBlockMeta = mMetaManager.getTempBlockMeta(blockId); TempBlockMeta tempBlockMeta = mMetaManager.getTempBlockMeta(blockId);
BlockStoreLocation location = tempBlockMeta.getBlockLocation(); freeSpaceInternal(userId, additionalBytes, tempBlockMeta.getBlockLocation());
freeSpaceInternal(userId, moreBytes, location);
// Increase the size of this temp block // Increase the size of this temp block
mMetaManager.resizeTempBlockMeta(tempBlockMeta, tempBlockMeta.getBlockSize() + moreBytes); mMetaManager.resizeTempBlockMeta(tempBlockMeta, tempBlockMeta.getBlockSize()
+ additionalBytes);
} finally { } finally {
mEvictionLock.writeLock().unlock(); mEvictionLock.writeLock().unlock();
} }
Expand All @@ -194,9 +193,9 @@ public void moveBlock(long userId, long blockId, BlockStoreLocation newLocation)
mEvictionLock.readLock().lock(); mEvictionLock.readLock().lock();
try { try {
long lockId = mLockManager.lockBlock(userId, blockId, BlockLockType.WRITE); long lockId = mLockManager.lockBlock(userId, blockId, BlockLockType.WRITE);
BlockMeta blockMeta = mMetaManager.getBlockMeta(blockId);
BlockStoreLocation oldLocation = blockMeta.getBlockLocation();
try { try {
BlockMeta blockMeta = mMetaManager.getBlockMeta(blockId);
BlockStoreLocation oldLocation = blockMeta.getBlockLocation();
moveBlockNoLock(blockId, newLocation); moveBlockNoLock(blockId, newLocation);
blockMeta = mMetaManager.getBlockMeta(blockId); blockMeta = mMetaManager.getBlockMeta(blockId);
BlockStoreLocation actualNewLocation = blockMeta.getBlockLocation(); BlockStoreLocation actualNewLocation = blockMeta.getBlockLocation();
Expand Down Expand Up @@ -255,25 +254,25 @@ public void cleanupUser(long userId) throws IOException {
List<TempBlockMeta> tempBlocksToRemove = mMetaManager.cleanupUser(userId); List<TempBlockMeta> tempBlocksToRemove = mMetaManager.cleanupUser(userId);
mLockManager.cleanupUser(userId); mLockManager.cleanupUser(userId);
mEvictionLock.readLock().unlock(); mEvictionLock.readLock().unlock();
Set<String> dirs = new HashSet<String>();
// TODO: fix the block removing below,
Set<String> dirs = new HashSet<String>(tempBlocksToRemove.size());
for (TempBlockMeta tempBlockMeta : tempBlocksToRemove) { for (TempBlockMeta tempBlockMeta : tempBlocksToRemove) {
String fileName = tempBlockMeta.getPath(); String fileName = tempBlockMeta.getPath();
try { try {
String dirName = CommonUtils.getParent(fileName); String dirName = CommonUtils.getParent(fileName);
dirs.add(dirName); dirs.add(dirName);
} catch (InvalidPathException e) { } catch (InvalidPathException e) {
LOG.error("Cannot parse parent dir of {}", fileName); LOG.error("Error in cleanup userId {}: cannot parse parent dir of {}", userId, fileName);
} }
if (!new File(fileName).delete()) { if (!new File(fileName).delete()) {
throw new IOException("Failed to cleanup userId " + userId + ": cannot delete file " LOG.error("Error in cleanup userId {}: cannot delete file {}", userId, fileName);
+ fileName);
} }
} }
// Cleanup the user folder // Cleanup the user folder
for (String dirName : dirs) { for (String dirName : dirs) {
if (!new File(dirName).delete()) { if (!new File(dirName).delete()) {
throw new IOException("Failed to cleanup userId " + userId + ": cannot delete directory " LOG.error("Error in cleanup userId {}: cannot delete directory ", userId, dirName);
+ dirName);
} }
} }
} }
Expand All @@ -285,7 +284,9 @@ public BlockStoreMeta getBlockStoreMeta() {


@Override @Override
public void registerBlockStoreEventListener(BlockStoreEventListener listener) { public void registerBlockStoreEventListener(BlockStoreEventListener listener) {
mBlockStoreEventListeners.add(listener); synchronized (mBlockStoreEventListeners) {
mBlockStoreEventListeners.add(listener);
}
} }


// Create a temp block meta. This method requires {@link mEvictionLock} is acquired in read mode. // Create a temp block meta. This method requires {@link mEvictionLock} is acquired in read mode.
Expand Down

0 comments on commit e3dfa73

Please sign in to comment.