Skip to content

Commit

Permalink
Ensure that PerRowSecondaryIndex is notified of row-level deletes
Browse files Browse the repository at this point in the history
patch by Sam Tunnicliffe; reviewed by jbellis for CASSANDRA-5445
  • Loading branch information
jbellis committed Apr 9, 2013
1 parent 87b350f commit c7eb146
Show file tree
Hide file tree
Showing 9 changed files with 75 additions and 89 deletions.
2 changes: 2 additions & 0 deletions CHANGES.txt
Original file line number Original file line Diff line number Diff line change
@@ -1,5 +1,7 @@
1.2.5 1.2.5
* Include fatal errors in trace events (CASSANDRA-5447) * Include fatal errors in trace events (CASSANDRA-5447)
* Ensure that PerRowSecondaryIndex is notified of row-level deletes
(CASSANDRA-5445)




1.2.4 1.2.4
Expand Down
2 changes: 1 addition & 1 deletion src/java/org/apache/cassandra/db/AtomicSortedColumns.java
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ public long addAllWithSizeDelta(ISortedColumns cm, Allocator allocator, Function
} }
while (!ref.compareAndSet(current, modified)); while (!ref.compareAndSet(current, modified));


indexer.commit(); indexer.updateRowLevelIndexes();


return sizeDelta; return sizeDelta;
} }
Expand Down
2 changes: 1 addition & 1 deletion src/java/org/apache/cassandra/db/Table.java
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ public void apply(RowMutation mutation, boolean writeCommitLog, boolean updateIn
} }


Tracing.trace("Adding to {} memtable", cf.metadata().cfName); Tracing.trace("Adding to {} memtable", cf.metadata().cfName);
cfs.apply(key, cf, updateIndexes ? cfs.indexManager.updaterFor(key, true) : SecondaryIndexManager.nullUpdater); cfs.apply(key, cf, updateIndexes ? cfs.indexManager.updaterFor(key) : SecondaryIndexManager.nullUpdater);
} }
} }
finally finally
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public LazilyCompactedRow(CompactionController controller, List<? extends ICount
super(rows.get(0).getKey()); super(rows.get(0).getKey());
this.rows = rows; this.rows = rows;
this.controller = controller; this.controller = controller;
indexer = controller.cfs.indexManager.updaterFor(key, false); indexer = controller.cfs.indexManager.updaterFor(key);


long maxDelTimestamp = Long.MIN_VALUE; long maxDelTimestamp = Long.MIN_VALUE;
for (OnDiskAtomIterator row : rows) for (OnDiskAtomIterator row : rows)
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ public ColumnFamily call() throws Exception
data.add(FBUtilities.closeableIterator(row.cf.iterator())); data.add(FBUtilities.closeableIterator(row.cf.iterator()));
} }


PrecompactedRow.merge(returnCF, data, controller.cfs.indexManager.updaterFor(rows.get(0).key, false)); PrecompactedRow.merge(returnCF, data, controller.cfs.indexManager.updaterFor(rows.get(0).key));
return PrecompactedRow.removeDeletedAndOldShards(rows.get(0).key, controller, returnCF); return PrecompactedRow.removeDeletedAndOldShards(rows.get(0).key, controller, returnCF);
} }
} }
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public static ColumnFamily removeDeletedAndOldShards(DecoratedKey key, boolean s
// See comment in preceding method // See comment in preceding method
ColumnFamily compacted = ColumnFamilyStore.removeDeleted(cf, ColumnFamily compacted = ColumnFamilyStore.removeDeleted(cf,
shouldPurge ? controller.gcBefore : Integer.MIN_VALUE, shouldPurge ? controller.gcBefore : Integer.MIN_VALUE,
controller.cfs.indexManager.updaterFor(key, false)); controller.cfs.indexManager.updaterFor(key));
if (shouldPurge && compacted != null && compacted.metadata().getDefaultValidator().isCommutative()) if (shouldPurge && compacted != null && compacted.metadata().getDefaultValidator().isCommutative())
CounterColumn.mergeAndRemoveOldShards(key, compacted, controller.gcBefore, controller.mergeShardBefore); CounterColumn.mergeAndRemoveOldShards(key, compacted, controller.gcBefore, controller.mergeShardBefore);
return compacted; return compacted;
Expand Down Expand Up @@ -121,7 +121,7 @@ private static ColumnFamily merge(List<SSTableIdentityIterator> rows, Compaction
} }
} }


merge(returnCF, data, controller.cfs.indexManager.updaterFor(rows.get(0).getKey(), false)); merge(returnCF, data, controller.cfs.indexManager.updaterFor(rows.get(0).getKey()));


return returnCF; return returnCF;
} }
Expand Down
93 changes: 11 additions & 82 deletions src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public void update(IColumn oldColumn, IColumn column) { }


public void remove(IColumn current) { } public void remove(IColumn current) { }


