Skip to content

Commit

Permalink
initial CAS support
Browse files Browse the repository at this point in the history
patch by jbellis and slebresne for CASSANDRA-5062
  • Loading branch information
jbellis committed Apr 11, 2013
1 parent f5ec4c7 commit 8b0e186
Show file tree
Hide file tree
Showing 50 changed files with 5,105 additions and 2,520 deletions.
1 change: 1 addition & 0 deletions CHANGES.txt
@@ -1,4 +1,5 @@
2.0
* CAS support (CASSANDRA-5062, )
* Leveled compaction performs size-tiered compactions in L0 (CASSANDRA-5371)
* Add yaml network topology snitch for mixed ec2/other envs (CASSANDRA-5339)
* Log when a node is down longer than the hint window (CASSANDRA-4554)
Expand Down
3 changes: 3 additions & 0 deletions conf/cassandra.yaml
Expand Up @@ -475,6 +475,9 @@ read_request_timeout_in_ms: 10000
range_request_timeout_in_ms: 10000
# How long the coordinator should wait for writes to complete
write_request_timeout_in_ms: 10000
# how long a coordinator should continue to retry a CAS operation
# that contends with other proposals for the same row
cas_contention_timeout_in_ms: 1000
# How long the coordinator should wait for truncates to complete
# (This can be much longer, because unless auto_snapshot is disabled
# we need to flush first so we can snapshot before removing the data.)
Expand Down
11 changes: 10 additions & 1 deletion interface/cassandra.thrift
Expand Up @@ -55,7 +55,7 @@ namespace rb CassandraThrift
# An effort should be made not to break forward-client-compatibility either
# (e.g. one should avoid removing obsolete fields from the IDL), but no
# guarantees in this respect are made by the Cassandra project.
const string VERSION = "19.36.0"
const string VERSION = "19.37.0"


#
Expand Down Expand Up @@ -635,6 +635,15 @@ service Cassandra {
4:required ConsistencyLevel consistency_level=ConsistencyLevel.ONE)
throws (1:InvalidRequestException ire, 2:UnavailableException ue, 3:TimedOutException te),

/**
* Atomic compare and set
*/
bool cas(1:required binary key,
2:required string column_family,
3:list<Column> expected,
4:list<Column> updates)
throws (1:InvalidRequestException ire, 2:UnavailableException ue, 3:TimedOutException te),

