Skip to content

Commit

Permalink
move AsyncEvictor out of TieredBlockStore
Browse files Browse the repository at this point in the history
  • Loading branch information
Mingfei committed Sep 23, 2015
1 parent c49f162 commit 62d8872
Show file tree
Hide file tree
Showing 8 changed files with 153 additions and 305 deletions.
107 changes: 107 additions & 0 deletions servers/src/main/java/tachyon/worker/block/AsyncEvictor.java
@@ -0,0 +1,107 @@
/*
* Licensed to the University of California, Berkeley under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/

package tachyon.worker.block;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.Semaphore;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import tachyon.Constants;
import tachyon.Pair;
import tachyon.Sessions;
import tachyon.exception.AlreadyExistsException;
import tachyon.exception.InvalidStateException;
import tachyon.exception.NotFoundException;
import tachyon.exception.OutOfSpaceException;
import tachyon.worker.WorkerContext;

public class AsyncEvictor implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(Constants.LOGGER_TYPE);
private final BlockStore mBlockStore;
private final List<Pair<BlockStoreLocation, Long>> mReservedBytesOnTiers =
new ArrayList<Pair<BlockStoreLocation, Long>>();
private final Timer mTimer;
private final Semaphore mSemaphore = new Semaphore(1);
private final Thread mEvictorThread;

public AsyncEvictor(BlockStore blockStore) {
mBlockStore = blockStore;
List<Long> capOnTiers = blockStore.getBlockStoreMeta().getCapacityBytesOnTiers();
long lastTierReservedBytes = 0;
for (int idx = 0; idx < capOnTiers.size(); idx ++) {
if (capOnTiers.get(idx) == 0) {
// If this tier is empty, just skip it
break;
}
String tierReservedSpaceProp =
String.format(Constants.WORKER_TIERED_STORAGE_LEVEL_RESERVED_RATIO_FORMAT, idx);
long reservedSpaceBytes =
(long)(capOnTiers.get(idx) * WorkerContext.getConf().getDouble(tierReservedSpaceProp));
mReservedBytesOnTiers.add(new Pair<BlockStoreLocation, Long>(
BlockStoreLocation.anyDirInTier(idx + 1), reservedSpaceBytes + lastTierReservedBytes));
lastTierReservedBytes += reservedSpaceBytes;
}
mEvictorThread = new Thread(this);
mEvictorThread.setDaemon(true);
mEvictorThread.setName("async-evictor");
mTimer = new Timer(true);
}

public void initialize() {
mEvictorThread.start();
mTimer.schedule(new TimerTask() {
public void run() {
mSemaphore.release();
}
}, 0L, WorkerContext.getConf().getLong(Constants
.WORKER_TIERED_STORAGE_EVICTION_ASYNC_PERIOD_MS_FORMAT));
}

@Override
public void run() {
try {
while (true) {
mSemaphore.acquire();
for (int tierIdx = mReservedBytesOnTiers.size() - 1; tierIdx >= 0 ; tierIdx --) {
Pair<BlockStoreLocation, Long> bytesReservedOnTier = mReservedBytesOnTiers.get(tierIdx);
BlockStoreLocation location = bytesReservedOnTier.getFirst();
long bytesReserved = bytesReservedOnTier.getSecond();
try {
mBlockStore.freeSpace(Sessions.DATASERVER_SESSION_ID, bytesReserved, location);
} catch (OutOfSpaceException e) {
LOG.warn(e.getMessage());
} catch (NotFoundException e) {
LOG.warn(e.getMessage());
} catch (AlreadyExistsException e) {
LOG.warn(e.getMessage());
} catch (InvalidStateException e) {
LOG.warn(e.getMessage());
} catch (IOException e) {
LOG.warn(e.getMessage());
}
}
}
} catch (InterruptedException e) {
LOG.info("Asynchronous evictor exits!");
}
}
}
20 changes: 10 additions & 10 deletions servers/src/main/java/tachyon/worker/block/BlockDataManager.java
Expand Up @@ -18,7 +18,6 @@
import java.io.IOException; import java.io.IOException;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ExecutorService;


import org.apache.thrift.TException; import org.apache.thrift.TException;


Expand Down Expand Up @@ -74,6 +73,8 @@ public final class BlockDataManager implements Testable<BlockDataManager> {
private Sessions mSessions; private Sessions mSessions;
/** Id of this worker */ /** Id of this worker */
private long mWorkerId; private long mWorkerId;
/** Asynchronous evictor for the block store */
private AsyncEvictor mAsyncEvictor;


