Skip to content

Commit

Permalink
Add UfsBlockStreTest
Browse files Browse the repository at this point in the history
  • Loading branch information
peisun1115 committed Mar 10, 2017
1 parent 8f521cf commit 7b2a233
Show file tree
Hide file tree
Showing 8 changed files with 131 additions and 48 deletions.
Expand Up @@ -364,14 +364,14 @@ void requestSpace(long sessionId, long blockId, long additionalBytes)
/** /**
* Opens a UFS block. * Opens a UFS block.
* *
* @param ufsBlockMeta the UFS block meta data * @param ufsBlockMetaConst the UFS block constant meta data
* @param maxUfsReadConcurrency the maximum UFS block read concurrency * @param maxUfsReadConcurrency the maximum UFS block read concurrency
* @throws BlockAlreadyExistsException if the UFS block already exists in the * @throws BlockAlreadyExistsException if the UFS block already exists in the
* {@link UfsBlockStore} * {@link UfsBlockStore}
* @throws UfsBlockAccessTokenUnavailableException if there are too many clients accessing the * @throws UfsBlockAccessTokenUnavailableException if there are too many clients accessing the
* UFS block * UFS block
*/ */
void openUfsBlock(UfsBlockMeta ufsBlockMeta, int maxUfsReadConcurrency) void openUfsBlock(UfsBlockMeta.ConstMeta ufsBlockMetaConst, int maxUfsReadConcurrency)
throws BlockAlreadyExistsException, UfsBlockAccessTokenUnavailableException; throws BlockAlreadyExistsException, UfsBlockAccessTokenUnavailableException;


