Skip to content

Commit

Permalink
Optimize batchlog replay to avoid full scans
Browse files Browse the repository at this point in the history
patch by Branimir Lambov; reviewed by Aleksey Yeschenko for
CASSANDRA-7237
  • Loading branch information
blambov authored and iamaleksey committed Aug 6, 2015
1 parent ef59624 commit 762db47
Show file tree
Hide file tree
Showing 9 changed files with 395 additions and 156 deletions.
2 changes: 2 additions & 0 deletions CHANGES.txt
@@ -1,4 +1,5 @@
3.0.0-beta1
* Optimize batchlog replay to avoid full scans (CASSANDRA-7237)
* Repair improvements when using vnodes (CASSANDRA-5220)
* Disable scripted UDFs by default (CASSANDRA-9889)
* Add transparent data encryption core classes (CASSANDRA-9945)
Expand All @@ -11,6 +12,7 @@ Merged from 2.1:
Merged from 2.0:
* Don't cast expected bf size to an int (CASSANDRA-9959)


3.0.0-alpha1
* Implement proper sandboxing for UDFs (CASSANDRA-9402)
* Simplify (and unify) cleanup of compaction leftovers (CASSANDRA-7066)
Expand Down
2 changes: 2 additions & 0 deletions NEWS.txt
Expand Up @@ -58,6 +58,8 @@ Upgrading
be done by setting the new option `enabled` to `false`.
- Only map syntax is now allowed for caching options. ALL/NONE/KEYS_ONLY/ROWS_ONLY syntax
has been deprecated since 2.1.0 and is being removed in 3.0.0.
- Batchlog entries are now stored in a new table - system.batches.
The old one has been deprecated.


2.2
Expand Down
226 changes: 125 additions & 101 deletions src/java/org/apache/cassandra/db/BatchlogManager.java

Large diffs are not rendered by default.

13 changes: 13 additions & 0 deletions src/java/org/apache/cassandra/db/ColumnFamilyStore.java
Expand Up @@ -2054,6 +2054,19 @@ public int getMeanColumns()
return count > 0 ? (int) (sum / count) : 0;
}

public double getMeanPartitionSize()
{
long sum = 0;
long count = 0;
for (SSTableReader sstable : getSSTables(SSTableSet.CANONICAL))
{
long n = sstable.getEstimatedPartitionSize().count();
sum += sstable.getEstimatedPartitionSize().mean() * n;
count += n;
}
return count > 0 ? sum * 1.0 / count : 0;
}

public long estimateKeys()
{
long n = 0;
Expand Down
2 changes: 1 addition & 1 deletion src/java/org/apache/cassandra/db/Memtable.java
Expand Up @@ -342,7 +342,7 @@ class FlushRunnable extends DiskAwareRunnable
+ liveDataSize.get()) // data
* 1.2); // bloom filter and row index overhead

this.isBatchLogTable = cfs.name.equals(SystemKeyspace.BATCHLOG) && cfs.keyspace.getName().equals(SystemKeyspace.NAME);
this.isBatchLogTable = cfs.name.equals(SystemKeyspace.BATCHES) && cfs.keyspace.getName().equals(SystemKeyspace.NAME);
}

public long getExpectedWriteSize()
Expand Down
28 changes: 22 additions & 6 deletions src/java/org/apache/cassandra/db/SystemKeyspace.java
Expand Up @@ -42,6 +42,7 @@
import org.apache.cassandra.db.compaction.CompactionHistoryTabularData;
import org.apache.cassandra.db.marshal.*;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.LocalPartitioner;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.ConfigurationException;
Expand Down Expand Up @@ -88,7 +89,7 @@ private SystemKeyspace()
public static final String NAME = "system";

public static final String HINTS = "hints";
public static final String BATCHLOG = "batchlog";
public static final String BATCHES = "batches";
public static final String PAXOS = "paxos";
public static final String BUILT_INDEXES = "IndexInfo";
public static final String LOCAL = "local";
Expand All @@ -102,6 +103,7 @@ private SystemKeyspace()
public static final String MATERIALIZED_VIEWS_BUILDS_IN_PROGRESS = "materialized_views_builds_in_progress";
public static final String BUILT_MATERIALIZED_VIEWS = "built_materialized_views";