/** /**
* Creates a BlockDataManager based on the configuration values. * Creates a BlockDataManager based on the configuration values.
Expand Down Expand Up @@ -109,6 +110,14 @@ public BlockDataManager(WorkerSource workerSource,
// Register the heartbeat reporter so it can record block store changes // Register the heartbeat reporter so it can record block store changes
mBlockStore.registerBlockStoreEventListener(mHeartbeatReporter); mBlockStore.registerBlockStoreEventListener(mHeartbeatReporter);
mBlockStore.registerBlockStoreEventListener(mMetricsReporter); mBlockStore.registerBlockStoreEventListener(mMetricsReporter);

// Check whether to start asynchronous evictor
if (mTachyonConf.getBoolean(Constants.WORKER_EVICT_ASYNC_ENABLE)) {
mAsyncEvictor = new AsyncEvictor(mBlockStore);
mAsyncEvictor.initialize();
} else {
mAsyncEvictor = null;
}
} }


class PrivateAccess { class PrivateAccess {
Expand Down Expand Up @@ -314,15 +323,6 @@ public void createBlockRemote(long sessionId, long blockId, int tierAlias, long
FileUtils.createBlockPath(createdBlock.getPath()); FileUtils.createBlockPath(createdBlock.getPath());
} }


/**
* Gets the block store which contains all blocks in the manager.
*
* @return the block store
*/
public BlockStore getBlockStore() {
return mBlockStore;
}

/** /**
* Opens a {@link BlockWriter} for an existing temporary block. This method is only called from a * Opens a {@link BlockWriter} for an existing temporary block. This method is only called from a
* data server. * data server.
Expand Down
34 changes: 33 additions & 1 deletion servers/src/main/java/tachyon/worker/block/BlockMasterSync.java
Expand Up @@ -17,6 +17,7 @@


import java.io.IOException; import java.io.IOException;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;


import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
Expand All @@ -25,6 +26,8 @@
import tachyon.Sessions; import tachyon.Sessions;
import tachyon.client.WorkerBlockMasterClient; import tachyon.client.WorkerBlockMasterClient;
import tachyon.conf.TachyonConf; import tachyon.conf.TachyonConf;
import tachyon.exception.InvalidStateException;
import tachyon.exception.NotFoundException;
import tachyon.thrift.Command; import tachyon.thrift.Command;
import tachyon.thrift.NetAddress; import tachyon.thrift.NetAddress;
import tachyon.util.CommonUtils; import tachyon.util.CommonUtils;
Expand All @@ -45,6 +48,7 @@
*/ */
public final class BlockMasterSync implements Runnable { public final class BlockMasterSync implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(Constants.LOGGER_TYPE); private static final Logger LOG = LoggerFactory.getLogger(Constants.LOGGER_TYPE);
private static final int DEFAULT_BLOCK_REMOVER_POOL_SIZE = 10;
/** Block data manager responsible for interacting with Tachyon and UFS storage */ /** Block data manager responsible for interacting with Tachyon and UFS storage */
private final BlockDataManager mBlockDataManager; private final BlockDataManager mBlockDataManager;
/** The net address of the worker */ /** The net address of the worker */
Expand Down Expand Up @@ -82,7 +86,6 @@ public final class BlockMasterSync implements Runnable {


mRunning = true; mRunning = true;
mWorkerId = 0; mWorkerId = 0;
mExecutorService = executorService;
} }


