Skip to content

Commit

Permalink
Avoid failing compactions with very large partitions
Browse files Browse the repository at this point in the history
Patch by Caleb Rackliffe; Reviewed by Chris Lohfink and Benjamin Lerer for CASSANDRA-15164
  • Loading branch information
maedhroz authored and clohfink committed Sep 23, 2020
1 parent 1dd6a10 commit 4782fd3
Show file tree
Hide file tree
Showing 7 changed files with 99 additions and 9 deletions.
1 change: 1 addition & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
4.0-beta3
* Avoid failing compactions with very large partitions (CASSANDRA-15164)
* Prevent NPE in StreamMessage in type lookup (CASSANDRA-16131)
* Avoid invalid state transition exception during incremental repair (CASSANDRA-16067)
* Allow zero padding in timestamp serialization (CASSANDRA-16105)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ public class MetadataCollector implements PartitionStatisticsCollector

static EstimatedHistogram defaultCellPerPartitionCountHistogram()
{
// EH of 114 can track a max value of 2395318855, i.e., > 2B columns
return new EstimatedHistogram(114);
// EH of 118 can track a max value of 4139110981, i.e., > 4B cells
return new EstimatedHistogram(118);
}

static EstimatedHistogram defaultPartitionSizeHistogram()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,17 @@
import java.util.List;
import java.util.UUID;

import org.apache.cassandra.db.rows.EncodingStats;
import org.apache.cassandra.io.ISerializer;
import org.apache.cassandra.io.sstable.format.Version;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.cassandra.db.rows.EncodingStats;
import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.db.commitlog.CommitLogPosition;
import org.apache.cassandra.db.commitlog.IntervalSet;
import org.apache.cassandra.io.ISerializer;
import org.apache.cassandra.io.sstable.format.Version;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.utils.ByteBufferUtil;
Expand Down Expand Up @@ -248,6 +251,8 @@ public int hashCode()

