From 016d91a7d7d6c7998d1f3cb35726baf8dd9bac03 Mon Sep 17 00:00:00 2001 From: Jacek Lewandowski Date: Thu, 28 Sep 2023 14:23:42 +0200 Subject: [PATCH] Fix closing iterator in SecondaryIndexBuilder MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Patch by Jacek Lewandowski; reviewed by Andres de la Peña, Piotr Kolaczkowski for CASSANDRA-18361 --- CHANGES.txt | 1 + .../index/SecondaryIndexManager.java | 22 ++++++++++-- .../index/internal/CassandraIndex.java | 36 +++++++++++++------ .../internal/CollatedViewIndexBuilder.java | 16 ++++----- 4 files changed, 55 insertions(+), 20 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 7db0dc4111f1..e06d99b01117 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.0.12 + * Fix closing iterator in SecondaryIndexBuilder (CASSANDRA-18361) * Update hdrhistogram to 2.1.12 (CASSANDRA-18893) * Improve performance of compactions when table does not have an index (CASSANDRA-18773) * JMH improvements - faster build and async profiler (CASSANDRA-18871) diff --git a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java index e9b22ef6d561..aa8feb295ce7 100644 --- a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java +++ b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java @@ -41,6 +41,7 @@ import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.SettableFuture; +import org.apache.cassandra.utils.Throwables; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; @@ -519,17 +520,34 @@ private void buildIndexesBlocking(Collection sstables, Set final SettableFuture build = SettableFuture.create(); Futures.addCallback(CompactionManager.instance.submitIndexBuild(builder), new FutureCallback() { - @Override - public void onFailure(Throwable t) + private void doOnFailure(Throwable t) { logAndMarkIndexesFailed(groupedIndexes, t, false); unbuiltIndexes.addAll(groupedIndexes); build.setException(t); } + @Override + public void onFailure(Throwable t) + { + if (builder instanceof AutoCloseable) + t = Throwables.close(t, Arrays.asList((AutoCloseable) builder)); + + doOnFailure(t); + } + @Override public void onSuccess(Object o) { + if (builder instanceof AutoCloseable) + { + Throwable t = Throwables.close(null, Arrays.asList((AutoCloseable) builder)); + if (t != null) + { + doOnFailure(t); + return; + } + } groupedIndexes.forEach(i -> markIndexBuilt(i, isFullRebuild)); logger.info("Index build of {} completed", getIndexNames(groupedIndexes)); builtIndexes.addAll(groupedIndexes); diff --git a/src/java/org/apache/cassandra/index/internal/CassandraIndex.java b/src/java/org/apache/cassandra/index/internal/CassandraIndex.java index 256104057769..6d04d523d4d1 100644 --- a/src/java/org/apache/cassandra/index/internal/CassandraIndex.java +++ b/src/java/org/apache/cassandra/index/internal/CassandraIndex.java @@ -23,19 +23,18 @@ import java.nio.ByteBuffer; import java.util.*; import java.util.concurrent.Callable; -import java.util.concurrent.Future; import java.util.function.BiFunction; import java.util.stream.Collectors; import java.util.stream.StreamSupport; import com.google.common.collect.ImmutableSet; - +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.cassandra.schema.TableMetadata; -import org.apache.cassandra.schema.TableMetadataRef; -import org.apache.cassandra.schema.ColumnMetadata; import org.apache.cassandra.cql3.Operator; import org.apache.cassandra.cql3.statements.schema.IndexTarget; import org.apache.cassandra.db.*; @@ -57,7 +56,10 @@ import org.apache.cassandra.index.transactions.UpdateTransaction; import org.apache.cassandra.io.sstable.ReducingKeyIterator; import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.schema.ColumnMetadata; import org.apache.cassandra.schema.IndexMetadata; +import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.schema.TableMetadataRef; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.Pair; import org.apache.cassandra.utils.concurrent.Refs; @@ -703,11 +705,25 @@ private void buildBlocking() metadata.name, getSSTableNames(sstables)); - SecondaryIndexBuilder builder = new CollatedViewIndexBuilder(baseCfs, - Collections.singleton(this), - new ReducingKeyIterator(sstables), - ImmutableSet.copyOf(sstables)); - Future future = CompactionManager.instance.submitIndexBuild(builder); + CollatedViewIndexBuilder builder = new CollatedViewIndexBuilder(baseCfs, + Collections.singleton(this), + new ReducingKeyIterator(sstables), + ImmutableSet.copyOf(sstables)); + ListenableFuture future = CompactionManager.instance.submitIndexBuild(builder); + Futures.addCallback(future, new FutureCallback() + { + @Override + public void onSuccess(Object o) + { + builder.close(); + } + + @Override + public void onFailure(Throwable throwable) + { + builder.close(); + } + }, MoreExecutors.directExecutor()); FBUtilities.waitOnFuture(future); indexCfs.forceBlockingFlush(); } diff --git a/src/java/org/apache/cassandra/index/internal/CollatedViewIndexBuilder.java b/src/java/org/apache/cassandra/index/internal/CollatedViewIndexBuilder.java index 3c005c42525b..eaf557799424 100644 --- a/src/java/org/apache/cassandra/index/internal/CollatedViewIndexBuilder.java +++ b/src/java/org/apache/cassandra/index/internal/CollatedViewIndexBuilder.java @@ -35,7 +35,7 @@ /** * Manages building an entire index from column family data. Runs on to compaction manager. */ -public class CollatedViewIndexBuilder extends SecondaryIndexBuilder +public class CollatedViewIndexBuilder extends SecondaryIndexBuilder implements AutoCloseable { private final ColumnFamilyStore cfs; private final Set indexers; @@ -64,8 +64,7 @@ public CompactionInfo getCompactionInfo() public void build() { - try - { + int pageSize = cfs.indexManager.calculateIndexingPageSize(); while (iter.hasNext()) { @@ -73,11 +72,12 @@ public void build() throw new CompactionInterruptedException(getCompactionInfo()); DecoratedKey key = iter.next(); cfs.indexManager.indexPartition(key, indexers, pageSize); - } - } - finally - { - iter.close(); } } + + @Override + public void close() + { + iter.close(); + } }