/** /**
Expand Down Expand Up @@ -211,4 +214,33 @@ private void handleMasterCommand(Command cmd) throws Exception {
throw new RuntimeException("Un-recognized command from master " + cmd); throw new RuntimeException("Un-recognized command from master " + cmd);
} }
} }

/**
* Thread to remove block from master
*/
private class BlockRemover implements Runnable {
private BlockDataManager mBlockDataManager;
private long mSessionId;
private long mBlockId;

public BlockRemover(BlockDataManager blockDataManager, long sessionId, long blockId) {
mBlockDataManager = blockDataManager;
mSessionId = sessionId;
mBlockId = blockId;
}

@Override
public void run() {
try {
mBlockDataManager.removeBlock(mSessionId, mBlockId);
} catch (IOException ioe) {
LOG.warn("Failed master free block cmd for: " + mBlockId + " due to concurrent read.");
} catch (InvalidStateException e) {
LOG.warn("Failed master free block cmd for: " + mBlockId + " due to block uncommitted.");
} catch (NotFoundException e) {
LOG.warn("Failed master free block cmd for: " + mBlockId + " due to block not found.");
}
}

}
} }
72 changes: 0 additions & 72 deletions servers/src/main/java/tachyon/worker/block/BlockMover.java

This file was deleted.

64 changes: 0 additions & 64 deletions servers/src/main/java/tachyon/worker/block/BlockRemover.java

This file was deleted.

9 changes: 1 addition & 8 deletions servers/src/main/java/tachyon/worker/block/BlockWorker.java
Expand Up @@ -96,8 +96,6 @@ public final class BlockWorker {
private final UIWebServer mWebServer; private final UIWebServer mWebServer;
/** Worker metrics system */ /** Worker metrics system */
private MetricsSystem mWorkerMetricsSystem; private MetricsSystem mWorkerMetricsSystem;
/** Shared executor service */
private ExecutorService mSharedExecutor;


/** /**
* @return the worker service handler * @return the worker service handler
Expand Down Expand Up @@ -170,11 +168,6 @@ public BlockWorker() throws IOException {
NetworkAddressUtils.getConnectAddress(ServiceType.MASTER_RPC, mTachyonConf), NetworkAddressUtils.getConnectAddress(ServiceType.MASTER_RPC, mTachyonConf),
mMasterClientExecutorService, mTachyonConf); mMasterClientExecutorService, mTachyonConf);


// Set up ExecutorService
mSharedExecutor = Executors.newFixedThreadPool(
mTachyonConf.getInt(Constants.WORKER_SHARED_EXECUTOR_CORES),
ThreadFactoryUtils.build("shared-executor-%d", false));

// Set up BlockDataManager // Set up BlockDataManager
WorkerSource workerSource = new WorkerSource(); WorkerSource workerSource = new WorkerSource();
mBlockDataManager = mBlockDataManager =
Expand Down Expand Up @@ -219,7 +212,7 @@ public BlockWorker() throws IOException {
Executors.newFixedThreadPool(3, ThreadFactoryUtils.build("worker-heartbeat-%d", true)); Executors.newFixedThreadPool(3, ThreadFactoryUtils.build("worker-heartbeat-%d", true));


mBlockMasterSync = new BlockMasterSync(mBlockDataManager, mWorkerNetAddress, mBlockMasterSync = new BlockMasterSync(mBlockDataManager, mWorkerNetAddress,
mBlockMasterClient); mBlockMasterClient);
// Get the worker id // Get the worker id
// TODO(calvin): Do this at TachyonWorker. // TODO(calvin): Do this at TachyonWorker.
mBlockMasterSync.setWorkerId(); mBlockMasterSync.setWorkerId();
Expand Down

0 comments on commit 62d8872

Please sign in to comment.