public static class StatsMetadataSerializer implements IMetadataComponentSerializer<StatsMetadata>
{
private static final Logger logger = LoggerFactory.getLogger(StatsMetadataSerializer.class);

public int serializedSize(Version version, StatsMetadata component) throws IOException
{
int size = 0;
Expand Down Expand Up @@ -340,7 +345,27 @@ public void serialize(Version version, StatsMetadata component, DataOutputPlus o
public StatsMetadata deserialize(Version version, DataInputPlus in) throws IOException
{
EstimatedHistogram partitionSizes = EstimatedHistogram.serializer.deserialize(in);

if (partitionSizes.isOverflowed())
{
logger.warn("Deserialized partition size histogram with {} values greater than the maximum of {}. " +
"Clearing the overflow bucket to allow for degraded mean and percentile calculations...",
partitionSizes.overflowCount(), partitionSizes.getLargestBucketOffset());

partitionSizes.clearOverflow();
}

EstimatedHistogram columnCounts = EstimatedHistogram.serializer.deserialize(in);

if (columnCounts.isOverflowed())
{
logger.warn("Deserialized partition cell count histogram with {} values greater than the maximum of {}. " +
"Clearing the overflow bucket to allow for degraded mean and percentile calculations...",
columnCounts.overflowCount(), columnCounts.getLargestBucketOffset());

columnCounts.clearOverflow();
}

CommitLogPosition commitLogLowerBound = CommitLogPosition.NONE, commitLogUpperBound;
commitLogUpperBound = CommitLogPosition.serializer.deserialize(in);
long minTimestamp = in.readLong();
Expand Down
31 changes: 28 additions & 3 deletions src/java/org/apache/cassandra/utils/EstimatedHistogram.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,13 @@
import java.util.concurrent.atomic.AtomicLongArray;

import com.google.common.base.Objects;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.io.ISerializer;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.slf4j.Logger;

public class EstimatedHistogram
{
Expand Down Expand Up @@ -275,11 +276,27 @@ public long getLargestBucketOffset()
}

/**
* @return true if this histogram has overflowed -- that is, a value larger than our largest bucket could bound was added
* @return true if a value larger than our largest bucket offset has been recorded, and false otherwise
*/
public boolean isOverflowed()
{
return buckets.get(buckets.length() - 1) > 0;
return overflowCount() > 0;
}

/**
* @return the number of recorded values larger than the largest bucket offset
*/
public long overflowCount()
{
return buckets.get(buckets.length() - 1);
}

/**
* Resets the count in the overflow bucket to zero. Subsequent calls to {@link #isOverflowed()} will return false.
*/
public void clearOverflow()
{
buckets.set(buckets.length() - 1, 0);
}

/**
Expand Down Expand Up @@ -367,8 +384,16 @@ public int hashCode()

public static class EstimatedHistogramSerializer implements ISerializer<EstimatedHistogram>
{
private static final Logger logger = LoggerFactory.getLogger(EstimatedHistogramSerializer.class);

public void serialize(EstimatedHistogram eh, DataOutputPlus out) throws IOException
{
if (eh.isOverflowed())
{
logger.warn("Serializing a histogram with {} values greater than the maximum of {}...",
eh.overflowCount(), eh.getLargestBucketOffset());
}

long[] offsets = eh.getBucketOffsets();
long[] buckets = eh.getBuckets(false);
out.writeInt(buckets.length);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,33 @@ public void testSerialization() throws IOException
}
}

@Test
public void testHistogramSterilization() throws IOException
{
Map<MetadataType, MetadataComponent> originalMetadata = constructMetadata();

// Modify the histograms to overflow:
StatsMetadata originalStats = (StatsMetadata) originalMetadata.get(MetadataType.STATS);
originalStats.estimatedCellPerPartitionCount.add(Long.MAX_VALUE);
originalStats.estimatedPartitionSize.add(Long.MAX_VALUE);
assertTrue(originalStats.estimatedCellPerPartitionCount.isOverflowed());
assertTrue(originalStats.estimatedPartitionSize.isOverflowed());

// Serialize w/ overflowed histograms:
MetadataSerializer serializer = new MetadataSerializer();
File statsFile = serialize(originalMetadata, serializer, BigFormat.latestVersion);
Descriptor desc = new Descriptor(statsFile.getParentFile(), "", "", 0, SSTableFormat.Type.BIG);

try (RandomAccessReader in = RandomAccessReader.open(statsFile))
{
// Deserialie and verify that the two histograms have had their overflow buckets cleared:
Map<MetadataType, MetadataComponent> deserialized = serializer.deserialize(desc, in, EnumSet.allOf(MetadataType.class));
StatsMetadata deserializedStats = (StatsMetadata)deserialized.get(MetadataType.STATS);
assertFalse(deserializedStats.estimatedCellPerPartitionCount.isOverflowed());
assertFalse(deserializedStats.estimatedPartitionSize.isOverflowed());
}
}

public File serialize(Map<MetadataType, MetadataComponent> metadata, MetadataSerializer serializer, Version version)
throws IOException
{
Expand Down
12 changes: 12 additions & 0 deletions test/unit/org/apache/cassandra/utils/EstimatedHistogramTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import org.junit.Test;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;


public class EstimatedHistogramTest
Expand Down Expand Up @@ -167,4 +169,14 @@ public void testPercentile()
assertEquals(1, histogram.percentile(0.99));
}
}

@Test
public void testClearOverflow()
{
EstimatedHistogram histogram = new EstimatedHistogram(1);
histogram.add(100);
assertTrue(histogram.isOverflowed());
histogram.clearOverflow();
assertFalse(histogram.isOverflowed());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ private static void testEstimatedHistogramWrite() throws IOException
offsets[i] = i;
data[i] = 10 * i;
}
data[offsets.length] = 100000;
data[offsets.length] = 100000; // write into the overflow bucket
EstimatedHistogram hist2 = new EstimatedHistogram(offsets, data);

try (DataOutputStreamPlus out = getOutput("utils.EstimatedHistogram.bin"))
Expand Down

0 comments on commit 4782fd3

Please sign in to comment.