Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

waitForFlush -> forceBlockingFlush. ServerTest.cleanup now flushes an…

…d cleans out

all ColumnFamilyStores and commitlog, allowing remove tests to not step on each
others' toes (all tests pass now).

patch by jbellis; reviewed by Sandeep Tata for #85
  • Loading branch information...
commit bafd4c3f97288aeead2fd0d45811f9dec5e801e9 1 parent d5971e2
Jonathan Ellis authored
View
32 src/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -84,7 +84,7 @@
private AtomicReference<BinaryMemtable> binaryMemtable_;
/* SSTables on disk for this column family */
- Set<String> ssTables_ = new HashSet<String>();
+ private Set<String> ssTables_ = new HashSet<String>();
/* Modification lock used for protecting reads from compactions. */
private ReentrantReadWriteLock lock_ = new ReentrantReadWriteLock(true);
@@ -433,11 +433,23 @@ void switchBinaryMemtable(String key, byte[] buffer) throws IOException
void forceFlush() throws IOException
{
- //MemtableManager.instance().submit(getColumnFamilyName(), memtable_.get() , CommitLog.CommitLogContext.NULL);
- //memtable_.get().flush(true, CommitLog.CommitLogContext.NULL);
memtable_.get().forceflush(this);
}
+ void forceBlockingFlush() throws IOException, ExecutionException, InterruptedException
+ {
+ forceFlush();
+ // block for flush to finish by adding a no-op action to the flush executorservice
+ // and waiting for that to finish. (this works since flush ES is single-threaded.)
+ Future f = MemtableManager.instance().flusher_.submit(new Runnable()
+ {
+ public void run()
+ {
+ }
+ });
+ f.get();
+ }
+
void forceFlushBinary()
{
BinaryMemtableManager.instance().submit(getColumnFamilyName(), binaryMemtable_.get());
@@ -1407,4 +1419,18 @@ public int getMemtableSwitchCount()
{
return memtableSwitchCount;
}
+
+ /**
+ * clears out all data associated with this ColumnFamily.
+ * For use in testing.
+ */
+ public void reset() throws IOException, ExecutionException, InterruptedException
+ {
+ forceBlockingFlush();
+ for (String fName : ssTables_)
+ {
+ new File(fName).delete();
+ }
+ ssTables_.clear();
+ }
}
View
7 src/org/apache/cassandra/db/CommitLog.java
@@ -57,7 +57,7 @@
*
* Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
*/
-class CommitLog
+public class CommitLog
{
private static final int bufSize_ = 128*1024*1024;
private static Map<String, CommitLog> instances_ = new HashMap<String, CommitLog>();
@@ -623,6 +623,11 @@ public void setForcedRollOver()
forcedRollOver_ = true;
}
+ public static void reset()
+ {
+ CommitLog.instances_.clear();
+ }
+
public static void main(String[] args) throws Throwable
{
LogUtil.init();
View
2  src/org/apache/cassandra/db/Table.java
@@ -425,7 +425,7 @@ public static Table open(String table)
return columnFamilyStores_;
}
- ColumnFamilyStore getColumnFamilyStore(String cfName)
+ public ColumnFamilyStore getColumnFamilyStore(String cfName)
{
return columnFamilyStores_.get(cfName);
}
View
21 test/org/apache/cassandra/ServerTest.java
@@ -3,15 +3,34 @@
import org.testng.annotations.Test;
import org.testng.annotations.BeforeMethod;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.CommitLog;
import java.io.File;
+import java.io.IOException;
@Test(groups={"serial"})
public class ServerTest {
- // TODO clean up static structures too (e.g. memtables)
@BeforeMethod
public void cleanup()
{
+ Table table = Table.open("Table1");
+ for (String cfName : table.getColumnFamilies())
+ {
+ ColumnFamilyStore cfs = table.getColumnFamilyStore(cfName);
+ try
+ {
+ cfs.reset();
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ CommitLog.reset();
+
String[] directoryNames = {
DatabaseDescriptor.getBootstrapFileLocation(),
DatabaseDescriptor.getLogFileLocation(),
View
33 test/org/apache/cassandra/db/ColumnFamilyStoreTest.java
@@ -67,9 +67,8 @@ public void testNameSort() throws IOException, ColumnFamilyNotDefinedException,
// validateNameSort(table);
- table.getColumnFamilyStore("Standard1").forceFlush();
- table.getColumnFamilyStore("Super1").forceFlush();
- waitForFlush();
+ table.getColumnFamilyStore("Standard1").forceBlockingFlush();
+ table.getColumnFamilyStore("Super1").forceBlockingFlush();
validateNameSort(table);
}
@@ -93,8 +92,7 @@ public void testTimeSort() throws IOException, ColumnFamilyNotDefinedException,
validateTimeSort(table);
- table.getColumnFamilyStore("StandardByTime1").forceFlush();
- waitForFlush();
+ table.getColumnFamilyStore("StandardByTime1").forceBlockingFlush();
validateTimeSort(table);
// interleave some new data to test memtable + sstable
@@ -154,18 +152,6 @@ private void validateTimeSort(Table table) throws IOException, ColumnFamilyNotDe
}
}
- private void waitForFlush()
- throws InterruptedException, ExecutionException
- {
- Future f = MemtableManager.instance().flusher_.submit(new Runnable()
- {
- public void run()
- {
- }
- });
- f.get();
- }
-
private void validateNameSort(Table table)
throws ColumnFamilyNotDefinedException, IOException
{
@@ -213,8 +199,7 @@ public void testRemoveColumn() throws IOException, ColumnFamilyNotDefinedExcepti
rm = new RowMutation("Table1", "key1");
rm.add("Standard1:Column1", "asdf".getBytes(), 0);
rm.apply();
- store.forceFlush();
- waitForFlush();
+ store.forceBlockingFlush();
// remove
rm = new RowMutation("Table1", "key1");
@@ -236,8 +221,7 @@ public void testRemoveSubColumn() throws IOException, ColumnFamilyNotDefinedExce
rm = new RowMutation("Table1", "key1");
rm.add("Super1:SC1:Column1", "asdf".getBytes(), 0);
rm.apply();
- store.forceFlush();
- waitForFlush();
+ store.forceBlockingFlush();
// remove
rm = new RowMutation("Table1", "key1");
@@ -259,8 +243,7 @@ public void testRemoveSuperColumn() throws IOException, ColumnFamilyNotDefinedEx
rm = new RowMutation("Table1", "key1");
rm.add("Super1:SC1:Column1", "asdf".getBytes(), 0);
rm.apply();
- store.forceFlush();
- waitForFlush();
+ store.forceBlockingFlush();
// remove
rm = new RowMutation("Table1", "key1");
@@ -359,7 +342,6 @@ public void testCompaction() throws IOException, ColumnFamilyNotDefinedException
{
Table table = Table.open("Table1");
ColumnFamilyStore store = table.getColumnFamilyStore("Standard1");
- store.ssTables_.clear(); // TODO integrate this better into test setup/teardown
for (int j = 0; j < 5; j++) {
for (int i = 0; i < 10; i++) {
@@ -369,8 +351,7 @@ public void testCompaction() throws IOException, ColumnFamilyNotDefinedException
rm.add("Standard1:A", new byte[0], epoch);
rm.apply();
}
- store.forceFlush();
- waitForFlush();
+ store.forceBlockingFlush();
}
Future ft = MinorCompactionManager.instance().submit(store);
ft.get();
Please sign in to comment.
Something went wrong with that request. Please try again.