Skip to content

Commit

Permalink
ACCUMULO-578 merge sandbox to trunk
Browse files Browse the repository at this point in the history
git-svn-id: https://svn.apache.org/repos/asf/accumulo/trunk@1346380 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information
Eric C. Newton committed Jun 5, 2012
1 parent 8ac33bb commit 2db5ce6
Show file tree
Hide file tree
Showing 88 changed files with 3,277 additions and 20,262 deletions.
6 changes: 2 additions & 4 deletions bin/tdown.sh
Expand Up @@ -29,7 +29,6 @@ echo 'stopping unresponsive tablet servers (if any) ...'
for server in `cat $SLAVES | grep -v '^#' `; do
# only start if there's not one already running
$ACCUMULO_HOME/bin/stop-server.sh $server "$ACCUMULO_HOME/.*/accumulo-start.*.jar" tserver TERM &
$ACCUMULO_HOME/bin/stop-server.sh $server "$ACCUMULO_HOME/.*/accumulo-start.*.jar" logger TERM &
done

sleep 10
Expand All @@ -38,8 +37,7 @@ echo 'stopping unresponsive tablet servers hard (if any) ...'
for server in `cat $SLAVES | grep -v '^#' `; do
# only start if there's not one already running
$ACCUMULO_HOME/bin/stop-server.sh $server "$ACCUMULO_HOME/.*/accumulo-start.*.jar" tserver KILL &
$ACCUMULO_HOME/bin/stop-server.sh $server "$ACCUMULO_HOME/.*/accumulo-start.*.jar" logger KILL &
done

echo 'Cleaning tablet server and logger entries from zookeeper'
$ACCUMULO_HOME/bin/accumulo org.apache.accumulo.server.util.ZooZap -tservers -loggers
echo 'Cleaning tablet server entries from zookeeper'
$ACCUMULO_HOME/bin/accumulo org.apache.accumulo.server.util.ZooZap -tservers
3 changes: 1 addition & 2 deletions bin/tup.sh
Expand Up @@ -23,13 +23,12 @@ bin=`cd "$bin"; pwd`

SLAVES=$ACCUMULO_HOME/conf/slaves

echo -n "Starting tablet servers and loggers ..."
echo -n "Starting tablet servers ..."

count=1
for server in `grep -v '^#' "$SLAVES"`
do
echo -n "."
${bin}/start-server.sh $server logger &
${bin}/start-server.sh $server tserver "tablet server" &
count=`expr $count + 1`
if [ `expr $count % 72` -eq 0 ] ;
Expand Down
12 changes: 9 additions & 3 deletions core/src/main/java/org/apache/accumulo/core/Constants.java
Expand Up @@ -62,9 +62,6 @@ public class Constants {

public static final String ZDEAD = "/dead";
public static final String ZDEADTSERVERS = "/dead/tservers";
public static final String ZDEADLOGGERS = "/dead/loggers";

public static final String ZLOGGERS = "/loggers";

public static final String ZTRACERS = "/tracers";

Expand All @@ -78,6 +75,7 @@ public class Constants {
public static final String ZNEXT_FILE = "/next_file";

public static final String ZHDFS_RESERVATIONS = "/hdfs_reservations";
public static final String ZRECOVERY = "/recovery";

public static final String METADATA_TABLE_ID = "!0";
public static final String METADATA_TABLE_NAME = "!METADATA";
Expand Down Expand Up @@ -184,4 +182,12 @@ public static String getRootTabletDir(AccumuloConfiguration conf) {
return getMetadataTableDir(conf) + ZROOT_TABLET;
}

/**
* @param conf
* @return
*/
public static String getWalDirectory(AccumuloConfiguration conf) {
return getBaseDir(conf) + "/wal";
}

}
Expand Up @@ -77,6 +77,7 @@
import org.apache.accumulo.core.util.CachedConfiguration;
import org.apache.accumulo.core.util.LocalityGroupUtil;
import org.apache.accumulo.core.util.MetadataTable;
import org.apache.accumulo.core.util.NamingThreadFactory;
import org.apache.accumulo.core.util.OpTimer;
import org.apache.accumulo.core.util.StringUtil;
import org.apache.accumulo.core.util.TextUtil;
Expand Down Expand Up @@ -386,7 +387,7 @@ public void addSplits(String tableName, SortedSet<Text> partitionKeys) throws Ta
CountDownLatch latch = new CountDownLatch(splits.size());
AtomicReference<Exception> exception = new AtomicReference<Exception>(null);

ExecutorService executor = Executors.newFixedThreadPool(16);
ExecutorService executor = Executors.newFixedThreadPool(16, new NamingThreadFactory("addSplits"));
try {
executor.submit(new SplitTask(new SplitEnv(tableName, tableId, executor, latch, exception), splits));

Expand Down
Expand Up @@ -23,10 +23,8 @@
import java.util.NoSuchElementException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
Expand All @@ -42,6 +40,7 @@
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.security.thrift.AuthInfo;
import org.apache.accumulo.core.util.NamingThreadFactory;
import org.apache.hadoop.io.Text;
import org.apache.log4j.Logger;

Expand Down Expand Up @@ -70,19 +69,8 @@ public class ScannerIterator implements Iterator<Entry<Key,Value>> {

private static final List<KeyValue> EMPTY_LIST = Collections.emptyList();

private static AtomicInteger threadCounter = new AtomicInteger(1);

private static ThreadPoolExecutor readaheadPool = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 3l, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
new ThreadFactory() {

@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setDaemon(true);
t.setName("Accumulo scanner read ahead thread " + threadCounter.getAndIncrement());
return t;
}
});
new NamingThreadFactory("Accumulo scanner read ahead thread"));

