Skip to content

Commit

Permalink
[TACHYON-1386] Shut down executors when Masters are stopped
Browse files Browse the repository at this point in the history
  • Loading branch information
aaudiber committed Dec 2, 2015
1 parent ab19431 commit 7404eb6
Show file tree
Hide file tree
Showing 8 changed files with 97 additions and 56 deletions.
25 changes: 23 additions & 2 deletions servers/src/main/java/tachyon/master/MasterBase.java
Expand Up @@ -17,6 +17,8 @@

import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -32,6 +34,7 @@
import tachyon.master.journal.JournalWriter;
import tachyon.master.journal.ReadWriteJournal;
import tachyon.proto.journal.Journal.JournalEntry;
import tachyon.util.ThreadFactoryUtils;

/**
* This is the base class for all masters, and contains common functionality. Common functionality
Expand All @@ -41,6 +44,8 @@
public abstract class MasterBase implements Master {
private static final Logger LOG = LoggerFactory.getLogger(Constants.LOGGER_TYPE);

private static final long SHUTDOWN_TIMEOUT_MS = 10000;

/** The executor used for running maintenance threads for the master. */
private final ExecutorService mExecutorService;

Expand All @@ -53,9 +58,15 @@ public abstract class MasterBase implements Master {
/** The journal writer for when the master is the leader. */
private JournalWriter mJournalWriter = null;

protected MasterBase(Journal journal, ExecutorService executorService) {
/**
* @param journal the journal to use for tracking master operations
* @param executorService the name pattern for the executor service to use for asynchronous tasks
*/
protected MasterBase(Journal journal, String executorServiceNamePattern) {
Preconditions.checkNotNull(executorServiceNamePattern);
mJournal = Preconditions.checkNotNull(journal);
mExecutorService = Preconditions.checkNotNull(executorService);
mExecutorService =
Executors.newFixedThreadPool(2, ThreadFactoryUtils.build(executorServiceNamePattern, true));
}

@Override
Expand Down Expand Up @@ -155,6 +166,16 @@ public void stop() throws IOException {
mStandbyJournalTailer.shutdownAndJoin();
}
}
mExecutorService.shutdownNow();
String awaitFailureMessage =
"waiting for {} executor service to shut down. Daemons may still be running";
try {
if (!mExecutorService.awaitTermination(SHUTDOWN_TIMEOUT_MS, TimeUnit.MILLISECONDS)) {
LOG.warn("Timed out " + awaitFailureMessage, this.getClass().getSimpleName());
}
} catch (InterruptedException e) {
LOG.warn("Interrupted while " + awaitFailureMessage, this.getClass().getSimpleName());
}
}

@Override
Expand Down
19 changes: 6 additions & 13 deletions servers/src/main/java/tachyon/master/block/BlockMaster.java
Expand Up @@ -26,7 +26,6 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;

Expand Down Expand Up @@ -70,7 +69,6 @@
import tachyon.thrift.WorkerInfo;
import tachyon.util.CommonUtils;
import tachyon.util.FormatUtils;
import tachyon.util.ThreadFactoryUtils;
import tachyon.util.io.PathUtils;

/**
Expand Down Expand Up @@ -133,7 +131,11 @@ public Object getFieldValue(MasterWorkerInfo o) {
@SuppressWarnings("unchecked")
private final IndexedSet<MasterWorkerInfo> mLostWorkers =
new IndexedSet<MasterWorkerInfo>(mIdIndex, mAddressIndex);
/** The service that detects lost worker nodes, and tries to restart the failed workers. */
/**
* The service that detects lost worker nodes, and tries to restart the failed workers.
* We store it here so that it can be accessed from tests.
*/
@SuppressWarnings("unused")
private Future<?> mLostWorkerDetectionService;
/** The next worker id to use. This state must be journaled. */
private final AtomicLong mNextWorkerId = new AtomicLong(1);
Expand All @@ -147,8 +149,7 @@ public static String getJournalDirectory(String baseDirectory) {
}

public BlockMaster(Journal journal) {
super(journal,
Executors.newFixedThreadPool(2, ThreadFactoryUtils.build("block-master-%d", true)));
super(journal, "block-master-%d");
}

@Override
Expand Down Expand Up @@ -216,14 +217,6 @@ HeartbeatContext.MASTER_LOST_WORKER_DETECTION, new LostWorkerDetectionHeartbeatE
}
}

@Override
public void stop() throws IOException {
super.stop();
if (mLostWorkerDetectionService != null) {
mLostWorkerDetectionService.cancel(true);
}
}

/**
* @return the number of workers
*/
Expand Down
19 changes: 6 additions & 13 deletions servers/src/main/java/tachyon/master/file/FileSystemMaster.java
Expand Up @@ -23,7 +23,6 @@
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import org.apache.commons.lang.exception.ExceptionUtils;
Expand Down Expand Up @@ -97,7 +96,6 @@
import tachyon.thrift.NetAddress;
import tachyon.underfs.UnderFileSystem;
import tachyon.util.IdUtils;
import tachyon.util.ThreadFactoryUtils;
import tachyon.util.io.PathUtils;

