Permalink
Browse files

Merge branch 'cassandra-1.2' into trunk

  • Loading branch information...
jbellis committed Dec 28, 2012
2 parents d077356 + 1d96e32 commit 04ade2ffa8750d53bf4694587ed6b83697f94bda
Showing with 20 additions and 10 deletions.
  1. +20 −10 src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@ -169,6 +169,7 @@ protected void runWith(File dataDirectory) throws Exception
row.close();
continue;
}
+
// If the row is cached, we call removeDeleted on at read time it to have coherent query returns,
// but if the row is not pushed out of the cache, obsolete tombstones will persist indefinitely.
controller.removeDeletedInCache(row.key);
@@ -187,19 +188,28 @@ protected void runWith(File dataDirectory) throws Exception
}
}
}
- if (!iter.hasNext() || newSSTableSegmentThresholdReached(writer))
+
+ if (newSSTableSegmentThresholdReached(writer))
{
- SSTableReader toIndex = writer.closeAndOpenReader(getMaxDataAge(toCompact));
- cachedKeyMap.put(toIndex, cachedKeys);
- sstables.add(toIndex);
- if (iter.hasNext())
- {
- writer = cfs.createCompactionWriter(keysPerSSTable, cfs.directories.getLocationForDisk(dataDirectory), toCompact);
- writers.add(writer);
- cachedKeys = new HashMap<DecoratedKey, RowIndexEntry>();
- }
+ SSTableReader sstable = writer.closeAndOpenReader(getMaxDataAge(toCompact));
+ cachedKeyMap.put(sstable, cachedKeys);
+ sstables.add(sstable);
+ writer = cfs.createCompactionWriter(keysPerSSTable, cfs.directories.getLocationForDisk(dataDirectory), toCompact);
+ writers.add(writer);
+ cachedKeys = new HashMap<DecoratedKey, RowIndexEntry>();
}
}
+
+ if (writer.getFilePointer() > 0)
+ {
+ SSTableReader sstable = writer.closeAndOpenReader(getMaxDataAge(toCompact));
+ cachedKeyMap.put(sstable, cachedKeys);
+ sstables.add(sstable);
+ }
+ else
+ {
+ writer.abort();
+ }
}
catch (Throwable t)
{

0 comments on commit 04ade2f

Please sign in to comment.