public void commit() {} public void updateRowLevelIndexes() {}
}; };


/** /**
Expand Down Expand Up @@ -480,11 +480,11 @@ public void deleteFromIndexes(DecoratedKey key, List<IColumn> indexedColumnsInRo
* can get updated. Note: only a CF backed by AtomicSortedColumns implements this behaviour * can get updated. Note: only a CF backed by AtomicSortedColumns implements this behaviour
* fully, other types simply ignore the index updater. * fully, other types simply ignore the index updater.
*/ */
public Updater updaterFor(final DecoratedKey key, boolean includeRowIndexes) public Updater updaterFor(final DecoratedKey key)
{ {
return (includeRowIndexes && !rowLevelIndexMap.isEmpty()) return (indexesByColumn.isEmpty() && rowLevelIndexMap.isEmpty())
? new MixedIndexUpdater(key) ? nullUpdater
: indexesByColumn.isEmpty() ? nullUpdater : new PerColumnIndexUpdater(key); : new StandardUpdater(key);
} }


/** /**
Expand Down Expand Up @@ -589,65 +589,14 @@ public static interface Updater
public void remove(IColumn current); public void remove(IColumn current);


/** called after memtable updates are complete (CASSANDRA-5397) */ /** called after memtable updates are complete (CASSANDRA-5397) */
public void commit(); public void updateRowLevelIndexes();
} }


private class PerColumnIndexUpdater implements Updater private class StandardUpdater implements Updater
{ {
private final DecoratedKey key; private final DecoratedKey key;


public PerColumnIndexUpdater(DecoratedKey key) public StandardUpdater(DecoratedKey key)
{
this.key = key;
}

public void insert(IColumn column)
{
if (column.isMarkedForDelete())
return;

SecondaryIndex index = indexFor(column.name());
if (index == null)
return;

((PerColumnSecondaryIndex) index).insert(key.key, column);
}

public void update(IColumn oldColumn, IColumn column)
{
SecondaryIndex index = indexFor(column.name());
if (index == null)
return;

((PerColumnSecondaryIndex) index).delete(key.key, oldColumn);
if (!column.isMarkedForDelete())
((PerColumnSecondaryIndex) index).insert(key.key, column);
}

public void remove(IColumn column)
{
if (column.isMarkedForDelete())
return;

SecondaryIndex index = indexFor(column.name());
if (index == null)
return;

((PerColumnSecondaryIndex) index).delete(key.key, column);
}

public void commit()
{
// this is a no-op as per-column index updates are applied immediately
}
}

private class MixedIndexUpdater implements Updater
{
private final DecoratedKey key;
ConcurrentHashMap<SecondaryIndex, ByteBuffer> deferredUpdates = new ConcurrentHashMap<SecondaryIndex, ByteBuffer>();

public MixedIndexUpdater(DecoratedKey key)
{ {
this.key = key; this.key = key;
} }
Expand All @@ -662,13 +611,7 @@ public void insert(IColumn column)
return; return;


if (index instanceof PerColumnSecondaryIndex) if (index instanceof PerColumnSecondaryIndex)
{
((PerColumnSecondaryIndex) index).insert(key.key, column); ((PerColumnSecondaryIndex) index).insert(key.key, column);
}
else
{
deferredUpdates.putIfAbsent(index, key.key);
}
} }


public void update(IColumn oldColumn, IColumn column) public void update(IColumn oldColumn, IColumn column)
Expand All @@ -683,10 +626,6 @@ public void update(IColumn oldColumn, IColumn column)
if (!column.isMarkedForDelete()) if (!column.isMarkedForDelete())
((PerColumnSecondaryIndex) index).insert(key.key, column); ((PerColumnSecondaryIndex) index).insert(key.key, column);
} }
else
{
deferredUpdates.putIfAbsent(index, key.key);
}
} }


public void remove(IColumn column) public void remove(IColumn column)
Expand All @@ -699,23 +638,13 @@ public void remove(IColumn column)
return; return;


if (index instanceof PerColumnSecondaryIndex) if (index instanceof PerColumnSecondaryIndex)
{
((PerColumnSecondaryIndex) index).delete(key.key, column); ((PerColumnSecondaryIndex) index).delete(key.key, column);
}
else
{
// per-row secondary indexes are assumed to keep the index up-to-date at insert time, rather
// than performing lazy updates
}
} }


public void commit() public void updateRowLevelIndexes()
{ {
for (Map.Entry<SecondaryIndex, ByteBuffer> update : deferredUpdates.entrySet()) for (SecondaryIndex index : rowLevelIndexMap.values())
{ ((PerRowSecondaryIndex) index).index(key.key);
assert update.getKey() instanceof PerRowSecondaryIndex;
((PerRowSecondaryIndex) update.getKey()).index(update.getValue());
}
} }
} }
} }
2 changes: 1 addition & 1 deletion test/unit/org/apache/cassandra/SchemaLoader.java
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ private static CFMetaData perRowIndexedCFMD(String ksName, String cfName, boolea
indexOptions, indexOptions,
ByteBufferUtil.bytesToHex(cName), ByteBufferUtil.bytesToHex(cName),
null)); null));
}}); }});
} }


