Skip to content
Permalink
Browse files
Make tombstones ingestible by having them return an empty result set. (
…#12392)

* Make tombstones ingestible by having them return an empty result set.

* Spotbug

* Coverage

* Coverage

* Remove unnecessary exception (checkstyle)

* Fix integration test and add one more to test dropExisting set to false over tombstones

* Force dropExisting to true in auto-compaction when the interval contains only tombstones

* Checkstyle, fix unit test

* Changed flag by mistake, fixing it

* Remove method from interface since this method is specific to only DruidSegmentInputentity

* Fix typo

* Adapt to latest code

* Update comments when only tombstones to compact

* Move empty iterator to a new DruidTombstoneSegmentReader

* Code review feedback

* Checkstyle

* Review feedback

* Coverage
  • Loading branch information
loquisgon committed Apr 15, 2022
1 parent a22d413 commit 0460d45e92a15ebdadb4455afa16a7d977f74388
Showing 14 changed files with 844 additions and 104 deletions.
@@ -71,7 +71,7 @@ public static <T extends Enum<T>> T getEnumIfPresent(final Class<T> enumClass, f

/**
* If first argument is not null, return it, else return the other argument. Sort of like
* {@link com.google.common.base.Objects#firstNonNull(Object, Object)} except will not explode if both arguments are
* {@link com.google.common.base.Objects#firstNonNull(T, T)} except will not explode if both arguments are
* null.
*/
@Nullable
@@ -85,7 +85,8 @@ public static <T> T firstNonNull(@Nullable T arg1, @Nullable T arg2)

/**
* Cancel futures manually, because sometime we can't cancel all futures in {@link com.google.common.util.concurrent.Futures.CombinedFuture}
* automatically. Especially when we call {@link com.google.common.util.concurrent.Futures#allAsList(Iterable)} to create a batch of
* automatically. Especially when we call
* {@link static <V> ListenableFuture<List<V>> com.google.common.util.concurrent.Futures#allAsList(Iterable<? extends ListenableFuture <? extends V>> futures)} to create a batch of
* future.
* @param mayInterruptIfRunning {@code true} if the thread executing this
* task should be interrupted; otherwise, in-progress tasks are allowed
@@ -136,4 +136,5 @@ default Predicate<Throwable> getRetryCondition()
{
return Predicates.alwaysFalse();
}

}
@@ -69,6 +69,7 @@
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.JodaUtils;
import org.apache.druid.java.util.common.NonnullPair;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularity;
@@ -111,6 +112,7 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.TreeMap;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@@ -550,16 +552,18 @@ static List<ParallelIndexIngestionSpec> createIngestionSchema(
segmentProvider,
lockGranularityInUse
);

final Map<DataSegment, File> segmentFileMap = pair.lhs;
final List<TimelineObjectHolder<String, DataSegment>> timelineSegments = pair.rhs;

if (timelineSegments.size() == 0) {
return Collections.emptyList();
}

// find metadata for interval
// find metadata for intervals with real data segments
// queryableIndexAndSegments is sorted by the interval of the dataSegment
final List<NonnullPair<QueryableIndex, DataSegment>> queryableIndexAndSegments = loadSegments(
// Note that this list will contain null QueriableIndex values for tombstones
final List<Pair<QueryableIndex, DataSegment>> queryableIndexAndSegments = loadSegments(
timelineSegments,
segmentFileMap,
toolbox.getIndexIO()
@@ -568,8 +572,10 @@ static List<ParallelIndexIngestionSpec> createIngestionSchema(
final CompactionTuningConfig compactionTuningConfig = partitionConfigurationManager.computeTuningConfig();

if (granularitySpec == null || granularitySpec.getSegmentGranularity() == null) {
final List<ParallelIndexIngestionSpec> specs = new ArrayList<>();

// original granularity
final Map<Interval, List<NonnullPair<QueryableIndex, DataSegment>>> intervalToSegments = new TreeMap<>(
final Map<Interval, List<Pair<QueryableIndex, DataSegment>>> intervalToSegments = new TreeMap<>(
Comparators.intervalsByStartThenEnd()
);
queryableIndexAndSegments.forEach(
@@ -578,11 +584,11 @@ static List<ParallelIndexIngestionSpec> createIngestionSchema(
);

// unify overlapping intervals to ensure overlapping segments compacting in the same indexSpec
List<NonnullPair<Interval, List<NonnullPair<QueryableIndex, DataSegment>>>> intervalToSegmentsUnified =
List<NonnullPair<Interval, List<Pair<QueryableIndex, DataSegment>>>> intervalToSegmentsUnified =
new ArrayList<>();
Interval union = null;
List<NonnullPair<QueryableIndex, DataSegment>> segments = new ArrayList<>();
for (Entry<Interval, List<NonnullPair<QueryableIndex, DataSegment>>> entry : intervalToSegments.entrySet()) {
List<Pair<QueryableIndex, DataSegment>> segments = new ArrayList<>();
for (Entry<Interval, List<Pair<QueryableIndex, DataSegment>>> entry : intervalToSegments.entrySet()) {
Interval cur = entry.getKey();
if (union == null) {
union = cur;
@@ -596,12 +602,12 @@ static List<ParallelIndexIngestionSpec> createIngestionSchema(
segments = new ArrayList<>(entry.getValue());
}
}

intervalToSegmentsUnified.add(new NonnullPair<>(union, segments));

final List<ParallelIndexIngestionSpec> specs = new ArrayList<>(intervalToSegmentsUnified.size());
for (NonnullPair<Interval, List<NonnullPair<QueryableIndex, DataSegment>>> entry : intervalToSegmentsUnified) {
for (NonnullPair<Interval, List<Pair<QueryableIndex, DataSegment>>> entry : intervalToSegmentsUnified) {
final Interval interval = entry.lhs;
final List<NonnullPair<QueryableIndex, DataSegment>> segmentsToCompact = entry.rhs;
final List<Pair<QueryableIndex, DataSegment>> segmentsToCompact = entry.rhs;
// If granularitySpec is not null, then set segmentGranularity. Otherwise,
// creates new granularitySpec and set segmentGranularity
Granularity segmentGranularityToUse = GranularityType.fromPeriod(interval.toPeriod()).getDefaultGranularity();
@@ -700,22 +706,19 @@ private static NonnullPair<Map<DataSegment, File>, List<TimelineObjectHolder<Str
LockGranularity lockGranularityInUse
) throws IOException, SegmentLoadingException
{
final List<DataSegment> usedSegmentsMinusTombstones =
segmentProvider.findSegments(toolbox.getTaskActionClient())
.stream()
.filter(dataSegment -> !dataSegment.isTombstone()) // skip tombstones
.collect(Collectors.toList());
segmentProvider.checkSegments(lockGranularityInUse, usedSegmentsMinusTombstones);
final Map<DataSegment, File> segmentFileMap = toolbox.fetchSegments(usedSegmentsMinusTombstones);
final List<DataSegment> usedSegments =
segmentProvider.findSegments(toolbox.getTaskActionClient());
segmentProvider.checkSegments(lockGranularityInUse, usedSegments);
final Map<DataSegment, File> segmentFileMap = toolbox.fetchSegments(usedSegments);
final List<TimelineObjectHolder<String, DataSegment>> timelineSegments = VersionedIntervalTimeline
.forSegments(usedSegmentsMinusTombstones)
.forSegments(usedSegments)
.lookup(segmentProvider.interval);
return new NonnullPair<>(segmentFileMap, timelineSegments);
}

private static DataSchema createDataSchema(
String dataSource,
List<NonnullPair<QueryableIndex, DataSegment>> queryableIndexAndSegments,
List<Pair<QueryableIndex, DataSegment>> queryableIndexAndSegments,
@Nullable DimensionsSpec dimensionsSpec,
@Nullable ClientCompactionTaskTransformSpec transformSpec,
@Nullable AggregatorFactory[] metricsSpec,
@@ -781,34 +784,36 @@ private static DataSchema createDataSchema(
private static void decideRollupAndQueryGranularityCarryOver(
SettableSupplier<Boolean> rollup,
SettableSupplier<Granularity> queryGranularity,
List<NonnullPair<QueryableIndex, DataSegment>> queryableIndexAndSegments
List<Pair<QueryableIndex, DataSegment>> queryableIndexAndSegments
)
{
final SettableSupplier<Boolean> rollupIsValid = new SettableSupplier<>(true);
for (NonnullPair<QueryableIndex, DataSegment> pair : queryableIndexAndSegments) {
for (Pair<QueryableIndex, DataSegment> pair : queryableIndexAndSegments) {
final QueryableIndex index = pair.lhs;
if (index.getMetadata() == null) {
throw new RE("Index metadata doesn't exist for segment[%s]", pair.rhs.getId());
}
// carry-overs (i.e. query granularity & rollup) are valid iff they are the same in every segment:

// Pick rollup value if all segments being compacted have the same, non-null, value otherwise set it to false
if (rollupIsValid.get()) {
Boolean isRollup = index.getMetadata().isRollup();
if (isRollup == null) {
rollupIsValid.set(false);
rollup.set(false);
} else if (rollup.get() == null) {
rollup.set(isRollup);
} else if (!rollup.get().equals(isRollup.booleanValue())) {
rollupIsValid.set(false);
rollup.set(false);
if (index != null) { // avoid tombstones
if (index.getMetadata() == null) {
throw new RE("Index metadata doesn't exist for segment[%s]", pair.rhs.getId());
}
// carry-overs (i.e. query granularity & rollup) are valid iff they are the same in every segment:

// Pick rollup value if all segments being compacted have the same, non-null, value otherwise set it to false
if (rollupIsValid.get()) {
Boolean isRollup = index.getMetadata().isRollup();
if (isRollup == null) {
rollupIsValid.set(false);
rollup.set(false);
} else if (rollup.get() == null) {
rollup.set(isRollup);
} else if (!rollup.get().equals(isRollup.booleanValue())) {
rollupIsValid.set(false);
rollup.set(false);
}
}
}

// Pick the finer, non-null, of the query granularities of the segments being compacted
Granularity current = index.getMetadata().getQueryGranularity();
queryGranularity.set(compareWithCurrent(queryGranularity.get(), current));
// Pick the finer, non-null, of the query granularities of the segments being compacted
Granularity current = index.getMetadata().getQueryGranularity();
queryGranularity.set(compareWithCurrent(queryGranularity.get(), current));
}
}
}

@@ -828,22 +833,28 @@ static Granularity compareWithCurrent(Granularity queryGranularity, Granularity
}

private static AggregatorFactory[] createMetricsSpec(
List<NonnullPair<QueryableIndex, DataSegment>> queryableIndexAndSegments
List<Pair<QueryableIndex, DataSegment>> queryableIndexAndSegments
)
{
final List<AggregatorFactory[]> aggregatorFactories = queryableIndexAndSegments
.stream()
.filter(pair -> pair.lhs != null) // avoid tombstones
.map(pair -> pair.lhs.getMetadata().getAggregators()) // We have already done null check on index.getMetadata()
.collect(Collectors.toList());
final AggregatorFactory[] mergedAggregators = AggregatorFactory.mergeAggregators(aggregatorFactories);

if (mergedAggregators == null) {
throw new ISE("Failed to merge aggregators[%s]", aggregatorFactories);
Optional<Pair<QueryableIndex, DataSegment>> pair =
queryableIndexAndSegments.stream().filter(p -> !p.rhs.isTombstone()).findFirst();
if (pair.isPresent()) {
// this means that there are true data segments, so something went wrong
throw new ISE("Failed to merge aggregators[%s]", aggregatorFactories);
}
}
return mergedAggregators;
}

private static DimensionsSpec createDimensionsSpec(List<NonnullPair<QueryableIndex, DataSegment>> queryableIndices)
private static DimensionsSpec createDimensionsSpec(List<Pair<QueryableIndex, DataSegment>> queryableIndices)
{
final BiMap<String, Integer> uniqueDims = HashBiMap.create();
final Map<String, DimensionSchema> dimensionSchemaMap = new HashMap<>();
@@ -859,33 +870,35 @@ private static DimensionsSpec createDimensionsSpec(List<NonnullPair<QueryableInd
);

int index = 0;
for (NonnullPair<QueryableIndex, DataSegment> pair : Lists.reverse(queryableIndices)) {
for (Pair<QueryableIndex, DataSegment> pair : Lists.reverse(queryableIndices)) {
final QueryableIndex queryableIndex = pair.lhs;
final Map<String, DimensionHandler> dimensionHandlerMap = queryableIndex.getDimensionHandlers();
if (queryableIndex != null) { // avoid tombstones
final Map<String, DimensionHandler> dimensionHandlerMap = queryableIndex.getDimensionHandlers();

for (String dimension : queryableIndex.getAvailableDimensions()) {
final ColumnHolder columnHolder = Preconditions.checkNotNull(
queryableIndex.getColumnHolder(dimension),
"Cannot find column for dimension[%s]",
dimension
);

if (!uniqueDims.containsKey(dimension)) {
final DimensionHandler dimensionHandler = Preconditions.checkNotNull(
dimensionHandlerMap.get(dimension),
"Cannot find dimensionHandler for dimension[%s]",
for (String dimension : queryableIndex.getAvailableDimensions()) {
final ColumnHolder columnHolder = Preconditions.checkNotNull(
queryableIndex.getColumnHolder(dimension),
"Cannot find column for dimension[%s]",
dimension
);

uniqueDims.put(dimension, index++);
dimensionSchemaMap.put(
dimension,
createDimensionSchema(
dimension,
columnHolder.getCapabilities(),
dimensionHandler.getMultivalueHandling()
)
);
if (!uniqueDims.containsKey(dimension)) {
final DimensionHandler dimensionHandler = Preconditions.checkNotNull(
dimensionHandlerMap.get(dimension),
"Cannot find dimensionHandler for dimension[%s]",
dimension
);

uniqueDims.put(dimension, index++);
dimensionSchemaMap.put(
dimension,
createDimensionSchema(
dimension,
columnHolder.getCapabilities(),
dimensionHandler.getMultivalueHandling()
)
);
}
}
}
}
@@ -905,25 +918,33 @@ private static DimensionsSpec createDimensionsSpec(List<NonnullPair<QueryableInd
return new DimensionsSpec(dimensionSchemas);
}

private static List<NonnullPair<QueryableIndex, DataSegment>> loadSegments(
/**
* This private method does not load, does not create QueryableIndices, for tombstones since tombstones
* do not have a file image, they are never pushed to deep storage. Thus, for the case of a tombstone,
* The return list
* will contain a null for the QueryableIndex slot in the pair (lhs)
*/
private static List<Pair<QueryableIndex, DataSegment>> loadSegments(
List<TimelineObjectHolder<String, DataSegment>> timelineObjectHolders,
Map<DataSegment, File> segmentFileMap,
IndexIO indexIO
) throws IOException
{
final List<NonnullPair<QueryableIndex, DataSegment>> segments = new ArrayList<>();
final List<Pair<QueryableIndex, DataSegment>> segments = new ArrayList<>();

for (TimelineObjectHolder<String, DataSegment> timelineObjectHolder : timelineObjectHolders) {
final PartitionHolder<DataSegment> partitionHolder = timelineObjectHolder.getObject();
for (PartitionChunk<DataSegment> chunk : partitionHolder) {
QueryableIndex queryableIndex = null;
final DataSegment segment = chunk.getObject();
final QueryableIndex queryableIndex = indexIO.loadIndex(
Preconditions.checkNotNull(segmentFileMap.get(segment), "File for segment %s", segment.getId())
);
segments.add(new NonnullPair<>(queryableIndex, segment));
if (!chunk.getObject().isTombstone()) {
queryableIndex = indexIO.loadIndex(
Preconditions.checkNotNull(segmentFileMap.get(segment), "File for segment %s", segment.getId())
);
}
segments.add(new Pair<>(queryableIndex, segment));
}
}

return segments;
}

@@ -237,7 +237,6 @@ protected InputSourceReader fixedFormatReader(InputRowSchema inputRowSchema, @Nu
//noinspection ConstantConditions
return FluentIterable
.from(partitionHolder)
.filter(chunk -> !chunk.getObject().isTombstone())
.transform(chunk -> new DruidSegmentInputEntity(segmentCacheManager, chunk.getObject(), holder.getInterval()));
}).iterator();

@@ -91,4 +91,10 @@ public void close()
}
};
}

