diff --git a/common/src/main/java/tachyon/master/block/BlockId.java b/common/src/main/java/tachyon/master/block/BlockId.java index 9b390fc19371..df92d9351abe 100644 --- a/common/src/main/java/tachyon/master/block/BlockId.java +++ b/common/src/main/java/tachyon/master/block/BlockId.java @@ -32,12 +32,10 @@ public final class BlockId { private static final long SEQUENCE_NUMBER_MASK = (1L << SEQUENCE_NUMBER_BITS) - 1; private BlockId() { - // util class + // prevent instantiation of a util class } /** - * Returns a block id with the specified container id and sequence number. - * * @param containerId the container id to create the block id with * @param sequenceNumber the sequence number to create the block id with * @return the block id constructed with the container id and sequence number @@ -49,29 +47,23 @@ public static long createBlockId(long containerId, long sequenceNumber) { } /** - * Returns the container id of the specified block id. - * * @param blockId the block id to get the container id for - * @return the container id + * @return the container id of a specified block id */ public static long getContainerId(long blockId) { return (blockId >> SEQUENCE_NUMBER_BITS) & CONTAINER_ID_MASK; } /** - * Returns the sequence number of the specified block id. - * * @param blockId the block id to get the sequene number for - * @return the sequence number + * @return the sequence number of the specified block id */ public static long getSequenceNumber(long blockId) { return blockId & SEQUENCE_NUMBER_MASK; } /** - * Returns the maximum possible sequence number for block ids. - * - * @return the maximum possible sequence number + * @return the maximum possible sequence number for block ids */ public static long getMaxSequenceNumber() { return SEQUENCE_NUMBER_MASK; diff --git a/common/src/thrift/tachyon.thrift b/common/src/thrift/tachyon.thrift index ef2a182a9234..b9055b7c792e 100644 --- a/common/src/thrift/tachyon.thrift +++ b/common/src/thrift/tachyon.thrift @@ -44,7 +44,6 @@ struct FileBlockInfo { 4: list locations } -// TODO: make fileId into i64 struct FileInfo { 1: i64 fileId 2: string name diff --git a/servers/src/main/java/tachyon/master/Master.java b/servers/src/main/java/tachyon/master/Master.java index 7964b7cd9223..04c39e9006f0 100644 --- a/servers/src/main/java/tachyon/master/Master.java +++ b/servers/src/main/java/tachyon/master/Master.java @@ -36,7 +36,7 @@ public interface Master extends JournalCheckpointStreamable { * * @return a {@link String} representing this master service name. */ - String getProcessorName(); + String getServiceName(); /** * Processes the journal checkpoint file. diff --git a/servers/src/main/java/tachyon/master/MasterBase.java b/servers/src/main/java/tachyon/master/MasterBase.java index 3d7d8fc7a64b..8f0979987a61 100644 --- a/servers/src/main/java/tachyon/master/MasterBase.java +++ b/servers/src/main/java/tachyon/master/MasterBase.java @@ -60,15 +60,18 @@ protected MasterBase(Journal journal, ExecutorService executorService) { @Override public void processJournalCheckpoint(JournalInputStream inputStream) throws IOException { JournalEntry entry; - while ((entry = inputStream.getNextEntry()) != null) { - processJournalEntry(entry); + try { + while ((entry = inputStream.getNextEntry()) != null) { + processJournalEntry(entry); + } + } finally { + inputStream.close(); } - inputStream.close(); } @Override public void start(boolean isLeader) throws IOException { - LOG.info(getProcessorName() + ": Starting master. isLeader: " + isLeader); + LOG.info(getServiceName() + ": Starting master. isLeader: " + isLeader); mIsLeader = isLeader; if (mIsLeader) { mJournalWriter = mJournal.getNewWriter(); @@ -96,7 +99,7 @@ public void start(boolean isLeader) throws IOException { // Phase 2: Replay all the state of the checkpoint and the completed log files. // TODO: only do this if this is a fresh start, not if this master had already been tailing // the journal. - LOG.info(getProcessorName() + ": process completed logs before becoming master."); + LOG.info(getServiceName() + ": process completed logs before becoming master."); JournalTailer catchupTailer = new JournalTailer(this, mJournal); if (catchupTailer.checkpointExists()) { catchupTailer.processJournalCheckpoint(true); @@ -119,11 +122,12 @@ public void start(boolean isLeader) throws IOException { @Override public void stop() throws IOException { - LOG.info(getProcessorName() + ":Stopping master. isLeader: " + isLeaderMode()); + LOG.info(getServiceName() + ":Stopping master. isLeader: " + isLeaderMode()); if (isStandbyMode()) { if (mStandbyJournalTailer != null) { // stop and wait for the journal tailer thread. mStandbyJournalTailer.shutdownAndJoin(); + mStandbyJournalTailer = null; } } else { // Stop this master. @@ -143,9 +147,7 @@ protected boolean isStandbyMode() { } protected void writeJournalEntry(JournalEntry entry) { - if (mJournalWriter == null) { - throw new RuntimeException("Cannot write entry: journal writer is null."); - } + Preconditions.checkNotNull(mJournalWriter, "Cannot write entry: journal writer is null."); try { mJournalWriter.getEntryOutputStream().writeEntry(entry); } catch (IOException ioe) { @@ -154,9 +156,7 @@ protected void writeJournalEntry(JournalEntry entry) { } protected void flushJournal() { - if (mJournalWriter == null) { - throw new RuntimeException("Cannot flush journal: Journal writer is null."); - } + Preconditions.checkNotNull(mJournalWriter, "Cannot write entry: journal writer is null."); try { mJournalWriter.getEntryOutputStream().flush(); } catch (IOException ioe) { diff --git a/servers/src/main/java/tachyon/master/MasterSource.java b/servers/src/main/java/tachyon/master/MasterSource.java index 6f95ac726099..e3d0b22e6029 100644 --- a/servers/src/main/java/tachyon/master/MasterSource.java +++ b/servers/src/main/java/tachyon/master/MasterSource.java @@ -121,6 +121,7 @@ public Integer getValue() { } }); + // TODO: renable when metrics is fully implemented on the master. /* mMetricRegistry.register(MetricRegistry.name("FilesTotal"), new Gauge() { @Override diff --git a/servers/src/main/java/tachyon/master/TachyonMaster.java b/servers/src/main/java/tachyon/master/TachyonMaster.java index 35c110a8aecd..759fdb9b8e62 100644 --- a/servers/src/main/java/tachyon/master/TachyonMaster.java +++ b/servers/src/main/java/tachyon/master/TachyonMaster.java @@ -58,6 +58,7 @@ public static void main(String[] args) { } try { + // TODO: create a master context with the tachyon conf. TachyonConf conf = new TachyonConf(); TachyonMaster master; if (conf.getBoolean(Constants.USE_ZOOKEEPER)) { @@ -150,80 +151,76 @@ public TachyonMaster(TachyonConf tachyonConf) { } /** - * Returns the underlying {@link TachyonConf} instance for the Worker. - * - * @return TachyonConf of the Master + * @return the underlying {@link TachyonConf} instance for the master. */ public TachyonConf getTachyonConf() { return mTachyonConf; } /** - * Gets the externally resolvable address of this master. + * @return the externally resolvable address of this master. */ public InetSocketAddress getMasterAddress() { return mMasterAddress; } /** - * Gets the actual bind hostname on RPC service (used by unit test only). + * @return the actual bind hostname on RPC service (used by unit test only). */ public String getRPCBindHost() { return NetworkAddressUtils.getThriftSocket(mTServerSocket).getLocalSocketAddress().toString(); } /** - * Gets the actual port that the RPC service is listening on (used by unit test only) + * @return the actual port that the RPC service is listening on (used by unit test only) */ public int getRPCLocalPort() { return mPort; } /** - * Gets the actual bind hostname on web service (used by unit test only). + * @return the actual bind hostname on web service (used by unit test only). */ public String getWebBindHost() { return mWebServer.getBindHost(); } /** - * Gets the actual port that the web service is listening on (used by unit test only) + * @return the actual port that the web service is listening on (used by unit test only) */ public int getWebLocalPort() { return mWebServer.getLocalPort(); } /** - * Gets internal {@link FileSystemMaster}, for unit test only. + * @return internal {@link FileSystemMaster}, for unit test only. */ public FileSystemMaster getFileSystemMaster() { return mFileSystemMaster; } /** - * Gets internal {@link RawTableMaster}, for unit test only. + * @return internal {@link RawTableMaster}, for unit test only. */ public RawTableMaster getRawTableMaster() { return mRawTableMaster; } /** - * Gets internal {@link BlockMaster}, for unit test only. + * @return internal {@link BlockMaster}, for unit test only. */ public BlockMaster getBlockMaster() { return mBlockMaster; } /** - * Gets the millisecond when Tachyon Master starts serving, return -1 when not started. + * @return the millisecond when Tachyon Master starts serving, return -1 when not started. */ public long getStarttimeMs() { return mStartTimeMs; } /** - * Gets whether the system is serving the rpc server. - * * @return true if the system is the leader (serving the rpc server), false otherwise. */ boolean isServing() { @@ -293,10 +290,10 @@ protected void startServingWebServer() { protected void startServingRPCServer() { // set up multiplexed thrift processors TMultiplexedProcessor processor = new TMultiplexedProcessor(); - processor.registerProcessor(mBlockMaster.getProcessorName(), mBlockMaster.getProcessor()); - processor.registerProcessor(mFileSystemMaster.getProcessorName(), + processor.registerProcessor(mBlockMaster.getServiceName(), mBlockMaster.getProcessor()); + processor.registerProcessor(mFileSystemMaster.getServiceName(), mFileSystemMaster.getProcessor()); - processor.registerProcessor(mRawTableMaster.getProcessorName(), mRawTableMaster.getProcessor()); + processor.registerProcessor(mRawTableMaster.getServiceName(), mRawTableMaster.getProcessor()); // create master thrift service with the multiplexed processor. mMasterServiceServer = new TThreadPoolServer(new TThreadPoolServer.Args(mTServerSocket) @@ -313,9 +310,11 @@ protected void startServingRPCServer() { protected void stopServing() throws Exception { if (mMasterServiceServer != null) { mMasterServiceServer.stop(); + mMasterServiceServer = null; } if (mWebServer != null) { mWebServer.shutdownWebServer(); + mWebServer = null; } mIsServing = false; } @@ -330,6 +329,9 @@ protected void stopServing() throws Exception { private boolean isJournalFormatted(String journalDirectory) throws IOException { UnderFileSystem ufs = UnderFileSystem.get(journalDirectory, mTachyonConf); if (!ufs.providesStorage()) { + // TODO: Should the journal really be allowed on a ufs without storage? + // This ufs doesn't provide storage. Allow the master to use this ufs for the journal. + LOG.info("Journal directory doesn't provide storage: " + journalDirectory); return true; } String[] files = ufs.list(journalDirectory); diff --git a/servers/src/main/java/tachyon/master/TachyonMasterFaultTolerant.java b/servers/src/main/java/tachyon/master/TachyonMasterFaultTolerant.java index 74953f691393..9ff12fefad8b 100644 --- a/servers/src/main/java/tachyon/master/TachyonMasterFaultTolerant.java +++ b/servers/src/main/java/tachyon/master/TachyonMasterFaultTolerant.java @@ -20,6 +20,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Preconditions; import com.google.common.base.Throwables; import tachyon.Constants; @@ -43,6 +44,7 @@ public final class TachyonMasterFaultTolerant extends TachyonMaster { public TachyonMasterFaultTolerant(TachyonConf tachyonConf) { super(tachyonConf); + Preconditions.checkArgument(tachyonConf.getBoolean(Constants.USE_ZOOKEEPER)); // Set up zookeeper specific functionality. try { @@ -78,15 +80,14 @@ public void start() throws Exception { while (true) { if (mLeaderSelectorClient.isLeader()) { - if (started) { - stopMasters(); - } + stopMasters(); startMasters(true); started = true; startServing(); } else { // This master should be standby, and not the leader if (isServing() || !started) { + // Need to transition this master to standby mode. stopServing(); stopMasters(); @@ -100,6 +101,7 @@ public void start() throws Exception { startMasters(false); started = true; } + // This master is already in standby mode. No further actions needed. } CommonUtils.sleepMs(LOG, 100); diff --git a/servers/src/main/java/tachyon/master/block/BlockMaster.java b/servers/src/main/java/tachyon/master/block/BlockMaster.java index f03e6e2f1b72..5f4ff0214b26 100644 --- a/servers/src/main/java/tachyon/master/block/BlockMaster.java +++ b/servers/src/main/java/tachyon/master/block/BlockMaster.java @@ -72,17 +72,22 @@ public final class BlockMaster extends MasterBase implements ContainerIdGenerator { private static final Logger LOG = LoggerFactory.getLogger(Constants.LOGGER_TYPE); + private final TachyonConf mTachyonConf; + /** Block metadata management. */ - /** Blocks on all workers, including active and lost blocks. This state must be journaled. */ + /** + * Blocks on all workers, including active and lost blocks. This state must be journaled. Access + * must be synchronized on mBlocks. If both block and worker metadata must be locked, mBlocks must + * be locked first. + */ private final Map mBlocks = new HashMap(); + /** + * Keeps track of block which are no longer in tachyon storage. Access must be synchronized on + * mBlocks. + */ + private final Set mLostBlocks = new HashSet(); /** This state must be journaled. */ private final BlockIdGenerator mBlockIdGenerator = new BlockIdGenerator(); - /** Keeps track of block which are no longer in tachyon storage. */ - private final Set mLostBlocks = new HashSet(); - /** Keeps track of workers which are no longer in communication with the master. */ - private final BlockingQueue mLostWorkers = - new LinkedBlockingQueue(); - private final TachyonConf mTachyonConf; /** Worker metadata management. */ private final IndexedSet.FieldIndex mIdIndex = @@ -100,12 +105,22 @@ public Object getFieldValue(MasterWorkerInfo o) { } }; @SuppressWarnings("unchecked") + /** + * All worker information. Access must be synchronized on mWorkers. If both block and worker + * metadata must be locked, mBlocks must be locked first. + */ private final IndexedSet mWorkers = new IndexedSet(mIdIndex, mAddressIndex); - /** The next worker id to use. This state must be journaled. */ - private final AtomicLong mNextWorkerId = new AtomicLong(1); + /** + * Keeps track of workers which are no longer in communication with the master. Access must be + * synchronized on mWorkers. + */ + private final BlockingQueue mLostWorkers = + new LinkedBlockingQueue(); /** The service that detects lost worker nodes, and tries to restart the failed workers. */ private Future mLostWorkerDetectionService; + /** The next worker id to use. This state must be journaled. */ + private final AtomicLong mNextWorkerId = new AtomicLong(1); /** * @param baseDirectory the base journal directory @@ -128,7 +143,7 @@ public TProcessor getProcessor() { } @Override - public String getProcessorName() { + public String getServiceName() { return Constants.BLOCK_MASTER_SERVICE_NAME; } @@ -196,13 +211,13 @@ public int getWorkerCount() { * RPC, and the internal web ui. */ public List getWorkerInfoList() { - List workerInfoList = new ArrayList(mWorkers.size()); synchronized (mWorkers) { + List workerInfoList = new ArrayList(mWorkers.size()); for (MasterWorkerInfo masterWorkerInfo : mWorkers) { workerInfoList.add(masterWorkerInfo.generateClientWorkerInfo()); } + return workerInfoList; } - return workerInfoList; } /** @@ -233,25 +248,19 @@ public long getUsedBytes() { return ret; } - /** - * @return set of block ids no longer in tachyon storage. - */ - public Set getLostBlocks() { - return Collections.unmodifiableSet(mLostBlocks); - } - /** * Gets info about the lost workers. Called by the internal web ui. * * @return a list of worker info */ public List getLostWorkersInfo() { - List ret = new ArrayList(); - - for (MasterWorkerInfo worker : mLostWorkers) { - ret.add(worker.generateClientWorkerInfo()); + synchronized (mWorkers) { + List ret = new ArrayList(mLostWorkers.size()); + for (MasterWorkerInfo worker : mLostWorkers) { + ret.add(worker.generateClientWorkerInfo()); + } + return ret; } - return ret; } /** @@ -260,20 +269,24 @@ public List getLostWorkersInfo() { * @param blockIds a list of block ids to remove from Tachyon space. */ public void removeBlocks(List blockIds) { - for (long blockId : blockIds) { - MasterBlockInfo masterBlockInfo = mBlocks.get(blockId); - if (masterBlockInfo == null) { - return; - } - for (long workerId : masterBlockInfo.getWorkers()) { - masterBlockInfo.removeWorker(workerId); - MasterWorkerInfo worker = mWorkers.getFirstByField(mIdIndex, workerId); - if (worker != null) { - worker.updateToRemovedBlock(true, blockId); + synchronized (mBlocks) { + synchronized (mWorkers) { + for (long blockId : blockIds) { + MasterBlockInfo masterBlockInfo = mBlocks.get(blockId); + if (masterBlockInfo == null) { + continue; + } + for (long workerId : masterBlockInfo.getWorkers()) { + masterBlockInfo.removeWorker(workerId); + MasterWorkerInfo worker = mWorkers.getFirstByField(mIdIndex, workerId); + if (worker != null) { + worker.updateToRemovedBlock(true, blockId); + } + } + // remove from lost blocks + mLostBlocks.remove(blockId); } } - // remove from lost blocks - mLostBlocks.remove(blockId); } } @@ -304,23 +317,26 @@ public void commitBlock(long workerId, long usedBytesOnTier, int tierAlias, long long length) { LOG.debug("Commit block from worker: {}", FormatUtils.parametersToString(workerId, usedBytesOnTier, blockId, length)); + synchronized (mBlocks) { + synchronized (mWorkers) { + MasterWorkerInfo workerInfo = mWorkers.getFirstByField(mIdIndex, workerId); + workerInfo.addBlock(blockId); + workerInfo.updateUsedBytes(tierAlias, usedBytesOnTier); + workerInfo.updateLastUpdatedTimeMs(); - MasterWorkerInfo workerInfo = mWorkers.getFirstByField(mIdIndex, workerId); - workerInfo.addBlock(blockId); - workerInfo.updateUsedBytes(tierAlias, usedBytesOnTier); - workerInfo.updateLastUpdatedTimeMs(); - - MasterBlockInfo masterBlockInfo = mBlocks.get(blockId); - if (masterBlockInfo == null) { - masterBlockInfo = new MasterBlockInfo(blockId, length); - mBlocks.put(blockId, masterBlockInfo); - // write new block info to journal. - writeJournalEntry( - new BlockInfoEntry(masterBlockInfo.getBlockId(), masterBlockInfo.getLength())); - flushJournal(); + MasterBlockInfo masterBlockInfo = mBlocks.get(blockId); + if (masterBlockInfo == null) { + masterBlockInfo = new MasterBlockInfo(blockId, length); + mBlocks.put(blockId, masterBlockInfo); + // write new block info to journal. + writeJournalEntry( + new BlockInfoEntry(masterBlockInfo.getBlockId(), masterBlockInfo.getLength())); + flushJournal(); + } + masterBlockInfo.addWorker(workerId, tierAlias); + mLostBlocks.remove(blockId); + } } - masterBlockInfo.addWorker(workerId, tierAlias); - mLostBlocks.remove(blockId); } /** @@ -331,16 +347,17 @@ public void commitBlock(long workerId, long usedBytesOnTier, int tierAlias, long * @param length the length of the block */ public void commitBlockInUFS(long blockId, long length) { - LOG.debug("Commit block: {}", FormatUtils.parametersToString(blockId, length)); - - MasterBlockInfo masterBlockInfo = mBlocks.get(blockId); - if (masterBlockInfo == null) { - masterBlockInfo = new MasterBlockInfo(blockId, length); - mBlocks.put(blockId, masterBlockInfo); - // write new block info to journal. - writeJournalEntry( - new BlockInfoEntry(masterBlockInfo.getBlockId(), masterBlockInfo.getLength())); - flushJournal(); + LOG.debug("Commit block to ufs: {}", FormatUtils.parametersToString(blockId, length)); + synchronized (mBlocks) { + MasterBlockInfo masterBlockInfo = mBlocks.get(blockId); + if (masterBlockInfo == null) { + masterBlockInfo = new MasterBlockInfo(blockId, length); + mBlocks.put(blockId, masterBlockInfo); + // write new block info to journal. + writeJournalEntry( + new BlockInfoEntry(masterBlockInfo.getBlockId(), masterBlockInfo.getLength())); + flushJournal(); + } } } @@ -350,12 +367,16 @@ public void commitBlockInUFS(long blockId, long length) { * @throws BlockInfoException */ public BlockInfo getBlockInfo(long blockId) throws BlockInfoException { - MasterBlockInfo masterBlockInfo = mBlocks.get(blockId); - if (masterBlockInfo == null) { - throw new BlockInfoException("Block info not found for " + blockId); + synchronized (mBlocks) { + MasterBlockInfo masterBlockInfo = mBlocks.get(blockId); + if (masterBlockInfo == null) { + throw new BlockInfoException("Block info not found for " + blockId); + } + // Construct the block info object to return. + synchronized (mWorkers) { + return generateBlockInfo(masterBlockInfo); + } } - // Construct the block info object to return. - return generateBlockInfo(masterBlockInfo); } /** @@ -367,14 +388,18 @@ public BlockInfo getBlockInfo(long blockId) throws BlockInfoException { */ public List getBlockInfoList(List blockIds) { List ret = new ArrayList(blockIds.size()); - for (long blockId : blockIds) { - MasterBlockInfo masterBlockInfo = mBlocks.get(blockId); - if (masterBlockInfo != null) { - // Construct the block info object to return. - ret.add(generateBlockInfo(masterBlockInfo)); + synchronized (mBlocks) { + synchronized (mWorkers) { + for (long blockId : blockIds) { + MasterBlockInfo masterBlockInfo = mBlocks.get(blockId); + if (masterBlockInfo != null) { + // Construct the block info object to return. + ret.add(generateBlockInfo(masterBlockInfo)); + } + } + return ret; } } - return ret; } /** @@ -431,7 +456,7 @@ public long getWorkerId(NetAddress workerNetAddress) { mWorkers.add(new MasterWorkerInfo(workerId, workerNetAddress)); // Write worker id to the journal. - writeJournalEntry(new WorkerIdGeneratorEntry(mNextWorkerId.get())); + writeJournalEntry(new WorkerIdGeneratorEntry(workerId)); flushJournal(); LOG.info("getWorkerId(): WorkerNetAddress: " + workerAddress + " id: " + workerId); @@ -450,26 +475,28 @@ public long getWorkerId(NetAddress workerNetAddress) { */ public long workerRegister(long workerId, List totalBytesOnTiers, List usedBytesOnTiers, Map> currentBlocksOnTiers) { - synchronized (mWorkers) { - if (!mWorkers.contains(mIdIndex, workerId)) { - LOG.warn("Could not find worker id: " + workerId + " to register."); - return -1L; - } - MasterWorkerInfo workerInfo = mWorkers.getFirstByField(mIdIndex, workerId); - workerInfo.updateLastUpdatedTimeMs(); + synchronized (mBlocks) { + synchronized (mWorkers) { + if (!mWorkers.contains(mIdIndex, workerId)) { + LOG.warn("Could not find worker id: " + workerId + " to register."); + return -1L; + } + MasterWorkerInfo workerInfo = mWorkers.getFirstByField(mIdIndex, workerId); + workerInfo.updateLastUpdatedTimeMs(); - // Gather all blocks on this worker. - HashSet blocks = new HashSet(); - for (List blockIds : currentBlocksOnTiers.values()) { - blocks.addAll(blockIds); - } + // Gather all blocks on this worker. + HashSet blocks = new HashSet(); + for (List blockIds : currentBlocksOnTiers.values()) { + blocks.addAll(blockIds); + } - // Detect any lost blocks on this worker. - Set removedBlocks = workerInfo.register(totalBytesOnTiers, usedBytesOnTiers, blocks); + // Detect any lost blocks on this worker. + Set removedBlocks = workerInfo.register(totalBytesOnTiers, usedBytesOnTiers, blocks); - processWorkerRemovedBlocks(workerInfo, removedBlocks); - processWorkerAddedBlocks(workerInfo, currentBlocksOnTiers); - LOG.info("registerWorker(): " + workerInfo); + processWorkerRemovedBlocks(workerInfo, removedBlocks); + processWorkerAddedBlocks(workerInfo, currentBlocksOnTiers); + LOG.info("registerWorker(): " + workerInfo); + } } return workerId; } @@ -487,39 +514,41 @@ public long workerRegister(long workerId, List totalBytesOnTiers, */ public Command workerHeartbeat(long workerId, List usedBytesOnTiers, List removedBlockIds, Map> addedBlocksOnTiers) { - synchronized (mWorkers) { - if (!mWorkers.contains(mIdIndex, workerId)) { - LOG.warn("Could not find worker id: " + workerId + " for heartbeat."); - return new Command(CommandType.Register, new ArrayList()); - } - MasterWorkerInfo workerInfo = mWorkers.getFirstByField(mIdIndex, workerId); - processWorkerRemovedBlocks(workerInfo, removedBlockIds); - processWorkerAddedBlocks(workerInfo, addedBlocksOnTiers); + synchronized (mBlocks) { + synchronized (mWorkers) { + if (!mWorkers.contains(mIdIndex, workerId)) { + LOG.warn("Could not find worker id: " + workerId + " for heartbeat."); + return new Command(CommandType.Register, new ArrayList()); + } + MasterWorkerInfo workerInfo = mWorkers.getFirstByField(mIdIndex, workerId); + processWorkerRemovedBlocks(workerInfo, removedBlockIds); + processWorkerAddedBlocks(workerInfo, addedBlocksOnTiers); - workerInfo.updateUsedBytes(usedBytesOnTiers); - workerInfo.updateLastUpdatedTimeMs(); + workerInfo.updateUsedBytes(usedBytesOnTiers); + workerInfo.updateLastUpdatedTimeMs(); - List toRemoveBlocks = workerInfo.getToRemoveBlocks(); - if (toRemoveBlocks.isEmpty()) { - return new Command(CommandType.Nothing, new ArrayList()); + List toRemoveBlocks = workerInfo.getToRemoveBlocks(); + if (toRemoveBlocks.isEmpty()) { + return new Command(CommandType.Nothing, new ArrayList()); + } + return new Command(CommandType.Free, toRemoveBlocks); } - return new Command(CommandType.Free, toRemoveBlocks); } } /** * Updates the worker and block metadata for blocks removed from a worker. * + * mBlocks should already be locked before calling this method. + * * @param workerInfo The worker metadata object * @param removedBlockIds A list of block ids removed from the worker */ private void processWorkerRemovedBlocks(MasterWorkerInfo workerInfo, Collection removedBlockIds) { - // TODO: lock mBlocks? for (long removedBlockId : removedBlockIds) { MasterBlockInfo masterBlockInfo = mBlocks.get(removedBlockId); if (masterBlockInfo == null) { - // TODO: throw exception? continue; } workerInfo.removeBlock(masterBlockInfo.getBlockId()); @@ -533,12 +562,13 @@ private void processWorkerRemovedBlocks(MasterWorkerInfo workerInfo, /** * Updates the worker and block metadata for blocks added to a worker. * + * mBlocks should already be locked before calling this method. + * * @param workerInfo The worker metadata object * @param addedBlockIds Mapping from StorageDirId to a list of block ids added to the directory. */ private void processWorkerAddedBlocks(MasterWorkerInfo workerInfo, Map> addedBlockIds) { - // TODO: lock mBlocks? for (Entry> blockIds : addedBlockIds.entrySet()) { long storageDirId = blockIds.getKey(); for (long blockId : blockIds.getValue()) { @@ -548,10 +578,8 @@ private void processWorkerAddedBlocks(MasterWorkerInfo workerInfo, // TODO: change upper API so that this is tier level or type, not storage dir id. int tierAlias = StorageDirId.getStorageLevelAliasValue(storageDirId); masterBlockInfo.addWorker(workerInfo.getId(), tierAlias); - mLostBlocks.remove(blockId); } else { - // TODO: throw exception? LOG.warn( "failed to register workerId: " + workerInfo.getId() + " to blockId: " + blockId); } @@ -560,6 +588,11 @@ private void processWorkerAddedBlocks(MasterWorkerInfo workerInfo, } /** + * Creates a {@link BlockInfo} form a given {@link MasterBlockInfo}, by populating worker + * locations. + * + * mWorkers should already be locked before calling this method. + * * @param masterBlockInfo the {@link MasterBlockInfo} * @return a {@link BlockInfo} from a {@link MasterBlockInfo}. Populates worker locations. */ @@ -585,9 +618,9 @@ public final class LostWorkerDetectionHeartbeatExecutor implements HeartbeatExec public void heartbeat() { LOG.debug("System status checking."); + int masterWorkerTimeoutMs = mTachyonConf.getInt(Constants.MASTER_WORKER_TIMEOUT_MS); synchronized (mWorkers) { for (MasterWorkerInfo worker : mWorkers) { - int masterWorkerTimeoutMs = mTachyonConf.getInt(Constants.MASTER_WORKER_TIMEOUT_MS); if (CommonUtils.getCurrentMs() - worker.getLastUpdatedTimeMs() > masterWorkerTimeoutMs) { LOG.error("The worker " + worker + " got timed out!"); mLostWorkers.add(worker); diff --git a/servers/src/main/java/tachyon/master/file/FileSystemMaster.java b/servers/src/main/java/tachyon/master/file/FileSystemMaster.java index 95a0ee0dde92..299074b96c2f 100644 --- a/servers/src/main/java/tachyon/master/file/FileSystemMaster.java +++ b/servers/src/main/java/tachyon/master/file/FileSystemMaster.java @@ -119,7 +119,7 @@ public TProcessor getProcessor() { } @Override - public String getProcessorName() { + public String getServiceName() { return Constants.FILE_SYSTEM_MASTER_SERVICE_NAME; } diff --git a/servers/src/main/java/tachyon/master/journal/JournalTailerThread.java b/servers/src/main/java/tachyon/master/journal/JournalTailerThread.java index fa05edb958c0..60cd76f88326 100644 --- a/servers/src/main/java/tachyon/master/journal/JournalTailerThread.java +++ b/servers/src/main/java/tachyon/master/journal/JournalTailerThread.java @@ -51,7 +51,7 @@ public JournalTailerThread(Master master, Journal journal) { * Initiate the shutdown of this tailer thread. */ public void shutdown() { - LOG.info(mMaster.getProcessorName() + " Journal tailer shutdown has been initiated."); + LOG.info(mMaster.getServiceName() + " Journal tailer shutdown has been initiated."); mInitiateShutdown = true; } @@ -70,7 +70,7 @@ public void shutdownAndJoin() { @Override public void run() { - LOG.info(mMaster.getProcessorName() + " Journal tailer started."); + LOG.info(mMaster.getServiceName() + " Journal tailer started."); // Continually loop loading the checkpoint file, and then loading all completed files. The loop // only repeats when the checkpoint file is updated after it was read. while (!mInitiateShutdown) { @@ -94,7 +94,7 @@ public void run() { } else if ((CommonUtils.getCurrentMs() - waitForShutdownStart) > SHUTDOWN_QUIET_WAIT_TIME_MS) { // There have been no new logs for the quiet period. Shutdown now. - LOG.info(mMaster.getProcessorName() + LOG.info(mMaster.getServiceName() + " Journal tailer has been shutdown. No new logs for the quiet period."); return; } @@ -110,6 +110,6 @@ public void run() { LOG.error(ioe.getMessage()); } } - LOG.info(mMaster.getProcessorName() + " Journal tailer has been shutdown."); + LOG.info(mMaster.getServiceName() + " Journal tailer has been shutdown."); } } diff --git a/servers/src/main/java/tachyon/master/rawtable/RawTableMaster.java b/servers/src/main/java/tachyon/master/rawtable/RawTableMaster.java index d97df4bec210..47e84bebcc0c 100644 --- a/servers/src/main/java/tachyon/master/rawtable/RawTableMaster.java +++ b/servers/src/main/java/tachyon/master/rawtable/RawTableMaster.java @@ -76,7 +76,7 @@ public TProcessor getProcessor() { } @Override - public String getProcessorName() { + public String getServiceName() { return Constants.RAW_TABLE_MASTER_SERVICE_NAME; } @@ -98,17 +98,11 @@ public void streamToJournalCheckpoint(JournalOutputStream outputStream) throws I @Override public void start(boolean isLeader) throws IOException { super.start(isLeader); - if (isLeaderMode()) { - // TODO: start periodic heartbeat threads. - } } @Override public void stop() throws IOException { super.stop(); - if (isLeaderMode()) { - // TODO: stop heartbeat threads. - } } /**