private class Reader implements Runnable {

Expand Down
Expand Up @@ -21,11 +21,6 @@
import java.util.Iterator;
import java.util.Map.Entry;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.BatchScanner;
Expand All @@ -36,6 +31,7 @@
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.security.thrift.AuthInfo;
import org.apache.accumulo.core.util.ArgumentChecker;
import org.apache.accumulo.core.util.SimpleThreadPool;
import org.apache.log4j.Logger;

public class TabletServerBatchReader extends ScannerOptions implements BatchScanner {
Expand All @@ -59,25 +55,6 @@ private static synchronized int getNextBatchReaderInstance() {

private final int batchReaderInstance = getNextBatchReaderInstance();

private static class BatchReaderThreadFactory implements ThreadFactory {

private ThreadFactory dtf = Executors.defaultThreadFactory();
private int threadNum = 1;
private final int batchReaderInstance;

BatchReaderThreadFactory(int batchReaderInstance) {
this.batchReaderInstance = batchReaderInstance;
}

public Thread newThread(Runnable r) {
Thread thread = dtf.newThread(r);
thread.setName("batch scanner " + batchReaderInstance + "-" + threadNum++);
thread.setDaemon(true);
return thread;
}

}

public TabletServerBatchReader(Instance instance, AuthInfo credentials, String table, Authorizations authorizations, int numQueryThreads) {
ArgumentChecker.notNull(instance, credentials, table, authorizations);
this.instance = instance;
Expand All @@ -86,8 +63,7 @@ public TabletServerBatchReader(Instance instance, AuthInfo credentials, String t
this.table = table;
this.numThreads = numQueryThreads;

queryThreadPool = new ThreadPoolExecutor(numQueryThreads, numQueryThreads, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(),
new BatchReaderThreadFactory(batchReaderInstance));
queryThreadPool = new SimpleThreadPool(numQueryThreads, "batch scanner " + batchReaderInstance + "-");

ranges = null;
}
Expand Down
Expand Up @@ -60,6 +60,7 @@
import org.apache.accumulo.core.tabletserver.thrift.NoSuchScanIDException;
import org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException;
import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
import org.apache.accumulo.core.util.NamingThreadFactory;
import org.apache.accumulo.core.util.ThriftUtil;
import org.apache.hadoop.io.Text;
import org.apache.log4j.Logger;
Expand Down Expand Up @@ -545,7 +546,7 @@ private class MutationWriter {
public MutationWriter(int numSendThreads) {
serversMutations = new HashMap<String,TabletServerMutations>();
queued = new HashSet<String>();
sendThreadPool = Executors.newFixedThreadPool(numSendThreads);
sendThreadPool = Executors.newFixedThreadPool(numSendThreads, new NamingThreadFactory(this.getClass().getName()));
}

private void binMutations(MutationSet mutationsToProcess, Map<String,TabletServerMutations> binnedMutations) {
Expand Down
35 changes: 10 additions & 25 deletions core/src/main/java/org/apache/accumulo/core/conf/Property.java
Expand Up @@ -58,14 +58,14 @@ public enum Property {
MASTER_CLIENTPORT("master.port.client", "9999", PropertyType.PORT, "The port used for handling client connections on the master"),
MASTER_TABLET_BALANCER("master.tablet.balancer", "org.apache.accumulo.server.master.balancer.TableLoadBalancer", PropertyType.CLASSNAME,
"The balancer class that accumulo will use to make tablet assignment and migration decisions."),
MASTER_LOGGER_BALANCER("master.logger.balancer", "org.apache.accumulo.server.master.balancer.SimpleLoggerBalancer", PropertyType.CLASSNAME,
"The balancer class that accumulo will use to make logger assignment decisions."),
MASTER_RECOVERY_MAXAGE("master.recovery.max.age", "60m", PropertyType.TIMEDURATION, "Recovery files older than this age will be removed."),
MASTER_RECOVERY_MAXTIME("master.recovery.time.max", "30m", PropertyType.TIMEDURATION, "The maximum time to attempt recovery before giving up"),
MASTER_BULK_RETRIES("master.bulk.retries", "3", PropertyType.COUNT, "The number of attempts to bulk-load a file before giving up."),
MASTER_BULK_THREADPOOL_SIZE("master.bulk.threadpool.size", "5", PropertyType.COUNT, "The number of threads to use when coordinating a bulk-import."),
MASTER_MINTHREADS("master.server.threads.minimum", "2", PropertyType.COUNT, "The minimum number of threads to use to handle incoming requests."),
MASTER_THREADCHECK("master.server.threadcheck.time", "1s", PropertyType.TIMEDURATION, "The time between adjustments of the server thread pool."),
MASTER_RECOVERY_DELAY("master.recovery.delay", "10s", PropertyType.TIMEDURATION,
"When a tablet server's lock is deleted, it takes time for it to completely quit. This delay gives it time before log recoveries begin."),

// properties that are specific to tablet server behavior
TSERV_PREFIX("tserver.", null, PropertyType.PREFIX, "Properties in this category affect the behavior of the tablet servers"),
Expand Down Expand Up @@ -115,10 +115,6 @@ public enum Property {
TSERV_BLOOM_LOAD_MAXCONCURRENT("tserver.bloom.load.concurrent.max", "4", PropertyType.COUNT,
"The number of concurrent threads that will load bloom filters in the background. "
+ "Setting this to zero will make bloom filters load in the foreground."),
TSERV_LOGGER_TIMEOUT("tserver.logger.timeout", "30s", PropertyType.TIMEDURATION, "The time to wait for a logger to respond to a write-ahead request"),
TSERV_LOGGER_COUNT("tserver.logger.count", "2", PropertyType.COUNT, "The number of loggers that each tablet server should use."),
TSERV_LOGGER_STRATEGY("tserver.logger.strategy", "org.apache.accumulo.server.tabletserver.log.RoundRobinLoggerStrategy", PropertyType.STRING,
"The classname used to decide which loggers to use."),
TSERV_MONITOR_FS(
"tserver.monitor.fs",
"true",
Expand Down Expand Up @@ -148,29 +144,18 @@ public enum Property {
TSERV_HOLD_TIME_SUICIDE("tserver.hold.time.max", "5m", PropertyType.TIMEDURATION,
"The maximum time for a tablet server to be in the \"memory full\" state. If the tablet server cannot write out memory"
+ " in this much time, it will assume there is some failure local to its node, and quit. A value of zero is equivalent to forever."),
TSERV_WAL_BLOCKSIZE("tserver.wal.blocksize", "0", PropertyType.MEMORY,
"The size of the HDFS blocks used to write to the Write-Ahead log. If zero, it will be 110% of tserver.walog.max.size (that is, try to use just one block)"),
TSERV_WAL_REPLICATION("tserver.wal.replication", "0", PropertyType.COUNT,
"The replication to use when writing the Write-Ahead log to HDFS. If zero, it will use the HDFS default replication setting."),
TSERV_RECOVERY_MAX_CONCURRENT("tserver.recovery.concurrent.max", "2", PropertyType.COUNT, "The maximum number of threads to use to sort logs during recovery"),
TSERV_SORT_BUFFER_SIZE("tserver.sort.buffer.size", "200M", PropertyType.MEMORY, "The amount of memory to use when sorting logs during recovery."),
TSERV_ARCHIVE_WALOGS("tserver.archive.walogs", "false", PropertyType.BOOLEAN, "Keep copies of the WALOGs for debugging purposes"),

// properties that are specific to logger server behavior
LOGGER_PREFIX("logger.", null, PropertyType.PREFIX, "Properties in this category affect the behavior of the write-ahead logger servers"),
LOGGER_PORT("logger.port.client", "11224", PropertyType.PORT, "The port used for write-ahead logger services"),
LOGGER_COPY_THREADPOOL_SIZE("logger.copy.threadpool.size", "2", PropertyType.COUNT,
"size of the thread pool used to copy files from the local log area to HDFS"),
LOGGER_DIR("logger.dir.walog", "walogs", PropertyType.PATH,
"The directory used to store write-ahead logs on the local filesystem. It is possible to specify a comma-separated list of directories."),
LOGGER_PORTSEARCH("logger.port.search", "false", PropertyType.BOOLEAN, "if the port above is in use, search higher ports until one is available"),
LOGGER_ARCHIVE("logger.archive", "false", PropertyType.BOOLEAN, "determines if logs are archived in hdfs"),
LOGGER_ARCHIVE_REPLICATION("logger.archive.replication", "0", PropertyType.COUNT,
"determines the replication factor for walogs archived in hdfs, set to zero to use default"),
LOGGER_MONITOR_FS(
"logger.monitor.fs",
"true",
PropertyType.BOOLEAN,
"When enabled the logger will monitor file systems and kill itself when one switches from rw to ro. This is usually and indication that Linux has detected a bad disk."),
LOGGER_SORT_BUFFER_SIZE("logger.sort.buffer.size", "200M", PropertyType.MEMORY,
"The amount of memory to use when sorting logs during recovery. Only used when *not* sorting logs with map/reduce."),
LOGGER_RECOVERY_FILE_REPLICATION("logger.recovery.file.replication", "2", PropertyType.COUNT,
"When a logger puts a WALOG into HDFS, it will use this as the replication factor."),
LOGGER_MINTHREADS("logger.server.threads.minimum", "2", PropertyType.COUNT, "The miniumum number of threads to use to handle incoming requests."),
LOGGER_THREADCHECK("logger.server.threadcheck.time", "1s", PropertyType.TIMEDURATION, "The time between adjustments of the server thread pool."),

// accumulo garbage collector properties
GC_PREFIX("gc.", null, PropertyType.PREFIX, "Properties in this category affect the behavior of the accumulo garbage collector."),
Expand Down Expand Up @@ -347,7 +332,7 @@ public static boolean isValidTablePropertyKey(String key) {
}

private static final EnumSet<Property> fixedProperties = EnumSet.of(Property.TSERV_CLIENTPORT, Property.TSERV_NATIVEMAP_ENABLED,
Property.TSERV_SCAN_MAX_OPENFILES, Property.TSERV_LOGGER_COUNT, Property.LOGGER_PORT, Property.MASTER_CLIENTPORT, Property.GC_PORT);
Property.TSERV_SCAN_MAX_OPENFILES, Property.MASTER_CLIENTPORT, Property.GC_PORT);

public static boolean isFixedZooPropertyKey(Property key) {
return fixedProperties.contains(key);
Expand Down
Expand Up @@ -29,10 +29,9 @@
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
Expand All @@ -51,6 +50,7 @@
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import org.apache.accumulo.core.util.CachedConfiguration;
import org.apache.accumulo.core.util.LoggingRunnable;
import org.apache.accumulo.core.util.NamingThreadFactory;
import org.apache.accumulo.start.classloader.AccumuloClassLoader;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
Expand All @@ -68,19 +68,6 @@ public class BloomFilterLayer {
public static final String BLOOM_FILE_NAME = "acu_bloom";
public static final int HASH_COUNT = 5;

private static class BloomLoaderThreadFactory implements ThreadFactory {

private ThreadFactory dtf = Executors.defaultThreadFactory();
private int threadNum = 1;

public Thread newThread(Runnable r) {
Thread thread = dtf.newThread(r);
thread.setName("bloom-loader-" + threadNum++);
thread.setDaemon(true);
return thread;
}
}

private static ExecutorService loadThreadPool = null;

private static synchronized ExecutorService getLoadThreadPool(int maxLoadThreads) {
Expand All @@ -89,7 +76,8 @@ private static synchronized ExecutorService getLoadThreadPool(int maxLoadThreads
}

if (maxLoadThreads > 0) {
loadThreadPool = new ThreadPoolExecutor(0, maxLoadThreads, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new BloomLoaderThreadFactory());
BlockingQueue<Runnable> q = new LinkedBlockingQueue<Runnable>();
loadThreadPool = new ThreadPoolExecutor(0, maxLoadThreads, 60, TimeUnit.SECONDS, q, new NamingThreadFactory("bloom-loader"));
}

return loadThreadPool;
Expand Down
Expand Up @@ -28,6 +28,7 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;

import org.apache.accumulo.core.util.NamingThreadFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

Expand Down Expand Up @@ -96,7 +97,7 @@ public class LruBlockCache implements BlockCache, HeapSize {
private final EvictionThread evictionThread;

/** Statistics thread schedule pool (for heavy debugging, could remove) */
private final ScheduledExecutorService scheduleThreadPool = Executors.newScheduledThreadPool(1);
private final ScheduledExecutorService scheduleThreadPool = Executors.newScheduledThreadPool(1, new NamingThreadFactory("LRUBlockCacheStats"));

/** Current size of cache */
private final AtomicLong size;
Expand Down

0 comments on commit 2db5ce6

Please sign in to comment.