/**
Remove data from the row specified by key at the granularity specified by column_path, and the given timestamp. Note
that all the values in column_path besides column_path.column_family are truly optional: you can remove the entire
Expand Down
6,469 changes: 4,028 additions & 2,441 deletions interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java

Large diffs are not rendered by default.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 11 additions & 1 deletion src/java/org/apache/cassandra/config/CFMetaData.java
Expand Up @@ -243,6 +243,16 @@ public final class CFMetaData
+ "inputs set<int>"
+ ") WITH COMMENT='unfinished compactions'");

public static final CFMetaData PaxosCf = compile(18, "CREATE TABLE " + SystemTable.PAXOS_CF + " ("
+ "row_key blob,"
+ "cf_id UUID,"
+ "in_progress_ballot timeuuid,"
+ "proposal blob,"
+ "most_recent_commit_at timeuuid,"
+ "most_recent_commit blob,"
+ "PRIMARY KEY (row_key, cf_id)"
+ ") WITH COMMENT='in-progress paxos proposals'");

public enum Caching
{
ALL, KEYS_ONLY, ROWS_ONLY, NONE;
Expand Down Expand Up @@ -889,7 +899,7 @@ public void reload()
{
Row cfDefRow = SystemTable.readSchemaRow(ksName, cfName);

if (cfDefRow.cf == null || cfDefRow.cf.isEmpty())
if (cfDefRow.cf == null || cfDefRow.cf.getColumnCount() == 0)
throw new RuntimeException(String.format("%s not found in the schema definitions table.", ksName + ":" + cfName));

try
Expand Down
2 changes: 2 additions & 0 deletions src/java/org/apache/cassandra/config/Config.java
Expand Up @@ -61,6 +61,8 @@ public class Config

public Long write_request_timeout_in_ms = new Long(10000);

public Long cas_contention_timeout_in_ms = new Long(1000);

public Long truncate_request_timeout_in_ms = new Long(60000);

public Integer streaming_socket_timeout_in_ms = new Integer(0);
Expand Down
10 changes: 10 additions & 0 deletions src/java/org/apache/cassandra/config/DatabaseDescriptor.java
Expand Up @@ -752,6 +752,16 @@ public static void setWriteRpcTimeout(Long timeOutInMillis)
conf.write_request_timeout_in_ms = timeOutInMillis;
}

public static long getCasContentionTimeout()
{
return conf.cas_contention_timeout_in_ms;
}

public static void setCasContentionTimeout(Long timeOutInMillis)
{
conf.cas_contention_timeout_in_ms = timeOutInMillis;
}

public static long getTruncateRpcTimeout()
{
return conf.truncate_request_timeout_in_ms;
Expand Down
1 change: 1 addition & 0 deletions src/java/org/apache/cassandra/config/KSMetaData.java
Expand Up @@ -89,6 +89,7 @@ public static KSMetaData systemKeyspace()
CFMetaData.SchemaColumnFamiliesCf,
CFMetaData.SchemaColumnsCf,
CFMetaData.CompactionLogCf,
CFMetaData.PaxosCf,
CFMetaData.OldStatusCf,
CFMetaData.OldHintsCf,
CFMetaData.OldMigrationsCf,
Expand Down
2 changes: 1 addition & 1 deletion src/java/org/apache/cassandra/config/Schema.java
Expand Up @@ -449,7 +449,7 @@ public synchronized void clear()

public static boolean invalidSchemaRow(Row row)
{
return row.cf == null || (row.cf.isMarkedForDelete() && row.cf.isEmpty());
return row.cf == null || (row.cf.isMarkedForDelete() && row.cf.getColumnCount() == 0);
}

public static boolean ignoredSchemaRow(Row row)
Expand Down
2 changes: 1 addition & 1 deletion src/java/org/apache/cassandra/cql3/QueryProcessor.java
Expand Up @@ -179,7 +179,7 @@ public static UntypedResultSet processInternal(String query)
}
catch (RequestValidationException e)
{
throw new AssertionError(e);
throw new RuntimeException("Error validating " + query, e);
}
}

Expand Down
Expand Up @@ -172,7 +172,7 @@ protected Map<ByteBuffer, ColumnGroupMap> readRows(List<ByteBuffer> keys, Column
Map<ByteBuffer, ColumnGroupMap> map = new HashMap<ByteBuffer, ColumnGroupMap>();
for (Row row : rows)
{
if (row.cf == null || row.cf.isEmpty())
if (row.cf == null || row.cf.getColumnCount() == 0)
continue;

ColumnGroupMap.Builder groupBuilder = new ColumnGroupMap.Builder(composite, true);
Expand Down
Expand Up @@ -53,9 +53,4 @@ public void maybeResetDeletionTimes(int gcBefore)
{
deletionInfo = deletionInfo.purge(gcBefore);
}

public boolean isEmpty()
{
return getColumnCount() == 0;
}
}
Expand Up @@ -186,7 +186,7 @@ else if (result == 0)
public void addAll(ColumnFamily cm, Allocator allocator, Function<Column, Column> transformation)
{
delete(cm.deletionInfo());
if (cm.isEmpty())
if (cm.getColumnCount() == 0)
return;

Column[] copy = columns.toArray(new Column[getColumnCount()]);
Expand Down
5 changes: 0 additions & 5 deletions src/java/org/apache/cassandra/db/AtomicSortedColumns.java
Expand Up @@ -231,11 +231,6 @@ public int getColumnCount()
return ref.get().map.size();
}

public boolean isEmpty()
{
return ref.get().map.isEmpty();
}

public Iterator<Column> iterator(ColumnSlice[] slices)
{
return new ColumnSlice.NavigableMapIterator(ref.get().map, slices);
Expand Down
45 changes: 42 additions & 3 deletions src/java/org/apache/cassandra/db/ColumnFamily.java
Expand Up @@ -17,14 +17,18 @@
*/
package org.apache.cassandra.db;

import java.io.DataInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.security.MessageDigest;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.UUID;