@Deprecated public static final String LEGACY_BATCHLOG = "batchlog";
@Deprecated public static final String LEGACY_KEYSPACES = "schema_keyspaces";
@Deprecated public static final String LEGACY_COLUMNFAMILIES = "schema_columnfamilies";
@Deprecated public static final String LEGACY_COLUMNS = "schema_columns";
Expand All @@ -123,15 +125,15 @@ private SystemKeyspace()
.compaction(CompactionParams.scts(singletonMap("enabled", "false")))
.gcGraceSeconds(0);

public static final CFMetaData Batchlog =
compile(BATCHLOG,
public static final CFMetaData Batches =
compile(BATCHES,
"batches awaiting replay",
"CREATE TABLE %s ("
+ "id uuid,"
+ "id timeuuid,"
+ "data blob,"
+ "version int,"
+ "written_at timestamp,"
+ "PRIMARY KEY ((id)))")
.copy(new LocalPartitioner(TimeUUIDType.instance))
.compaction(CompactionParams.scts(singletonMap("min_threshold", "2")))
.gcGraceSeconds(0);

Expand Down Expand Up @@ -279,6 +281,19 @@ private SystemKeyspace()
+ "view_name text,"
+ "PRIMARY KEY ((keyspace_name), view_name))");

@Deprecated
public static final CFMetaData LegacyBatchlog =
compile(LEGACY_BATCHLOG,
"*DEPRECATED* batchlog entries",
"CREATE TABLE %s ("
+ "id uuid,"
+ "data blob,"
+ "version int,"
+ "written_at timestamp,"
+ "PRIMARY KEY ((id)))")
.compaction(CompactionParams.scts(singletonMap("min_threshold", "2")))
.gcGraceSeconds(0);

@Deprecated
public static final CFMetaData LegacyKeyspaces =
compile(LEGACY_KEYSPACES,
Expand Down Expand Up @@ -409,7 +424,7 @@ private static Tables tables()
{
return Tables.of(BuiltIndexes,
Hints,
Batchlog,
Batches,
Paxos,
Local,
Peers,
Expand All @@ -421,6 +436,7 @@ private static Tables tables()
AvailableRanges,
MaterializedViewsBuildsInProgress,
BuiltMaterializedViews,
LegacyBatchlog,
LegacyKeyspaces,
LegacyColumnfamilies,
LegacyColumns,
Expand Down
30 changes: 29 additions & 1 deletion src/java/org/apache/cassandra/dht/LocalPartitioner.java
Expand Up @@ -66,9 +66,37 @@ public LocalToken getRandomToken()

public Token.TokenFactory getTokenFactory()
{
throw new UnsupportedOperationException();
return tokenFactory;
}

private final Token.TokenFactory tokenFactory = new Token.TokenFactory()
{
public ByteBuffer toByteArray(Token token)
{
return ((LocalToken)token).token;
}

public Token fromByteArray(ByteBuffer bytes)
{
return new LocalToken(bytes);
}

public String toString(Token token)
{
return comparator.getString(((LocalToken)token).token);
}

public void validate(String token)
{
comparator.validate(comparator.fromString(token));
}

public Token fromString(String string)
{
return new LocalToken(comparator.fromString(string));
}
};

public boolean preservesOrder()
{
return true;
Expand Down
2 changes: 1 addition & 1 deletion src/java/org/apache/cassandra/service/StorageProxy.java
Expand Up @@ -863,7 +863,7 @@ private static void asyncRemoveFromBatchlog(Collection<InetAddress> endpoints, U
null,
WriteType.SIMPLE);
Mutation mutation = new Mutation(
PartitionUpdate.fullPartitionDelete(SystemKeyspace.Batchlog,
PartitionUpdate.fullPartitionDelete(SystemKeyspace.Batches,
UUIDType.instance.decompose(uuid),
FBUtilities.timestampMicros(),
FBUtilities.nowInSeconds()));
Expand Down

0 comments on commit 762db47

Please sign in to comment.