Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
dd3293a
MSQ compaction runner run test
cecemei Jan 22, 2026
f723153
fix test
cecemei Jan 26, 2026
4004d08
fix test 2
cecemei Jan 26, 2026
2531140
lock input interval
cecemei Jan 27, 2026
a335f9d
test
cecemei Jan 27, 2026
02f8030
Merge branch 'master' into compact-test
cecemei Jan 28, 2026
9050d70
test coverage
cecemei Jan 28, 2026
e43ec58
allowNonAlignedInterval and forceDropExisting
cecemei Jan 29, 2026
80ead3d
fix test
cecemei Jan 29, 2026
c3de9ea
Merge branch 'master' into compact-test
cecemei Jan 29, 2026
5d17e68
Update indexing-service/src/main/java/org/apache/druid/indexing/commo…
cecemei Feb 2, 2026
95f4e09
Update indexing-service/src/main/java/org/apache/druid/indexing/commo…
cecemei Feb 2, 2026
f918259
update
cecemei Feb 2, 2026
58c7197
style
cecemei Feb 2, 2026
4024d87
drop-existing
cecemei Feb 2, 2026
d56410f
Apply suggestion from @kfaraz
cecemei Feb 3, 2026
c79ca10
format
cecemei Feb 3, 2026
05c8688
aligned
cecemei Feb 3, 2026
8e44e7c
build
cecemei Feb 3, 2026
529dcd7
mis-aligned
cecemei Feb 4, 2026
c99ee19
format
cecemei Feb 4, 2026
27fa2ed
test
cecemei Feb 4, 2026
f4af751
lock-interval
cecemei Feb 4, 2026
c19202e
lock
cecemei Feb 4, 2026
d85d0fc
test
cecemei Feb 4, 2026
1fb1342
force drop existing, revert non-aligned, deprecated allowNonAlignedIn…
cecemei Feb 5, 2026
c38fbe1
revert THREE_HOUR
cecemei Feb 5, 2026
7221970
revert format change
cecemei Feb 5, 2026
782d2ac
test
cecemei Feb 5, 2026
241b2a7
comment
cecemei Feb 5, 2026
1079703
use-queue
cecemei Feb 5, 2026
3c7df1a
reduce test
cecemei Feb 5, 2026
a0ed966
batchSegmentAllocation
cecemei Feb 6, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
Expand Down Expand Up @@ -356,42 +355,6 @@ public boolean determineLockGranularityAndTryLock(TaskActionClient client, List<
}
}

/**
* Attempts to acquire a lock that covers certain segments.
* <p>
* Will look at {@link Tasks#FORCE_TIME_CHUNK_LOCK_KEY} to decide whether to acquire a time chunk or segment lock.
* <p>
* This method will initialize {@link #taskLockHelper} as a side effect.
*
* @return whether the lock was acquired
*/
boolean determineLockGranularityAndTryLockWithSegments(
TaskActionClient client,
List<DataSegment> segments,
BiConsumer<LockGranularity, List<DataSegment>> segmentCheckFunction
) throws IOException
{
final boolean forceTimeChunkLock = getContextValue(
Tasks.FORCE_TIME_CHUNK_LOCK_KEY,
Tasks.DEFAULT_FORCE_TIME_CHUNK_LOCK
);

if (forceTimeChunkLock) {
log.info("[%s] is set to true in task context. Use timeChunk lock", Tasks.FORCE_TIME_CHUNK_LOCK_KEY);
taskLockHelper = createLockHelper(LockGranularity.TIME_CHUNK);
segmentCheckFunction.accept(LockGranularity.TIME_CHUNK, segments);
return tryTimeChunkLock(
client,
new ArrayList<>(segments.stream().map(DataSegment::getInterval).collect(Collectors.toSet()))
);
} else {
final LockGranularityDetermineResult result = determineSegmentGranularity(segments);
taskLockHelper = createLockHelper(result.lockGranularity);
segmentCheckFunction.accept(result.lockGranularity, segments);
return tryLockWithDetermineResult(client, result);
}
}

private LockGranularityDetermineResult determineSegmentGranularity(TaskActionClient client, List<Interval> intervals)
throws IOException
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public class CompactionIOConfig implements IOConfig
@JsonCreator
public CompactionIOConfig(
@JsonProperty("inputSpec") CompactionInputSpec inputSpec,
@JsonProperty("allowNonAlignedInterval") boolean allowNonAlignedInterval,
@Deprecated @JsonProperty("allowNonAlignedInterval") boolean allowNonAlignedInterval,
@JsonProperty("dropExisting") @Nullable Boolean dropExisting
)
{
Expand All @@ -60,6 +60,10 @@ public CompactionInputSpec getInputSpec()
return inputSpec;
}

