Skip to content

Commit

Permalink
Fixes to make BinaryMemtable useful. Highlights are configurable thre…
Browse files Browse the repository at this point in the history
…ads for [binary]memtable flushing and flushAndShutdown JMX/nodeprobe directive.

patch by Chris Goffinet; reviewed by jbellis for CASSANDRA-337

git-svn-id: https://svn.apache.org/repos/asf/incubator/cassandra/trunk@808942 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information
jbellis committed Aug 28, 2009
1 parent 29515c1 commit dc79416
Show file tree
Hide file tree
Showing 11 changed files with 161 additions and 27 deletions.
15 changes: 15 additions & 0 deletions conf/storage-conf.xml
Expand Up @@ -308,4 +308,19 @@
~ ten days.
-->
<GCGraceSeconds>864000</GCGraceSeconds>

<!--
~ Number of threads to run when flushing memtables to disk. Set this to
~ the number of disks you physically have in your machine allocated for DataDirectory * 2.
~ If you are planning to use the Binary Memtable, its recommended to increase the max threads
~ to maintain a higher quality of service while under load when normal memtables are flushing to disk.
-->
<FlushMinThreads>1</FlushMinThreads>
<FlushMaxThreads>1</FlushMaxThreads>

<!--
~ The threshold size in megabytes the binary memtable must grow to, before it's submitted for flushing to disk.
-->
<BinaryMemtableSizeInMB>256</BinaryMemtableSizeInMB>

</Storage>
36 changes: 36 additions & 0 deletions src/java/org/apache/cassandra/config/DatabaseDescriptor.java
Expand Up @@ -74,6 +74,9 @@ public static enum CommitLogSync {
private static int slicedReadBufferSizeInKB_ = 64;
private static List<String> tables_ = new ArrayList<String>();
private static Set<String> applicationColumnFamilies_ = new HashSet<String>();
private static int flushMinThreads_ = 1;
private static int flushMaxThreads_ = 1;
private static int bmtThreshold_ = 256;

// Default descriptive names for introspection. The user can override
// these choices in the config file. These are not case sensitive.
Expand Down Expand Up @@ -271,6 +274,24 @@ else if (commitLogSync_ == CommitLogSync.batch)
slicedReadBufferSizeInKB_ = Integer.parseInt(rawSlicedBuffer);
}

String rawflushMinThreads = xmlUtils.getNodeValue("/Storage/FlushMinThreads");
if (rawflushMinThreads != null)
{
flushMinThreads_ = Integer.parseInt(rawflushMinThreads);
}

String rawflushMaxThreads = xmlUtils.getNodeValue("/Storage/FlushMaxThreads");
if (rawflushMaxThreads != null)
{
flushMaxThreads_ = Integer.parseInt(rawflushMaxThreads);
}

String bmtThreshold = xmlUtils.getNodeValue("/Storage/BinaryMemtableSizeInMB");
if (bmtThreshold != null)
{
bmtThreshold_ = Integer.parseInt(bmtThreshold);
}

/* TCP port on which the storage system listens */
String port = xmlUtils.getNodeValue("/Storage/StoragePort");
if ( port != null )
Expand Down Expand Up @@ -999,4 +1020,19 @@ public static int getSlicedReadBufferSizeInKB()
{
return slicedReadBufferSizeInKB_;
}

public static int getFlushMinThreads()
{
return flushMinThreads_;
}

public static int getFlushMaxThreads()
{
return flushMaxThreads_;
}

public static int getBMTThreshold()
{
return bmtThreshold_;
}
}
29 changes: 26 additions & 3 deletions src/java/org/apache/cassandra/db/BinaryMemtable.java
Expand Up @@ -34,11 +34,13 @@

import org.apache.log4j.Logger;
import org.cliffc.high_scale_lib.NonBlockingHashMap;
import java.util.*;
import org.apache.cassandra.dht.IPartitioner;

