Permalink
Browse files

Merge branch 'cassandra-2.1' into trunk

  • Loading branch information...
2 parents e185afa + b4f262e commit 2d92f14baaae7f2dd4a61f602896dd3a4abf7d1f @iamaleksey iamaleksey committed Mar 11, 2014
View
2 CHANGES.txt
@@ -16,6 +16,8 @@
* Fix ClassCastException for compact table with composites (CASSANDRA-6738)
* Fix potentially repairing with wrong nodes (CASSANDRA-6808)
Merged from 2.0:
+ * Fix saving triggers to schema (CASSANDRA-6789)
+ * Fix trigger mutations when base mutation list is immutable (CASSANDRA-6790)
* Avoid race-prone second "scrub" of system keyspace (CASSANDRA-6797)
* Pool CqlRecordWriter clients by inetaddress rather than Range
(CASSANDRA-6665)
View
3 doc/cql3/CQL.textile
@@ -222,9 +222,6 @@ bc(syntax)..
<column-definition> ::= <identifier> <type> ( STATIC )? ( PRIMARY KEY )?
| PRIMARY KEY '(' <partition-key> ( ',' <identifier> )* ')'
-<partition-key> ::= <partition-key>
- | '(' <partition-key> ( ',' <identifier> )* ')'
-
<partition-key> ::= <identifier>
| '(' <identifier> (',' <identifier> )* ')'
View
3 src/java/org/apache/cassandra/config/CFMetaData.java
@@ -1693,6 +1693,9 @@ public void toSchema(Mutation mutation, long timestamp)
{
toSchemaNoColumnsNoTriggers(mutation, timestamp);
+ for (TriggerDefinition td : triggers.values())
+ td.toSchema(mutation, cfName, timestamp);
+
for (ColumnDefinition cd : allColumns())
cd.toSchema(mutation, timestamp);
}
View
6 src/java/org/apache/cassandra/service/StorageProxy.java
@@ -511,13 +511,13 @@ public static void mutate(Collection<? extends IMutation> mutations, Consistency
}
}
- public static void mutateWithTriggers(Collection<? extends IMutation> mutations, ConsistencyLevel consistencyLevel, boolean mutateAtomically) throws WriteTimeoutException, UnavailableException,
- OverloadedException, InvalidRequestException
+ public static void mutateWithTriggers(Collection<? extends IMutation> mutations, ConsistencyLevel consistencyLevel, boolean mutateAtomically)
+ throws WriteTimeoutException, UnavailableException, OverloadedException, InvalidRequestException
{
Collection<Mutation> tmutations = TriggerExecutor.instance.execute(mutations);
if (mutateAtomically || tmutations != null)
{
- Collection<Mutation> allMutations = (Collection<Mutation>) mutations;
+ Collection<Mutation> allMutations = new ArrayList<>((Collection<Mutation>) mutations);
if (tmutations != null)
allMutations.addAll(tmutations);
StorageProxy.mutateAtomically(allMutations, consistencyLevel);
View
126 test/unit/org/apache/cassandra/triggers/TriggersSchemaTest.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.triggers;
+
+import java.util.Collections;
+
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.KSMetaData;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.config.TriggerDefinition;
+import org.apache.cassandra.locator.SimpleStrategy;
+import org.apache.cassandra.service.MigrationManager;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class TriggersSchemaTest extends SchemaLoader
+{
+ String ksName = "ks" + System.nanoTime();
+ String cfName = "cf" + System.nanoTime();
+ String triggerName = "trigger_" + System.nanoTime();
+ String triggerClass = "org.apache.cassandra.triggers.NoSuchTrigger.class";
+
+ @Test
+ public void newKsContainsCfWithTrigger() throws Exception
+ {
+ TriggerDefinition td = TriggerDefinition.create(triggerName, triggerClass);
+ CFMetaData cfm1 = CFMetaData.compile(String.format("CREATE TABLE %s (k int PRIMARY KEY, v int)", cfName), ksName);
+ cfm1.addTriggerDefinition(td);
+ KSMetaData ksm = KSMetaData.newKeyspace(ksName,
+ SimpleStrategy.class,
+ Collections.singletonMap("replication_factor", "1"),
+ true,
+ Collections.singletonList(cfm1));
+ MigrationManager.announceNewKeyspace(ksm);
+
+ CFMetaData cfm2 = Schema.instance.getCFMetaData(ksName, cfName);
+ assertFalse(cfm2.getTriggers().isEmpty());
+ assertEquals(1, cfm2.getTriggers().size());
+ assertEquals(td, cfm2.getTriggers().get(triggerName));
+ }
+
+ @Test
+ public void addNewCfWithTriggerToKs() throws Exception
+ {
+ KSMetaData ksm = KSMetaData.newKeyspace(ksName,
+ SimpleStrategy.class,
+ Collections.singletonMap("replication_factor", "1"),
+ true,
+ Collections.EMPTY_LIST);
+ MigrationManager.announceNewKeyspace(ksm);
+
+ CFMetaData cfm1 = CFMetaData.compile(String.format("CREATE TABLE %s (k int PRIMARY KEY, v int)", cfName), ksName);
+ TriggerDefinition td = TriggerDefinition.create(triggerName, triggerClass);
+ cfm1.addTriggerDefinition(td);
+
+ MigrationManager.announceNewColumnFamily(cfm1);
+
+ CFMetaData cfm2 = Schema.instance.getCFMetaData(ksName, cfName);
+ assertFalse(cfm2.getTriggers().isEmpty());
+ assertEquals(1, cfm2.getTriggers().size());
+ assertEquals(td, cfm2.getTriggers().get(triggerName));
+ }
+
+ @Test
+ public void addTriggerToCf() throws Exception
+ {
+ CFMetaData cfm1 = CFMetaData.compile(String.format("CREATE TABLE %s (k int PRIMARY KEY, v int)", cfName), ksName);
+ KSMetaData ksm = KSMetaData.newKeyspace(ksName,
+ SimpleStrategy.class,
+ Collections.singletonMap("replication_factor", "1"),
+ true,
+ Collections.singletonList(cfm1));
+ MigrationManager.announceNewKeyspace(ksm);
+
+ CFMetaData cfm2 = Schema.instance.getCFMetaData(ksName, cfName).clone();
+ TriggerDefinition td = TriggerDefinition.create(triggerName, triggerClass);
+ cfm2.addTriggerDefinition(td);
+ MigrationManager.announceColumnFamilyUpdate(cfm2, false);
+
+ CFMetaData cfm3 = Schema.instance.getCFMetaData(ksName, cfName);
+ assertFalse(cfm3.getTriggers().isEmpty());
+ assertEquals(1, cfm3.getTriggers().size());
+ assertEquals(td, cfm3.getTriggers().get(triggerName));
+ }
+
+ @Test
+ public void removeTriggerFromCf() throws Exception
+ {
+ TriggerDefinition td = TriggerDefinition.create(triggerName, triggerClass);
+ CFMetaData cfm1 = CFMetaData.compile(String.format("CREATE TABLE %s (k int PRIMARY KEY, v int)", cfName), ksName);
+ cfm1.addTriggerDefinition(td);
+ KSMetaData ksm = KSMetaData.newKeyspace(ksName,
+ SimpleStrategy.class,
+ Collections.singletonMap("replication_factor", "1"),
+ true,
+ Collections.singletonList(cfm1));
+ MigrationManager.announceNewKeyspace(ksm);
+
+ CFMetaData cfm2 = Schema.instance.getCFMetaData(ksName, cfName).clone();
+ cfm2.removeTrigger(triggerName);
+ MigrationManager.announceColumnFamilyUpdate(cfm2, false);
+
+ CFMetaData cfm3 = Schema.instance.getCFMetaData(ksName, cfName).clone();
+ assertTrue(cfm3.getTriggers().isEmpty());
+ }
+}
View
179 test/unit/org/apache/cassandra/triggers/TriggersTest.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.triggers;
+
+import java.net.InetAddress;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Collections;
+
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.cql3.UntypedResultSet;
+import org.apache.cassandra.db.ArrayBackedSortedColumns;
+import org.apache.cassandra.db.Cell;
+import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.composites.CellNames;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.thrift.Cassandra;
+import org.apache.cassandra.thrift.ColumnOrSuperColumn;
+import org.apache.cassandra.thrift.ColumnParent;
+import org.apache.cassandra.thrift.TFramedTransportFactory;
+import org.apache.cassandra.thrift.ThriftServer;
+import org.apache.thrift.protocol.TBinaryProtocol;
+
+import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
+import static org.junit.Assert.assertEquals;
+
+public class TriggersTest extends SchemaLoader
+{
+ private static boolean triggerCreated = false;
+ private static ThriftServer thriftServer;
+
+ private static String ksName = "triggers_test_ks";
+ private static String cfName = "test_table";
+
+ @Before
+ public void setup() throws Exception
+ {
+ StorageService.instance.initServer(0);
+ if (thriftServer == null || ! thriftServer.isRunning())
+ {
+ thriftServer = new ThriftServer(InetAddress.getLocalHost(), 9170, 50);
+ thriftServer.start();
+ }
+
+ String cql = String.format("CREATE KEYSPACE IF NOT EXISTS %s " +
+ "WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1}",
+ ksName);
+ QueryProcessor.process(cql, ConsistencyLevel.ONE);
+
+ cql = String.format("CREATE TABLE IF NOT EXISTS %s.%s (k int, v1 int, v2 int, PRIMARY KEY (k))", ksName, cfName);
+ QueryProcessor.process(cql, ConsistencyLevel.ONE);
+
+ // no conditional execution of create trigger stmt yet
+ if (! triggerCreated)
+ {
+ cql = String.format("CREATE TRIGGER trigger_1 ON %s.%s USING '%s'",
+ ksName, cfName, TestTrigger.class.getName());
+ QueryProcessor.process(cql, ConsistencyLevel.ONE);
+ triggerCreated = true;
+ }
+ }
+
+ @AfterClass
+ public static void teardown()
+ {
+ if (thriftServer != null && thriftServer.isRunning())
+ {
+ thriftServer.stop();
+ }
+ }
+
+ @Test
+ public void executeTriggerOnCqlInsert() throws Exception
+ {
+ String cql = String.format("INSERT INTO %s.%s (k, v1) VALUES (0, 0)", ksName, cfName);
+ QueryProcessor.process(cql, ConsistencyLevel.ONE);
+ assertUpdateIsAugmented(0);
+ }
+
+ @Test
+ public void executeTriggerOnCqlBatchInsert() throws Exception
+ {
+ String cql = String.format("BEGIN BATCH " +
+ " INSERT INTO %s.%s (k, v1) VALUES (1, 1); " +
+ "APPLY BATCH",
+ ksName, cfName);
+ QueryProcessor.process(cql, ConsistencyLevel.ONE);
+ assertUpdateIsAugmented(1);
+ }
+
+ @Test
+ public void executeTriggerOnThriftInsert() throws Exception
+ {
+ Cassandra.Client client = new Cassandra.Client(
+ new TBinaryProtocol(
+ new TFramedTransportFactory().openTransport(
+ InetAddress.getLocalHost().getHostName(), 9170)));
+ client.set_keyspace(ksName);
+ client.insert(bytes(2),
+ new ColumnParent(cfName),
+ getColumnForInsert("v1", 2),
+ org.apache.cassandra.thrift.ConsistencyLevel.ONE);
+
+ assertUpdateIsAugmented(2);
+ }
+
+ @Test
+ public void executeTriggerOnThriftBatchUpdate() throws Exception
+ {
+ Cassandra.Client client = new Cassandra.Client(
+ new TBinaryProtocol(
+ new TFramedTransportFactory().openTransport(
+ InetAddress.getLocalHost().getHostName(), 9170)));
+ client.set_keyspace(ksName);
+ org.apache.cassandra.thrift.Mutation mutation = new org.apache.cassandra.thrift.Mutation();
+ ColumnOrSuperColumn cosc = new ColumnOrSuperColumn();
+ cosc.setColumn(getColumnForInsert("v1", 3));
+ mutation.setColumn_or_supercolumn(cosc);
+ client.batch_mutate(
+ Collections.singletonMap(bytes(3),
+ Collections.singletonMap(cfName,
+ Collections.singletonList(mutation))),
+ org.apache.cassandra.thrift.ConsistencyLevel.ONE);
+
+ assertUpdateIsAugmented(3);
+ }
+
+ private void assertUpdateIsAugmented(int key)
+ {
+ UntypedResultSet rs = QueryProcessor.processInternal(
+ String.format("SELECT * FROM %s.%s WHERE k=%s", ksName, cfName, key));
+ assertEquals(999, rs.one().getInt("v2"));
+ }
+
+ private org.apache.cassandra.thrift.Column getColumnForInsert(String columnName, int value)
+ {
+ org.apache.cassandra.thrift.Column column = new org.apache.cassandra.thrift.Column();
+ column.setName(Schema.instance.getCFMetaData(ksName, cfName).comparator.asAbstractType().fromString(columnName));
+ column.setValue(bytes(value));
+ column.setTimestamp(System.currentTimeMillis());
+ return column;
+ }
+
+ public static class TestTrigger implements ITrigger
+ {
+ public Collection<Mutation> augment(ByteBuffer key, ColumnFamily update)
+ {
+ ColumnFamily extraUpdate = update.cloneMeShallow(ArrayBackedSortedColumns.factory, false);
+ extraUpdate.addColumn(new Cell(update.metadata().comparator.makeCellName(bytes("v2")),
+ bytes(999)));
+ Mutation mutation = new Mutation(ksName, key);
+ mutation.add(extraUpdate);
+ return Collections.singletonList(mutation);
+ }
+ }
+}

0 comments on commit 2d92f14

Please sign in to comment.