public boolean isFromTombstone()
{
return segment.isTombstone();
}

}
@@ -19,6 +19,7 @@

package org.apache.druid.indexing.input;

import com.google.common.base.Preconditions;
import org.apache.druid.data.input.InputEntity;
import org.apache.druid.data.input.InputEntityReader;
import org.apache.druid.data.input.InputFormat;
@@ -55,14 +56,28 @@ public InputEntityReader createReader(
File temporaryDirectory
)
{
return new DruidSegmentReader(
source,
indexIO,
inputRowSchema.getTimestampSpec(),
inputRowSchema.getDimensionsSpec(),
inputRowSchema.getColumnsFilter(),
dimFilter,
temporaryDirectory
// this method handles the case when the entity comes from a tombstone or from a regular segment
Preconditions.checkArgument(
source instanceof DruidSegmentInputEntity,
DruidSegmentInputEntity.class.getName() + " required, but "
+ source.getClass().getName() + " provided."
);

final InputEntityReader retVal;
// Cast is safe here because of the precondition above passed
if (((DruidSegmentInputEntity) source).isFromTombstone()) {
retVal = new DruidTombstoneSegmentReader(source);
} else {
retVal = new DruidSegmentReader(
source,
indexIO,
inputRowSchema.getTimestampSpec(),
inputRowSchema.getDimensionsSpec(),
inputRowSchema.getColumnsFilter(),
dimFilter,
temporaryDirectory
);
}
return retVal;
}
}

0 comments on commit 0460d45

Please sign in to comment.