/**
Expand All @@ -116,7 +114,11 @@ public final class FileSystemMaster extends MasterBase {

private final PrefixList mWhitelist;

/** The service that tries to check inodefiles with ttl set */
/**
* The service that tries to check inodefiles with ttl set.
* We store it here so that it can be accessed from tests.
*/
@SuppressWarnings("unused")
private Future<?> mTTLCheckerService;

private final TTLBucketList mTTLBuckets = new TTLBucketList();
Expand All @@ -130,8 +132,7 @@ public static String getJournalDirectory(String baseDirectory) {
}

public FileSystemMaster(BlockMaster blockMaster, Journal journal) {
super(journal,
Executors.newFixedThreadPool(2, ThreadFactoryUtils.build("file-system-master-%d", true)));
super(journal, "file-system-master-%d");
mBlockMaster = blockMaster;

mDirectoryIdGenerator = new InodeDirectoryIdGenerator(mBlockMaster);
Expand Down Expand Up @@ -253,14 +254,6 @@ public void start(boolean isLeader) throws IOException {
super.start(isLeader);
}

@Override
public void stop() throws IOException {
super.stop();
if (mTTLCheckerService != null) {
mTTLCheckerService.cancel(true);
}
}

/**
* Whether the filesystem contains a directory with the id. Called by internal masters.
*
Expand Down
26 changes: 9 additions & 17 deletions servers/src/main/java/tachyon/master/lineage/LineageMaster.java
Expand Up @@ -20,7 +20,6 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import org.apache.thrift.TProcessor;
Expand Down Expand Up @@ -83,7 +82,6 @@
import tachyon.thrift.LineageMasterClientService;
import tachyon.thrift.LineageMasterWorkerService;
import tachyon.util.IdUtils;
import tachyon.util.ThreadFactoryUtils;
import tachyon.util.io.PathUtils;

/**
Expand All @@ -98,9 +96,15 @@ public final class LineageMaster extends MasterBase {
private final FileSystemMaster mFileSystemMaster;
private final LineageIdGenerator mLineageIdGenerator;

/** The service that checkpoints lineages. */
/**
* The service that checkpoints lineages. We store it here so that it can be accessed from tests.
*/
@SuppressWarnings("unused")
private Future<?> mCheckpointExecutionService;
/** The service that recomputes lineages. */
/**
* The service that recomputes lineages. We store it here so that it can be accessed from tests.
*/
@SuppressWarnings("unused")
private Future<?> mRecomputeExecutionService;

/** Map from worker to the files to checkpoint on that worker. Used by checkpoint service. */
Expand All @@ -121,8 +125,7 @@ public static String getJournalDirectory(String baseDirectory) {
* @param journal the journal
*/
public LineageMaster(FileSystemMaster fileSystemMaster, Journal journal) {
super(journal,
Executors.newFixedThreadPool(2, ThreadFactoryUtils.build("lineage-master-%d", true)));
super(journal, "lineage-master-%d");

mTachyonConf = MasterContext.getConf();
mFileSystemMaster = Preconditions.checkNotNull(fileSystemMaster);
Expand Down Expand Up @@ -188,17 +191,6 @@ public void start(boolean isLeader) throws IOException {
}
}

@Override
public void stop() throws IOException {
super.stop();
if (mCheckpointExecutionService != null) {
mCheckpointExecutionService.cancel(true);
}
if (mRecomputeExecutionService != null) {
mRecomputeExecutionService.cancel(true);
}
}

@Override
public synchronized void streamToJournalCheckpoint(JournalOutputStream outputStream)
throws IOException {
Expand Down
12 changes: 2 additions & 10 deletions servers/src/main/java/tachyon/master/rawtable/RawTableMaster.java
Expand Up @@ -19,13 +19,12 @@
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executors;

import com.google.common.base.Preconditions;
import org.apache.thrift.TProcessor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Preconditions;
import com.google.protobuf.ByteString;
import com.google.protobuf.Message;

Expand Down Expand Up @@ -55,7 +54,6 @@
import tachyon.thrift.RawTableInfo;
import tachyon.thrift.RawTableMasterClientService;
import tachyon.util.IdUtils;
import tachyon.util.ThreadFactoryUtils;
import tachyon.util.io.PathUtils;

public class RawTableMaster extends MasterBase {
Expand All @@ -72,8 +70,7 @@ public static String getJournalDirectory(String baseDirectory) {
}

public RawTableMaster(FileSystemMaster fileSystemMaster, Journal journal) {
super(journal,
Executors.newFixedThreadPool(2, ThreadFactoryUtils.build("raw-table-master-%d", true)));
super(journal, "raw-table-master-%d");
TachyonConf conf = MasterContext.getConf();
mMaxTableMetadataBytes = conf.getBytes(Constants.MAX_TABLE_METADATA_BYTE);
mMaxColumns = conf.getInt(Constants.MAX_COLUMNS);
Expand Down Expand Up @@ -126,11 +123,6 @@ public void start(boolean isLeader) throws IOException {
super.start(isLeader);
}

@Override
public void stop() throws IOException {
super.stop();
}

/**
* Creates a raw table. A table is a directory with sub-directories representing columns.
*
Expand Down
15 changes: 15 additions & 0 deletions servers/src/test/java/tachyon/master/block/BlockMasterTest.java
Expand Up @@ -20,6 +20,8 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import org.junit.After;
Expand Down Expand Up @@ -280,6 +282,19 @@ public void detectLostWorkerTest() throws Exception {
Assert.assertNotNull(mPrivateAccess.getWorkerById(workerId));
}

@Test
public void stopTest() throws Exception {
ExecutorService service =
(ExecutorService) Whitebox.getInternalState(mMaster, "mExecutorService");
Future<?> lostWorkerThread =
(Future<?>) Whitebox.getInternalState(mMaster, "mLostWorkerDetectionService");
Assert.assertFalse(lostWorkerThread.isDone());
Assert.assertFalse(service.isShutdown());
mMaster.stop();
Assert.assertTrue(lostWorkerThread.isDone());
Assert.assertTrue(service.isShutdown());
}

private void addWorker(BlockMaster master, long workerId, List<String> storageTierAliases,
Map<String, Long> totalBytesOnTiers, Map<String, Long> usedBytesOnTiers)
throws TachyonException {
Expand Down
Expand Up @@ -17,6 +17,8 @@

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import org.junit.Assert;
Expand All @@ -26,6 +28,7 @@
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
import org.mockito.internal.util.reflection.Whitebox;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
Expand All @@ -34,10 +37,10 @@
import tachyon.Constants;
import tachyon.TachyonURI;
import tachyon.client.file.options.SetStateOptions;
import tachyon.exception.DirectoryNotEmptyException;
import tachyon.exception.ExceptionMessage;
import tachyon.exception.FileDoesNotExistException;
import tachyon.exception.InvalidPathException;
import tachyon.exception.DirectoryNotEmptyException;
import tachyon.heartbeat.HeartbeatContext;
import tachyon.heartbeat.HeartbeatScheduler;
import tachyon.master.MasterContext;
Expand Down Expand Up @@ -366,6 +369,19 @@ public void freeDirTest() throws Exception {
Assert.assertEquals(0, mBlockMaster.getBlockInfo(blockId).getLocations().size());
}

@Test
public void stopTest() throws Exception {
ExecutorService service =
(ExecutorService) Whitebox.getInternalState(mFileSystemMaster, "mExecutorService");
Future<?> ttlThread =
(Future<?>) Whitebox.getInternalState(mFileSystemMaster, "mTTLCheckerService");
Assert.assertFalse(ttlThread.isDone());
Assert.assertFalse(service.isShutdown());
mFileSystemMaster.stop();
Assert.assertTrue(ttlThread.isDone());
Assert.assertTrue(service.isShutdown());
}

private long createFileWithSingleBlock(TachyonURI uri) throws Exception {
long fileId = mFileSystemMaster.create(uri, sNestedFileOptions);
long blockId = mFileSystemMaster.getNewBlockIdForFile(fileId);
Expand Down
Expand Up @@ -18,6 +18,8 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;

import org.junit.Assert;
import org.junit.Before;
Expand Down Expand Up @@ -182,6 +184,23 @@ public void heartbeatTest() throws Exception {
Assert.assertEquals(blockId, (long) command.checkpointFiles.get(0).blockIds.get(0));
}

@Test
public void stopTest() throws Exception {
ExecutorService service =
(ExecutorService) Whitebox.getInternalState(mLineageMaster, "mExecutorService");
Future<?> checkpointThread =
(Future<?>) Whitebox.getInternalState(mLineageMaster, "mCheckpointExecutionService");
Future<?> recomputeThread =
(Future<?>) Whitebox.getInternalState(mLineageMaster, "mRecomputeExecutionService");
Assert.assertFalse(checkpointThread.isDone());
Assert.assertFalse(recomputeThread.isDone());
Assert.assertFalse(service.isShutdown());
mLineageMaster.stop();
Assert.assertTrue(checkpointThread.isDone());
Assert.assertTrue(recomputeThread.isDone());
Assert.assertTrue(service.isShutdown());
}

@SuppressWarnings("unchecked")
@Test
public void queueForCheckpointTest() throws Exception {
Expand Down

0 comments on commit 7404eb6

Please sign in to comment.