Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

use data size ratio in liveRatio instead of live size : serialized th…

…roughput

patch by jbellis; reviewed by slebresne for CASSANDRA-4399
  • Loading branch information...
commit 67dec69f53d2bfd3818fea4ede40e5d5a6b2356b 1 parent 8674784
@jbellis jbellis authored
View
8 src/java/org/apache/cassandra/db/AbstractColumnContainer.java
@@ -84,9 +84,11 @@ public void maybeResetDeletionTimes(int gcBefore)
columns.maybeResetDeletionTimes(gcBefore);
}
- /**
- * We need to go through each column in the column container and resolve it before adding
- */
+ public long addAllWithSizeDelta(AbstractColumnContainer cc, Allocator allocator, Function<IColumn, IColumn> transformation)
+ {
+ return columns.addAllWithSizeDelta(cc.columns, allocator, transformation);
+ }
+
public void addAll(AbstractColumnContainer cc, Allocator allocator, Function<IColumn, IColumn> transformation)
{
columns.addAll(cc.columns, allocator, transformation);
View
6 src/java/org/apache/cassandra/db/AbstractThreadUnsafeSortedColumns.java
@@ -93,6 +93,12 @@ else if (c < 0)
// having to care about the deletion infos
protected abstract void addAllColumns(ISortedColumns columns, Allocator allocator, Function<IColumn, IColumn> transformation);
+ public long addAllWithSizeDelta(ISortedColumns cm, Allocator allocator, Function<IColumn, IColumn> transformation)
+ {
+ // sizeDelta is only needed by memtable updates which should not be using thread-unsafe containers
+ throw new UnsupportedOperationException();
+ }
+
public void addAll(ISortedColumns columns, Allocator allocator, Function<IColumn, IColumn> transformation)
{
addAllColumns(columns, allocator, transformation);
View
31 src/java/org/apache/cassandra/db/AtomicSortedColumns.java
@@ -154,6 +154,11 @@ public void addColumn(IColumn column, Allocator allocator)
public void addAll(ISortedColumns cm, Allocator allocator, Function<IColumn, IColumn> transformation)
{
+ addAllWithSizeDelta(cm, allocator, transformation);
+ }
+
+ public long addAllWithSizeDelta(ISortedColumns cm, Allocator allocator, Function<IColumn, IColumn> transformation)
+ {
/*
* This operation needs to atomicity and isolation. To that end, we
* add the new column to a copy of the map (a cheap O(1) snapTree
@@ -166,9 +171,12 @@ public void addAll(ISortedColumns cm, Allocator allocator, Function<IColumn, ICo
* we bail early, avoiding unnecessary work if possible.
*/
Holder current, modified;
+ long sizeDelta;
+
main_loop:
do
{
+ sizeDelta = 0;
current = ref.get();
DeletionInfo newDelInfo = current.deletionInfo;
if (newDelInfo.markedForDeleteAt < cm.getDeletionInfo().markedForDeleteAt)
@@ -177,13 +185,15 @@ public void addAll(ISortedColumns cm, Allocator allocator, Function<IColumn, ICo
for (IColumn column : cm.getSortedColumns())
{
- modified.addColumn(transformation.apply(column), allocator);
+ sizeDelta += modified.addColumn(transformation.apply(column), allocator);
// bail early if we know we've been beaten
if (ref.get() != current)
continue main_loop;
}
}
while (!ref.compareAndSet(current, modified));
+
+ return sizeDelta;
}
public boolean replace(IColumn oldColumn, IColumn newColumn)
@@ -329,16 +339,26 @@ Holder clear()
return new Holder(new SnapTreeMap<ByteBuffer, IColumn>(map.comparator()), deletionInfo);
}
- void addColumn(IColumn column, Allocator allocator)
+ long addColumn(IColumn column, Allocator allocator)
{
ByteBuffer name = column.name();
IColumn oldColumn;
- while ((oldColumn = map.putIfAbsent(name, column)) != null)
+ long sizeDelta = 0;
+ while (true)
{
+ oldColumn = map.putIfAbsent(name, column);
+ if (oldColumn == null)
+ {
+ sizeDelta += column.serializedSize();
+ break;
+ }
+
if (oldColumn instanceof SuperColumn)
{
assert column instanceof SuperColumn;
+ long previousSize = oldColumn.serializedSize();
((SuperColumn) oldColumn).putColumn((SuperColumn)column, allocator);
+ sizeDelta += oldColumn.serializedSize() - previousSize;
break; // Delegated to SuperColumn
}
else
@@ -346,12 +366,17 @@ void addColumn(IColumn column, Allocator allocator)
// calculate reconciled col from old (existing) col and new col
IColumn reconciledColumn = column.reconcile(oldColumn, allocator);
if (map.replace(name, oldColumn, reconciledColumn))
+ {
+ sizeDelta += reconciledColumn.serializedSize() - oldColumn.serializedSize();
break;
+ }
// We failed to replace column due to a concurrent update or a concurrent removal. Keep trying.
// (Currently, concurrent removal should not happen (only updates), but let us support that anyway.)
}
}
+
+ return sizeDelta;
}
void retainAll(ISortedColumns columns)
View
7 src/java/org/apache/cassandra/db/ISortedColumns.java
@@ -68,6 +68,13 @@
* add(c);
* </code>
* but is potentially faster.
+ *
+ * @return the difference in size seen after merging the given columns
+ */
+ public long addAllWithSizeDelta(ISortedColumns cm, Allocator allocator, Function<IColumn, IColumn> transformation);
+
+ /**
+ * Adds the columns without necessarily computing the size delta
*/
public void addAll(ISortedColumns cm, Allocator allocator, Function<IColumn, IColumn> transformation);
View
24 src/java/org/apache/cassandra/db/Memtable.java
@@ -80,7 +80,7 @@ protected void afterExecute(Runnable r, Throwable t)
volatile static Memtable activelyMeasuring;
private volatile boolean isFrozen;
- private final AtomicLong currentThroughput = new AtomicLong(0);
+ private final AtomicLong currentSize = new AtomicLong(0);
private final AtomicLong currentOperations = new AtomicLong(0);
// We index the memtable by RowPosition only for the purpose of being able
@@ -122,12 +122,12 @@ public long getLiveSize()
{
// 25% fudge factor on the base throughput * liveRatio calculation. (Based on observed
// pre-slabbing behavior -- not sure what accounts for this. May have changed with introduction of slabbing.)
- return (long) (currentThroughput.get() * cfs.liveRatio * 1.25);
+ return (long) (currentSize.get() * cfs.liveRatio * 1.25);
}
public long getSerializedSize()
{
- return currentThroughput.get();
+ return currentSize.get();
}
public long getOperations()
@@ -190,7 +190,7 @@ public void run()
deepSize += meter.measureDeep(entry.getKey()) + meter.measureDeep(entry.getValue());
objects += entry.getValue().getColumnCount();
}
- double newRatio = (double) deepSize / currentThroughput.get();
+ double newRatio = (double) deepSize / currentSize.get();
if (newRatio < MIN_SANE_LIVE_RATIO)
{
@@ -226,12 +226,6 @@ public void run()
private void resolve(DecoratedKey key, ColumnFamily cf)
{
- currentThroughput.addAndGet(cf.size());
- currentOperations.addAndGet((cf.getColumnCount() == 0)
- ? cf.isMarkedForDelete() ? 1 : 0
- : cf.getColumnCount());
-
-
ColumnFamily previous = columnFamilies.get(key);
if (previous == null)
@@ -244,7 +238,11 @@ private void resolve(DecoratedKey key, ColumnFamily cf)
previous = empty;
}
- previous.addAll(cf, allocator, localCopyFunction);
+ long sizeDelta = previous.addAllWithSizeDelta(cf, allocator, localCopyFunction);
+ currentSize.addAndGet(sizeDelta);
+ currentOperations.addAndGet((cf.getColumnCount() == 0)
+ ? cf.isMarkedForDelete() ? 1 : 0
+ : cf.getColumnCount());
}
// for debugging
@@ -274,7 +272,7 @@ private SSTableReader writeSortedContents(Future<ReplayPosition> context) throws
}
long estimatedSize = (long) ((keySize // index entries
+ keySize // keys in data file
- + currentThroughput.get()) // data
+ + currentSize.get()) // data
* 1.2); // bloom filter and row index overhead
SSTableReader ssTable;
// errors when creating the writer that may leave empty temp files.
@@ -325,7 +323,7 @@ public void runMayThrow() throws Exception
public String toString()
{
return String.format("Memtable-%s@%s(%s/%s serialized/live bytes, %s ops)",
- cfs.getColumnFamilyName(), hashCode(), currentThroughput, getLiveSize(), currentOperations);
+ cfs.getColumnFamilyName(), hashCode(), currentSize, getLiveSize(), currentOperations);
}
/**
Please sign in to comment.
Something went wrong with that request. Please try again.