/**
* @deprecated Provided for backwards compatibility. Can lead to data being accidentally overshadowed.
*/
@Deprecated
@JsonProperty("allowNonAlignedInterval")
@JsonInclude(JsonInclude.Include.NON_DEFAULT)
public boolean isAllowNonAlignedInterval()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,7 @@ public int getPriority()
public boolean isReady(TaskActionClient taskActionClient) throws Exception
{
final List<DataSegment> segments = segmentProvider.findSegments(taskActionClient);
return determineLockGranularityAndTryLockWithSegments(taskActionClient, segments, segmentProvider::checkSegments);
return determineLockGranularityAndTryLock(taskActionClient, List.of(umbrellaInterval(segments, segmentProvider)));
}

@Override
Expand Down Expand Up @@ -474,6 +474,7 @@ public boolean isPerfectRollup()
* <li> Rollup is done on a multi-valued string dimension or an unknown dimension
* (since MSQ requires multi-valued string dimensions to be converted to arrays for rollup) </li>
* </ul>
*
* @return false for native engine, true for MSQ engine only when partitioning or rollup is done on a multi-valued
* string or unknown dimension.
*/
Expand Down Expand Up @@ -635,12 +636,7 @@ static Map<Interval, DataSchema> createDataSchemasForIntervals(
toolbox.getEmitter(),
metricBuilder,
segmentProvider.dataSource,
JodaUtils.umbrellaInterval(
Iterables.transform(
timelineSegments,
DataSegment::getInterval
)
),
umbrellaInterval(timelineSegments, segmentProvider),
lazyFetchSegments(
timelineSegments,
toolbox.getSegmentCacheManager()
Expand Down Expand Up @@ -671,6 +667,15 @@ private static Iterable<DataSegment> retrieveRelevantTimelineHolders(
return VersionedIntervalTimeline.getAllObjects(timelineSegments);
}

private static Interval umbrellaInterval(Iterable<DataSegment> segments, SegmentProvider segmentProvider)
{
return JodaUtils.umbrellaInterval(
List.of(
JodaUtils.umbrellaInterval(Iterables.transform(segments, DataSegment::getInterval)),
segmentProvider.interval
));
}

private static DataSchema createDataSchema(
ServiceEmitter emitter,
ServiceMetricEvent.Builder metricBuilder,
Expand Down Expand Up @@ -964,9 +969,9 @@ public DimensionsSpec getDimensionsSpec()
// Store forceSegmentSortByTime only if false, for compatibility with legacy compaction states.
final Boolean forceSegmentSortByTime = includeTimeAsDimension ? false : null;
return DimensionsSpec.builder()
.setDimensions(dimensionSchemas)
.setForceSegmentSortByTime(forceSegmentSortByTime)
.build();
.setDimensions(dimensionSchemas)
.setForceSegmentSortByTime(forceSegmentSortByTime)
.build();
}

public AggregatorFactory[] getMetricsSpec()
Expand Down Expand Up @@ -1016,7 +1021,7 @@ public Set<String> getMultiValuedDimensions()
* Sort {@link #segmentsIterable} in order, such that we look at later segments prior to earlier ones. Useful when
* analyzing dimensions, as it allows us to take the latest value we see, and therefore prefer types from more
* recent segments, if there was a change.
*
* <p>
* Returns a List copy of the original Iterable.
*/
private List<Pair<DataSegment, Supplier<ResourceHolder<QueryableIndex>>>> sortSegmentsListNewestFirst()
Expand Down Expand Up @@ -1146,7 +1151,9 @@ private void processProjections(final QueryableIndex index)
final QueryableIndex projectionIndex = Preconditions.checkNotNull(
index.getProjectionQueryableIndex(schema.getName())
);
final List<DimensionSchema> columnSchemas = Lists.newArrayListWithExpectedSize(schema.getGroupingColumns().size());
final List<DimensionSchema> columnSchemas = Lists.newArrayListWithExpectedSize(
schema.getGroupingColumns().size()
);
for (String groupingColumn : schema.getGroupingColumns()) {
if (groupingColumn.equals(schema.getTimeColumnName())) {
columnSchemas.add(
Expand Down Expand Up @@ -1291,6 +1298,11 @@ public Builder interval(Interval interval)
return inputSpec(new CompactionIntervalSpec(interval, null));
}

public Builder interval(Interval interval, boolean dropExisting)
{
return inputSpec(new CompactionIntervalSpec(interval, null), dropExisting);
}

public Builder segments(List<DataSegment> segments)
{
return inputSpec(SpecificSegmentsSpec.fromSegments(segments));
Expand Down Expand Up @@ -1394,7 +1406,7 @@ public CompactionTask build()

/**
* Compcation Task Tuning Config.
*
* <p>
* An extension of ParallelIndexTuningConfig. As of now, all this TuningConfig
* does is fail if the TuningConfig contains
* `awaitSegmentAvailabilityTimeoutMillis` that is != 0 since it is not
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ public Set<DataSegment> computeTombstoneSegmentsForReplace(

/**
* See the method body for an example and an indepth explanation as to how the replace interval is created
*
* @param intervalsToDrop Empty intervals in the query that need to be dropped. They should be aligned with the
* replaceGranularity
* @param intervalsToReplace Intervals in the query which are eligible for replacement with new data.
Expand Down Expand Up @@ -278,7 +279,8 @@ public DataSegment createTombstoneForTimeChunkInterval(
.version(version)
.shardSpec(shardSpec)
.loadSpec(tombstoneLoadSpec) // load spec is special for tombstone
.size(1); // in case coordinator segment balancing chokes with zero size
.size(1) // in case coordinator segment balancing chokes with zero size
.totalRows(0);

return dataSegmentBuilder.build();

Expand All @@ -291,8 +293,8 @@ public DataSegment createTombstoneForTimeChunkInterval(
* For a datasource having segments for 2020-01-01/2020-12-31 and 2022-01-01/2022-12-31, this method would return
* the segment 2020-01-01/2020-12-31 if the input intervals asked for the segment between 2019 and 2021.
*
* @param inputIntervals Intervals corresponding to the task
* @param dataSource Datasource corresponding to the task
* @param inputIntervals Intervals corresponding to the task
* @param dataSource Datasource corresponding to the task
* @return Intervals corresponding to used segments that overlap with any of the spec's input intervals
* @throws IOException If used segments cannot be retrieved
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.druid.indexing.common.actions;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.base.Suppliers;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.config.TaskStorageConfig;
Expand Down Expand Up @@ -48,10 +49,11 @@
import org.apache.druid.server.coordinator.simulate.BlockingExecutorService;
import org.apache.druid.server.coordinator.simulate.TestDruidLeaderSelector;
import org.apache.druid.server.coordinator.simulate.WrappingScheduledExecutorService;
import org.easymock.EasyMock;
import org.joda.time.Period;
import org.junit.rules.ExternalResource;

import java.util.concurrent.atomic.AtomicBoolean;

public class TaskActionTestKit extends ExternalResource
{
private final MetadataStorageTablesConfig metadataStorageTablesConfig = MetadataStorageTablesConfig.fromBase("druid");
Expand All @@ -66,7 +68,46 @@ public class TaskActionTestKit extends ExternalResource
private BlockingExecutorService metadataCachePollExec;

private boolean useSegmentMetadataCache = false;
private boolean useCentralizedDatasourceSchema = false;
private boolean batchSegmentAllocation = true;
private boolean skipSegmentPayloadFetchForAllocation = new TaskLockConfig().isBatchAllocationReduceMetadataIO();
private AtomicBoolean configFinalized = new AtomicBoolean();

public TaskActionTestKit setUseSegmentMetadataCache(boolean useSegmentMetadataCache)
{
if (configFinalized.get()) {
throw new IllegalStateException("Test config already finalized");
}
this.useSegmentMetadataCache = useSegmentMetadataCache;
return this;
}

public TaskActionTestKit setUseCentralizedDatasourceSchema(boolean useCentralizedDatasourceSchema)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did centralized schema also run into an issue similar to batch allocation?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's not supported by msq engine, schema is not saved

{
if (configFinalized.get()) {
throw new IllegalStateException("Test config already finalized");
}
this.useCentralizedDatasourceSchema = useCentralizedDatasourceSchema;
return this;
}

public TaskActionTestKit setBatchSegmentAllocation(boolean batchSegmentAllocation)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious, have we added new tests which need this?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah i think i had some issues with batch segment allocation with segment lock, and some pending segments cache was not removed properly or something (maybe i should have written it down), and have decided to disable the batch allocation to avoid this issue

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea so it's set to false in CompactionTaskRunBase

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, if that is the case, we would need to dig deeper into it since it sounds like a bug and batch segment allocation is the default. It should work for all cases.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's only for segment lock, and i wasn't sure it's due to test setup or anything.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, in that case, let's skip the test case for segment lock and add a comment or add a test case and mark it disabled. That way, we would be aware that there is a bug and can address it later.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i could not reproduce the test cases any more, maybe it disappeared after we switched to use the widen interval. anyways, so i added the segmentQueue test param.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, okay. So do the tests now use batch segment allocation or not?

{
if (configFinalized.get()) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we want to impose this limit? It should be okay to be able to change the config between tests.
The before method would be invoked before every test, right?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the before method was called before @test, wont that mean we can't change the config between tests, but in setup only?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well this is just to prevent ppl from accidently calling it inside test method

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to protect devs against that? This is test-only code.

wont that mean we can't change the config between tests, but in setup only?

If the setup method (i.e. before) is called before every @Test, then each test can potentially have its own setup with a different set of configs if needed. Changing the config while a @Test method is in progress will have no effect anyway.

throw new IllegalStateException("Test config already finalized");
}
this.batchSegmentAllocation = batchSegmentAllocation;
return this;
}

public TaskActionTestKit setSkipSegmentPayloadFetchForAllocation(boolean skipSegmentPayloadFetchForAllocation)
{
if (configFinalized.get()) {
throw new IllegalStateException("Test config already finalized");
}
this.skipSegmentPayloadFetchForAllocation = skipSegmentPayloadFetchForAllocation;
return this;
}

public StubServiceEmitter getServiceEmitter()
{
Expand All @@ -88,6 +129,11 @@ public GlobalTaskLockbox getTaskLockbox()
return taskLockbox;
}

public TaskStorage getTaskStorage()
{
return taskStorage;
}

public IndexerMetadataStorageCoordinator getMetadataStorageCoordinator()
{
return metadataStorageCoordinator;
Expand All @@ -98,16 +144,6 @@ public TaskActionToolbox getTaskActionToolbox()
return taskActionToolbox;
}

public void setSkipSegmentPayloadFetchForAllocation(boolean skipSegmentPayloadFetchForAllocation)
{
this.skipSegmentPayloadFetchForAllocation = skipSegmentPayloadFetchForAllocation;
}

public void setUseSegmentMetadataCache(boolean useSegmentMetadataCache)
{
this.useSegmentMetadataCache = useSegmentMetadataCache;
}

public void syncSegmentMetadataCache()
{
metadataCachePollExec.finishNextPendingTasks(4);
Expand All @@ -116,11 +152,13 @@ public void syncSegmentMetadataCache()
@Override
public void before()
{
Preconditions.checkState(configFinalized.compareAndSet(false, true));
emitter = new StubServiceEmitter();
taskStorage = new HeapMemoryTaskStorage(new TaskStorageConfig(new Period("PT24H")));
testDerbyConnector = new TestDerbyConnector(
new MetadataStorageConnectorConfig(),
metadataStorageTablesConfig
metadataStorageTablesConfig,
CentralizedDatasourceSchemaConfig.enabled(useCentralizedDatasourceSchema)
);
final ObjectMapper objectMapper = new TestUtils().getTestObjectMapper();
final SegmentSchemaManager segmentSchemaManager = new SegmentSchemaManager(
Expand All @@ -136,13 +174,19 @@ public void before()
metadataStorageTablesConfig,
testDerbyConnector,
segmentSchemaManager,
CentralizedDatasourceSchemaConfig.create(),
CentralizedDatasourceSchemaConfig.enabled(useCentralizedDatasourceSchema),
new HeapMemoryIndexingStateStorage()
);
taskLockbox = new GlobalTaskLockbox(taskStorage, metadataStorageCoordinator);
taskLockbox.syncFromStorage();
final TaskLockConfig taskLockConfig = new TaskLockConfig()
{
@Override
public boolean isBatchSegmentAllocation()
{
return batchSegmentAllocation;
}

@Override
public long getBatchAllocationWaitTime()
{
Expand All @@ -156,22 +200,28 @@ public boolean isBatchAllocationReduceMetadataIO()
}
};

SupervisorManager supervisorManager = new SupervisorManager(objectMapper, null);
SegmentAllocationQueue segmentAllocationQueue = new SegmentAllocationQueue(
taskLockbox,
taskLockConfig,
metadataStorageCoordinator,
emitter,
ScheduledExecutors::fixed
);
if (segmentAllocationQueue.isEnabled()) {
segmentAllocationQueue.becomeLeader();
}
taskActionToolbox = new TaskActionToolbox(
taskLockbox,
taskStorage,
metadataStorageCoordinator,
new SegmentAllocationQueue(
taskLockbox,
taskLockConfig,
metadataStorageCoordinator,
emitter,
ScheduledExecutors::fixed
),
segmentAllocationQueue,
emitter,
EasyMock.createMock(SupervisorManager.class),
supervisorManager,
objectMapper
);
testDerbyConnector.createDataSourceTable();
testDerbyConnector.createUpgradeSegmentsTable();
testDerbyConnector.createPendingSegmentsTable();
testDerbyConnector.createSegmentSchemasTable();
testDerbyConnector.createSegmentTable();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -372,9 +372,7 @@ public void testRunParallelWithRangePartitioningAndNoUpfrontSegmentFetching() th
Granularities.HOUR,
Granularities.MINUTE,
true,

// Umbrella interval for all segments, since CompactionTasks generated a single granularitySpec.
ImmutableList.of(Intervals.of("2014-01-01/2014-01-01T03:00:00"))
ImmutableList.of(INTERVAL_TO_INDEX)
),
null
);
Expand Down
Loading
Loading