import com.google.common.base.Function;
import com.google.common.base.Functions;
import com.google.common.collect.ImmutableMap;

import org.apache.commons.lang.builder.HashCodeBuilder;

Expand All @@ -35,6 +39,8 @@
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.io.sstable.ColumnStats;
import org.apache.cassandra.io.sstable.SSTable;
import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.utils.*;

/**
Expand Down Expand Up @@ -209,9 +215,12 @@ public void addAtom(OnDiskAtom atom)
public abstract int getColumnCount();

/**
* Returns true if this map is empty, false otherwise.
* Returns true if this contains no columns or deletion info
*/
public abstract boolean isEmpty();
public boolean isEmpty()
{
return deletionInfo().isLive() && getColumnCount() == 0;
}

/**
* Returns an iterator over the columns of this map that returns only the matching @param slices.
Expand Down Expand Up @@ -271,7 +280,7 @@ public ColumnFamily diff(ColumnFamily cfComposite)
}
}

if (!cfDiff.isEmpty() || cfDiff.isMarkedForDelete())
if (!cfDiff.isEmpty())
return cfDiff;
return null;
}
Expand Down Expand Up @@ -418,6 +427,36 @@ public boolean hasIrrelevantData(int gcBefore)
return false;
}

public Map<ByteBuffer, ByteBuffer> asMap()
{
ImmutableMap.Builder<ByteBuffer, ByteBuffer> builder = ImmutableMap.builder();
for (Column column : this)
builder.put(column.name, column.value);
return builder.build();
}

public static ColumnFamily fromBytes(ByteBuffer bytes)
{
if (bytes == null)
return null;

try
{
return serializer.deserialize(new DataInputStream(ByteBufferUtil.inputStream(bytes)), MessagingService.current_version);
}
catch (IOException e)
{
throw new RuntimeException(e);
}
}

public ByteBuffer toBytes()
{
DataOutputBuffer out = new DataOutputBuffer();
serializer.serialize(this, out, MessagingService.current_version);
return ByteBuffer.wrap(out.getData(), 0, out.getLength());
}

public abstract static class Factory <T extends ColumnFamily>
{
/**
Expand Down
3 changes: 2 additions & 1 deletion src/java/org/apache/cassandra/db/ConsistencyLevel.java
Expand Up @@ -47,7 +47,8 @@ public enum ConsistencyLevel
QUORUM (4),
ALL (5),
LOCAL_QUORUM(6),
EACH_QUORUM (7);
EACH_QUORUM (7),
SERIAL (8);

private static final Logger logger = LoggerFactory.getLogger(ConsistencyLevel.class);

Expand Down
12 changes: 6 additions & 6 deletions src/java/org/apache/cassandra/db/DefsTable.java
Expand Up @@ -347,7 +347,7 @@ private static Set<String> mergeKeyspaces(Map<DecoratedKey, ColumnFamily> old, M
ColumnFamily ksAttrs = entry.getValue();

// we don't care about nested ColumnFamilies here because those are going to be processed separately
if (!ksAttrs.isEmpty())
if (!(ksAttrs.getColumnCount() == 0))
addKeyspace(KSMetaData.fromSchema(new Row(entry.getKey(), entry.getValue()), Collections.<CFMetaData>emptyList()));
}

Expand All @@ -367,7 +367,7 @@ private static Set<String> mergeKeyspaces(Map<DecoratedKey, ColumnFamily> old, M
ColumnFamily prevValue = entry.getValue().leftValue();
ColumnFamily newValue = entry.getValue().rightValue();

if (prevValue.isEmpty())
if (prevValue.getColumnCount() == 0)
{
addKeyspace(KSMetaData.fromSchema(new Row(entry.getKey(), newValue), Collections.<CFMetaData>emptyList()));
continue;
Expand All @@ -391,7 +391,7 @@ private static Set<String> mergeKeyspaces(Map<DecoratedKey, ColumnFamily> old, M

ColumnFamily newState = valueDiff.rightValue();

if (newState.isEmpty())
if (newState.getColumnCount() == 0)
keyspacesToDrop.add(AsciiType.instance.getString(key.key));
else
updateKeyspace(KSMetaData.fromSchema(new Row(key, newState), Collections.<CFMetaData>emptyList()));
Expand All @@ -411,7 +411,7 @@ private static void mergeColumnFamilies(Map<DecoratedKey, ColumnFamily> old, Map
{
ColumnFamily cfAttrs = entry.getValue();

if (!cfAttrs.isEmpty())
if (!(cfAttrs.getColumnCount() == 0))
{
Map<String, CFMetaData> cfDefs = KSMetaData.deserializeColumnFamilies(new Row(entry.getKey(), cfAttrs));

Expand All @@ -432,12 +432,12 @@ private static void mergeColumnFamilies(Map<DecoratedKey, ColumnFamily> old, Map

Row newRow = new Row(keyspace, newValue);

if (prevValue.isEmpty()) // whole keyspace was deleted and now it's re-created
if (prevValue.getColumnCount() == 0) // whole keyspace was deleted and now it's re-created
{
for (CFMetaData cfm : KSMetaData.deserializeColumnFamilies(newRow).values())
addColumnFamily(cfm);
}
else if (newValue.isEmpty()) // whole keyspace is deleted
else if (newValue.getColumnCount() == 0) // whole keyspace is deleted
{
for (CFMetaData cfm : KSMetaData.deserializeColumnFamilies(new Row(keyspace, prevValue)).values())
dropColumnFamily(cfm.ksName, cfm.cfName);
Expand Down
2 changes: 1 addition & 1 deletion src/java/org/apache/cassandra/db/Memtable.java
Expand Up @@ -467,7 +467,7 @@ private SSTableReader writeSortedContents(Future<ReplayPosition> context, File s
// and BL data is strictly local, so we don't need to preserve tombstones for repair.
// If we have a data row + row level tombstone, then writing it is effectively an expensive no-op so we skip it.
// See CASSANDRA-4667.
if (cfs.name.equals(SystemTable.BATCHLOG_CF) && cfs.table.getName().equals(Table.SYSTEM_KS) && !cf.isEmpty())
if (cfs.name.equals(SystemTable.BATCHLOG_CF) && cfs.table.getName().equals(Table.SYSTEM_KS) && !(cf.getColumnCount() == 0))
continue;

// Pedantically, you could purge column level tombstones that are past GcGRace when writing to the SSTable.
Expand Down
8 changes: 8 additions & 0 deletions src/java/org/apache/cassandra/db/Row.java
Expand Up @@ -18,9 +18,12 @@
package org.apache.cassandra.db;

import java.io.*;
import java.nio.ByteBuffer;

import org.apache.cassandra.db.filter.IDiskAtomFilter;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.ByteBufferUtil;

Expand All @@ -39,6 +42,11 @@ public Row(DecoratedKey key, ColumnFamily cf)
this.cf = cf;
}

public Row(ByteBuffer key, ColumnFamily updates)
{
this(StorageService.getPartitioner().decorateKey(key), updates);
}

@Override
public String toString()
{
Expand Down
8 changes: 6 additions & 2 deletions src/java/org/apache/cassandra/db/RowMutation.java
Expand Up @@ -40,7 +40,7 @@ public class RowMutation implements IMutation
public static final String FORWARD_TO = "FWD_TO";
public static final String FORWARD_FROM = "FWD_FRM";

private final String table;
private final String table; // todo this is redundant
private final ByteBuffer key;
// map of column family id to mutations for that column family.
private final Map<UUID, ColumnFamily> modifications;
Expand All @@ -67,6 +67,11 @@ protected RowMutation(String table, ByteBuffer key, Map<UUID, ColumnFamily> modi
this.modifications = modifications;
}

public RowMutation(ByteBuffer key, ColumnFamily cf)
{
this(Schema.instance.getCFMetaData(cf.id()).ksName, key, cf);
}

public String getTable()
{
return table;
Expand Down Expand Up @@ -230,7 +235,6 @@ public String toString(boolean shallow)
return buff.append("])").toString();
}


public static class RowMutationSerializer implements IVersionedSerializer<RowMutation>
{
public void serialize(RowMutation rm, DataOutput out, int version) throws IOException
Expand Down

0 comments on commit 8b0e186

Please sign in to comment.