diff --git a/build/findbugs/findbugs-exclude.xml b/build/findbugs/findbugs-exclude.xml index e9cca8fe62c2..4c539602dbb3 100644 --- a/build/findbugs/findbugs-exclude.xml +++ b/build/findbugs/findbugs-exclude.xml @@ -39,7 +39,7 @@ UC: Useless non-empty void method UPM: Private method is never called --> - + diff --git a/clients/unshaded/src/main/java/tachyon/client/block/BlockStoreContext.java b/clients/unshaded/src/main/java/tachyon/client/block/BlockStoreContext.java index e71ec3b2f124..7a653b5c2295 100644 --- a/clients/unshaded/src/main/java/tachyon/client/block/BlockStoreContext.java +++ b/clients/unshaded/src/main/java/tachyon/client/block/BlockStoreContext.java @@ -35,6 +35,12 @@ * A shared context in each client JVM for common block master client functionality such as a pool * of master clients and a pool of local worker clients. Any remote clients will be created and * destroyed on a per use basis. This class is thread safe. + * + * NOTE: The context maintains a pool of block master clients that is already thread-safe. + * Synchronizing {@link BlockStoreContext} methods could lead to deadlock: thread A attempts to + * acquire a client when there are no clients left in the pool and blocks holding a lock on the + * {@link BlockStoreContext}, when thread B attempts to release a client it owns, it is unable to do + * so, because thread A holds the lock on {@link BlockStoreContext}. */ public enum BlockStoreContext { INSTANCE; @@ -107,11 +113,6 @@ public BlockMasterClient acquireMasterClient() { /** * Releases a block master client into the block master client pool. * - * NOTE: the client pool is already thread-safe. Synchronizing on {@link BlockStoreContext} will - * lead to deadlock: thread A acquired a client and awaits for {@link BlockStoreContext} to - * release the client, while thread B holds the lock of {@link BlockStoreContext} but waits for - * available clients. - * * @param masterClient a block master client to release */ public void releaseMasterClient(BlockMasterClient masterClient) { @@ -125,7 +126,7 @@ public void releaseMasterClient(BlockMasterClient masterClient) { * * @return a {@link BlockWorkerClient} to a worker in the Tachyon system */ - public synchronized BlockWorkerClient acquireWorkerClient() { + public BlockWorkerClient acquireWorkerClient() { BlockWorkerClient client = acquireLocalWorkerClient(); if (client == null) { // Get a worker client for any worker in the system. @@ -142,7 +143,7 @@ public synchronized BlockWorkerClient acquireWorkerClient() { * @return a {@link BlockWorkerClient} connected to the worker with the given hostname * @throws IOException if no Tachyon worker is available for the given hostname */ - public synchronized BlockWorkerClient acquireWorkerClient(String hostname) throws IOException { + public BlockWorkerClient acquireWorkerClient(String hostname) throws IOException { BlockWorkerClient client; if (hostname.equals(NetworkAddressUtils.getLocalHostName(ClientContext.getConf()))) { client = acquireLocalWorkerClient(); @@ -162,7 +163,7 @@ public synchronized BlockWorkerClient acquireWorkerClient(String hostname) throw * @return a {@link BlockWorkerClient} connected to the worker with the given hostname * @throws IOException if no Tachyon worker is available for the given hostname */ - public synchronized BlockWorkerClient acquireWorkerClient(NetAddress address) + public BlockWorkerClient acquireWorkerClient(NetAddress address) throws IOException { BlockWorkerClient client; if (address == null) { @@ -185,11 +186,10 @@ public synchronized BlockWorkerClient acquireWorkerClient(NetAddress address) * * @return a {@link BlockWorkerClient} to a worker in the Tachyon system or null if failed */ - public synchronized BlockWorkerClient acquireLocalWorkerClient() { + public BlockWorkerClient acquireLocalWorkerClient() { if (!mLocalBlockWorkerClientPoolInitialized) { initializeLocalBlockWorkerClientPool(); } - if (mLocalBlockWorkerClientPool == null) { return null; } @@ -204,7 +204,7 @@ public synchronized BlockWorkerClient acquireLocalWorkerClient() { * @param hostname the worker hostname to connect to, empty string for any worker * @return a worker client with a connection to the specified hostname */ - private synchronized BlockWorkerClient acquireRemoteWorkerClient(String hostname) { + private BlockWorkerClient acquireRemoteWorkerClient(String hostname) { NetAddress workerAddress = getWorkerAddress(hostname); return acquireRemoteWorkerClient(workerAddress); } @@ -217,7 +217,7 @@ private synchronized BlockWorkerClient acquireRemoteWorkerClient(String hostname * @param address the address of the worker * @return a worker client with a connection to the specified hostname */ - private synchronized BlockWorkerClient acquireRemoteWorkerClient(NetAddress address) { + private BlockWorkerClient acquireRemoteWorkerClient(NetAddress address) { // If we couldn't find a worker, crash. if (address == null) { // TODO(calvin): Better exception usage. @@ -235,11 +235,6 @@ private synchronized BlockWorkerClient acquireRemoteWorkerClient(NetAddress addr * Releases the {@link BlockWorkerClient} back to the client pool, or destroys it if it was a * remote client. * - * NOTE: the client pool is already thread-safe. Synchronizing on {@link BlockStoreContext} will - * lead to deadlock: thread A acquired a client and awaits for {@link BlockStoreContext} to - * release the client, while thread B holds the lock of {@link BlockStoreContext} but waits for - * available clients. - * * @param blockWorkerClient the worker client to release, the client should not be accessed after * this method is called */ @@ -264,7 +259,7 @@ public void releaseWorkerClient(BlockWorkerClient blockWorkerClient) { // TODO(calvin): Handle the case when the local worker starts up after the client or shuts down // before the client does. Also, when this is fixed, fix the TODO(cc) in // TachyonBlockStore#getInStream too. - public synchronized boolean hasLocalWorker() { + public boolean hasLocalWorker() { if (!mLocalBlockWorkerClientPoolInitialized) { initializeLocalBlockWorkerClientPool(); } @@ -275,7 +270,7 @@ public synchronized boolean hasLocalWorker() { * Re-initializes the {@link BlockStoreContext}. This method should only be used in * {@link ClientContext}. */ - public synchronized void reset() { + public void reset() { if (mBlockMasterClientPool != null) { mBlockMasterClientPool.close(); } diff --git a/clients/unshaded/src/main/java/tachyon/client/file/FileSystemContext.java b/clients/unshaded/src/main/java/tachyon/client/file/FileSystemContext.java index f5b86d7c5235..3c26a32fe6c8 100644 --- a/clients/unshaded/src/main/java/tachyon/client/file/FileSystemContext.java +++ b/clients/unshaded/src/main/java/tachyon/client/file/FileSystemContext.java @@ -22,11 +22,19 @@ * A shared context in each client JVM for common file master client functionality such as a pool of * master clients. Any remote clients will be created and destroyed on a per use basis. This class * is thread safe. + * + * + * NOTE: The context maintains a pool of file system master clients that is already thread-safe. + * Synchronizing {@link FileSystemContext} methods could lead to deadlock: thread A attempts to + * acquire a client when there are no clients left in the pool and blocks holding a lock on the + * {@link FileSystemContext}, when thread B attempts to release a client it owns it is unable to do + * so, because thread A holds the lock on {@link FileSystemContext}. */ public enum FileSystemContext { INSTANCE; private FileSystemMasterClientPool mFileSystemMasterClientPool; + private final TachyonBlockStore mTachyonBlockStore; /** @@ -50,11 +58,6 @@ public FileSystemMasterClient acquireMasterClient() { /** * Releases a block master client into the block master client pool. * - * NOTE: the client pool is already thread-safe. Synchronizing on {@link FileSystemContext} will - * lead to deadlock: thread A acquired a client and awaits for {@link FileSystemContext} to - * release the client, while thread B holds the lock of {@link FileSystemContext} but waits for - * available clients. - * * @param masterClient a block master client to release */ public void releaseMasterClient(FileSystemMasterClient masterClient) { @@ -64,7 +67,7 @@ public void releaseMasterClient(FileSystemMasterClient masterClient) { /** * @return the Tachyon block store */ - public synchronized TachyonBlockStore getTachyonBlockStore() { + public TachyonBlockStore getTachyonBlockStore() { return mTachyonBlockStore; } @@ -72,7 +75,7 @@ public synchronized TachyonBlockStore getTachyonBlockStore() { * Re-initializes the Block Store context. This method should only be used in * {@link ClientContext}. */ - public synchronized void reset() { + public void reset() { mFileSystemMasterClientPool.close(); mFileSystemMasterClientPool = new FileSystemMasterClientPool(ClientContext.getMasterAddress()); diff --git a/clients/unshaded/src/main/java/tachyon/client/worker/BlockWorkerClient.java b/clients/unshaded/src/main/java/tachyon/client/worker/BlockWorkerClient.java index 5c173ec2e0ea..bab7a1359fd8 100644 --- a/clients/unshaded/src/main/java/tachyon/client/worker/BlockWorkerClient.java +++ b/clients/unshaded/src/main/java/tachyon/client/worker/BlockWorkerClient.java @@ -161,7 +161,7 @@ public Void call() throws TachyonTException, TException { } @Override - protected void beforeDisconnect() { + protected synchronized void beforeDisconnect() { // Heartbeat to send the client metrics. if (mHeartbeatExecutor != null) { mHeartbeatExecutor.heartbeat(); @@ -169,14 +169,14 @@ protected void beforeDisconnect() { } @Override - protected void afterDisconnect() { + protected synchronized void afterDisconnect() { if (mHeartbeat != null) { mHeartbeat.cancel(true); } } @Override - protected TachyonService.Client getClient() { + protected synchronized TachyonService.Client getClient() { return mClient; } @@ -402,6 +402,7 @@ public Boolean call() throws TException { * @throws IOException if an I/O error occurs */ public synchronized void sessionHeartbeat() throws ConnectionFailedException, IOException { + System.out.println("this is wrong"); retryRPC(new RpcCallable() { @Override public Void call() throws TException { diff --git a/clients/unshaded/src/test/java/tachyon/client/block/BlockStoreContextTest.java b/clients/unshaded/src/test/java/tachyon/client/block/BlockStoreContextTest.java index 951990f775d5..cf2163e117d7 100644 --- a/clients/unshaded/src/test/java/tachyon/client/block/BlockStoreContextTest.java +++ b/clients/unshaded/src/test/java/tachyon/client/block/BlockStoreContextTest.java @@ -19,15 +19,23 @@ import org.junit.Assert; import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mockito; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; import com.google.common.collect.Lists; import tachyon.Constants; import tachyon.client.ClientContext; +import tachyon.client.worker.BlockWorkerClient; /** * Tests {@link BlockStoreContext}. */ +@RunWith(PowerMockRunner.class) +@PrepareForTest({BlockStoreContext.class, BlockWorkerClient.class, BlockWorkerClientPool.class}) public final class BlockStoreContextTest { /** * This test ensures acquiring all the available BlockStore master clients blocks further @@ -38,7 +46,7 @@ public final class BlockStoreContextTest { * @throws Exception if an unexpected error occurs during the test */ @Test(timeout = 10000) - public void acquireAtMaxLimitTest() throws Exception { + public void acquireMasterLimitTest() throws Exception { final List clients = Lists.newArrayList(); // Acquire all the clients @@ -48,7 +56,7 @@ public void acquireAtMaxLimitTest() throws Exception { } // Spawn another thread to acquire a master client - Thread acquireThread = new Thread(new AcquireClient()); + Thread acquireThread = new Thread(new AcquireMasterClient()); acquireThread.start(); // Wait for the spawned thread to complete. If it is able to acquire a master client before @@ -75,11 +83,77 @@ public void acquireAtMaxLimitTest() throws Exception { } } - class AcquireClient implements Runnable { + class AcquireMasterClient implements Runnable { @Override public void run() { BlockMasterClient client = BlockStoreContext.INSTANCE.acquireMasterClient(); BlockStoreContext.INSTANCE.releaseMasterClient(client); } } + + /** + * This test ensures acquiring all the available BlockStore worker clients blocks further requests + * for clients. It also ensures clients are available for reuse after they are released by the + * previous owners. If the test takes longer than 10 seconds, a deadlock most likely occurred + * preventing the release of the worker clients. + * + * @throws Exception if an unexpected error occurs during the test + */ + @Test(timeout = 10000) + public void acquireWorkerLimitTest() throws Exception { + // Use mocks for the block worker client to prevent it from trying to invoke the session + // heartbeat RPC. + BlockWorkerClient mock = PowerMockito.mock(BlockWorkerClient.class); + PowerMockito.doNothing().when(mock).sessionHeartbeat(); + PowerMockito.doReturn(true).when(mock).isLocal(); + PowerMockito + .whenNew(BlockWorkerClient.class) + .withArguments(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.anyLong(), + Mockito.anyBoolean(), Mockito.any()).thenReturn(mock); + + final List clients = Lists.newArrayList(); + + // Acquire all the clients + for (int i = 0; i < ClientContext.getConf() + .getInt(Constants.USER_BLOCK_WORKER_CLIENT_THREADS); i ++) { + clients.add(BlockStoreContext.INSTANCE.acquireWorkerClient()); + } + + // Spawn another thread to acquire a worker client + Thread acquireThread = new Thread(new AcquireWorkerClient()); + acquireThread.start(); + + // Wait for the spawned thread to complete. If it is able to acquire a worker client before + // the defined timeout, fail + long timeoutMs = Constants.SECOND_MS / 2; + long start = System.currentTimeMillis(); + acquireThread.join(timeoutMs); + if (System.currentTimeMillis() - start < timeoutMs) { + Assert.fail("Acquired a worker client when the client pool was full."); + } + + // Release all the clients + // Set the RPC number of retries to -1 to prevent the worker client from trying to send a + // heartbeat message when it is released. + for (BlockWorkerClient client : clients) { + BlockStoreContext.INSTANCE.releaseWorkerClient(client); + } + + // Wait for the spawned thread to complete. If it is unable to acquire a worker client before + // the defined timeout, fail. + timeoutMs = 5 * Constants.SECOND_MS; + start = System.currentTimeMillis(); + acquireThread.join(timeoutMs); + if (System.currentTimeMillis() - start >= timeoutMs) { + Assert.fail("Failed to acquire a worker client within " + timeoutMs + "ms. Deadlock?"); + } + } + + class AcquireWorkerClient implements Runnable { + @Override + public void run() { + BlockWorkerClient client = BlockStoreContext.INSTANCE.acquireWorkerClient(); + BlockStoreContext.INSTANCE.releaseWorkerClient(client); + } + } } diff --git a/clients/unshaded/src/test/java/tachyon/client/file/FileSystemContextTest.java b/clients/unshaded/src/test/java/tachyon/client/file/FileSystemContextTest.java index adfaa919bd41..aeb5a8c7d2ed 100644 --- a/clients/unshaded/src/test/java/tachyon/client/file/FileSystemContextTest.java +++ b/clients/unshaded/src/test/java/tachyon/client/file/FileSystemContextTest.java @@ -76,8 +76,11 @@ public void acquireAtMaxLimitTest() throws Exception { class AcquireClient implements Runnable { @Override public void run() { + System.out.println("1"); FileSystemMasterClient client = FileSystemContext.INSTANCE.acquireMasterClient(); + System.out.println("2"); FileSystemContext.INSTANCE.releaseMasterClient(client); + System.out.println("3"); } } } diff --git a/common/src/main/java/tachyon/ClientBase.java b/common/src/main/java/tachyon/ClientBase.java index db1059c8b7eb..f48a4d533cb6 100644 --- a/common/src/main/java/tachyon/ClientBase.java +++ b/common/src/main/java/tachyon/ClientBase.java @@ -287,7 +287,8 @@ protected interface RpcCallableThrowsTachyonTException { * been called before calling this method or during the retry * @throws ConnectionFailedException if network connection failed */ - protected V retryRPC(RpcCallable rpc) throws IOException, ConnectionFailedException { + protected synchronized V retryRPC(RpcCallable rpc) throws IOException, + ConnectionFailedException { int retry = 0; while (!mClosed && (retry ++) <= RPC_MAX_NUM_RETRY) { connect(); @@ -315,7 +316,7 @@ protected V retryRPC(RpcCallable rpc) throws IOException, ConnectionFaile * @throws IOException when retries exceeds {@link #RPC_MAX_NUM_RETRY} or {@link #close()} has * been called before calling this method or during the retry */ - protected V retryRPC(RpcCallableThrowsTachyonTException rpc) + protected synchronized V retryRPC(RpcCallableThrowsTachyonTException rpc) throws TachyonException, IOException { int retry = 0; while (!mClosed && (retry ++) <= RPC_MAX_NUM_RETRY) { diff --git a/servers/src/main/java/tachyon/master/block/meta/MasterWorkerInfo.java b/servers/src/main/java/tachyon/master/block/meta/MasterWorkerInfo.java index ede76b1fbd19..86f59a0a93bd 100644 --- a/servers/src/main/java/tachyon/master/block/meta/MasterWorkerInfo.java +++ b/servers/src/main/java/tachyon/master/block/meta/MasterWorkerInfo.java @@ -97,7 +97,7 @@ public MasterWorkerInfo(long id, WorkerNetAddress address) { * @param blocks set of block ids on this worker * @return A Set of blocks removed (or lost) from this worker */ - public Set register(final StorageTierAssoc globalStorageTierAssoc, + public synchronized Set register(final StorageTierAssoc globalStorageTierAssoc, final List storageTierAliases, final Map totalBytesOnTiers, final Map usedBytesOnTiers, final Set blocks) { // If the storage aliases do not have strictly increasing ordinal value based on the total @@ -190,7 +190,7 @@ public synchronized WorkerInfo generateClientWorkerInfo() { /** * @return the worker's address */ - public WorkerNetAddress getAddress() { + public synchronized WorkerNetAddress getAddress() { return mWorkerAddress; } @@ -211,7 +211,7 @@ public synchronized Set getBlocks() { /** * @return the capacity of the worker in bytes */ - public long getCapacityBytes() { + public synchronized long getCapacityBytes() { return mCapacityBytes; } @@ -267,7 +267,7 @@ public synchronized Map getUsedBytesOnTiers() { /** * @return the start time in milliseconds */ - public long getStartTime() { + public synchronized long getStartTime() { return mStartTimeMs; } diff --git a/servers/src/main/java/tachyon/master/file/FileSystemMaster.java b/servers/src/main/java/tachyon/master/file/FileSystemMaster.java index 5ad829648c36..dbca84dcc879 100644 --- a/servers/src/main/java/tachyon/master/file/FileSystemMaster.java +++ b/servers/src/main/java/tachyon/master/file/FileSystemMaster.java @@ -364,7 +364,6 @@ public PersistenceState getPersistenceState(long fileId) } private FileInfo getFileInfoInternal(Inode inode) throws FileDoesNotExistException { - // This function should only be called from within synchronized (mInodeTree) blocks. FileInfo fileInfo = inode.generateClientFileInfo(mInodeTree.getPath(inode).toString()); fileInfo.inMemoryPercentage = getInMemoryPercentage(inode); TachyonURI path = mInodeTree.getPath(inode); @@ -479,7 +478,6 @@ public void completeFile(long fileId, CompleteFileOptions options) void completeFileInternal(List blockIds, long fileId, long length, long opTimeMs) throws FileDoesNotExistException, InvalidPathException, InvalidFileSizeException, FileAlreadyCompletedException { - // This function should only be called from within synchronized (mInodeTree) blocks. InodeFile inode = (InodeFile) mInodeTree.getInodeById(fileId); inode.setBlockIds(blockIds); inode.setLastModificationTimeMs(opTimeMs); @@ -534,7 +532,6 @@ public long create(TachyonURI path, CreateOptions options) InodeTree.CreatePathResult createInternal(TachyonURI path, CreateOptions options) throws InvalidPathException, FileAlreadyExistsException, BlockInfoException, IOException { - // This function should only be called from within synchronized (mInodeTree) blocks. CreatePathOptions createPathOptions = new CreatePathOptions.Builder(MasterContext.getConf()) .setBlockSizeBytes(options.getBlockSizeBytes()).setDirectory(false) .setOperationTimeMs(options.getOperationTimeMs()).setPersisted(options.isPersisted()) @@ -703,8 +700,6 @@ boolean deleteFileRecursiveInternal(long fileId, boolean replayed, long opTimeMs */ boolean deleteFileInternal(long fileId, boolean recursive, boolean replayed, long opTimeMs) throws FileDoesNotExistException, IOException, DirectoryNotEmptyException { - // This function should only be called from within synchronized (mInodeTree) blocks. - // // TODO(jiri): A crash after any UFS object is deleted and before the delete operation is // journaled will result in an inconsistency between Tachyon and UFS. Inode inode = mInodeTree.getInodeById(fileId); @@ -849,7 +844,6 @@ public List getFileBlockInfoList(TachyonURI path) */ private FileBlockInfo generateFileBlockInfo(InodeFile file, BlockInfo blockInfo) throws InvalidPathException { - // This function should only be called from within synchronized (mInodeTree) blocks. FileBlockInfo fileBlockInfo = new FileBlockInfo(); fileBlockInfo.blockInfo = blockInfo; fileBlockInfo.ufsLocations = new ArrayList(); @@ -1136,7 +1130,6 @@ public boolean rename(long fileId, TachyonURI dstPath) */ boolean renameInternal(long fileId, TachyonURI dstPath, boolean replayed, long opTimeMs) throws FileDoesNotExistException, InvalidPathException, IOException { - // This function should only be called from within synchronized (mInodeTree) blocks. Inode srcInode = mInodeTree.getInodeById(fileId); TachyonURI srcPath = mInodeTree.getPath(srcInode); LOG.debug("Renaming {} to {}", srcPath, dstPath); diff --git a/servers/src/main/java/tachyon/master/file/meta/Inode.java b/servers/src/main/java/tachyon/master/file/meta/Inode.java index 61ed99507400..e60467aa3223 100644 --- a/servers/src/main/java/tachyon/master/file/meta/Inode.java +++ b/servers/src/main/java/tachyon/master/file/meta/Inode.java @@ -210,7 +210,7 @@ public synchronized boolean equals(Object o) { /** * @return the create time, in milliseconds */ - public long getCreationTimeMs() { + public synchronized long getCreationTimeMs() { return mCreationTimeMs; } @@ -257,21 +257,21 @@ public synchronized int hashCode() { /** * @return true if the inode is deleted, false otherwise */ - public boolean isDeleted() { + public synchronized boolean isDeleted() { return mDeleted; } /** * @return true if the inode is a directory, false otherwise */ - public boolean isDirectory() { + public synchronized boolean isDirectory() { return mDirectory; } /** * @return true if the inode is a file, false otherwise */ - public boolean isFile() { + public synchronized boolean isFile() { return !mDirectory; } diff --git a/servers/src/main/java/tachyon/master/file/meta/InodeDirectoryIdGenerator.java b/servers/src/main/java/tachyon/master/file/meta/InodeDirectoryIdGenerator.java index 182b0a788cba..060396c40118 100644 --- a/servers/src/main/java/tachyon/master/file/meta/InodeDirectoryIdGenerator.java +++ b/servers/src/main/java/tachyon/master/file/meta/InodeDirectoryIdGenerator.java @@ -72,7 +72,7 @@ public synchronized JournalEntry toJournalEntry() { * * @param entry {@link InodeDirectoryIdGeneratorEntry} to use for initialization */ - public void initFromJournalEntry(InodeDirectoryIdGeneratorEntry entry) { + public synchronized void initFromJournalEntry(InodeDirectoryIdGeneratorEntry entry) { mContainerId = entry.getContainerId(); mSequenceNumber = entry.getSequenceNumber(); mInitialized = true; diff --git a/servers/src/main/java/tachyon/master/file/meta/InodeFile.java b/servers/src/main/java/tachyon/master/file/meta/InodeFile.java index 497a8cfda433..ee14935f83cd 100644 --- a/servers/src/main/java/tachyon/master/file/meta/InodeFile.java +++ b/servers/src/main/java/tachyon/master/file/meta/InodeFile.java @@ -154,7 +154,7 @@ private InodeFile(InodeFile.Builder builder) { } @Override - public FileInfo generateClientFileInfo(String path) { + public synchronized FileInfo generateClientFileInfo(String path) { FileInfo ret = new FileInfo(); // note: in-memory percentage is NOT calculated here, because it needs blocks info stored in // block master @@ -182,7 +182,7 @@ public FileInfo generateClientFileInfo(String path) { /** * Resets the file inode. */ - public void reset() { + public synchronized void reset() { mBlocks = Lists.newArrayList(); mLength = 0; mCompleted = false; @@ -192,7 +192,7 @@ public void reset() { /** * @param blockSizeBytes the block size to use */ - public void setBlockSize(long blockSizeBytes) { + public synchronized void setBlockSize(long blockSizeBytes) { Preconditions.checkArgument(blockSizeBytes >= 0, "Block size cannot be negative"); mBlockSizeBytes = blockSizeBytes; } @@ -200,7 +200,7 @@ public void setBlockSize(long blockSizeBytes) { /** * @param ttl the TTL to use, in milliseconds */ - public void setTtl(long ttl) { + public synchronized void setTtl(long ttl) { mTtl = ttl; } @@ -214,7 +214,7 @@ public synchronized List getBlockIds() { /** * @return the block size in bytes */ - public long getBlockSizeBytes() { + public synchronized long getBlockSizeBytes() { return mBlockSizeBytes; } @@ -325,7 +325,7 @@ public synchronized void complete(long length) } @Override - public String toString() { + public synchronized String toString() { StringBuilder sb = new StringBuilder("InodeFile("); sb.append(super.toString()).append(", LENGTH: ").append(mLength); sb.append(", Cacheable: ").append(mCacheable); diff --git a/servers/src/main/java/tachyon/master/journal/JournalWriter.java b/servers/src/main/java/tachyon/master/journal/JournalWriter.java index 439ab59913a2..7ff0649c8289 100644 --- a/servers/src/main/java/tachyon/master/journal/JournalWriter.java +++ b/servers/src/main/java/tachyon/master/journal/JournalWriter.java @@ -151,7 +151,7 @@ public synchronized JournalOutputStream getEntryOutputStream() throws IOExceptio * * @throws IOException if an I/O error occurs */ - public void close() throws IOException { + public synchronized void close() throws IOException { if (mCheckpointOutputStream != null) { mCheckpointOutputStream.close(); } @@ -168,7 +168,7 @@ public void close() throws IOException { * @return the output stream for the current log file * @throws IOException if an I/O error occurs */ - private OutputStream openCurrentLog() throws IOException { + private synchronized OutputStream openCurrentLog() throws IOException { String currentLogFile = mJournal.getCurrentLogFilePath(); OutputStream os = mUfs.create(currentLogFile); LOG.info("Opened current log file: {}", currentLogFile); @@ -180,7 +180,7 @@ private OutputStream openCurrentLog() throws IOException { * * @throws IOException if an I/O error occurs */ - private void deleteCompletedLogs() throws IOException { + private synchronized void deleteCompletedLogs() throws IOException { LOG.info("Deleting all completed log files..."); // Loop over all complete logs starting from the beginning. // TODO(gpang): should the deletes start from the end? @@ -205,7 +205,7 @@ private void deleteCompletedLogs() throws IOException { * * @throws IOException if an I/O error occurs */ - private void completeCurrentLog() throws IOException { + private synchronized void completeCurrentLog() throws IOException { String currentLog = mJournal.getCurrentLogFilePath(); if (!mUfs.exists(currentLog)) { // All logs are already complete, so nothing to do.