public class BinaryMemtable
{
private static Logger logger_ = Logger.getLogger( Memtable.class );
private int threshold_ = 512*1024*1024;
private int threshold_ = DatabaseDescriptor.getBMTThreshold()*1024*1024;
private AtomicInteger currentSize_ = new AtomicInteger(0);

/* Table and ColumnFamily name are used to determine the ColumnFamilyStore */
Expand Down Expand Up @@ -138,10 +140,31 @@ void flush() throws IOException
* Use the SSTable to write the contents of the TreeMap
* to disk.
*/

String path;
SSTableWriter writer;
ColumnFamilyStore cfStore = Table.open(table_).getColumnFamilyStore(cfName_);
List<String> keys = new ArrayList<String>( columnFamilies_.keySet() );
SSTableWriter writer = new SSTableWriter(cfStore.getTempSSTablePath(), keys.size(), StorageService.getPartitioner());
Collections.sort(keys);
/*
Adding a lock here so data directories are evenly used. By default currentIndex
is incremented, not an AtomicInteger. Let's fix this!
*/
lock_.lock();
try
{
path = cfStore.getTempSSTablePath();
writer = new SSTableWriter(path, keys.size(), StorageService.getPartitioner());
}
finally
{
lock_.unlock();
}

final IPartitioner partitioner = StorageService.getPartitioner();
final Comparator<String> dc = partitioner.getDecoratedKeyComparator();
Collections.sort(keys, dc);


/* Use this BloomFilter to decide if a key exists in a SSTable */
for ( String key : keys )
{
Expand Down
7 changes: 4 additions & 3 deletions src/java/org/apache/cassandra/db/ColumnFamilyStore.java
Expand Up @@ -38,6 +38,7 @@
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.*;
import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
import org.apache.cassandra.concurrent.ThreadFactoryImpl;
import org.apache.cassandra.db.filter.*;
import org.apache.cassandra.db.marshal.AbstractType;

Expand All @@ -55,8 +56,8 @@ public final class ColumnFamilyStore implements ColumnFamilyStoreMBean
private static final int BUFSIZE = 128 * 1024 * 1024;

private static NonBlockingHashMap<String, Set<Memtable>> memtablesPendingFlush = new NonBlockingHashMap<String, Set<Memtable>>();
private static ExecutorService flusher_ = new DebuggableThreadPoolExecutor("MEMTABLE-FLUSHER-POOL");

private static ExecutorService flusher_ = new DebuggableThreadPoolExecutor(DatabaseDescriptor.getFlushMinThreads(), DatabaseDescriptor.getFlushMaxThreads(), Integer.MAX_VALUE, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactoryImpl("MEMTABLE-FLUSHER-POOL"));
private final String table_;
public final String columnFamily_;
private final boolean isSuper_;
Expand Down Expand Up @@ -457,7 +458,7 @@ public void run()
assert oldMemtable.isFlushed() || oldMemtable.isClean();
}

void forceFlushBinary()
public void forceFlushBinary()
{
submitFlush(binaryMemtable_.get());
}
Expand Down
6 changes: 3 additions & 3 deletions src/java/org/apache/cassandra/db/Table.java
Expand Up @@ -642,11 +642,11 @@ void load(Row row) throws IOException
for (ColumnFamily columnFamily : row.getColumnFamilies())
{
Collection<IColumn> columns = columnFamily.getSortedColumns();
for(IColumn column : columns)
for (IColumn column : columns)
{
ColumnFamilyStore cfStore = columnFamilyStores_.get(column.name());
ColumnFamilyStore cfStore = columnFamilyStores_.get(new String(column.name(), "UTF-8"));
cfStore.applyBinary(key, column.value());
}
}
}
row.clear();
}
Expand Down
41 changes: 26 additions & 15 deletions src/java/org/apache/cassandra/net/MessagingService.java
Expand Up @@ -478,14 +478,26 @@ public static void setStreamingMode(boolean bVal)
{
isStreaming_.set(bVal);
}
public static void flushAndshutdown()
{
// safely shutdown and send all writes
for(Map.Entry<String, TcpConnectionManager> entry : poolTable_.entrySet() )
{
for(TcpConnection connection: entry.getValue().getConnections())
{
connection.doPendingWrites();
}
}
shutdown();
}

