Skip to content

Commit

Permalink
Review suggestions
Browse files Browse the repository at this point in the history
  • Loading branch information
adelapena committed May 20, 2020
1 parent 68c45e0 commit dc0030e
Show file tree
Hide file tree
Showing 4 changed files with 294 additions and 224 deletions.
6 changes: 3 additions & 3 deletions src/java/org/apache/cassandra/index/Index.java
Expand Up @@ -196,7 +196,7 @@ public SecondaryIndexBuilder getIndexBuildTask(ColumnFamilyStore cfs, Set<Index>
* single pass through the data. The singleton instance returned from the default method implementation builds
* indexes using a {@code ReducingKeyIterator} to provide a collated view of the SSTable data.
*
* @return an instance of the index build taski helper. Index implementations which return <b>the same instance</b>
* @return an instance of the index build task helper. Index implementations which return <b>the same instance</b>
* will be built using a single task.
*/
default IndexBuildingSupport getBuildTaskSupport()
Expand All @@ -216,11 +216,11 @@ default IndexBuildingSupport getRecoveryTaskSupport()
* Returns the type of operations supported by the index in case its building has failed and it's needing recovery.
*
* @param isInitialBuild {@code true} if the failure is for the initial build task on index creation, {@code false}
* if the failure is for a rebuild or recovery.
* if the failure is for a full rebuild or recovery.
*/
default LoadType getSupportedLoadTypeOnFailure(boolean isInitialBuild)
{
return isInitialBuild ? LoadType.WRITE : LoadType.READ;
return isInitialBuild ? LoadType.WRITE : LoadType.ALL;
}

/**
Expand Down
81 changes: 48 additions & 33 deletions src/java/org/apache/cassandra/index/SecondaryIndexManager.java
Expand Up @@ -214,7 +214,7 @@ private synchronized Future<?> createIndex(IndexMetadata indexDef, boolean isNew
final Index index = createInstance(indexDef);
index.register(this);
if (writableIndexes.put(index.getIndexMetadata().name, index) == null)
logger.info("Index [" + index.getIndexMetadata().name + "] registered and writable.");
logger.info("Index [{}] registered and writable.", index.getIndexMetadata().name);

markIndexesBuilding(ImmutableSet.of(index), true, isNewCF);

Expand Down Expand Up @@ -356,27 +356,40 @@ public void markAllIndexesRemoved()
*/
public void rebuildIndexesBlocking(Set<String> indexNames)
{
baseCfs.forceBlockingFlush();
try (ColumnFamilyStore.RefViewFragment viewFragment = baseCfs.selectAndReference(View.selectFunction(SSTableSet.CANONICAL));
Refs<SSTableReader> allSSTables = viewFragment.refs)
// Get the set of indexes that require blocking build
Set<Index> toRebuild = indexes.values()
.stream()
.filter(index -> indexNames.contains(index.getIndexMetadata().name))
.filter(Index::shouldBuildBlocking)
.collect(Collectors.toSet());

if (toRebuild.isEmpty())
{
logger.info("No defined indexes with the supplied names: {}", Joiner.on(',').join(indexNames));
return;
}

// Optimistically mark the indexes as writable, so we don't miss incoming writes
boolean needsFlush = false;
for (Index index : toRebuild)
{
Set<Index> toRebuild = indexes.values()
.stream()
.filter(index -> indexNames.contains(index.getIndexMetadata().name))
.filter(Index::shouldBuildBlocking)
.peek(index ->
{
String name = index.getIndexMetadata().name;
if (writableIndexes.put(name, index) == null)
logger.info("Index [" + name + "] became writable starting recovery.");
})
.collect(Collectors.toSet());
if (toRebuild.isEmpty())
String name = index.getIndexMetadata().name;
if (writableIndexes.put(name, index) == null)
{
logger.info("No defined indexes with the supplied names: {}", Joiner.on(',').join(indexNames));
return;
logger.info("Index [{}] became writable starting recovery.", name);
needsFlush = true;
}
}

// Once we are tracking new writes, flush any memtable contents to not miss them from the sstable-based rebuild
if (needsFlush)
baseCfs.forceBlockingFlush();

// Now that we are tracking new writes and we haven't left untracked contents on the memtables, we are ready to
// index the sstables
try (ColumnFamilyStore.RefViewFragment viewFragment = baseCfs.selectAndReference(View.selectFunction(SSTableSet.CANONICAL));
Refs<SSTableReader> allSSTables = viewFragment.refs)
{
buildIndexesBlocking(allSSTables, toRebuild, true);
}
}
Expand Down Expand Up @@ -481,19 +494,19 @@ private void buildIndexesBlocking(Collection<SSTableReader> sstables, Set<Index>

try
{
logger.info("Submitting index recovery/build of {} for data in {}",
logger.info("Submitting index {} of {} for data in {}",
isFullRebuild ? "recovery" : "build",
indexes.stream().map(i -> i.getIndexMetadata().name).collect(Collectors.joining(",")),
sstables.stream().map(SSTableReader::toString).collect(Collectors.joining(",")));

// Group all building tasks
Map<Index.IndexBuildingSupport, Set<Index>> byType = new HashMap<>();
for (Index index : indexes)
{
String indexName = index.getIndexMetadata().name;
IndexBuildingSupport rebuildOrRecoveryTask = needsFullRebuild.contains(indexName)
? index.getBuildTaskSupport()
: index.getRecoveryTaskSupport();
Set<Index> stored = byType.computeIfAbsent(rebuildOrRecoveryTask, i -> new HashSet<>());
IndexBuildingSupport buildOrRecoveryTask = isFullRebuild
? index.getBuildTaskSupport()
: index.getRecoveryTaskSupport();
Set<Index> stored = byType.computeIfAbsent(buildOrRecoveryTask, i -> new HashSet<>());
stored.add(index);
}

Expand Down Expand Up @@ -605,8 +618,8 @@ private String getIndexNames(Set<Index> indexes)
* the SecondaryIndexManager instance, it means all invocations for all different indexes will go through the same
* lock, but this is fine as the work done while holding such lock is trivial.
* <p>
* {@link #markIndexBuilt(Index, boolean)} or {@link #markIndexFailed(Index)} should be always called after the
* rebuilding has finished, so that the index build state can be correctly managed and the index rebuilt.
* {@link #markIndexBuilt(Index, boolean)} or {@link #markIndexFailed(Index, boolean)} should be always called after
* the rebuilding has finished, so that the index build state can be correctly managed and the index rebuilt.
*
* @param indexes the index to be marked as building
* @param isFullRebuild {@code true} if this method is invoked as a full index rebuild, {@code false} otherwise
Expand Down Expand Up @@ -656,10 +669,10 @@ private synchronized void markIndexBuilt(Index index, boolean isFullRebuild)
if (isFullRebuild)
{
if (queryableIndexes.add(indexName))
logger.info("Index [" + indexName + "] became queryable after successful build.");
logger.info("Index [{}] became queryable after successful build.", indexName);

if (writableIndexes.put(indexName, index) == null)
logger.info("Index [" + indexName + "] became writable after successful build.");
logger.info("Index [{}] became writable after successful build.", indexName);
}

AtomicInteger counter = inProgressBuilds.get(indexName);
Expand All @@ -680,7 +693,7 @@ private synchronized void markIndexBuilt(Index index, boolean isFullRebuild)
* {@link #markIndexesBuilding(Set, boolean, boolean)} should always be invoked before this method.
*
* @param index the index to be marked as built
* @param isInitialBuild
* @param isInitialBuild {@code true} if the index failed during its initial build, {@code false} otherwise
*/
private synchronized void markIndexFailed(Index index, boolean isInitialBuild)
{
Expand All @@ -699,9 +712,10 @@ private synchronized void markIndexFailed(Index index, boolean isInitialBuild)
needsFullRebuild.add(indexName);

if (!index.getSupportedLoadTypeOnFailure(isInitialBuild).supportsWrites() && writableIndexes.remove(indexName) != null)
logger.info("Index [" + indexName + "] became not-writable because of failed build.");
logger.info("Index [{}] became not-writable because of failed build.", indexName);

if (!index.getSupportedLoadTypeOnFailure(isInitialBuild).supportsReads() && queryableIndexes.remove(indexName))
logger.info("Index [" + indexName + "] became not-queryable.");
logger.info("Index [{}] became not-queryable because of failed build.", indexName);
}
}

Expand Down Expand Up @@ -1129,9 +1143,10 @@ public void validate(PartitionUpdate update) throws InvalidRequestException
index.validate(update);
}

/**
/*
* IndexRegistry methods
*/

public void registerIndex(Index index)
{
String name = index.getIndexMetadata().name;
Expand Down Expand Up @@ -1161,7 +1176,7 @@ public Collection<Index> listIndexes()
return ImmutableSet.copyOf(indexes.values());
}

/**
/*
* Handling of index updates.
* Implementations of the various IndexTransaction interfaces, for keeping indexes in sync with base data
* during updates, compaction and cleanup. Plus factory methods for obtaining transaction instances.
Expand Down
Expand Up @@ -1069,9 +1069,10 @@ public void testReadOnlyIndex() throws Throwable

// Upon rebuild, both reads and writes still go through
getCurrentColumnFamilyStore().indexManager.rebuildIndexesBlocking(ImmutableSet.of(indexName));
assertEquals(1, index.rowsInserted.size());
execute("SELECT value FROM %s WHERE value = 1");
execute("INSERT INTO %s (pk, ck, value) VALUES (?, ?, ?)", 2, 1, 1);
assertEquals(3, index.rowsInserted.size()); // rows + flush
assertEquals(2, index.rowsInserted.size());
dropIndex(format("DROP INDEX %s.%s", KEYSPACE, indexName));

// On bad initial build writes are not forwarded to the index
Expand All @@ -1086,7 +1087,7 @@ public void testReadOnlyIndex() throws Throwable
// Upon recovery, we can index data again
index.reset();
getCurrentColumnFamilyStore().indexManager.rebuildIndexesBlocking(ImmutableSet.of(indexName));
assertTrue(waitForIndexBuilds(keyspace(), indexName));
assertEquals(2, index.rowsInserted.size());
execute("SELECT value FROM %s WHERE value = 1");
execute("INSERT INTO %s (pk, ck, value) VALUES (?, ?, ?)", 2, 1, 1);
assertEquals(3, index.rowsInserted.size());
Expand All @@ -1107,9 +1108,10 @@ public void testWriteOnlyIndex() throws Throwable

// Upon rebuild, both reads and writes still go through
getCurrentColumnFamilyStore().indexManager.rebuildIndexesBlocking(ImmutableSet.of(indexName));
assertEquals(1, index.rowsInserted.size());
execute("SELECT value FROM %s WHERE value = 1");
execute("INSERT INTO %s (pk, ck, value) VALUES (?, ?, ?)", 2, 1, 1);
assertEquals(3, index.rowsInserted.size()); // rows + flush
assertEquals(2, index.rowsInserted.size());
dropIndex(format("DROP INDEX %s.%s", KEYSPACE, indexName));

// On bad initial build writes are forwarded to the index
Expand All @@ -1124,7 +1126,7 @@ public void testWriteOnlyIndex() throws Throwable
// Upon recovery, we can query data again
index.reset();
getCurrentColumnFamilyStore().indexManager.rebuildIndexesBlocking(ImmutableSet.of(indexName));
assertTrue(waitForIndexBuilds(keyspace(), indexName));
assertEquals(2, index.rowsInserted.size());
execute("SELECT value FROM %s WHERE value = 1");
execute("INSERT INTO %s (pk, ck, value) VALUES (?, ?, ?)", 2, 1, 1);
assertEquals(3, index.rowsInserted.size());
Expand Down Expand Up @@ -1640,35 +1642,6 @@ public Callable<?> getInvalidateTask()
}
}

/**
* <code>StubIndex</code> that blocks during the initialization.
*/
public static class BlockingStubIndex extends StubIndex
{
private final CountDownLatch latch = new CountDownLatch(1);

public BlockingStubIndex(ColumnFamilyStore baseCfs, IndexMetadata indexDef)
{
super(baseCfs, indexDef);
}

@Override
public Callable<?> getInitializationTask()
{
return () -> {
latch.await();
return null;
};
}

@Override
public Callable<?> getInvalidateTask()
{
latch.countDown();
return super.getInvalidateTask();
}
}

/**
* {@code StubIndex} that only supports some load. Could be intentional or a result of a bad init.
*/
Expand Down

0 comments on commit dc0030e

Please sign in to comment.