Skip to content

Commit

Permalink
Merge branch 'cassandra-1.2' into trunk
Browse files Browse the repository at this point in the history
  • Loading branch information
Vijay2win committed Dec 28, 2012
2 parents 125a8c9 + ac9f478 commit 70e9bed
Show file tree
Hide file tree
Showing 21 changed files with 125 additions and 17 deletions.
5 changes: 5 additions & 0 deletions src/java/org/apache/cassandra/db/Column.java
Expand Up @@ -102,6 +102,11 @@ public long timestamp()
return timestamp;
}

public long minTimestamp()
{
return timestamp;
}

public long maxTimestamp()
{
return timestamp;
Expand Down
4 changes: 3 additions & 1 deletion src/java/org/apache/cassandra/db/ColumnFamily.java
Expand Up @@ -376,16 +376,18 @@ public void validateColumnFields() throws MarshalException

public ColumnStats getColumnStats()
{
long minTimestampSeen = deletionInfo() == DeletionInfo.LIVE ? Long.MAX_VALUE : deletionInfo().minTimestamp();
long maxTimestampSeen = deletionInfo().maxTimestamp();
StreamingHistogram tombstones = new StreamingHistogram(SSTable.TOMBSTONE_HISTOGRAM_BIN_SIZE);

for (IColumn column : columns)
{
minTimestampSeen = Math.min(minTimestampSeen, column.minTimestamp());
maxTimestampSeen = Math.max(maxTimestampSeen, column.maxTimestamp());
int deletionTime = column.getLocalDeletionTime();
if (deletionTime < Integer.MAX_VALUE)
tombstones.update(deletionTime);
}
return new ColumnStats(getColumnCount(), maxTimestampSeen, tombstones);
return new ColumnStats(getColumnCount(), minTimestampSeen, maxTimestampSeen, tombstones);
}
}
1 change: 1 addition & 0 deletions src/java/org/apache/cassandra/db/ColumnFamilyStore.java
Expand Up @@ -1967,6 +1967,7 @@ public SSTableWriter createCompactionWriter(long estimatedRows, File location, C
// and adds generation of live ancestors
for (SSTableReader sstable : sstables)
{
sstableMetadataCollector.updateMinTimestamp(sstable.getMinTimestamp());
sstableMetadataCollector.updateMaxTimestamp(sstable.getMaxTimestamp());
sstableMetadataCollector.addAncestor(sstable.descriptor.generation);
for (Integer i : sstable.getAncestors())
Expand Down
10 changes: 10 additions & 0 deletions src/java/org/apache/cassandra/db/DeletionInfo.java
Expand Up @@ -178,6 +178,16 @@ public DeletionInfo add(DeletionInfo newInfo)
}
}

public long minTimestamp()
{
long minTimestamp = topLevel.markedForDeleteAt;
for (RangeTombstone i : ranges)
{
minTimestamp = Math.min(minTimestamp, i.data.markedForDeleteAt);
}
return minTimestamp;
}

/**
* The maximum timestamp mentioned by this DeletionInfo.
*/
Expand Down
3 changes: 2 additions & 1 deletion src/java/org/apache/cassandra/db/OnDiskAtom.java
Expand Up @@ -34,8 +34,9 @@ public interface OnDiskAtom

/**
* For a standard column, this is the same as timestamp().
* For a super column, this is the max column timestamp of the sub columns.
* For a super column, this is the min/max column timestamp of the sub columns.
*/
public long minTimestamp();
public long maxTimestamp();
public int getLocalDeletionTime(); // for tombstone GC, so int is sufficient granularity

Expand Down
5 changes: 5 additions & 0 deletions src/java/org/apache/cassandra/db/RangeTombstone.java
Expand Up @@ -57,6 +57,11 @@ public int getLocalDeletionTime()
return data.localDeletionTime;
}

public long minTimestamp()
{
return data.markedForDeleteAt;
}