/** /**
Expand Down
Expand Up @@ -162,8 +162,7 @@ public LockBlockResult call() throws AlluxioException {
if (options.isSetUfsPath() && !options.getUfsPath().isEmpty()) { if (options.isSetUfsPath() && !options.getUfsPath().isEmpty()) {
// When the block does not exist in Alluxio but exists in UFS, try to open the UFS // When the block does not exist in Alluxio but exists in UFS, try to open the UFS
// block. // block.
mWorker.openUfsBlock( mWorker.openUfsBlock(new UfsBlockMeta.ConstMeta(sessionId, blockId, options),
UfsBlockMeta.fromLockBlockOptions(sessionId, blockId, options),
options.getMaxUfsReadConcurrency()); options.getMaxUfsReadConcurrency());
return new LockBlockResult(lockId, ""); return new LockBlockResult(lockId, "");
} else { } else {
Expand Down
Expand Up @@ -435,9 +435,9 @@ public FileInfo getFileInfo(long fileId) throws IOException {
} }


@Override @Override
public void openUfsBlock(UfsBlockMeta ufsBlockMeta, int maxConcurrency) public void openUfsBlock(UfsBlockMeta.ConstMeta ufsBlockMetaConst, int maxConcurrency)
throws BlockAlreadyExistsException, UfsBlockAccessTokenUnavailableException { throws BlockAlreadyExistsException, UfsBlockAccessTokenUnavailableException {
mUfsBlockStore.acquireAccess(ufsBlockMeta, maxConcurrency); mUfsBlockStore.acquireAccess(ufsBlockMetaConst, maxConcurrency);
} }


@Override @Override
Expand Down
Expand Up @@ -67,16 +67,17 @@ public UfsBlockStore(BlockStore alluxioBlockStore) {


/** /**
* Acquires access for a UFS block given a {@link UfsBlockMeta} and the limit on the maximum * Acquires access for a UFS block given a {@link UfsBlockMeta} and the limit on the maximum
* concurrent accessors on the block. * concurrency on the block.
* *
* @param blockMeta the block meta * @param blockMetaConst the constant block meta
* @param maxConcurrency the maximum concurrency * @param maxConcurrency the maximum concurrency
* @throws BlockAlreadyExistsException if the block already exists for a session ID * @throws BlockAlreadyExistsException if the block already exists for a session ID
* @throws UfsBlockAccessTokenUnavailableException if there are too many concurrent sessions * @throws UfsBlockAccessTokenUnavailableException if there are too many concurrent sessions
* accessing the block * accessing the block
*/ */
public void acquireAccess(UfsBlockMeta blockMeta, int maxConcurrency) throws public void acquireAccess(UfsBlockMeta.ConstMeta blockMetaConst, int maxConcurrency) throws
BlockAlreadyExistsException, UfsBlockAccessTokenUnavailableException { BlockAlreadyExistsException, UfsBlockAccessTokenUnavailableException {
UfsBlockMeta blockMeta = new UfsBlockMeta(blockMetaConst);
long sessionId = blockMeta.getSessionId(); long sessionId = blockMeta.getSessionId();
long blockId = blockMeta.getBlockId(); long blockId = blockMeta.getBlockId();
mLock.lock(); mLock.lock();
Expand Down
Expand Up @@ -21,80 +21,84 @@
* This class represents the metadata of a UFS block. * This class represents the metadata of a UFS block.
*/ */
public final class UfsBlockMeta { public final class UfsBlockMeta {
private final long mSessionId; private ConstMeta mConstMeta;
private final long mBlockId;
private final String mUfsPath;
/** The offset in bytes of the first byte of the block in its corresponding UFS file. */
private final long mOffset;
/** The block size in bytes. */
private final long mBlockSize;


/** The set of session IDs to be committed. */ /** The set of session IDs to be committed. */
private boolean mCommitPending; private boolean mCommitPending;
private BlockReader mBlockReader; private BlockReader mBlockReader;
private BlockWriter mBlockWriter; private BlockWriter mBlockWriter;


/** /**
* Creates {@link UfsBlockMeta} from a {@link LockBlockTOptions}. * The constant metadata of this UFS block.
*
* @param sessionId the session ID
* @param blockId the block ID
* @param options the thrift lock options
* @return the {@link UfsBlockMeta}
*/ */
public static UfsBlockMeta fromLockBlockOptions(long sessionId, long blockId, public static final class ConstMeta {
LockBlockTOptions options) { public final long mSessionId;
return new UfsBlockMeta(sessionId, blockId, options); public final long mBlockId;
public final String mUfsPath;
/** The offset in bytes of the first byte of the block in its corresponding UFS file. */
public final long mOffset;
/** The block size in bytes. */
public final long mBlockSize;

/**
* Creates {@link UfsBlockMeta.ConstMeta} from a {@link LockBlockTOptions}.
*
* @param sessionId the session ID
* @param blockId the block ID
* @param options the thrift lock options
* @return the {@link UfsBlockMeta}
*/
public ConstMeta(long sessionId, long blockId, LockBlockTOptions options) {
mSessionId = sessionId;
mBlockId = blockId;
mUfsPath = options.getUfsPath();
mOffset = options.getOffset();
mBlockSize = options.getBlockSize();
}
} }


/** /**
* Creates a {@link UfsBlockMeta}. * Creates a {@link UfsBlockMeta}.
* *
* @param sessionId the session ID * @param meta the constant metadata of this UFS block
* @param blockId the block ID
* @param options the thrift lock block options
*/ */
private UfsBlockMeta(long sessionId, long blockId, LockBlockTOptions options) { public UfsBlockMeta(ConstMeta meta) {
mSessionId = sessionId; mConstMeta = meta;
mBlockId = blockId;
mUfsPath = options.getUfsPath();
mOffset = options.getOffset();
mBlockSize = options.getBlockSize();
} }


/** /**
* @return the session ID * @return the session ID
*/ */
public long getSessionId() { public long getSessionId() {
return mSessionId; return mConstMeta.mSessionId;
} }


/** /**
* @return the block ID * @return the block ID
*/ */
public long getBlockId() { public long getBlockId() {
return mBlockId; return mConstMeta.mBlockId;
} }


/** /**
* @return the UFS path * @return the UFS path
*/ */
public String getUfsPath() { public String getUfsPath() {
return mUfsPath; return mConstMeta.mUfsPath;
} }


/** /**
* @return the offset of the block in the UFS file * @return the offset of the block in the UFS file
*/ */
public long getOffset() { public long getOffset() {
return mOffset; return mConstMeta.mOffset;
} }


/** /**
* @return the block size in bytes * @return the block size in bytes
*/ */
public long getBlockSize() { public long getBlockSize() {
return mBlockSize; return mConstMeta.mBlockSize;
} }


/** /**
Expand Down
Expand Up @@ -115,12 +115,11 @@ public void openUfsBlock() throws Exception {
long blockId = mRandom.nextLong(); long blockId = mRandom.nextLong();
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
long sessionId = i + 1; long sessionId = i + 1;
UfsBlockMeta meta = UfsBlockMeta.ConstMeta meta =
UfsBlockMeta.fromLockBlockOptions(sessionId, blockId, new LockBlockTOptions()); new UfsBlockMeta.ConstMeta(sessionId, blockId, new LockBlockTOptions());
mBlockWorker.openUfsBlock(meta, 10); mBlockWorker.openUfsBlock(meta, 10);
} }
UfsBlockMeta meta = UfsBlockMeta.ConstMeta meta = new UfsBlockMeta.ConstMeta(12, blockId, new LockBlockTOptions());
UfsBlockMeta.fromLockBlockOptions(12, blockId, new LockBlockTOptions());
try { try {
mBlockWorker.openUfsBlock(meta, 10); mBlockWorker.openUfsBlock(meta, 10);
Assert.fail(); Assert.fail();
Expand All @@ -134,12 +133,12 @@ public void closeUfsBlock() throws Exception {
long blockId = mRandom.nextLong(); long blockId = mRandom.nextLong();
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
long sessionId = i + 1; long sessionId = i + 1;
UfsBlockMeta meta = UfsBlockMeta.ConstMeta meta =
UfsBlockMeta.fromLockBlockOptions(sessionId, blockId, new LockBlockTOptions()); new UfsBlockMeta.ConstMeta(sessionId, blockId, new LockBlockTOptions());
mBlockWorker.openUfsBlock(meta, 10); mBlockWorker.openUfsBlock(meta, 10);
mBlockWorker.closeUfsBlock(sessionId, blockId); mBlockWorker.closeUfsBlock(sessionId, blockId);
} }
UfsBlockMeta meta = UfsBlockMeta.fromLockBlockOptions(12, blockId, new LockBlockTOptions()); UfsBlockMeta.ConstMeta meta = new UfsBlockMeta.ConstMeta(12, blockId, new LockBlockTOptions());
mBlockWorker.openUfsBlock(meta, 10); mBlockWorker.openUfsBlock(meta, 10);
} }


Expand Down
Expand Up @@ -69,7 +69,7 @@ public void before() throws Exception {
options.setOffset(TEST_BLOCK_SIZE); options.setOffset(TEST_BLOCK_SIZE);
options.setUfsPath(testFilePath); options.setUfsPath(testFilePath);


mUfsBlockMeta = UfsBlockMeta.fromLockBlockOptions(SESSION_ID, BLOCK_ID, options); mUfsBlockMeta = new UfsBlockMeta(new UfsBlockMeta.ConstMeta(SESSION_ID, BLOCK_ID, options));
} }


@Test @Test
Expand Down Expand Up @@ -156,7 +156,7 @@ public void transferFullBlock() throws Exception {
} }


@Test @Test
public void transferParitalBlock() throws Exception { public void transferPartialBlock() throws Exception {
mReader = UfsBlockReader.create(mUfsBlockMeta, 0, false, mAlluxioBlockStore); mReader = UfsBlockReader.create(mUfsBlockMeta, 0, false, mAlluxioBlockStore);
ByteBuf buf = ByteBuf buf =
PooledByteBufAllocator.DEFAULT.buffer((int) TEST_BLOCK_SIZE / 2, (int) TEST_BLOCK_SIZE / 2); PooledByteBufAllocator.DEFAULT.buffer((int) TEST_BLOCK_SIZE / 2, (int) TEST_BLOCK_SIZE / 2);
Expand Down
@@ -0,0 +1,80 @@
/*
* The Alluxio Open Foundation licenses this work under the Apache License, version 2.0
* (the "License"). You may not use this work except in compliance with the License, which is
* available at www.apache.org/licenses/LICENSE-2.0
*
* This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied, as more fully set forth in the License.
*
* See the NOTICE file distributed with this work for information regarding copyright ownership.
*/

package alluxio.worker.block;

import alluxio.exception.UfsBlockAccessTokenUnavailableException;
import alluxio.thrift.LockBlockTOptions;
import alluxio.worker.block.meta.UfsBlockMeta;

import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.mockito.Mockito;

public final class UfsBlockStoreTest {
private static final long TEST_BLOCK_SIZE = 1024;
private static final long BLOCK_ID = 2;

private BlockStore mAlluxioBlockStore;
private LockBlockTOptions mLockBlockTOptions;

/** Rule to create a new temporary folder during each test. */
@Rule
public TemporaryFolder mFolder = new TemporaryFolder();

@Before
public void before() throws Exception {
mAlluxioBlockStore = Mockito.mock(BlockStore.class);

LockBlockTOptions options = new LockBlockTOptions();
options.setMaxUfsReadConcurrency(10);
options.setBlockSize(TEST_BLOCK_SIZE);
options.setOffset(TEST_BLOCK_SIZE);
options.setUfsPath(mFolder.newFile().getAbsolutePath());
mLockBlockTOptions = options;
}

@Test
public void acquireAccess() throws Exception {
UfsBlockStore blockStore = new UfsBlockStore(mAlluxioBlockStore);
for (int i = 0; i < 5; i++) {
UfsBlockMeta.ConstMeta constMeta =
new UfsBlockMeta.ConstMeta(i + 1, BLOCK_ID, mLockBlockTOptions);
blockStore.acquireAccess(constMeta, 5);
}

try {
UfsBlockMeta.ConstMeta constMeta =
new UfsBlockMeta.ConstMeta(6, BLOCK_ID, mLockBlockTOptions);
blockStore.acquireAccess(constMeta, 5);
Assert.fail();
} catch (UfsBlockAccessTokenUnavailableException e) {
// expected
}
}

@Test
public void releaseAccess() throws Exception {
UfsBlockStore blockStore = new UfsBlockStore(mAlluxioBlockStore);
for (int i = 0; i < 5; i++) {
UfsBlockMeta.ConstMeta constMeta =
new UfsBlockMeta.ConstMeta(i + 1, BLOCK_ID, mLockBlockTOptions);
blockStore.acquireAccess(constMeta, 5);
blockStore.releaseAccess(constMeta.mSessionId, constMeta.mBlockId);
}

UfsBlockMeta.ConstMeta constMeta = new UfsBlockMeta.ConstMeta(6, BLOCK_ID, mLockBlockTOptions);
blockStore.acquireAccess(constMeta, 5);
}
}

0 comments on commit 7b2a233

Please sign in to comment.