Skip to content

Commit

Permalink
Fixing a concurrency bug and enabling IS in FindBugs.
Browse files Browse the repository at this point in the history
  • Loading branch information
jsimsa committed Jan 12, 2016
1 parent 799b08b commit be951de
Show file tree
Hide file tree
Showing 13 changed files with 131 additions and 61 deletions.
2 changes: 1 addition & 1 deletion build/findbugs/findbugs-exclude.xml
Expand Up @@ -39,7 +39,7 @@
UC: Useless non-empty void method
UPM: Private method is never called
-->
<Bug code="DC,DE,DLS,Dm,DP,EI,Eq,IS,JLM,LI,Nm,NP,PT,PZLA,RCN,REC,RR,RV,SBSC,Se,SIC,UC,UPM"/>
<Bug code="DC,DE,DLS,Dm,DP,EI,Eq,JLM,LI,Nm,NP,PT,PZLA,RCN,REC,RR,RV,SBSC,Se,SIC,UC,UPM"/>
</Match>

<Match>
Expand Down
Expand Up @@ -35,6 +35,12 @@
* A shared context in each client JVM for common block master client functionality such as a pool
* of master clients and a pool of local worker clients. Any remote clients will be created and
* destroyed on a per use basis. This class is thread safe.
*
* NOTE: The context maintains a pool of block master clients that is already thread-safe.
* Synchronizing {@link BlockStoreContext} methods could lead to deadlock: thread A attempts to
* acquire a client when there are no clients left in the pool and blocks holding a lock on the
* {@link BlockStoreContext}, when thread B attempts to release a client it owns, it is unable to do
* so, because thread A holds the lock on {@link BlockStoreContext}.
*/
public enum BlockStoreContext {
INSTANCE;
Expand Down Expand Up @@ -107,11 +113,6 @@ public BlockMasterClient acquireMasterClient() {
/**
* Releases a block master client into the block master client pool.
*
* NOTE: the client pool is already thread-safe. Synchronizing on {@link BlockStoreContext} will
* lead to deadlock: thread A acquired a client and awaits for {@link BlockStoreContext} to
* release the client, while thread B holds the lock of {@link BlockStoreContext} but waits for
* available clients.
*
* @param masterClient a block master client to release
*/
public void releaseMasterClient(BlockMasterClient masterClient) {
Expand All @@ -125,7 +126,7 @@ public void releaseMasterClient(BlockMasterClient masterClient) {
*
* @return a {@link BlockWorkerClient} to a worker in the Tachyon system
*/
public synchronized BlockWorkerClient acquireWorkerClient() {
public BlockWorkerClient acquireWorkerClient() {
BlockWorkerClient client = acquireLocalWorkerClient();
if (client == null) {
// Get a worker client for any worker in the system.
Expand All @@ -142,7 +143,7 @@ public synchronized BlockWorkerClient acquireWorkerClient() {
* @return a {@link BlockWorkerClient} connected to the worker with the given hostname
* @throws IOException if no Tachyon worker is available for the given hostname
*/
public synchronized BlockWorkerClient acquireWorkerClient(String hostname) throws IOException {
public BlockWorkerClient acquireWorkerClient(String hostname) throws IOException {
BlockWorkerClient client;
if (hostname.equals(NetworkAddressUtils.getLocalHostName(ClientContext.getConf()))) {
client = acquireLocalWorkerClient();
Expand All @@ -162,7 +163,7 @@ public synchronized BlockWorkerClient acquireWorkerClient(String hostname) throw
* @return a {@link BlockWorkerClient} connected to the worker with the given hostname
* @throws IOException if no Tachyon worker is available for the given hostname
*/
public synchronized BlockWorkerClient acquireWorkerClient(NetAddress address)
public BlockWorkerClient acquireWorkerClient(NetAddress address)
throws IOException {
BlockWorkerClient client;
if (address == null) {
Expand All @@ -185,11 +186,10 @@ public synchronized BlockWorkerClient acquireWorkerClient(NetAddress address)
*
* @return a {@link BlockWorkerClient} to a worker in the Tachyon system or null if failed
*/
public synchronized BlockWorkerClient acquireLocalWorkerClient() {
public BlockWorkerClient acquireLocalWorkerClient() {
if (!mLocalBlockWorkerClientPoolInitialized) {
initializeLocalBlockWorkerClientPool();
}

if (mLocalBlockWorkerClientPool == null) {
return null;
}
Expand All @@ -204,7 +204,7 @@ public synchronized BlockWorkerClient acquireLocalWorkerClient() {
* @param hostname the worker hostname to connect to, empty string for any worker
* @return a worker client with a connection to the specified hostname
*/
private synchronized BlockWorkerClient acquireRemoteWorkerClient(String hostname) {
private BlockWorkerClient acquireRemoteWorkerClient(String hostname) {
NetAddress workerAddress = getWorkerAddress(hostname);
return acquireRemoteWorkerClient(workerAddress);
}
Expand All @@ -217,7 +217,7 @@ private synchronized BlockWorkerClient acquireRemoteWorkerClient(String hostname
* @param address the address of the worker
* @return a worker client with a connection to the specified hostname
*/
private synchronized BlockWorkerClient acquireRemoteWorkerClient(NetAddress address) {
private BlockWorkerClient acquireRemoteWorkerClient(NetAddress address) {
// If we couldn't find a worker, crash.
if (address == null) {
// TODO(calvin): Better exception usage.
Expand All @@ -235,11 +235,6 @@ private synchronized BlockWorkerClient acquireRemoteWorkerClient(NetAddress addr
* Releases the {@link BlockWorkerClient} back to the client pool, or destroys it if it was a
* remote client.
*
* NOTE: the client pool is already thread-safe. Synchronizing on {@link BlockStoreContext} will
* lead to deadlock: thread A acquired a client and awaits for {@link BlockStoreContext} to
* release the client, while thread B holds the lock of {@link BlockStoreContext} but waits for
* available clients.
*
* @param blockWorkerClient the worker client to release, the client should not be accessed after
* this method is called
*/
Expand All @@ -264,7 +259,7 @@ public void releaseWorkerClient(BlockWorkerClient blockWorkerClient) {
// TODO(calvin): Handle the case when the local worker starts up after the client or shuts down
// before the client does. Also, when this is fixed, fix the TODO(cc) in
// TachyonBlockStore#getInStream too.
public synchronized boolean hasLocalWorker() {
public boolean hasLocalWorker() {
if (!mLocalBlockWorkerClientPoolInitialized) {
initializeLocalBlockWorkerClientPool();
}
Expand All @@ -275,7 +270,7 @@ public synchronized boolean hasLocalWorker() {
* Re-initializes the {@link BlockStoreContext}. This method should only be used in
* {@link ClientContext}.
*/
public synchronized void reset() {
public void reset() {
if (mBlockMasterClientPool != null) {
mBlockMasterClientPool.close();
}
Expand Down
Expand Up @@ -22,11 +22,19 @@
* A shared context in each client JVM for common file master client functionality such as a pool of
* master clients. Any remote clients will be created and destroyed on a per use basis. This class
* is thread safe.
*
*
* NOTE: The context maintains a pool of file system master clients that is already thread-safe.
* Synchronizing {@link FileSystemContext} methods could lead to deadlock: thread A attempts to
* acquire a client when there are no clients left in the pool and blocks holding a lock on the
* {@link FileSystemContext}, when thread B attempts to release a client it owns it is unable to do
* so, because thread A holds the lock on {@link FileSystemContext}.
*/
public enum FileSystemContext {
INSTANCE;

private FileSystemMasterClientPool mFileSystemMasterClientPool;

private final TachyonBlockStore mTachyonBlockStore;

/**
Expand All @@ -50,11 +58,6 @@ public FileSystemMasterClient acquireMasterClient() {
/**
* Releases a block master client into the block master client pool.
*
* NOTE: the client pool is already thread-safe. Synchronizing on {@link FileSystemContext} will
* lead to deadlock: thread A acquired a client and awaits for {@link FileSystemContext} to
* release the client, while thread B holds the lock of {@link FileSystemContext} but waits for
* available clients.
*
* @param masterClient a block master client to release
*/
public void releaseMasterClient(FileSystemMasterClient masterClient) {
Expand All @@ -64,15 +67,15 @@ public void releaseMasterClient(FileSystemMasterClient masterClient) {
/**
* @return the Tachyon block store
*/
public synchronized TachyonBlockStore getTachyonBlockStore() {
public TachyonBlockStore getTachyonBlockStore() {
return mTachyonBlockStore;
}

/**
* Re-initializes the Block Store context. This method should only be used in
* {@link ClientContext}.
*/
public synchronized void reset() {
public void reset() {
mFileSystemMasterClientPool.close();
mFileSystemMasterClientPool =
new FileSystemMasterClientPool(ClientContext.getMasterAddress());
Expand Down
Expand Up @@ -161,22 +161,22 @@ public Void call() throws TachyonTException, TException {
}

@Override
protected void beforeDisconnect() {
protected synchronized void beforeDisconnect() {
// Heartbeat to send the client metrics.
if (mHeartbeatExecutor != null) {
mHeartbeatExecutor.heartbeat();
}
}

@Override
protected void afterDisconnect() {
protected synchronized void afterDisconnect() {
if (mHeartbeat != null) {
mHeartbeat.cancel(true);
}
}

@Override
protected TachyonService.Client getClient() {
protected synchronized TachyonService.Client getClient() {
return mClient;
}

Expand Down Expand Up @@ -402,6 +402,7 @@ public Boolean call() throws TException {
* @throws IOException if an I/O error occurs
*/
public synchronized void sessionHeartbeat() throws ConnectionFailedException, IOException {
System.out.println("this is wrong");
retryRPC(new RpcCallable<Void>() {
@Override
public Void call() throws TException {
Expand Down
Expand Up @@ -19,15 +19,23 @@

import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;

import com.google.common.collect.Lists;

import tachyon.Constants;
import tachyon.client.ClientContext;
import tachyon.client.worker.BlockWorkerClient;

/**
* Tests {@link BlockStoreContext}.
*/
@RunWith(PowerMockRunner.class)
@PrepareForTest({BlockStoreContext.class, BlockWorkerClient.class, BlockWorkerClientPool.class})
public final class BlockStoreContextTest {
/**
* This test ensures acquiring all the available BlockStore master clients blocks further
Expand All @@ -38,7 +46,7 @@ public final class BlockStoreContextTest {
* @throws Exception if an unexpected error occurs during the test
*/
@Test(timeout = 10000)
public void acquireAtMaxLimitTest() throws Exception {
public void acquireMasterLimitTest() throws Exception {
final List<BlockMasterClient> clients = Lists.newArrayList();

// Acquire all the clients
Expand All @@ -48,7 +56,7 @@ public void acquireAtMaxLimitTest() throws Exception {
}

// Spawn another thread to acquire a master client
Thread acquireThread = new Thread(new AcquireClient());
Thread acquireThread = new Thread(new AcquireMasterClient());
acquireThread.start();

// Wait for the spawned thread to complete. If it is able to acquire a master client before
Expand All @@ -75,11 +83,77 @@ public void acquireAtMaxLimitTest() throws Exception {
}
}

class AcquireClient implements Runnable {
class AcquireMasterClient implements Runnable {
@Override
public void run() {
BlockMasterClient client = BlockStoreContext.INSTANCE.acquireMasterClient();
BlockStoreContext.INSTANCE.releaseMasterClient(client);
}
}

/**
* This test ensures acquiring all the available BlockStore worker clients blocks further requests
* for clients. It also ensures clients are available for reuse after they are released by the
* previous owners. If the test takes longer than 10 seconds, a deadlock most likely occurred
* preventing the release of the worker clients.
*
* @throws Exception if an unexpected error occurs during the test
*/
@Test(timeout = 10000)
public void acquireWorkerLimitTest() throws Exception {
// Use mocks for the block worker client to prevent it from trying to invoke the session
// heartbeat RPC.
BlockWorkerClient mock = PowerMockito.mock(BlockWorkerClient.class);
PowerMockito.doNothing().when(mock).sessionHeartbeat();
PowerMockito.doReturn(true).when(mock).isLocal();
PowerMockito
.whenNew(BlockWorkerClient.class)
.withArguments(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.anyLong(),
Mockito.anyBoolean(), Mockito.any()).thenReturn(mock);

final List<BlockWorkerClient> clients = Lists.newArrayList();

// Acquire all the clients
for (int i = 0; i < ClientContext.getConf()
.getInt(Constants.USER_BLOCK_WORKER_CLIENT_THREADS); i ++) {
clients.add(BlockStoreContext.INSTANCE.acquireWorkerClient());
}

// Spawn another thread to acquire a worker client
Thread acquireThread = new Thread(new AcquireWorkerClient());
acquireThread.start();

// Wait for the spawned thread to complete. If it is able to acquire a worker client before
// the defined timeout, fail
long timeoutMs = Constants.SECOND_MS / 2;
long start = System.currentTimeMillis();
acquireThread.join(timeoutMs);
if (System.currentTimeMillis() - start < timeoutMs) {
Assert.fail("Acquired a worker client when the client pool was full.");
}

// Release all the clients
// Set the RPC number of retries to -1 to prevent the worker client from trying to send a
// heartbeat message when it is released.
for (BlockWorkerClient client : clients) {
BlockStoreContext.INSTANCE.releaseWorkerClient(client);
}

// Wait for the spawned thread to complete. If it is unable to acquire a worker client before
// the defined timeout, fail.
timeoutMs = 5 * Constants.SECOND_MS;
start = System.currentTimeMillis();
acquireThread.join(timeoutMs);
if (System.currentTimeMillis() - start >= timeoutMs) {
Assert.fail("Failed to acquire a worker client within " + timeoutMs + "ms. Deadlock?");
}
}

class AcquireWorkerClient implements Runnable {
@Override
public void run() {
BlockWorkerClient client = BlockStoreContext.INSTANCE.acquireWorkerClient();
BlockStoreContext.INSTANCE.releaseWorkerClient(client);
}
}
}
Expand Up @@ -76,8 +76,11 @@ public void acquireAtMaxLimitTest() throws Exception {
class AcquireClient implements Runnable {
@Override
public void run() {
System.out.println("1");
FileSystemMasterClient client = FileSystemContext.INSTANCE.acquireMasterClient();
System.out.println("2");
FileSystemContext.INSTANCE.releaseMasterClient(client);
System.out.println("3");
}
}
}
5 changes: 3 additions & 2 deletions common/src/main/java/tachyon/ClientBase.java
Expand Up @@ -287,7 +287,8 @@ protected interface RpcCallableThrowsTachyonTException<V> {
* been called before calling this method or during the retry
* @throws ConnectionFailedException if network connection failed
*/
protected <V> V retryRPC(RpcCallable<V> rpc) throws IOException, ConnectionFailedException {
protected synchronized <V> V retryRPC(RpcCallable<V> rpc) throws IOException,
ConnectionFailedException {
int retry = 0;
while (!mClosed && (retry ++) <= RPC_MAX_NUM_RETRY) {
connect();
Expand Down Expand Up @@ -315,7 +316,7 @@ protected <V> V retryRPC(RpcCallable<V> rpc) throws IOException, ConnectionFaile
* @throws IOException when retries exceeds {@link #RPC_MAX_NUM_RETRY} or {@link #close()} has
* been called before calling this method or during the retry
*/
protected <V> V retryRPC(RpcCallableThrowsTachyonTException<V> rpc)
protected synchronized <V> V retryRPC(RpcCallableThrowsTachyonTException<V> rpc)
throws TachyonException, IOException {
int retry = 0;
while (!mClosed && (retry ++) <= RPC_MAX_NUM_RETRY) {
Expand Down
Expand Up @@ -97,7 +97,7 @@ public MasterWorkerInfo(long id, WorkerNetAddress address) {
* @param blocks set of block ids on this worker
* @return A Set of blocks removed (or lost) from this worker
*/
public Set<Long> register(final StorageTierAssoc globalStorageTierAssoc,
public synchronized Set<Long> register(final StorageTierAssoc globalStorageTierAssoc,
final List<String> storageTierAliases, final Map<String, Long> totalBytesOnTiers,
final Map<String, Long> usedBytesOnTiers, final Set<Long> blocks) {
// If the storage aliases do not have strictly increasing ordinal value based on the total
Expand Down Expand Up @@ -190,7 +190,7 @@ public synchronized WorkerInfo generateClientWorkerInfo() {
/**
* @return the worker's address
*/
public WorkerNetAddress getAddress() {
public synchronized WorkerNetAddress getAddress() {
return mWorkerAddress;
}

Expand All @@ -211,7 +211,7 @@ public synchronized Set<Long> getBlocks() {
/**
* @return the capacity of the worker in bytes
*/
public long getCapacityBytes() {
public synchronized long getCapacityBytes() {
return mCapacityBytes;
}

Expand Down Expand Up @@ -267,7 +267,7 @@ public synchronized Map<String, Long> getUsedBytesOnTiers() {
/**
* @return the start time in milliseconds
*/
public long getStartTime() {
public synchronized long getStartTime() {
return mStartTimeMs;
}

Expand Down

0 comments on commit be951de

Please sign in to comment.