Skip to content

Commit

Permalink
Remove commit pending completely
Browse files Browse the repository at this point in the history
  • Loading branch information
peisun1115 committed Mar 10, 2017
1 parent 130cf93 commit 0036457
Show file tree
Hide file tree
Showing 9 changed files with 62 additions and 93 deletions.
Expand Up @@ -244,18 +244,32 @@ public StorageDir getDir(BlockStoreLocation location) {
* Gets the metadata of a temp block.
*
* @param blockId the id of the temp block
* @return metadata of the block or null
* @return metadata of the block
* @throws BlockDoesNotExistException when block id can not be found
*/
public TempBlockMeta getTempBlockMeta(long blockId) throws BlockDoesNotExistException {
TempBlockMeta blockMeta = getTempBlockMetaOrNull(blockId);
if (blockMeta == null) {
throw new BlockDoesNotExistException(ExceptionMessage.TEMP_BLOCK_META_NOT_FOUND, blockId);
}
return blockMeta;
}

/**
* Gets the metadata of a temp block.
*
* @param blockId the id of the temp block
* @return metadata of the block or null
*/
public TempBlockMeta getTempBlockMetaOrNull(long blockId) {
for (StorageTier tier : mTiers) {
for (StorageDir dir : tier.getStorageDirs()) {
if (dir.hasTempBlockMeta(blockId)) {
return dir.getTempBlockMeta(blockId);
}
}
}
throw new BlockDoesNotExistException(ExceptionMessage.TEMP_BLOCK_META_NOT_FOUND, blockId);
return null;
}

/**
Expand Down

This file was deleted.

Expand Up @@ -122,6 +122,8 @@ TempBlockMeta createBlock(long sessionId, long blockId, BlockStoreLocation locat
BlockMeta getBlockMeta(long sessionId, long blockId, long lockId)
throws BlockDoesNotExistException, InvalidWorkerStateException;

TempBlockMeta getTempBlockMeta(long sessionId, long blockId);

/**
* Commits a temporary block to the local store. After commit, the block will be available in this
* block store for all clients to access. Since a temp block is "private" to the writer, this
Expand Down
Expand Up @@ -447,10 +447,16 @@ public boolean openUfsBlock(long sessionId, long blockId, OpenUfsBlockOptions op

@Override
public void closeUfsBlock(long sessionId, long blockId)
throws BlockAlreadyExistsException, BlockDoesNotExistException, InvalidWorkerStateException,
IOException, WorkerOutOfSpaceException {
if (mUnderFileSystemBlockStore.cleanup(sessionId, blockId)) {
commitBlock(sessionId, blockId);
throws BlockAlreadyExistsException, InvalidWorkerStateException, IOException,
WorkerOutOfSpaceException {
mUnderFileSystemBlockStore.closeReaderOrWriter(sessionId, blockId);
if (mBlockStore.getTempBlockMeta(sessionId, blockId) != null) {
try {
commitBlock(sessionId, blockId);
} catch (BlockDoesNotExistException e) {
// This can only happen if the session is expired. Ignore this exception if that happens.
LOG.warn("Block {} does not exist while being committed.", blockId);
}
}
mUnderFileSystemBlockStore.releaseAccess(sessionId, blockId);
}
Expand Down
Expand Up @@ -178,8 +178,7 @@ public BlockWriter getBlockWriter(long sessionId, long blockId)
// block lock here since no sharing
// TODO(bin): Handle the case where multiple writers compete for the same block.
try (LockResource r = new LockResource(mMetadataReadLock)) {
TempBlockMeta tempBlockMeta = mMetaManager.getTempBlockMeta(blockId);
return new LocalFileBlockWriter(tempBlockMeta.getPath());
TempBlockMeta tempBlockMeta = mMetaManager.getTempBlockMeta(blockId); return new LocalFileBlockWriter(tempBlockMeta.getPath());
}
}

Expand Down Expand Up @@ -234,6 +233,13 @@ public BlockMeta getBlockMeta(long sessionId, long blockId, long lockId)
}
}

@Override
public TempBlockMeta getTempBlockMeta(long sessionId, long blockId) {
try (LockResource r = new LockResource(mMetadataReadLock)) {
return mMetaManager.getTempBlockMetaOrNull(blockId);
}
}

@Override
public void commitBlock(long sessionId, long blockId) throws BlockAlreadyExistsException,
InvalidWorkerStateException, BlockDoesNotExistException, IOException {
Expand Down
Expand Up @@ -23,7 +23,9 @@
import alluxio.exception.WorkerOutOfSpaceException;
import alluxio.underfs.UnderFileSystem;
import alluxio.underfs.options.OpenOptions;
import alluxio.util.CommonUtils;
import alluxio.util.network.NetworkAddressUtils;
import alluxio.worker.block.io.BlockReader;
import alluxio.worker.block.io.LocalFileBlockWriter;
import alluxio.worker.block.meta.UnderFileSystemBlockMeta;

Expand All @@ -41,11 +43,11 @@
import javax.annotation.concurrent.NotThreadSafe;

/**
* This class implements a {@link BlockReaderWithCache} to read a block directly from UFS, and
* This class implements a {@link BlockReader} to read a block directly from UFS, and
* optionally cache the block to the Alluxio worker if the whole block it is read.
*/
@NotThreadSafe
public final class UnderFileSystemBlockReader implements BlockReaderWithCache {
public final class UnderFileSystemBlockReader implements BlockReader {
private static final Logger LOG = LoggerFactory.getLogger(UnderFileSystemBlockReader.class);

/** An object storing the mapping of tier aliases to ordinals. */
Expand All @@ -64,13 +66,8 @@ public final class UnderFileSystemBlockReader implements BlockReaderWithCache {
private InputStream mUnderFileSystemInputStream;
/** The block writer to write the block to Alluxio. */
private LocalFileBlockWriter mBlockWriter;
/**
* If set, the reader is closed and should not be used afterwards except isCommitPending
* method.
*/
/** If set, the reader is closed and should not be used afterwards. */
private boolean mClosed;
/** If set, this block is pending to be committed to Alluxio. */
private boolean mCommitPending;

/**
* The position of mUnderFileSystemInputStream (if not null) is blockStart + mInStreamPos.
Expand Down Expand Up @@ -242,8 +239,6 @@ public void close() throws IOException {
// This aborts the block if the block is not fully read.
updateBlockWriter(mBlockMeta.getBlockSize());

// We need to check whether the block is cached before closing the block writer.
boolean isBlockCached = isBlockCached();
Closer closer = Closer.create();
if (mBlockWriter != null) {
closer.register(mBlockWriter);
Expand All @@ -252,32 +247,16 @@ public void close() throws IOException {
closer.register(mUnderFileSystemInputStream);
}
closer.close();

if (isBlockCached) {
mCommitPending = true;
}
} finally {
mClosed = true;
}
}

@Override
public boolean isCommitPending() {
return mCommitPending;
}

@Override
public boolean isClosed() {
return mClosed;
}

/**
* @return true if the whole block is read and cached to the temporary block location
*/
private boolean isBlockCached() {
return mBlockWriter != null && mBlockWriter.getPosition() == mBlockMeta.getBlockSize();
}

/**
* Updates the UFS input stream given an offset to read.
*
Expand Down Expand Up @@ -305,22 +284,30 @@ private void updateUnderFileSystemInputStream(long offset) throws IOException {
*
* @param offset the read offset
*/
private void updateBlockWriter(long offset) {
private void updateBlockWriter(long offset) throws IOException {
try {
if (mBlockWriter != null && offset > mBlockWriter.getPosition()) {
mBlockWriter.close();
mBlockWriter = null;
mLocalBlockStore.abortBlock(mBlockMeta.getSessionId(), mBlockMeta.getBlockId());
}
} catch (BlockDoesNotExistException e) {
// This can only happen when the session is expired.
LOG.warn("Block {} does not exist when being aborted.", mBlockMeta.getBlockId());
} catch (BlockAlreadyExistsException | InvalidWorkerStateException | IOException e) {
// We cannot skip the exception here because we need to make sure that the user of this
// reader does not commit the block if it fails to abort the block.
throw CommonUtils.castToIOException(e);
}
try {
if (mBlockWriter == null && offset == 0 && !mNoCache) {
BlockStoreLocation loc = BlockStoreLocation.anyDirInTier(mStorageTierAssoc.getAlias(0));
String blockPath = mLocalBlockStore
.createBlock(mBlockMeta.getSessionId(), mBlockMeta.getBlockId(), loc,
mInitialBlockSize).getPath();
mBlockWriter = new LocalFileBlockWriter(blockPath);
}
} catch (IOException | BlockAlreadyExistsException | BlockDoesNotExistException
| InvalidWorkerStateException | WorkerOutOfSpaceException e) {
} catch (IOException | BlockAlreadyExistsException | WorkerOutOfSpaceException e) {
// This can happen when there are concurrent UFS readers who are all trying to cache to block.
LOG.debug(
"Failed to update block writer for UFS block [blockId: {}, ufsPath: {}, offset: {}]",
Expand Down
Expand Up @@ -130,23 +130,22 @@ public boolean acquireAccess(long sessionId, long blockId, OpenUfsBlockOptions o
*
* @param sessionId the session ID
* @param blockId the block ID
* @return true if block is to be committed into Local block store
* @throws IOException if it fails to clean up
*/
public boolean cleanup(long sessionId, long blockId) throws IOException {
public void closeReaderOrWriter(long sessionId, long blockId) throws IOException {
BlockInfo blockInfo;
mLock.lock();
try {
blockInfo = mBlocks.get(new Key(sessionId, blockId));
if (blockInfo == null) {
LOG.warn("Key (block ID: {}, session ID {}) is not found when cleaning up the UFS block.",
blockId, sessionId);
return false;
return;
}
} finally {
mLock.unlock();
}
return blockInfo.closeReaderOrWriter();
blockInfo.closeReaderOrWriter();
}

/**
Expand Down Expand Up @@ -200,7 +199,7 @@ public void cleanupSession(long sessionId) {
// Note that we don't need to explicitly call abortBlock to cleanup the temp block
// in Local block store because they will be cleanup by the session cleaner in the
// Local block store.
cleanup(sessionId, blockId);
closeReaderOrWriter(sessionId, blockId);
releaseAccess(sessionId, blockId);
} catch (Exception e) {
LOG.warn("Failed to cleanup UFS block {}, session {}.", blockId, sessionId);
Expand Down Expand Up @@ -234,7 +233,7 @@ public BlockReader getBlockReader(final long sessionId, long blockId, long offse
} finally {
mLock.unlock();
}
BlockReaderWithCache reader =
BlockReader reader =
UnderFileSystemBlockReader.create(blockInfo.getMeta(), offset, noCache, mLocalBlockStore);
blockInfo.setBlockReader(reader);
return reader;
Expand Down Expand Up @@ -319,7 +318,7 @@ private static class BlockInfo {
// A correct client implementation should never access the following reader/writer
// concurrently. But just to avoid crashing the server thread with runtime exception when
// the client is mis-behaving, we access them with locks acquired.
private BlockReaderWithCache mBlockReader;
private BlockReader mBlockReader;
private BlockWriter mBlockWriter;

/**
Expand Down Expand Up @@ -351,7 +350,7 @@ public synchronized BlockReader getBlockReader() {
/**
* @param blockReader the block reader to be set
*/
public synchronized void setBlockReader(BlockReaderWithCache blockReader) {
public synchronized void setBlockReader(BlockReader blockReader) {
mBlockReader = blockReader;
}

Expand All @@ -372,21 +371,17 @@ public synchronized void setBlockWriter(BlockWriter blockWriter) {
/**
* Closes the block reader or writer.
*
* @return true if the block is pending to be committed
* @throws IOException if it fails to close block reader or writer
*/
public synchronized boolean closeReaderOrWriter() throws IOException {
boolean commitPending = false;
public synchronized void closeReaderOrWriter() throws IOException {
if (mBlockReader != null) {
mBlockReader.close();
commitPending = mBlockReader.isCommitPending();
mBlockReader = null;
}
if (mBlockWriter != null) {
mBlockWriter.close();
mBlockWriter = null;
}
return commitPending;
}
}
}
Expand Up @@ -253,14 +253,9 @@ public BlockMeta getBlockMeta(long blockId) throws BlockDoesNotExistException {
*
* @param blockId the block id
* @return {@link TempBlockMeta} of the given block or null
* @throws BlockDoesNotExistException if no temp block is found
*/
public TempBlockMeta getTempBlockMeta(long blockId) throws BlockDoesNotExistException {
TempBlockMeta tempBlockMeta = mBlockIdToTempBlockMap.get(blockId);
if (tempBlockMeta == null) {
throw new BlockDoesNotExistException(ExceptionMessage.TEMP_BLOCK_META_NOT_FOUND, blockId);
}
return tempBlockMeta;
public TempBlockMeta getTempBlockMeta(long blockId) {
return mBlockIdToTempBlockMap.get(blockId);
}

/**
Expand Down
Expand Up @@ -90,7 +90,6 @@ public void readFullBlock() throws Exception {
.equalIncreasingByteBuffer((int) TEST_BLOCK_SIZE, (int) TEST_BLOCK_SIZE, buffer));

mReader.close();
Assert.assertTrue(mReader.isCommitPending());
}

@Test
Expand All @@ -103,7 +102,6 @@ public void readPartialBlock() throws Exception {
.equalIncreasingByteBuffer((int) TEST_BLOCK_SIZE, (int) TEST_BLOCK_SIZE - 1, buffer));

mReader.close();
Assert.assertFalse(mReader.isCommitPending());
}

@Test
Expand All @@ -116,7 +114,6 @@ public void offset() throws Exception {
.equalIncreasingByteBuffer((int) TEST_BLOCK_SIZE + 2, (int) TEST_BLOCK_SIZE - 2, buffer));

mReader.close();
Assert.assertFalse(mReader.isCommitPending());
}

@Test
Expand All @@ -136,7 +133,6 @@ public void readOverlap() throws Exception {
.equalIncreasingByteBuffer((int) TEST_BLOCK_SIZE + 3, (int) TEST_BLOCK_SIZE - 3, buffer));

mReader.close();
Assert.assertTrue(mReader.isCommitPending());
}

@Test
Expand All @@ -147,7 +143,6 @@ public void noCache() throws Exception {
Assert.assertTrue(BufferUtils
.equalIncreasingByteBuffer((int) TEST_BLOCK_SIZE, (int) TEST_BLOCK_SIZE, buffer));
mReader.close();
Assert.assertFalse(mReader.isCommitPending());
}

@Test
Expand All @@ -163,7 +158,6 @@ public void transferFullBlock() throws Exception {
.equalIncreasingByteBuffer((int) TEST_BLOCK_SIZE, (int) TEST_BLOCK_SIZE,
buf.nioBuffer()));
mReader.close();
Assert.assertTrue(mReader.isCommitPending());
} finally {
buf.release();
}
Expand All @@ -182,7 +176,6 @@ public void transferPartialBlock() throws Exception {
.equalIncreasingByteBuffer((int) TEST_BLOCK_SIZE, (int) TEST_BLOCK_SIZE / 2,
buf.nioBuffer()));
mReader.close();
Assert.assertFalse(mReader.isCommitPending());
} finally {
buf.release();
}
Expand Down

0 comments on commit 0036457

Please sign in to comment.