diff --git a/conf/storage-conf.xml b/conf/storage-conf.xml
index 8aaeeb663a02..25d2901da60a 100644
--- a/conf/storage-conf.xml
+++ b/conf/storage-conf.xml
@@ -308,4 +308,19 @@
~ ten days.
-->
864000
+
+
+ 1
+ 1
+
+
+ 256
+
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 90166761fac2..e86b1349d1b1 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -74,6 +74,9 @@ public static enum CommitLogSync {
private static int slicedReadBufferSizeInKB_ = 64;
private static List tables_ = new ArrayList();
private static Set applicationColumnFamilies_ = new HashSet();
+ private static int flushMinThreads_ = 1;
+ private static int flushMaxThreads_ = 1;
+ private static int bmtThreshold_ = 256;
// Default descriptive names for introspection. The user can override
// these choices in the config file. These are not case sensitive.
@@ -271,6 +274,24 @@ else if (commitLogSync_ == CommitLogSync.batch)
slicedReadBufferSizeInKB_ = Integer.parseInt(rawSlicedBuffer);
}
+ String rawflushMinThreads = xmlUtils.getNodeValue("/Storage/FlushMinThreads");
+ if (rawflushMinThreads != null)
+ {
+ flushMinThreads_ = Integer.parseInt(rawflushMinThreads);
+ }
+
+ String rawflushMaxThreads = xmlUtils.getNodeValue("/Storage/FlushMaxThreads");
+ if (rawflushMaxThreads != null)
+ {
+ flushMaxThreads_ = Integer.parseInt(rawflushMaxThreads);
+ }
+
+ String bmtThreshold = xmlUtils.getNodeValue("/Storage/BinaryMemtableSizeInMB");
+ if (bmtThreshold != null)
+ {
+ bmtThreshold_ = Integer.parseInt(bmtThreshold);
+ }
+
/* TCP port on which the storage system listens */
String port = xmlUtils.getNodeValue("/Storage/StoragePort");
if ( port != null )
@@ -999,4 +1020,19 @@ public static int getSlicedReadBufferSizeInKB()
{
return slicedReadBufferSizeInKB_;
}
+
+ public static int getFlushMinThreads()
+ {
+ return flushMinThreads_;
+ }
+
+ public static int getFlushMaxThreads()
+ {
+ return flushMaxThreads_;
+ }
+
+ public static int getBMTThreshold()
+ {
+ return bmtThreshold_;
+ }
}
diff --git a/src/java/org/apache/cassandra/db/BinaryMemtable.java b/src/java/org/apache/cassandra/db/BinaryMemtable.java
index 2cd439a1ba61..4530e8b28b71 100644
--- a/src/java/org/apache/cassandra/db/BinaryMemtable.java
+++ b/src/java/org/apache/cassandra/db/BinaryMemtable.java
@@ -34,11 +34,13 @@
import org.apache.log4j.Logger;
import org.cliffc.high_scale_lib.NonBlockingHashMap;
+import java.util.*;
+import org.apache.cassandra.dht.IPartitioner;
public class BinaryMemtable
{
private static Logger logger_ = Logger.getLogger( Memtable.class );
- private int threshold_ = 512*1024*1024;
+ private int threshold_ = DatabaseDescriptor.getBMTThreshold()*1024*1024;
private AtomicInteger currentSize_ = new AtomicInteger(0);
/* Table and ColumnFamily name are used to determine the ColumnFamilyStore */
@@ -138,10 +140,31 @@ void flush() throws IOException
* Use the SSTable to write the contents of the TreeMap
* to disk.
*/
+
+ String path;
+ SSTableWriter writer;
ColumnFamilyStore cfStore = Table.open(table_).getColumnFamilyStore(cfName_);
List keys = new ArrayList( columnFamilies_.keySet() );
- SSTableWriter writer = new SSTableWriter(cfStore.getTempSSTablePath(), keys.size(), StorageService.getPartitioner());
- Collections.sort(keys);
+ /*
+ Adding a lock here so data directories are evenly used. By default currentIndex
+ is incremented, not an AtomicInteger. Let's fix this!
+ */
+ lock_.lock();
+ try
+ {
+ path = cfStore.getTempSSTablePath();
+ writer = new SSTableWriter(path, keys.size(), StorageService.getPartitioner());
+ }
+ finally
+ {
+ lock_.unlock();
+ }
+
+ final IPartitioner partitioner = StorageService.getPartitioner();
+ final Comparator dc = partitioner.getDecoratedKeyComparator();
+ Collections.sort(keys, dc);
+
+
/* Use this BloomFilter to decide if a key exists in a SSTable */
for ( String key : keys )
{
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index b08cf6b2d263..af6e247adada 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -38,6 +38,7 @@
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.*;
import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
+import org.apache.cassandra.concurrent.ThreadFactoryImpl;
import org.apache.cassandra.db.filter.*;
import org.apache.cassandra.db.marshal.AbstractType;
@@ -55,8 +56,8 @@ public final class ColumnFamilyStore implements ColumnFamilyStoreMBean
private static final int BUFSIZE = 128 * 1024 * 1024;
private static NonBlockingHashMap> memtablesPendingFlush = new NonBlockingHashMap>();
- private static ExecutorService flusher_ = new DebuggableThreadPoolExecutor("MEMTABLE-FLUSHER-POOL");
-
+ private static ExecutorService flusher_ = new DebuggableThreadPoolExecutor(DatabaseDescriptor.getFlushMinThreads(), DatabaseDescriptor.getFlushMaxThreads(), Integer.MAX_VALUE, TimeUnit.SECONDS, new LinkedBlockingQueue(), new ThreadFactoryImpl("MEMTABLE-FLUSHER-POOL"));
+
private final String table_;
public final String columnFamily_;
private final boolean isSuper_;
@@ -457,7 +458,7 @@ public void run()
assert oldMemtable.isFlushed() || oldMemtable.isClean();
}
- void forceFlushBinary()
+ public void forceFlushBinary()
{
submitFlush(binaryMemtable_.get());
}
diff --git a/src/java/org/apache/cassandra/db/Table.java b/src/java/org/apache/cassandra/db/Table.java
index 54389e5dfb1c..04b28fb2e911 100644
--- a/src/java/org/apache/cassandra/db/Table.java
+++ b/src/java/org/apache/cassandra/db/Table.java
@@ -642,11 +642,11 @@ void load(Row row) throws IOException
for (ColumnFamily columnFamily : row.getColumnFamilies())
{
Collection columns = columnFamily.getSortedColumns();
- for(IColumn column : columns)
+ for (IColumn column : columns)
{
- ColumnFamilyStore cfStore = columnFamilyStores_.get(column.name());
+ ColumnFamilyStore cfStore = columnFamilyStores_.get(new String(column.name(), "UTF-8"));
cfStore.applyBinary(key, column.value());
- }
+ }
}
row.clear();
}
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index 954324b88b98..21f97d21893b 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -478,14 +478,26 @@ public static void setStreamingMode(boolean bVal)
{
isStreaming_.set(bVal);
}
+ public static void flushAndshutdown()
+ {
+ // safely shutdown and send all writes
+ for(Map.Entry entry : poolTable_.entrySet() )
+ {
+ for(TcpConnection connection: entry.getValue().getConnections())
+ {
+ connection.doPendingWrites();
+ }
+ }
+ shutdown();
+ }
public static void shutdown()
{
logger_.info("Shutting down ...");
- synchronized ( MessagingService.class )
- {
- /* Stop listening on any socket */
- for( SelectionKey skey : listenSockets_.values() )
+ synchronized (MessagingService.class)
+ {
+ /* Stop listening on any socket */
+ for (SelectionKey skey : listenSockets_.values())
{
skey.cancel();
try
@@ -495,26 +507,25 @@ public static void shutdown()
catch (IOException e) {}
}
listenSockets_.clear();
-
- /* Shutdown the threads in the EventQueue's */
- messageDeserializationExecutor_.shutdownNow();
+
+ /* Shutdown the threads in the EventQueue's */
+ messageDeserializationExecutor_.shutdownNow();
messageSerializerExecutor_.shutdownNow();
messageDeserializerExecutor_.shutdownNow();
streamExecutor_.shutdownNow();
-
+
/* shut down the cachetables */
taskCompletionMap_.shutdown();
- callbackMap_.shutdown();
-
+ callbackMap_.shutdown();
+
/* Interrupt the selector manager thread */
SelectorManager.getSelectorManager().interrupt();
-
- poolTable_.clear();
- verbHandlers_.clear();
+
+ poolTable_.clear();
+ verbHandlers_.clear();
bShutdown_ = true;
}
- if (logger_.isDebugEnabled())
- logger_.debug("Shutdown invocation complete.");
+ logger_.info("Shutdown invocation complete.");
}
public static void receive(Message message)
diff --git a/src/java/org/apache/cassandra/net/TcpConnection.java b/src/java/org/apache/cassandra/net/TcpConnection.java
index 5039833876db..08afda822ad3 100644
--- a/src/java/org/apache/cassandra/net/TcpConnection.java
+++ b/src/java/org/apache/cassandra/net/TcpConnection.java
@@ -387,7 +387,7 @@ public void write(SelectionKey key)
resumeStreaming();
}
- void doPendingWrites()
+ public void doPendingWrites()
{
synchronized(this)
{
diff --git a/src/java/org/apache/cassandra/net/TcpConnectionManager.java b/src/java/org/apache/cassandra/net/TcpConnectionManager.java
index e29eb9d20931..b8fa9096a4a2 100644
--- a/src/java/org/apache/cassandra/net/TcpConnectionManager.java
+++ b/src/java/org/apache/cassandra/net/TcpConnectionManager.java
@@ -211,4 +211,8 @@ boolean contains(TcpConnection connection)
{
return allConnections_.contains(connection);
}
+ List getConnections()
+ {
+ return allConnections_;
+ }
}
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 62cc11086afb..0e2dd245d520 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -778,6 +778,24 @@ public void clearSnapshot() throws IOException
logger_.debug("Cleared out all snapshot directories");
}
+ public void forceTableFlushBinary(String tableName) throws IOException
+ {
+ if (DatabaseDescriptor.getTable(tableName) == null)
+ {
+ throw new IOException("Table " + tableName + "does not exist");
+ }
+
+ Table table = Table.open(tableName);
+ Set columnFamilies = table.getColumnFamilies();
+ for (String columnFamily : columnFamilies)
+ {
+ ColumnFamilyStore cfStore = table.getColumnFamilyStore(columnFamily);
+ logger_.debug("Forcing flush on keyspace " + tableName + " on CF " + columnFamily);
+ cfStore.forceFlushBinary();
+ }
+ }
+
+
/* End of MBean interface methods */
/**
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index 2e21ecbf6589..046fb267dfdd 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -84,4 +84,11 @@ public interface StorageServiceMBean
* Remove all the existing snapshots.
*/
public void clearSnapshot() throws IOException;
+
+ /**
+ * Flush all binary memtables for a table
+ * @param tableName
+ * @throws IOException
+ */
+ public void forceTableFlushBinary(String tableName) throws IOException;
}
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java
index f0df925cb35f..b6d846f9297a 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -257,7 +257,16 @@ public void forceTableCompaction() throws IOException
{
ssProxy.forceTableCompaction();
}
-
+
+ /**
+ * Trigger a binary flush on CFs of a table.
+ */
+ public void forceTableFlushBinary(String tableName) throws IOException
+ {
+ ssProxy.forceTableFlushBinary(tableName);
+ }
+
+
/**
* Write a textual representation of the Cassandra ring.
*
@@ -517,7 +526,7 @@ private static void printUsage()
{
HelpFormatter hf = new HelpFormatter();
String header = String.format(
- "%nAvailable commands: ring, cluster, info, cleanup, compact, cfstats, snapshot [name], clearsnapshot, bootstrap, tpstats");
+ "%nAvailable commands: ring, cluster, info, cleanup, compact, cfstats, snapshot [name], clearsnapshot, bootstrap, tpstats, flush_binary");
String usage = String.format("java %s -host %n", NodeProbe.class.getName());
hf.printHelp(usage, "", options, header);
}
@@ -609,6 +618,16 @@ else if (cmdName.equals("tpstats"))
{
probe.printThreadPoolStats(System.out);
}
+ else if (cmdName.equals("flush_binary"))
+ {
+ if (probe.getArgs().length < 2)
+ {
+ System.err.println("Missing keyspace argument.");
+ NodeProbe.printUsage();
+ System.exit(1);
+ }
+ probe.forceTableFlushBinary(probe.getArgs()[1]);
+ }
else
{
System.err.println("Unrecognized command: " + cmdName + ".");