Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

merge from 1.2

  • Loading branch information...
commit a751d04d7d0e5e96ea7cee79093e090836ca0f5e 2 parents 2fde93d + c7eb146
@jbellis jbellis authored
View
2  CHANGES.txt
@@ -35,6 +35,8 @@
1.2.5
* Include fatal errors in trace events (CASSANDRA-5447)
+ * Ensure that PerRowSecondaryIndex is notified of row-level deletes
+ (CASSANDRA-5445)
1.2.4
View
2  doc/native_protocol.spec
@@ -1,6 +1,6 @@
CQL BINARY PROTOCOL v1
- (draft)
+
Table of Contents
View
2  src/java/org/apache/cassandra/db/AtomicSortedColumns.java
@@ -173,7 +173,7 @@ public long addAllWithSizeDelta(ColumnFamily cm, Allocator allocator, Function<C
}
while (!ref.compareAndSet(current, modified));
- indexer.commit();
+ indexer.updateRowLevelIndexes();
return sizeDelta;
}
View
2  src/java/org/apache/cassandra/db/Table.java
@@ -374,7 +374,7 @@ public void apply(RowMutation mutation, boolean writeCommitLog, boolean updateIn
}
Tracing.trace("Adding to {} memtable", cf.metadata().cfName);
- cfs.apply(key, cf, updateIndexes ? cfs.indexManager.updaterFor(key, true) : SecondaryIndexManager.nullUpdater);
+ cfs.apply(key, cf, updateIndexes ? cfs.indexManager.updaterFor(key) : SecondaryIndexManager.nullUpdater);
}
}
finally
View
2  src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
@@ -71,7 +71,7 @@ public LazilyCompactedRow(CompactionController controller, List<? extends ICount
super(rows.get(0).getKey());
this.rows = rows;
this.controller = controller;
- indexer = controller.cfs.indexManager.updaterFor(key, false);
+ indexer = controller.cfs.indexManager.updaterFor(key);
long maxDelTimestamp = Long.MIN_VALUE;
for (OnDiskAtomIterator row : rows)
View
2  src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
@@ -205,7 +205,7 @@ public ColumnFamily call() throws Exception
data.add(FBUtilities.closeableIterator(row.cf.iterator()));
}
- PrecompactedRow.merge(returnCF, data, controller.cfs.indexManager.updaterFor(rows.get(0).key, false));
+ PrecompactedRow.merge(returnCF, data, controller.cfs.indexManager.updaterFor(rows.get(0).key));
return PrecompactedRow.removeDeletedAndOldShards(rows.get(0).key, controller, returnCF);
}
}
View
4 src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
@@ -87,7 +87,7 @@ public static ColumnFamily removeDeletedAndOldShards(DecoratedKey key, boolean s
// See comment in preceding method
ColumnFamily compacted = ColumnFamilyStore.removeDeleted(cf,
shouldPurge ? controller.gcBefore : Integer.MIN_VALUE,
- controller.cfs.indexManager.updaterFor(key, false));
+ controller.cfs.indexManager.updaterFor(key));
if (shouldPurge && compacted != null && compacted.metadata().getDefaultValidator().isCommutative())
CounterColumn.mergeAndRemoveOldShards(key, compacted, controller.gcBefore, controller.mergeShardBefore);
return compacted;
@@ -121,7 +121,7 @@ private static ColumnFamily merge(List<SSTableIdentityIterator> rows, Compaction
}
}
- merge(returnCF, data, controller.cfs.indexManager.updaterFor(rows.get(0).getKey(), false));
+ merge(returnCF, data, controller.cfs.indexManager.updaterFor(rows.get(0).getKey()));
return returnCF;
}
View
101 src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
@@ -52,7 +52,7 @@ public void update(Column oldColumn, Column column) { }
public void remove(Column current) { }
- public void commit() {}
+ public void updateRowLevelIndexes() {}
};
/**
@@ -471,11 +471,11 @@ public void deleteFromIndexes(DecoratedKey key, List<Column> indexedColumnsInRow
* can get updated. Note: only a CF backed by AtomicSortedColumns implements this behaviour
* fully, other types simply ignore the index updater.
*/
- public Updater updaterFor(final DecoratedKey key, boolean includeRowIndexes)
+ public Updater updaterFor(final DecoratedKey key)
{
- return (includeRowIndexes && !rowLevelIndexMap.isEmpty())
- ? new MixedIndexUpdater(key)
- : indexesByColumn.isEmpty() ? nullUpdater : new PerColumnIndexUpdater(key);
+ return (indexesByColumn.isEmpty() && rowLevelIndexMap.isEmpty())
+ ? nullUpdater
+ : new StandardUpdater(key);
}
/**
@@ -580,14 +580,14 @@ public boolean validate(Column column)
public void remove(Column current);
/** called after memtable updates are complete (CASSANDRA-5397) */
- public void commit();
+ public void updateRowLevelIndexes();
}
- private class PerColumnIndexUpdater implements Updater
+ private class StandardUpdater implements Updater
{
private final DecoratedKey key;
- public PerColumnIndexUpdater(DecoratedKey key)
+ public StandardUpdater(DecoratedKey key)
{
this.key = key;
}
@@ -598,76 +598,19 @@ public void insert(Column column)
return;
for (SecondaryIndex index : indexFor(column.name()))
- ((PerColumnSecondaryIndex) index).insert(key.key, column);
- }
-
- public void update(Column oldColumn, Column column)
- {
- for (SecondaryIndex index : indexFor(column.name()))
{
- ((PerColumnSecondaryIndex) index).delete(key.key, oldColumn);
- if (!column.isMarkedForDelete())
+ if (index instanceof PerColumnSecondaryIndex)
((PerColumnSecondaryIndex) index).insert(key.key, column);
}
}
- public void remove(Column column)
- {
- if (column.isMarkedForDelete())
- return;
-
- for (SecondaryIndex index : indexFor(column.name()))
- ((PerColumnSecondaryIndex) index).delete(key.key, column);
- }
-
- public void commit()
- {
- // this is a no-op as per-column index updates are applied immediately
- }
- }
-
- private class MixedIndexUpdater implements Updater
- {
- private final DecoratedKey key;
- ConcurrentHashMap<SecondaryIndex, ByteBuffer> deferredUpdates = new ConcurrentHashMap<SecondaryIndex, ByteBuffer>();
-
- public MixedIndexUpdater(DecoratedKey key)
- {
- this.key = key;
- }
-
- public void insert(Column column)
- {
- if (column.isMarkedForDelete())
- return;
-
- for (SecondaryIndex index : indexFor(column.name()))
- {
- if (index instanceof PerColumnSecondaryIndex)
- {
- ((PerColumnSecondaryIndex) index).insert(key.key, column);
- }
- else
- {
- deferredUpdates.putIfAbsent(index, key.key);
- }
- }
- }
-
public void update(Column oldColumn, Column column)
{
for (SecondaryIndex index : indexFor(column.name()))
{
- if (index instanceof PerColumnSecondaryIndex)
- {
- ((PerColumnSecondaryIndex) index).delete(key.key, oldColumn);
- if (!column.isMarkedForDelete())
- ((PerColumnSecondaryIndex) index).insert(key.key, column);
- }
- else
- {
- deferredUpdates.putIfAbsent(index, key.key);
- }
+ ((PerColumnSecondaryIndex) index).delete(key.key, oldColumn);
+ if (!column.isMarkedForDelete())
+ ((PerColumnSecondaryIndex) index).insert(key.key, column);
}
}
@@ -678,25 +621,15 @@ public void remove(Column column)
for (SecondaryIndex index : indexFor(column.name()))
{
- if (index instanceof PerColumnSecondaryIndex)
- {
- ((PerColumnSecondaryIndex) index).delete(key.key, column);
- }
- else
- {
- // per-row secondary indexes are assumed to keep the index up-to-date at insert time, rather
- // than performing lazy updates
- }
+ if (index instanceof PerColumnSecondaryIndex)
+ ((PerColumnSecondaryIndex) index).delete(key.key, column);
}
}
- public void commit()
+ public void updateRowLevelIndexes()
{
- for (Map.Entry<SecondaryIndex, ByteBuffer> update : deferredUpdates.entrySet())
- {
- assert update.getKey() instanceof PerRowSecondaryIndex;
- ((PerRowSecondaryIndex) update.getKey()).index(update.getValue());
- }
+ for (SecondaryIndex index : rowLevelIndexMap.values())
+ ((PerRowSecondaryIndex) index).index(key.key);
}
}
}
View
2  test/unit/org/apache/cassandra/SchemaLoader.java
@@ -311,7 +311,7 @@ private static CFMetaData perRowIndexedCFMD(String ksName, String cfName, boolea
indexOptions,
ByteBufferUtil.bytesToHex(cName),
null, ColumnDefinition.Type.REGULAR));
- }});
+ }});
}
private static void useCompression(List<KSMetaData> schema)
View
55 test/unit/org/apache/cassandra/db/index/PerRowSecondaryIndexTest.java
@@ -25,14 +25,17 @@
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.utils.ByteBufferUtil;
+import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.Arrays;
import java.util.Set;
import static junit.framework.Assert.assertEquals;
import static junit.framework.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
public class PerRowSecondaryIndexTest extends SchemaLoader
{
@@ -43,6 +46,12 @@
// indexed & stashes it in a static variable for inspection
// in the test.
+ @Before
+ public void clearTestStub()
+ {
+ TestIndex.reset();
+ }
+
@Test
public void testIndexInsertAndUpdate() throws IOException
{
@@ -64,11 +73,56 @@ public void testIndexInsertAndUpdate() throws IOException
indexedRow = TestIndex.LAST_INDEXED_ROW;
assertNotNull(indexedRow);
assertEquals(ByteBufferUtil.bytes("bar"), indexedRow.getColumn(ByteBufferUtil.bytes("indexed")).value());
+ assertTrue(Arrays.equals("k1".getBytes(), TestIndex.LAST_INDEXED_KEY.array()));
+ }
+
+ @Test
+ public void testColumnDelete() throws IOException
+ {
+ // issue a column delete and test that the configured index instance was notified to update
+ RowMutation rm;
+ rm = new RowMutation("PerRowSecondaryIndex", ByteBufferUtil.bytes("k2"));
+ rm.delete(new QueryPath("Indexed1", null, ByteBufferUtil.bytes("indexed")), 1);
+ rm.apply();
+
+ ColumnFamily indexedRow = TestIndex.LAST_INDEXED_ROW;
+ assertNotNull(indexedRow);
+
+ for (IColumn column : indexedRow.getSortedColumns())
+ {
+ assertTrue(column.isMarkedForDelete());
+ }
+ assertTrue(Arrays.equals("k2".getBytes(), TestIndex.LAST_INDEXED_KEY.array()));
+ }
+
+ @Test
+ public void testRowDelete() throws IOException
+ {
+ // issue a row level delete and test that the configured index instance was notified to update
+ RowMutation rm;
+ rm = new RowMutation("PerRowSecondaryIndex", ByteBufferUtil.bytes("k3"));
+ rm.delete(new QueryPath("Indexed1"), 1);
+ rm.apply();
+
+ ColumnFamily indexedRow = TestIndex.LAST_INDEXED_ROW;
+ assertNotNull(indexedRow);
+ for (IColumn column : indexedRow.getSortedColumns())
+ {
+ assertTrue(column.isMarkedForDelete());
+ }
+ assertTrue(Arrays.equals("k3".getBytes(), TestIndex.LAST_INDEXED_KEY.array()));
}
public static class TestIndex extends PerRowSecondaryIndex
{
public static ColumnFamily LAST_INDEXED_ROW;
+ public static ByteBuffer LAST_INDEXED_KEY;
+
+ public static void reset()
+ {
+ LAST_INDEXED_KEY = null;
+ LAST_INDEXED_ROW = null;
+ }
@Override
public void index(ByteBuffer rowKey, ColumnFamily cf)
@@ -81,6 +135,7 @@ public void index(ByteBuffer rowKey)
QueryFilter filter = QueryFilter.getIdentityFilter(DatabaseDescriptor.getPartitioner().decorateKey(rowKey),
baseCfs.getColumnFamilyName());
LAST_INDEXED_ROW = baseCfs.getColumnFamily(filter);
+ LAST_INDEXED_KEY = rowKey;
}
@Override
Please sign in to comment.
Something went wrong with that request. Please try again.