public long maxTimestamp()
{
return data.markedForDeleteAt;
Expand Down
8 changes: 8 additions & 0 deletions src/java/org/apache/cassandra/db/SuperColumn.java
Expand Up @@ -154,6 +154,14 @@ public long timestamp()
throw new UnsupportedOperationException("This operation is not supported for Super Columns.");
}

public long minTimestamp()
{
long minTimestamp = getMarkedForDeleteAt();
for (IColumn subColumn : getSubColumns())
minTimestamp = Math.min(minTimestamp, subColumn.minTimestamp());
return minTimestamp;
}

public long maxTimestamp()
{
long maxTimestamp = getMarkedForDeleteAt();
Expand Down
Expand Up @@ -109,12 +109,12 @@ public String getColumnFamily()
* @return true if it's okay to drop tombstones for the given row, i.e., if we know all the verisons of the row
* are included in the compaction set
*/
public boolean shouldPurge(DecoratedKey key)
public boolean shouldPurge(DecoratedKey key, long maxDeletionTimestamp)
{
List<SSTableReader> filteredSSTables = overlappingTree.search(key);
for (SSTableReader sstable : filteredSSTables)
{
if (sstable.getBloomFilter().isPresent(key.key))
if (sstable.getBloomFilter().isPresent(key.key) && sstable.getMinTimestamp() >= maxDeletionTimestamp)
return false;
}
return true;
Expand Down
Expand Up @@ -929,7 +929,7 @@ public ValidationCompactionController(ColumnFamilyStore cfs, int gcBefore)
}

@Override
public boolean shouldPurge(DecoratedKey key)
public boolean shouldPurge(DecoratedKey key, long delTimestamp)
{
/*
* The main reason we always purge is that including gcable tombstone would mean that the
Expand Down
Expand Up @@ -71,18 +71,20 @@ public LazilyCompactedRow(CompactionController controller, List<? extends ICount
super(rows.get(0).getKey());
this.rows = rows;
this.controller = controller;
this.shouldPurge = controller.shouldPurge(key);
indexer = controller.cfs.indexManager.updaterFor(key, false);

long maxDelTimestamp = Long.MIN_VALUE;
for (OnDiskAtomIterator row : rows)
{
ColumnFamily cf = row.getColumnFamily();
maxDelTimestamp = Math.max(maxDelTimestamp, cf.deletionInfo().maxTimestamp());

if (emptyColumnFamily == null)
emptyColumnFamily = cf;
else
emptyColumnFamily.delete(cf);
}
this.shouldPurge = controller.shouldPurge(key, maxDelTimestamp);

try
{
Expand All @@ -94,7 +96,9 @@ public LazilyCompactedRow(CompactionController controller, List<? extends ICount
}
// reach into the reducer used during iteration to get column count, size, max column timestamp
// (however, if there are zero columns, iterator() will not be called by ColumnIndexer and reducer will be null)
columnStats = new ColumnStats(reducer == null ? 0 : reducer.columns, reducer == null ? Long.MIN_VALUE : reducer.maxTimestampSeen,
columnStats = new ColumnStats(reducer == null ? 0 : reducer.columns,
reducer == null ? Long.MAX_VALUE : reducer.minTimestampSeen,
reducer == null ? Long.MIN_VALUE : reducer.maxTimestampSeen,
reducer == null ? new StreamingHistogram(SSTable.TOMBSTONE_HISTOGRAM_BIN_SIZE) : reducer.tombstones
);
columnSerializedSize = reducer == null ? 0 : reducer.serializedSize;
Expand Down Expand Up @@ -236,6 +240,7 @@ private class Reducer extends MergeIterator.Reducer<OnDiskAtom, OnDiskAtom>

long serializedSize = 4; // int for column count
int columns = 0;
long minTimestampSeen = Long.MAX_VALUE;
long maxTimestampSeen = Long.MIN_VALUE;
StreamingHistogram tombstones = new StreamingHistogram(SSTable.TOMBSTONE_HISTOGRAM_BIN_SIZE);

Expand Down Expand Up @@ -290,6 +295,7 @@ protected OnDiskAtom getReduced()

serializedSize += reduced.serializedSizeForSSTable();
columns++;
minTimestampSeen = Math.min(minTimestampSeen, reduced.minTimestamp());
maxTimestampSeen = Math.max(maxTimestampSeen, reduced.maxTimestamp());
int deletionTime = reduced.getLocalDeletionTime();
if (deletionTime < Integer.MAX_VALUE)
Expand Down
Expand Up @@ -59,7 +59,7 @@ public static ColumnFamily removeDeletedAndOldShards(DecoratedKey key, Compactio
Boolean shouldPurge = null;

if (cf.hasIrrelevantData(controller.gcBefore))
shouldPurge = controller.shouldPurge(key);
shouldPurge = controller.shouldPurge(key, cf.maxTimestamp());

// We should only gc tombstone if shouldPurge == true. But otherwise,
// it is still ok to collect column that shadowed by their (deleted)
Expand All @@ -69,7 +69,7 @@ public static ColumnFamily removeDeletedAndOldShards(DecoratedKey key, Compactio
if (compacted != null && compacted.metadata().getDefaultValidator().isCommutative())
{
if (shouldPurge == null)
shouldPurge = controller.shouldPurge(key);
shouldPurge = controller.shouldPurge(key, cf.deletionInfo().maxTimestamp());
if (shouldPurge)
CounterColumn.mergeAndRemoveOldShards(key, compacted, controller.gcBefore, controller.mergeShardBefore);
}
Expand Down
2 changes: 1 addition & 1 deletion src/java/org/apache/cassandra/db/compaction/Scrubber.java
Expand Up @@ -353,7 +353,7 @@ public ScrubController(ColumnFamilyStore cfs)
}

@Override
public boolean shouldPurge(DecoratedKey key)
public boolean shouldPurge(DecoratedKey key, long delTimestamp)
{
return false;
}
Expand Down
4 changes: 3 additions & 1 deletion src/java/org/apache/cassandra/io/sstable/ColumnStats.java
Expand Up @@ -28,13 +28,15 @@ public class ColumnStats
public final int columnCount;

/** the largest (client-supplied) timestamp in the row */
public final long minTimestamp;
public final long maxTimestamp;

/** histogram of tombstone drop time */
public final StreamingHistogram tombstoneHistogram;

public ColumnStats(int columnCount, long maxTimestamp, StreamingHistogram tombstoneHistogram)
public ColumnStats(int columnCount, long minTimestamp, long maxTimestamp, StreamingHistogram tombstoneHistogram)
{
this.minTimestamp = minTimestamp;
this.maxTimestamp = maxTimestamp;
this.columnCount = columnCount;
this.tombstoneHistogram = tombstoneHistogram;
Expand Down
5 changes: 4 additions & 1 deletion src/java/org/apache/cassandra/io/sstable/Descriptor.java
Expand Up @@ -47,7 +47,7 @@ public class Descriptor
public static class Version
{
// This needs to be at the begining for initialization sake
private static final String current_version = "ia";
private static final String current_version = "ib";

public static final Version LEGACY = new Version("a"); // "pre-history"
// b (0.7.0): added version to sstable filenames
Expand All @@ -65,6 +65,7 @@ public static class Version
// ia (1.2.0): column indexes are promoted to the index file
// records estimated histogram of deletion times in tombstones
// bloom filter (keys and columns) upgraded to Murmur3
// ib (1.2.1): tracks min client timestamp in metadata component

public static final Version CURRENT = new Version(current_version);

Expand All @@ -77,6 +78,7 @@ public static class Version
public final boolean metadataIncludesReplayPosition;
public final boolean metadataIncludesModernReplayPosition;
public final boolean tracksMaxTimestamp;
public final boolean tracksMinTimestamp;
public final boolean hasCompressionRatio;
public final boolean hasPartitioner;
public final boolean tracksTombstones;
Expand All @@ -94,6 +96,7 @@ public Version(String version)
hasCompressionRatio = version.compareTo("hb") >= 0;
hasPartitioner = version.compareTo("hc") >= 0;
tracksMaxTimestamp = version.compareTo("hd") >= 0;
tracksMinTimestamp = version.compareTo("ib") >= 0;
hasAncestors = version.compareTo("he") >= 0;
metadataIncludesModernReplayPosition = version.compareTo("hf") >= 0;
tracksTombstones = version.compareTo("ia") >= 0;
Expand Down
21 changes: 18 additions & 3 deletions src/java/org/apache/cassandra/io/sstable/SSTableMetadata.java
Expand Up @@ -51,6 +51,7 @@ public class SSTableMetadata
public final EstimatedHistogram estimatedRowSize;
public final EstimatedHistogram estimatedColumnCount;
public final ReplayPosition replayPosition;
public final long minTimestamp;
public final long maxTimestamp;
public final double compressionRatio;
public final String partitioner;
Expand All @@ -62,19 +63,21 @@ private SSTableMetadata()
this(defaultRowSizeHistogram(),
defaultColumnCountHistogram(),
ReplayPosition.NONE,
Long.MAX_VALUE,
Long.MIN_VALUE,
NO_COMPRESSION_RATIO,
null,
Collections.<Integer>emptySet(),
defaultTombstoneDropTimeHistogram());
}

private SSTableMetadata(EstimatedHistogram rowSizes, EstimatedHistogram columnCounts, ReplayPosition replayPosition, long maxTimestamp,
double cr, String partitioner, Set<Integer> ancestors, StreamingHistogram estimatedTombstoneDropTime)
private SSTableMetadata(EstimatedHistogram rowSizes, EstimatedHistogram columnCounts, ReplayPosition replayPosition, long minTimestamp,
long maxTimestamp, double cr, String partitioner, Set<Integer> ancestors, StreamingHistogram estimatedTombstoneDropTime)
{
this.estimatedRowSize = rowSizes;
this.estimatedColumnCount = columnCounts;
this.replayPosition = replayPosition;
this.minTimestamp = minTimestamp;
this.maxTimestamp = maxTimestamp;
this.compressionRatio = cr;
this.partitioner = partitioner;
Expand Down Expand Up @@ -129,6 +132,7 @@ public static class Collector
protected EstimatedHistogram estimatedRowSize = defaultRowSizeHistogram();
protected EstimatedHistogram estimatedColumnCount = defaultColumnCountHistogram();
protected ReplayPosition replayPosition = ReplayPosition.NONE;
protected long minTimestamp = Long.MAX_VALUE;
protected long maxTimestamp = Long.MIN_VALUE;
protected double compressionRatio = NO_COMPRESSION_RATIO;
protected Set<Integer> ancestors = new HashSet<Integer>();
Expand Down Expand Up @@ -158,6 +162,11 @@ public void addCompressionRatio(long compressed, long uncompressed)
compressionRatio = (double) compressed/uncompressed;
}

public void updateMinTimestamp(long potentialMin)
{
minTimestamp = Math.min(minTimestamp, potentialMin);
}

public void updateMaxTimestamp(long potentialMax)
{
maxTimestamp = Math.max(maxTimestamp, potentialMax);
Expand All @@ -168,6 +177,7 @@ public SSTableMetadata finalizeMetadata(String partitioner)
return new SSTableMetadata(estimatedRowSize,
estimatedColumnCount,
replayPosition,
minTimestamp,
maxTimestamp,
compressionRatio,
partitioner,
Expand Down Expand Up @@ -201,6 +211,7 @@ public Collector addAncestor(int generation)

void update(long size, ColumnStats stats)
{
updateMinTimestamp(stats.minTimestamp);
/*
* The max timestamp is not always collected here (more precisely, row.maxTimestamp() may return Long.MIN_VALUE),
* to avoid deserializing an EchoedRow.
Expand All @@ -226,6 +237,7 @@ public void serialize(SSTableMetadata sstableStats, DataOutput dos) throws IOExc
EstimatedHistogram.serializer.serialize(sstableStats.estimatedRowSize, dos);
EstimatedHistogram.serializer.serialize(sstableStats.estimatedColumnCount, dos);
ReplayPosition.serializer.serialize(sstableStats.replayPosition, dos);
dos.writeLong(sstableStats.minTimestamp);
dos.writeLong(sstableStats.maxTimestamp);
dos.writeDouble(sstableStats.compressionRatio);
dos.writeUTF(sstableStats.partitioner);
Expand Down Expand Up @@ -269,6 +281,9 @@ public SSTableMetadata deserialize(DataInputStream dis, Descriptor desc) throws
// make sure we don't omit replaying something that we should. see CASSANDRA-4782
replayPosition = ReplayPosition.NONE;
}
long minTimestamp = desc.version.tracksMinTimestamp ? dis.readLong() : Long.MIN_VALUE;
if (!desc.version.tracksMinTimestamp)
minTimestamp = Long.MAX_VALUE;
long maxTimestamp = desc.version.containsTimestamp() ? dis.readLong() : Long.MIN_VALUE;
if (!desc.version.tracksMaxTimestamp) // see javadoc to Descriptor.containsTimestamp
maxTimestamp = Long.MIN_VALUE;
Expand All @@ -283,7 +298,7 @@ public SSTableMetadata deserialize(DataInputStream dis, Descriptor desc) throws
StreamingHistogram tombstoneHistogram = desc.version.tracksTombstones
? StreamingHistogram.serializer.deserialize(dis)
: defaultTombstoneDropTimeHistogram();
return new SSTableMetadata(rowSizes, columnCounts, replayPosition, maxTimestamp, compressionRatio, partitioner, ancestors, tombstoneHistogram);
return new SSTableMetadata(rowSizes, columnCounts, replayPosition, minTimestamp, maxTimestamp, compressionRatio, partitioner, ancestors, tombstoneHistogram);
}
}
}
5 changes: 5 additions & 0 deletions src/java/org/apache/cassandra/io/sstable/SSTableReader.java
Expand Up @@ -1102,6 +1102,11 @@ public ReplayPosition getReplayPosition()
return sstableMetadata.replayPosition;
}

public long getMinTimestamp()
{
return sstableMetadata.minTimestamp;
}

public long getMaxTimestamp()
{
return sstableMetadata.maxTimestamp;
Expand Down
3 changes: 3 additions & 0 deletions src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
Expand Up @@ -234,6 +234,7 @@ public long appendFromStream(DecoratedKey key, CFMetaData metadata, long dataSiz
}

// deserialize each column to obtain maxTimestamp and immediately serialize it.
long minTimestamp = Long.MAX_VALUE;
long maxTimestamp = Long.MIN_VALUE;
StreamingHistogram tombstones = new StreamingHistogram(TOMBSTONE_HISTOGRAM_BIN_SIZE);
ColumnFamily cf = ColumnFamily.create(metadata, ArrayBackedSortedColumns.factory());
Expand Down Expand Up @@ -268,6 +269,7 @@ else if (atom instanceof SuperColumn)
{
tombstones.update(deletionTime);
}
minTimestamp = Math.min(minTimestamp, atom.minTimestamp());
maxTimestamp = Math.max(maxTimestamp, atom.maxTimestamp());
try
{
Expand All @@ -281,6 +283,7 @@ else if (atom instanceof SuperColumn)

assert dataSize == dataFile.getFilePointer() - (dataStart + 8)
: "incorrect row data size " + dataSize + " written to " + dataFile.getPath() + "; correct is " + (dataFile.getFilePointer() - (dataStart + 8));
sstableMetadataCollector.updateMinTimestamp(minTimestamp);
sstableMetadataCollector.updateMaxTimestamp(maxTimestamp);
sstableMetadataCollector.addRowSize(dataFile.getFilePointer() - currentPosition);
sstableMetadataCollector.addColumnCount(columnCount);
Expand Down

0 comments on commit 70e9bed

Please sign in to comment.