Skip to content

Commit

Permalink
Update CoreWorker with new BlockStore interface.
Browse files Browse the repository at this point in the history
  • Loading branch information
calvinjia committed Jun 27, 2015
1 parent 7d2d89e commit 749b702
Showing 1 changed file with 15 additions and 13 deletions.
28 changes: 15 additions & 13 deletions servers/src/main/java/tachyon/worker/CoreWorker.java
Expand Up @@ -27,6 +27,7 @@
import tachyon.worker.block.io.BlockReader; import tachyon.worker.block.io.BlockReader;
import tachyon.worker.block.io.BlockWriter; import tachyon.worker.block.io.BlockWriter;
import tachyon.worker.block.meta.BlockMeta; import tachyon.worker.block.meta.BlockMeta;
import tachyon.worker.block.meta.TempBlockMeta;


import java.io.IOException; import java.io.IOException;
import java.util.List; import java.util.List;
Expand All @@ -53,11 +54,12 @@ public boolean cancelBlock(long userId, long blockId) {
} }


public String createBlock(long userId, long blockId, int location, long initialBytes) public String createBlock(long userId, long blockId, int location, long initialBytes)
throws OutOfSpaceException { throws IOException, OutOfSpaceException {
BlockStoreLocation loc = BlockStoreLocation.anyDirInTier(location); BlockStoreLocation loc = BlockStoreLocation.anyDirInTier(location);
Optional<BlockMeta> optBlock = mBlockStore.createBlockMeta(userId, blockId, loc, initialBytes); Optional<TempBlockMeta> optBlock =
mBlockStore.createBlockMeta(userId, blockId, loc, initialBytes);
if (optBlock.isPresent()) { if (optBlock.isPresent()) {
return optBlock.get().getTmpPath(); return optBlock.get().getPath();
} }
// Failed to allocate initial bytes // Failed to allocate initial bytes
throw new OutOfSpaceException("Failed to allocate " + initialBytes + " for user " + userId); throw new OutOfSpaceException("Failed to allocate " + initialBytes + " for user " + userId);
Expand All @@ -66,25 +68,26 @@ public String createBlock(long userId, long blockId, int location, long initialB
public BlockWriter createBlockRemote(long userId, long blockId, int location, long initialBytes) public BlockWriter createBlockRemote(long userId, long blockId, int location, long initialBytes)
throws FileDoesNotExistException, IOException { throws FileDoesNotExistException, IOException {
BlockStoreLocation loc = BlockStoreLocation.anyDirInTier(location); BlockStoreLocation loc = BlockStoreLocation.anyDirInTier(location);
Optional<BlockMeta> optBlock = mBlockStore.createBlockMeta(userId, blockId, loc, initialBytes); Optional<TempBlockMeta> optBlock =
mBlockStore.createBlockMeta(userId, blockId, loc, initialBytes);
if (optBlock.isPresent()) { if (optBlock.isPresent()) {
Optional<BlockWriter> optWriter = mBlockStore.getBlockWriter(userId, blockId); Optional<BlockWriter> optWriter = mBlockStore.getBlockWriter(userId, blockId);
if (optWriter.isPresent()) { if (optWriter.isPresent()) {
return optWriter.get(); return optWriter.get();
} }
// TODO: Throw a better exception // TODO: Throw a better exception
throw new IOException("Failed to obtain block writer"); throw new IOException("Failed to obtain block writer.");
} }
throw new FileDoesNotExistException("Block " + blockId + " does not exist on this worker."); throw new FileDoesNotExistException("Block " + blockId + " does not exist on this worker.");
} }


public boolean freeBlock(long userId, long blockId) { public boolean freeBlock(long userId, long blockId) throws IOException {
Optional<Long> optLock = mBlockStore.lockBlock(userId, blockId, BlockLock.BlockLockType.WRITE); Optional<Long> optLock = mBlockStore.lockBlock(userId, blockId, BlockLock.BlockLockType.WRITE);
if (!optLock.isPresent()) { if (!optLock.isPresent()) {
return false; return false;
} }
Long lockId = optLock.get(); Long lockId = optLock.get();
mBlockStore.removeBlock(userId, blockId, lockId, BlockStoreLocation.anyTier()); mBlockStore.removeBlock(userId, blockId, lockId);
mBlockStore.unlockBlock(lockId); mBlockStore.unlockBlock(lockId);
return true; return true;
} }
Expand Down Expand Up @@ -113,7 +116,7 @@ public long lockBlock(long userId, long blockId, int type) {
return -1; return -1;
} }


public boolean persistBlock(long userId, long blockId) { public boolean persistBlock(long userId, long blockId) throws IOException {
return mBlockStore.commitBlock(userId, blockId); return mBlockStore.commitBlock(userId, blockId);
} }


Expand All @@ -127,15 +130,15 @@ public String readBlock(long userId, long blockId, long lockId) throws FileDoesN
} }


public BlockReader readBlockRemote(long userId, long blockId, long lockId) public BlockReader readBlockRemote(long userId, long blockId, long lockId)
throws FileDoesNotExistException { throws FileDoesNotExistException, IOException {
Optional<BlockReader> optReader = mBlockStore.getBlockReader(userId, blockId, lockId); Optional<BlockReader> optReader = mBlockStore.getBlockReader(userId, blockId, lockId);
if (optReader.isPresent()) { if (optReader.isPresent()) {
return optReader.get(); return optReader.get();
} }
throw new FileDoesNotExistException("Block " + blockId + " does not exist on this worker."); throw new FileDoesNotExistException("Block " + blockId + " does not exist on this worker.");
} }


public boolean relocateBlock(long userId, long blockId, int destination) { public boolean relocateBlock(long userId, long blockId, int destination) throws IOException {
Optional<Long> optLock = mBlockStore.lockBlock(userId, blockId, BlockLock.BlockLockType.WRITE); Optional<Long> optLock = mBlockStore.lockBlock(userId, blockId, BlockLock.BlockLockType.WRITE);
// TODO: Define this behavior // TODO: Define this behavior
if (!optLock.isPresent()) { if (!optLock.isPresent()) {
Expand All @@ -150,11 +153,10 @@ public boolean relocateBlock(long userId, long blockId, int destination) {
return false; return false;
} }
// TODO: Add this to the BlockMeta API // TODO: Add this to the BlockMeta API
BlockStoreLocation oldLoc = optMeta.get().getLocation();
BlockStoreLocation newLoc = BlockStoreLocation.anyDirInTier(destination); BlockStoreLocation newLoc = BlockStoreLocation.anyDirInTier(destination);
if (mBlockStore.copyBlock(userId, blockId, lockId, newLoc)) { if (mBlockStore.copyBlock(userId, blockId, lockId, newLoc)) {
// TODO: What if this fails? // TODO: What if this fails?
mBlockStore.removeBlock(userId, blockId, lockId, oldLoc); mBlockStore.removeBlock(userId, blockId, lockId);
mBlockStore.unlockBlock(lockId); mBlockStore.unlockBlock(lockId);
return true; return true;
} else { } else {
Expand All @@ -163,7 +165,7 @@ public boolean relocateBlock(long userId, long blockId, int destination) {
} }
} }


public boolean requestSpace(long userId, long blockId, long bytesRequested) { public boolean requestSpace(long userId, long blockId, long bytesRequested) throws IOException {
return mBlockStore.requestSpace(userId, blockId, bytesRequested); return mBlockStore.requestSpace(userId, blockId, bytesRequested);
} }


Expand Down

0 comments on commit 749b702

Please sign in to comment.