Skip to content

Commit

Permalink
Fix closing iterator in SecondaryIndexBuilder
Browse files Browse the repository at this point in the history
Patch by Jacek Lewandowski; reviewed by Andres de la Peña, Piotr Kolaczkowski for CASSANDRA-18361
  • Loading branch information
jacek-lewandowski committed Oct 5, 2023
1 parent 9ccec3d commit 016d91a
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 20 deletions.
1 change: 1 addition & 0 deletions CHANGES.txt
@@ -1,4 +1,5 @@
4.0.12
* Fix closing iterator in SecondaryIndexBuilder (CASSANDRA-18361)
* Update hdrhistogram to 2.1.12 (CASSANDRA-18893)
* Improve performance of compactions when table does not have an index (CASSANDRA-18773)
* JMH improvements - faster build and async profiler (CASSANDRA-18871)
Expand Down
22 changes: 20 additions & 2 deletions src/java/org/apache/cassandra/index/SecondaryIndexManager.java
Expand Up @@ -41,6 +41,7 @@
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;

import org.apache.cassandra.utils.Throwables;
import org.apache.commons.lang3.StringUtils;

import org.slf4j.Logger;
Expand Down Expand Up @@ -519,17 +520,34 @@ private void buildIndexesBlocking(Collection<SSTableReader> sstables, Set<Index>
final SettableFuture build = SettableFuture.create();
Futures.addCallback(CompactionManager.instance.submitIndexBuild(builder), new FutureCallback()
{
@Override
public void onFailure(Throwable t)
private void doOnFailure(Throwable t)
{
logAndMarkIndexesFailed(groupedIndexes, t, false);
unbuiltIndexes.addAll(groupedIndexes);
build.setException(t);
}

@Override
public void onFailure(Throwable t)
{
if (builder instanceof AutoCloseable)
t = Throwables.close(t, Arrays.asList((AutoCloseable) builder));

doOnFailure(t);
}

@Override
public void onSuccess(Object o)
{
if (builder instanceof AutoCloseable)
{
Throwable t = Throwables.close(null, Arrays.asList((AutoCloseable) builder));
if (t != null)
{
doOnFailure(t);
return;
}
}
groupedIndexes.forEach(i -> markIndexBuilt(i, isFullRebuild));
logger.info("Index build of {} completed", getIndexNames(groupedIndexes));
builtIndexes.addAll(groupedIndexes);
Expand Down
36 changes: 26 additions & 10 deletions src/java/org/apache/cassandra/index/internal/CassandraIndex.java
Expand Up @@ -23,19 +23,18 @@
import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

import com.google.common.collect.ImmutableSet;

import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.schema.TableMetadataRef;
import org.apache.cassandra.schema.ColumnMetadata;
import org.apache.cassandra.cql3.Operator;
import org.apache.cassandra.cql3.statements.schema.IndexTarget;
import org.apache.cassandra.db.*;
Expand All @@ -57,7 +56,10 @@
import org.apache.cassandra.index.transactions.UpdateTransaction;
import org.apache.cassandra.io.sstable.ReducingKeyIterator;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.schema.ColumnMetadata;
import org.apache.cassandra.schema.IndexMetadata;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.schema.TableMetadataRef;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.concurrent.Refs;
Expand Down Expand Up @@ -703,11 +705,25 @@ private void buildBlocking()
metadata.name,
getSSTableNames(sstables));

SecondaryIndexBuilder builder = new CollatedViewIndexBuilder(baseCfs,
Collections.singleton(this),
new ReducingKeyIterator(sstables),
ImmutableSet.copyOf(sstables));
Future<?> future = CompactionManager.instance.submitIndexBuild(builder);
CollatedViewIndexBuilder builder = new CollatedViewIndexBuilder(baseCfs,
Collections.singleton(this),
new ReducingKeyIterator(sstables),
ImmutableSet.copyOf(sstables));
ListenableFuture<?> future = CompactionManager.instance.submitIndexBuild(builder);
Futures.addCallback(future, new FutureCallback<Object>()
{
@Override
public void onSuccess(Object o)
{
builder.close();
}

@Override
public void onFailure(Throwable throwable)
{
builder.close();
}
}, MoreExecutors.directExecutor());
FBUtilities.waitOnFuture(future);
indexCfs.forceBlockingFlush();
}
Expand Down
Expand Up @@ -35,7 +35,7 @@
/**
* Manages building an entire index from column family data. Runs on to compaction manager.
*/
public class CollatedViewIndexBuilder extends SecondaryIndexBuilder
public class CollatedViewIndexBuilder extends SecondaryIndexBuilder implements AutoCloseable
{
private final ColumnFamilyStore cfs;
private final Set<Index> indexers;
Expand Down Expand Up @@ -64,20 +64,20 @@ public CompactionInfo getCompactionInfo()

public void build()
{
try
{

int pageSize = cfs.indexManager.calculateIndexingPageSize();
while (iter.hasNext())
{
if (isStopRequested())
throw new CompactionInterruptedException(getCompactionInfo());
DecoratedKey key = iter.next();
cfs.indexManager.indexPartition(key, indexers, pageSize);
}
}
finally
{
iter.close();
}
}

@Override
public void close()
{
iter.close();
}
}

0 comments on commit 016d91a

Please sign in to comment.