Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Add more methods in BlockMetaEventListener interface
  • Loading branch information
apc999 authored and calvinjia committed Jun 27, 2015
1 parent dc62f88 commit 18d94e3
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 22 deletions.
Expand Up @@ -77,13 +77,13 @@ public void postCommitBlock(long userId, long blockId, BlockStoreLocation locati
}

@Override
public void preMoveBlock(long userId, long blockId, BlockStoreLocation oldLocation,
public void preMoveBlockByClient(long userId, long blockId, BlockStoreLocation oldLocation,
BlockStoreLocation newLocation) {
// Do nothing
}

@Override
public void postMoveBlock(long userId, long blockId, BlockStoreLocation oldLocation,
public void postMoveBlockByClient(long userId, long blockId, BlockStoreLocation oldLocation,
BlockStoreLocation newLocation) {
Long storageDirId = newLocation.getStorageDirId();
synchronized (mLock) {
Expand All @@ -99,22 +99,22 @@ public void postMoveBlock(long userId, long blockId, BlockStoreLocation oldLocat
}

@Override
public void preRemoveBlock(long userId, long blockId) {
public void preRemoveBlockByClient(long userId, long blockId) {
// Do nothing
}

@Override
public void postRemoveBlock(long userId, long blockId) {
public void postRemoveBlockByClient(long userId, long blockId) {
// Do nothing
}

@Override
public void preEvictBlock(long userId, long blockId) {
public void preRemoveBlockByWorker(long userId, long blockId) {
// Do nothing
}

@Override
public void postEvictBlock(long userId, long blockId) {
public void postRemoveBlockByWorker(long userId, long blockId) {
synchronized (mLock) {
// Remove the block from list of added blocks, in case it was added in this heartbeat period.
removeBlockFromAddedBlocks(blockId);
Expand All @@ -125,6 +125,29 @@ public void postEvictBlock(long userId, long blockId) {
}
}

@Override
public void preMoveBlockByWorker(long userId, long blockId, BlockStoreLocation oldLocation,
BlockStoreLocation newLocation) {
// Do nothing
}

@Override
public void postMoveBlockByWorker(long userId, long blockId, BlockStoreLocation oldLocation,
BlockStoreLocation newLocation) {
Long storageDirId = newLocation.getStorageDirId();
synchronized (mLock) {
// Add the block to the removed block list to remove the previous location
// TODO: We should have a better mechanism to indicate block movement
mRemovedBlocks.add(blockId);
// Remove the block from our list of added blocks in this heartbeat, if it was added to
// prevent adding the block twice.
removeBlockFromAddedBlocks(blockId);
// Add the block back with the new storagedir.
addBlockToAddedBlocks(blockId, storageDirId);
}
}


@Override
public void preAbortBlock(long userId, long blockId) {
// Do nothing
Expand Down
Expand Up @@ -26,21 +26,28 @@ public interface BlockMetaEventListener {

void postCommitBlock(long userId, long blockId, BlockStoreLocation location);

void preMoveBlock(long userId, long blockId, BlockStoreLocation oldLocation,
void preAbortBlock(long userId, long blockId);

void postAbortBlock(long userId, long blockId);

void preMoveBlockByClient(long userId, long blockId, BlockStoreLocation oldLocation,
BlockStoreLocation newLocation);

void postMoveBlock(long userId, long blockId, BlockStoreLocation oldLocation,
void postMoveBlockByClient(long userId, long blockId, BlockStoreLocation oldLocation,
BlockStoreLocation newLocation);

void preRemoveBlock(long userId, long blockId);
void preRemoveBlockByClient(long userId, long blockId);

void postRemoveBlock(long userId, long blockId);
void postRemoveBlockByClient(long userId, long blockId);

void preEvictBlock(long userId, long blockId);
void preMoveBlockByWorker(long userId, long blockId, BlockStoreLocation oldLocation,
BlockStoreLocation newLocation);

void postEvictBlock(long userId, long blockId);
void postMoveBlockByWorker(long userId, long blockId, BlockStoreLocation oldLocation,
BlockStoreLocation newLocation);

void preAbortBlock(long userId, long blockId);
void preRemoveBlockByWorker(long userId, long blockId);

void postRemoveBlockByWorker(long userId, long blockId);

void postAbortBlock(long userId, long blockId);
}
21 changes: 15 additions & 6 deletions servers/src/main/java/tachyon/worker/block/TieredBlockStore.java
Expand Up @@ -187,14 +187,14 @@ public void moveBlock(long userId, long blockId, BlockStoreLocation newLocation)
BlockMeta blockMeta = mMetaManager.getBlockMeta(blockId);
BlockStoreLocation oldLocation = blockMeta.getBlockLocation();
for (BlockMetaEventListener listener : mMetaEventListeners) {
listener.preMoveBlock(userId, blockId, oldLocation, newLocation);
listener.preMoveBlockByClient(userId, blockId, oldLocation, newLocation);
}
try {
moveBlockNoLock(blockId, newLocation);
blockMeta = mMetaManager.getBlockMeta(blockId);
BlockStoreLocation actualNewLocation = blockMeta.getBlockLocation();
for (BlockMetaEventListener listener : mMetaEventListeners) {
listener.postMoveBlock(userId, blockId, oldLocation, actualNewLocation);
listener.postMoveBlockByClient(userId, blockId, oldLocation, actualNewLocation);
}
} finally {
mLockManager.unlockBlock(lockId);
Expand All @@ -211,12 +211,12 @@ public void removeBlock(long userId, long blockId) throws IOException {
try {
long lockId = mLockManager.lockBlock(userId, blockId, BlockLockType.WRITE);
for (BlockMetaEventListener listener : mMetaEventListeners) {
listener.preRemoveBlock(userId, blockId);
listener.preRemoveBlockByClient(userId, blockId);
}
try {
removeBlockNoLock(userId, blockId);
for (BlockMetaEventListener listener : mMetaEventListeners) {
listener.postRemoveBlock(userId, blockId);
listener.postRemoveBlockByClient(userId, blockId);
}
} finally {
mLockManager.unlockBlock(lockId);
Expand Down Expand Up @@ -411,12 +411,12 @@ private void freeSpaceInternal(long userId, long availableBytes, BlockStoreLocat
for (long blockId : plan.toEvict()) {
long lockId = mLockManager.lockBlock(userId, blockId, BlockLockType.WRITE);
for (BlockMetaEventListener listener : mMetaEventListeners) {
listener.preEvictBlock(userId, blockId);
listener.preRemoveBlockByWorker(userId, blockId);
}
try {
removeBlockNoLock(userId, blockId);
for (BlockMetaEventListener listener : mMetaEventListeners) {
listener.postEvictBlock(userId, blockId);
listener.postRemoveBlockByWorker(userId, blockId);
}
} catch (IOException e) {
throw new IOException("Failed to free space: cannot evict block " + blockId);
Expand All @@ -428,9 +428,18 @@ private void freeSpaceInternal(long userId, long availableBytes, BlockStoreLocat
for (Pair<Long, BlockStoreLocation> entry : plan.toMove()) {
long blockId = entry.getFirst();
BlockStoreLocation newLocation = entry.getSecond();
BlockMeta blockMeta = mMetaManager.getBlockMeta(blockId);
BlockStoreLocation oldLocation = blockMeta.getBlockLocation();
long lockId = mLockManager.lockBlock(userId, blockId, BlockLockType.WRITE);

for (BlockMetaEventListener listener : mMetaEventListeners) {
listener.preMoveBlockByWorker(userId, blockId, oldLocation, newLocation);
}
try {
moveBlockNoLock(blockId, newLocation);
for (BlockMetaEventListener listener : mMetaEventListeners) {
listener.postMoveBlockByWorker(userId, blockId, oldLocation, newLocation);
}
} catch (IOException e) {
throw new IOException("Failed to free space: cannot move block " + blockId + " to "
+ newLocation);
Expand Down
Expand Up @@ -38,11 +38,11 @@ public final void before() {

private void moveBlock(long blockId, BlockStoreLocation newLocation) {
BlockStoreLocation unusedOldLocation = new BlockStoreLocation(1, 0);
mReporter.postMoveBlock(USER_ID, blockId, unusedOldLocation, newLocation);
mReporter.postMoveBlockByWorker(USER_ID, blockId, unusedOldLocation, newLocation);
}

private void removeBlock(long blockId) {
mReporter.postRemoveBlock(USER_ID, blockId);
mReporter.postRemoveBlockByWorker(USER_ID, blockId);
}

// Tests Empty Report
Expand Down

0 comments on commit 18d94e3

Please sign in to comment.