public static void shutdown()
{
logger_.info("Shutting down ...");
synchronized ( MessagingService.class )
{
/* Stop listening on any socket */
for( SelectionKey skey : listenSockets_.values() )
synchronized (MessagingService.class)
{
/* Stop listening on any socket */
for (SelectionKey skey : listenSockets_.values())
{
skey.cancel();
try
Expand All @@ -495,26 +507,25 @@ public static void shutdown()
catch (IOException e) {}
}
listenSockets_.clear();
/* Shutdown the threads in the EventQueue's */
messageDeserializationExecutor_.shutdownNow();

/* Shutdown the threads in the EventQueue's */
messageDeserializationExecutor_.shutdownNow();
messageSerializerExecutor_.shutdownNow();
messageDeserializerExecutor_.shutdownNow();
streamExecutor_.shutdownNow();

/* shut down the cachetables */
taskCompletionMap_.shutdown();
callbackMap_.shutdown();
callbackMap_.shutdown();

/* Interrupt the selector manager thread */
SelectorManager.getSelectorManager().interrupt();
poolTable_.clear();
verbHandlers_.clear();

poolTable_.clear();
verbHandlers_.clear();
bShutdown_ = true;
}
if (logger_.isDebugEnabled())
logger_.debug("Shutdown invocation complete.");
logger_.info("Shutdown invocation complete.");
}

public static void receive(Message message)
Expand Down
2 changes: 1 addition & 1 deletion src/java/org/apache/cassandra/net/TcpConnection.java
Expand Up @@ -387,7 +387,7 @@ public void write(SelectionKey key)
resumeStreaming();
}

void doPendingWrites()
public void doPendingWrites()
{
synchronized(this)
{
Expand Down
4 changes: 4 additions & 0 deletions src/java/org/apache/cassandra/net/TcpConnectionManager.java
Expand Up @@ -211,4 +211,8 @@ boolean contains(TcpConnection connection)
{
return allConnections_.contains(connection);
}
List<TcpConnection> getConnections()
{
return allConnections_;
}
}
18 changes: 18 additions & 0 deletions src/java/org/apache/cassandra/service/StorageService.java
Expand Up @@ -778,6 +778,24 @@ public void clearSnapshot() throws IOException
logger_.debug("Cleared out all snapshot directories");
}

public void forceTableFlushBinary(String tableName) throws IOException
{
if (DatabaseDescriptor.getTable(tableName) == null)
{
throw new IOException("Table " + tableName + "does not exist");
}

Table table = Table.open(tableName);
Set<String> columnFamilies = table.getColumnFamilies();
for (String columnFamily : columnFamilies)
{
ColumnFamilyStore cfStore = table.getColumnFamilyStore(columnFamily);
logger_.debug("Forcing flush on keyspace " + tableName + " on CF " + columnFamily);
cfStore.forceFlushBinary();
}
}


/* End of MBean interface methods */

/**
Expand Down
Expand Up @@ -84,4 +84,11 @@ public interface StorageServiceMBean
* Remove all the existing snapshots.
*/
public void clearSnapshot() throws IOException;

/**
* Flush all binary memtables for a table
* @param tableName
* @throws IOException
*/
public void forceTableFlushBinary(String tableName) throws IOException;
}
23 changes: 21 additions & 2 deletions src/java/org/apache/cassandra/tools/NodeProbe.java
Expand Up @@ -257,7 +257,16 @@ public void forceTableCompaction() throws IOException
{
ssProxy.forceTableCompaction();
}


/**
* Trigger a binary flush on CFs of a table.
*/
public void forceTableFlushBinary(String tableName) throws IOException
{
ssProxy.forceTableFlushBinary(tableName);
}


/**
* Write a textual representation of the Cassandra ring.
*
Expand Down Expand Up @@ -517,7 +526,7 @@ private static void printUsage()
{
HelpFormatter hf = new HelpFormatter();
String header = String.format(
"%nAvailable commands: ring, cluster, info, cleanup, compact, cfstats, snapshot [name], clearsnapshot, bootstrap, tpstats");
"%nAvailable commands: ring, cluster, info, cleanup, compact, cfstats, snapshot [name], clearsnapshot, bootstrap, tpstats, flush_binary");
String usage = String.format("java %s -host <arg> <command>%n", NodeProbe.class.getName());
hf.printHelp(usage, "", options, header);
}
Expand Down Expand Up @@ -609,6 +618,16 @@ else if (cmdName.equals("tpstats"))
{
probe.printThreadPoolStats(System.out);
}
else if (cmdName.equals("flush_binary"))
{
if (probe.getArgs().length < 2)
{
System.err.println("Missing keyspace argument.");
NodeProbe.printUsage();
System.exit(1);
}
probe.forceTableFlushBinary(probe.getArgs()[1]);
}
else
{
System.err.println("Unrecognized command: " + cmdName + ".");
Expand Down

0 comments on commit dc79416

Please sign in to comment.