private static void useCompression(List<KSMetaData> schema) private static void useCompression(List<KSMetaData> schema)
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -26,14 +26,17 @@
import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.ByteBufferUtil;


import org.junit.Before;
import org.junit.Test; import org.junit.Test;


import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Set; import java.util.Set;


import static junit.framework.Assert.assertEquals; import static junit.framework.Assert.assertEquals;
import static junit.framework.Assert.assertNotNull; import static junit.framework.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;


public class PerRowSecondaryIndexTest extends SchemaLoader public class PerRowSecondaryIndexTest extends SchemaLoader
{ {
Expand All @@ -44,6 +47,12 @@ public class PerRowSecondaryIndexTest extends SchemaLoader
// indexed & stashes it in a static variable for inspection // indexed & stashes it in a static variable for inspection
// in the test. // in the test.


@Before
public void clearTestStub()
{
TestIndex.reset();
}

@Test @Test
public void testIndexInsertAndUpdate() throws IOException public void testIndexInsertAndUpdate() throws IOException
{ {
Expand All @@ -65,11 +74,56 @@ public void testIndexInsertAndUpdate() throws IOException
indexedRow = TestIndex.LAST_INDEXED_ROW; indexedRow = TestIndex.LAST_INDEXED_ROW;
assertNotNull(indexedRow); assertNotNull(indexedRow);
assertEquals(ByteBufferUtil.bytes("bar"), indexedRow.getColumn(ByteBufferUtil.bytes("indexed")).value()); assertEquals(ByteBufferUtil.bytes("bar"), indexedRow.getColumn(ByteBufferUtil.bytes("indexed")).value());
assertTrue(Arrays.equals("k1".getBytes(), TestIndex.LAST_INDEXED_KEY.array()));
}

@Test
public void testColumnDelete() throws IOException
{
// issue a column delete and test that the configured index instance was notified to update
RowMutation rm;
rm = new RowMutation("PerRowSecondaryIndex", ByteBufferUtil.bytes("k2"));
rm.delete(new QueryPath("Indexed1", null, ByteBufferUtil.bytes("indexed")), 1);
rm.apply();

ColumnFamily indexedRow = TestIndex.LAST_INDEXED_ROW;
assertNotNull(indexedRow);

for (IColumn column : indexedRow.getSortedColumns())
{
assertTrue(column.isMarkedForDelete());
}
assertTrue(Arrays.equals("k2".getBytes(), TestIndex.LAST_INDEXED_KEY.array()));
}

@Test
public void testRowDelete() throws IOException
{
// issue a row level delete and test that the configured index instance was notified to update
RowMutation rm;
rm = new RowMutation("PerRowSecondaryIndex", ByteBufferUtil.bytes("k3"));
rm.delete(new QueryPath("Indexed1"), 1);
rm.apply();

ColumnFamily indexedRow = TestIndex.LAST_INDEXED_ROW;
assertNotNull(indexedRow);
for (IColumn column : indexedRow.getSortedColumns())
{
assertTrue(column.isMarkedForDelete());
}
assertTrue(Arrays.equals("k3".getBytes(), TestIndex.LAST_INDEXED_KEY.array()));
} }


public static class TestIndex extends PerRowSecondaryIndex public static class TestIndex extends PerRowSecondaryIndex
{ {
public static ColumnFamily LAST_INDEXED_ROW; public static ColumnFamily LAST_INDEXED_ROW;
public static ByteBuffer LAST_INDEXED_KEY;

public static void reset()
{
LAST_INDEXED_KEY = null;
LAST_INDEXED_ROW = null;
}


@Override @Override
public void index(ByteBuffer rowKey, ColumnFamily cf) public void index(ByteBuffer rowKey, ColumnFamily cf)
Expand All @@ -82,6 +136,7 @@ public void index(ByteBuffer rowKey)
QueryFilter filter = QueryFilter.getIdentityFilter(DatabaseDescriptor.getPartitioner().decorateKey(rowKey), QueryFilter filter = QueryFilter.getIdentityFilter(DatabaseDescriptor.getPartitioner().decorateKey(rowKey),
new QueryPath(baseCfs.getColumnFamilyName())); new QueryPath(baseCfs.getColumnFamilyName()));
LAST_INDEXED_ROW = baseCfs.getColumnFamily(filter); LAST_INDEXED_ROW = baseCfs.getColumnFamily(filter);
LAST_INDEXED_KEY = rowKey;
} }


@Override @Override
Expand Down

0 comments on commit c7eb146

Please sign in to comment.