From 0dd916b21bcfc5f2e56c74935d07b483d5d5444d Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Thu, 26 Oct 2023 14:25:22 -0700 Subject: [PATCH] Fix NPE caused by realtime segment closing race, fix possible missing-segment retry bug. Fixes #12168, by returning empty from FireHydrant when the segment is swapped to null. This causes the SinkQuerySegmentWalker to use ReportTimelineMissingSegmentQueryRunner, which causes the Broker to look for the segment somewhere else. In addition, this patch changes SinkQuerySegmentWalker to acquire references to all hydrants (subsegments of a sink) at once, and return a ReportTimelineMissingSegmentQueryRunner if *any* of them could not be acquired. I suspect, although have not confirmed, that the prior behavior could lead to segments being reported as missing even though results from some hydrants were still included. --- .../druid/segment/realtime/FireHydrant.java | 9 +- .../appenderator/SinkQuerySegmentWalker.java | 127 +++++++++--------- .../druid/segment/realtime/plumber/Sink.java | 56 +++++++- .../plumber/SinkSegmentReference.java | 78 +++++++++++ .../segment/realtime/FireHydrantTest.java | 16 +++ 5 files changed, 218 insertions(+), 68 deletions(-) create mode 100644 server/src/main/java/org/apache/druid/segment/realtime/plumber/SinkSegmentReference.java diff --git a/server/src/main/java/org/apache/druid/segment/realtime/FireHydrant.java b/server/src/main/java/org/apache/druid/segment/realtime/FireHydrant.java index 29a8986a04cf..5f1a88f2ea97 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/FireHydrant.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/FireHydrant.java @@ -175,6 +175,12 @@ public Optional> getSegmentForQuery( ) { ReferenceCountingSegment sinkSegment = adapter.get(); + + if (sinkSegment == null) { + // adapter can be null if this segment is removed (swapped to null) while being queried. + return Optional.empty(); + } + SegmentReference segment = segmentMapFn.apply(sinkSegment); while (true) { Optional reference = segment.acquireReferences(); @@ -186,7 +192,8 @@ public Optional> getSegmentForQuery( // segment swap, the new segment should already be visible. ReferenceCountingSegment newSinkSegment = adapter.get(); if (newSinkSegment == null) { - throw new ISE("FireHydrant was 'closed' by swapping segment to null while acquiring a segment"); + // adapter can be null if this segment is removed (swapped to null) while being queried. + return Optional.empty(); } if (sinkSegment == newSinkSegment) { if (newSinkSegment.isClosed()) { diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java index 81cd4db25561..b4cd659c88bb 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java @@ -40,7 +40,6 @@ import org.apache.druid.query.DirectQueryProcessingPool; import org.apache.druid.query.FinalizeResultsQueryRunner; import org.apache.druid.query.MetricsEmittingQueryRunner; -import org.apache.druid.query.NoopQueryRunner; import org.apache.druid.query.Query; import org.apache.druid.query.QueryDataSource; import org.apache.druid.query.QueryMetrics; @@ -62,6 +61,7 @@ import org.apache.druid.segment.join.JoinableFactoryWrapper; import org.apache.druid.segment.realtime.FireHydrant; import org.apache.druid.segment.realtime.plumber.Sink; +import org.apache.druid.segment.realtime.plumber.SinkSegmentReference; import org.apache.druid.timeline.SegmentId; import org.apache.druid.timeline.VersionedIntervalTimeline; import org.apache.druid.timeline.partition.PartitionChunk; @@ -69,6 +69,7 @@ import org.joda.time.Interval; import java.io.Closeable; +import java.util.List; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -169,17 +170,17 @@ public QueryRunner getQueryRunnerForSegments(final Query query, final final AtomicLong cpuTimeAccumulator = new AtomicLong(0L); // Make sure this query type can handle the subquery, if present. - if ((dataSourceFromQuery instanceof QueryDataSource) && !toolChest.canPerformSubquery(((QueryDataSource) dataSourceFromQuery).getQuery())) { + if ((dataSourceFromQuery instanceof QueryDataSource) + && !toolChest.canPerformSubquery(((QueryDataSource) dataSourceFromQuery).getQuery())) { throw new ISE("Cannot handle subquery: %s", dataSourceFromQuery); } // segmentMapFn maps each base Segment into a joined Segment if necessary. - final Function segmentMapFn = dataSourceFromQuery - .createSegmentMapFunction( - query, - cpuTimeAccumulator - ); - + final Function segmentMapFn = + dataSourceFromQuery.createSegmentMapFunction( + query, + cpuTimeAccumulator + ); // We compute the join cache key here itself so it doesn't need to be re-computed for every segment final Optional cacheKeyPrefix = Optional.ofNullable(query.getDataSource().getCacheKey()); @@ -200,44 +201,32 @@ public QueryRunner getQueryRunnerForSegments(final Query query, final final Sink theSink = chunk.getObject(); final SegmentId sinkSegmentId = theSink.getSegment().getId(); + final List segmentReferences = + theSink.acquireSegmentReferences(segmentMapFn, skipIncrementalSegment); - Iterable> perHydrantRunners = new SinkQueryRunners<>( - Iterables.transform( - theSink, - hydrant -> { - // Hydrant might swap at any point, but if it's swapped at the start - // then we know it's *definitely* swapped. - final boolean hydrantDefinitelySwapped = hydrant.hasSwapped(); - - if (skipIncrementalSegment && !hydrantDefinitelySwapped) { - return new Pair<>(hydrant.getSegmentDataInterval(), new NoopQueryRunner<>()); - } - - // Prevent the underlying segment from swapping when its being iterated - final Optional> maybeSegmentAndCloseable = - hydrant.getSegmentForQuery(segmentMapFn); + if (segmentReferences == null) { + // We failed to acquire references for all subsegments. Bail and report the entire sink missing. + return new ReportTimelineMissingSegmentQueryRunner<>(descriptor); + } - // if optional isn't present, we failed to acquire reference to the segment or any joinables - if (!maybeSegmentAndCloseable.isPresent()) { - return new Pair<>( - hydrant.getSegmentDataInterval(), - new ReportTimelineMissingSegmentQueryRunner<>(descriptor) - ); - } - final Pair segmentAndCloseable = maybeSegmentAndCloseable.get(); - try { + final Closeable releaser = () -> CloseableUtils.closeAll(segmentReferences); - QueryRunner runner = factory.createRunner(segmentAndCloseable.lhs); + try { + Iterable> perHydrantRunners = new SinkQueryRunners<>( + Iterables.transform( + segmentReferences, + segmentReference -> { + QueryRunner runner = factory.createRunner(segmentReference.getSegment()); // 1) Only use caching if data is immutable // 2) Hydrants are not the same between replicas, make sure cache is local - if (hydrantDefinitelySwapped && cache.isLocal()) { - StorageAdapter storageAdapter = segmentAndCloseable.lhs.asStorageAdapter(); + if (segmentReference.isImmutable() && cache.isLocal()) { + StorageAdapter storageAdapter = segmentReference.getSegment().asStorageAdapter(); long segmentMinTime = storageAdapter.getMinTime().getMillis(); long segmentMaxTime = storageAdapter.getMaxTime().getMillis(); Interval actualDataInterval = Intervals.utc(segmentMinTime, segmentMaxTime + 1); runner = new CachingQueryRunner<>( - makeHydrantCacheIdentifier(hydrant), + makeHydrantCacheIdentifier(sinkSegmentId, segmentReference.getHydrantNumber()), cacheKeyPrefix, descriptor, actualDataInterval, @@ -254,35 +243,33 @@ public QueryRunner getQueryRunnerForSegments(final Query query, final cacheConfig ); } - // Make it always use Closeable to decrement() - runner = QueryRunnerHelper.makeClosingQueryRunner( - runner, - segmentAndCloseable.rhs - ); - return new Pair<>(segmentAndCloseable.lhs.getDataInterval(), runner); - } - catch (Throwable e) { - throw CloseableUtils.closeAndWrapInCatch(e, segmentAndCloseable.rhs); + return new Pair<>(segmentReference.getSegment().getDataInterval(), runner); } - } - ) - ); - return new SpecificSegmentQueryRunner<>( - withPerSinkMetrics( - new BySegmentQueryRunner<>( - sinkSegmentId, - descriptor.getInterval().getStart(), - factory.mergeRunners( - DirectQueryProcessingPool.INSTANCE, - perHydrantRunners - ) - ), - toolChest, - sinkSegmentId, - cpuTimeAccumulator - ), - new SpecificSegmentSpec(descriptor) - ); + ) + ); + return QueryRunnerHelper.makeClosingQueryRunner( + new SpecificSegmentQueryRunner<>( + withPerSinkMetrics( + new BySegmentQueryRunner<>( + sinkSegmentId, + descriptor.getInterval().getStart(), + factory.mergeRunners( + DirectQueryProcessingPool.INSTANCE, + perHydrantRunners + ) + ), + toolChest, + sinkSegmentId, + cpuTimeAccumulator + ), + new SpecificSegmentSpec(descriptor) + ), + releaser + ); + } + catch (Throwable e) { + throw CloseableUtils.closeAndWrapInCatch(e, releaser); + } } ); final QueryRunner mergedRunner = @@ -361,8 +348,16 @@ public VersionedIntervalTimeline getSinkTimeline() return sinkTimeline; } - public static String makeHydrantCacheIdentifier(FireHydrant input) + public static String makeHydrantCacheIdentifier(final FireHydrant hydrant) + { + return makeHydrantCacheIdentifier(hydrant.getSegmentId(), hydrant.getCount()); + } + + public static String makeHydrantCacheIdentifier(final SegmentId segmentId, final int hydrantNumber) { - return input.getSegmentId() + "_" + input.getCount(); + // Cache ID like segmentId_H0, etc. The 'H' disambiguates subsegment [foo_x_y_z partition 0 hydrant 1] + // from full segment [foo_x_y_z partition 1], and is therefore useful if we ever want the cache to mix full segments + // with subsegments (hydrants). + return segmentId + "_H" + hydrantNumber; } } diff --git a/server/src/main/java/org/apache/druid/segment/realtime/plumber/Sink.java b/server/src/main/java/org/apache/druid/segment/realtime/plumber/Sink.java index 43e3f096541f..95ed92aafed2 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/plumber/Sink.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/plumber/Sink.java @@ -26,9 +26,13 @@ import org.apache.druid.data.input.InputRow; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.Pair; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.query.Query; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.segment.QueryableIndex; import org.apache.druid.segment.ReferenceCountingSegment; +import org.apache.druid.segment.SegmentReference; import org.apache.druid.segment.column.ColumnFormat; import org.apache.druid.segment.incremental.AppendableIndexSpec; import org.apache.druid.segment.incremental.IncrementalIndex; @@ -42,6 +46,9 @@ import org.apache.druid.timeline.partition.ShardSpec; import org.joda.time.Interval; +import javax.annotation.Nullable; +import java.io.Closeable; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -50,14 +57,18 @@ import java.util.LinkedHashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Function; public class Sink implements Iterable, Overshadowable { private static final IncrementalIndexAddResult ALREADY_SWAPPED = new IncrementalIndexAddResult(-1, -1, "write after index swapped"); + private static final Logger log = new Logger(Sink.class); private final Object hydrantLock = new Object(); private final Interval interval; @@ -228,6 +239,7 @@ public boolean finished() /** * Marks sink as 'finished', preventing further writes. + * * @return 'true' if sink was sucessfully finished, 'false' if sink was already finished */ public boolean finishWriting() @@ -288,6 +300,47 @@ public long getBytesInMemory() } } + /** + * Acquire references to all {@link FireHydrant} that represent this sink. Returns null if they cannot all be + * acquired, possibly because they were closed (swapped to null) concurrently with this method being called. + * + * @param segmentMapFn from {@link org.apache.druid.query.DataSource#createSegmentMapFunction(Query, AtomicLong)} + * @param skipIncrementalSegment whether in-memory {@link IncrementalIndex} segments should be skipped + */ + @Nullable + public List acquireSegmentReferences( + final Function segmentMapFn, + final boolean skipIncrementalSegment + ) + { + final List retVal = new ArrayList<>(hydrants.size()); + + for (final FireHydrant hydrant : hydrants) { + // Hydrant might swap at any point, but if it's swapped at the start + // then we know it's *definitely* swapped. + final boolean hydrantDefinitelySwapped = hydrant.hasSwapped(); + + if (skipIncrementalSegment && !hydrantDefinitelySwapped) { + continue; + } + + final Optional> maybeHolder = hydrant.getSegmentForQuery(segmentMapFn); + if (maybeHolder.isPresent()) { + final Pair holder = maybeHolder.get(); + retVal.add(new SinkSegmentReference(hydrant.getCount(), holder.lhs, hydrantDefinitelySwapped, holder.rhs)); + } else { + // Cannot acquire this hydrant. Release all others previously acquired and return null. + for (final SinkSegmentReference reference : retVal) { + reference.close(); + } + + return null; + } + } + + return retVal; + } + private boolean checkInDedupSet(InputRow row) { if (dedupColumn != null) { @@ -335,7 +388,8 @@ private FireHydrant makeNewCurrIndex(long minTimestamp, DataSchema schema) .build(); // Build the incremental-index according to the spec that was chosen by the user - final IncrementalIndex newIndex = appendableIndexSpec.builder() + final IncrementalIndex newIndex = appendableIndexSpec + .builder() .setIndexSchema(indexSchema) .setMaxRowCount(maxRowsInMemory) .setMaxBytesInMemory(maxBytesInMemory) diff --git a/server/src/main/java/org/apache/druid/segment/realtime/plumber/SinkSegmentReference.java b/server/src/main/java/org/apache/druid/segment/realtime/plumber/SinkSegmentReference.java new file mode 100644 index 000000000000..10dfc2b275ed --- /dev/null +++ b/server/src/main/java/org/apache/druid/segment/realtime/plumber/SinkSegmentReference.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.realtime.plumber; + + +import org.apache.druid.segment.SegmentReference; +import org.apache.druid.utils.CloseableUtils; + +import java.io.Closeable; +import java.util.function.Function; + +/** + * Segment reference returned by {@link Sink#acquireSegmentReferences(Function, boolean)}. Must be closed in order + * to release the reference. + */ +public class SinkSegmentReference implements Closeable +{ + private final int hydrantNumber; + private final SegmentReference segment; + private final boolean immutable; + private final Closeable releaser; + + public SinkSegmentReference(int hydrantNumber, SegmentReference segment, boolean immutable, Closeable releaser) + { + this.hydrantNumber = hydrantNumber; + this.segment = segment; + this.immutable = immutable; + this.releaser = releaser; + } + + /** + * Index of the {@link org.apache.druid.segment.realtime.FireHydrant} within the {@link Sink} that this segment + * reference came from. + */ + public int getHydrantNumber() + { + return hydrantNumber; + } + + /** + * The segment reference. + */ + public SegmentReference getSegment() + { + return segment; + } + + /** + * Whether the segment is immutable. + */ + public boolean isImmutable() + { + return immutable; + } + + @Override + public void close() + { + CloseableUtils.closeAndWrapExceptions(releaser); + } +} diff --git a/server/src/test/java/org/apache/druid/segment/realtime/FireHydrantTest.java b/server/src/test/java/org/apache/druid/segment/realtime/FireHydrantTest.java index 0085cb12ead5..38c3fda1e7e2 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/FireHydrantTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/FireHydrantTest.java @@ -146,6 +146,22 @@ public void testGetSegmentForQuerySwapped() throws IOException Assert.assertEquals(0, queryableSegmentReference.getNumReferences()); } + @Test + public void testGetSegmentForQuerySwappedWithNull() + { + ReferenceCountingSegment incrementalSegmentReference = hydrant.getHydrantSegment(); + hydrant.swapSegment(null); + ReferenceCountingSegment queryableSegmentReference = hydrant.getHydrantSegment(); + Assert.assertEquals(0, incrementalSegmentReference.getNumReferences()); + Assert.assertNull(queryableSegmentReference); + + Optional> maybeSegmentAndCloseable = hydrant.getSegmentForQuery( + Function.identity() + ); + Assert.assertEquals(0, incrementalSegmentReference.getNumReferences()); + Assert.assertFalse(maybeSegmentAndCloseable.isPresent()); + } + @Test public void testGetSegmentForQueryButNotAbleToAcquireReferences() {