Skip to content
Permalink
Browse files
Fix auto compaction by adjusting compaction task's interval to align …
…with segmentGranularity when segmentGranularity is set (#12334)

* add impl

* add ITs

* address comments

* address comments

* address comments

* fix failure

* fix checkstyle

* fix checkstyle
  • Loading branch information
maytasm committed Mar 18, 2022
1 parent 6f0e5f2 commit dbb9518f50d87f2a0512e8262ebc85a9f9f0e575
Show file tree
Hide file tree
Showing 8 changed files with 335 additions and 16 deletions.
@@ -24,9 +24,10 @@
import com.google.common.collect.PeekingIterator;
import org.apache.druid.java.util.common.guava.Comparators;
import org.joda.time.DateTime;
import org.joda.time.DateTimeComparator;
import org.joda.time.Interval;
import org.joda.time.chrono.ISOChronology;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
@@ -182,18 +183,18 @@ private static void verifyAscendingSortOrder(Interval previous, Interval current

public static Interval umbrellaInterval(Iterable<Interval> intervals)
{
ArrayList<DateTime> startDates = new ArrayList<>();
ArrayList<DateTime> endDates = new ArrayList<>();
boolean emptyIntervals = true;
DateTimeComparator dateTimeComp = DateTimeComparator.getInstance();
DateTime minStart = new DateTime(Long.MAX_VALUE, ISOChronology.getInstanceUTC());
DateTime maxEnd = new DateTime(Long.MIN_VALUE, ISOChronology.getInstanceUTC());

for (Interval interval : intervals) {
startDates.add(interval.getStart());
endDates.add(interval.getEnd());
emptyIntervals = false;
minStart = Collections.min(ImmutableList.of(minStart, interval.getStart()), dateTimeComp);
maxEnd = Collections.max(ImmutableList.of(maxEnd, interval.getEnd()), dateTimeComp);
}

DateTime minStart = minDateTime(startDates.toArray(new DateTime[0]));
DateTime maxEnd = maxDateTime(endDates.toArray(new DateTime[0]));

if (minStart == null || maxEnd == null) {
if (emptyIntervals) {
throw new IllegalArgumentException("Empty list of intervals");
}
return new Interval(minStart, maxEnd);
@@ -202,7 +202,7 @@ public TaskPayloadResponse getTaskPayload(String taskId)
try {
StatusResponseHolder response = makeRequest(
HttpMethod.GET,
StringUtils.format("%stask/%s", getIndexerURL(), taskId)
StringUtils.format("%stask/%s", getIndexerURL(), StringUtils.urlEncode(taskId))
);
LOG.debug("Task %s response %s", taskId, response.getContent());
return jsonMapper.readValue(
@@ -25,6 +25,7 @@
import org.apache.druid.guice.GuiceInjectors;
import org.apache.druid.guice.IndexingServiceFirehoseModule;
import org.apache.druid.guice.IndexingServiceInputSourceModule;
import org.apache.druid.guice.IndexingServiceTuningConfigModule;
import org.apache.druid.initialization.Initialization;
import org.testng.IModuleFactory;
import org.testng.ITestContext;
@@ -50,7 +51,8 @@ private static List<? extends Module> getModules()
return ImmutableList.of(
new DruidTestModule(),
new IndexingServiceFirehoseModule(),
new IndexingServiceInputSourceModule()
new IndexingServiceInputSourceModule(),
new IndexingServiceTuningConfigModule()
);
}

@@ -30,8 +30,12 @@
import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec;
import org.apache.druid.indexing.common.task.CompactionIntervalSpec;
import org.apache.druid.indexing.common.task.CompactionTask;
import org.apache.druid.indexing.overlord.http.TaskPayloadResponse;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
@@ -631,6 +635,103 @@ public void testAutoCompactionDutyWithSegmentGranularityAndSmallerSegmentGranula
}
}

@Test
public void testAutoCompactionDutyWithSegmentGranularityFinerAndNotAlignWithSegment() throws Exception
{
updateCompactionTaskSlot(1, 1, null);
final ISOChronology chrono = ISOChronology.getInstance(DateTimes.inferTzFromString("America/Los_Angeles"));
Map<String, Object> specs = ImmutableMap.of("%%GRANULARITYSPEC%%", new UniformGranularitySpec(Granularities.MONTH, Granularities.DAY, false, ImmutableList.of(new Interval("2013-08-31/2013-09-02", chrono))));
loadData(INDEX_TASK_WITH_GRANULARITY_SPEC, specs);
try (final Closeable ignored = unloader(fullDatasourceName)) {
Map<String, Object> expectedResult = ImmutableMap.of(
"%%FIELD_TO_QUERY%%", "added",
"%%EXPECTED_COUNT_RESULT%%", 2,
"%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(57.0), ImmutableList.of(459.0))))
);
verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, expectedResult);
submitCompactionConfig(
MAX_ROWS_PER_SEGMENT_COMPACTED,
NO_SKIP_OFFSET,
new UserCompactionTaskGranularityConfig(Granularities.WEEK, null, null),
false
);
// Before compaction, we have segments with the interval 2013-08-01/2013-09-01 and 2013-09-01/2013-10-01
// We will compact the latest segment, 2013-09-01/2013-10-01, to WEEK.
// Since the start of the week does not align with 2013-09-01 or 2013-10-01, we expect the compaction task's
// interval to be adjusted so that the compacted WEEK segments does not unintentionally remove data of the
// non compacted 2013-08-01/2013-09-01 segment.
// Note that the compacted WEEK segment does not fully cover the original MONTH segment as the MONTH segment
// does not have data on every week on the month
forceTriggerAutoCompaction(3);
// Make sure that no data is lost after compaction
expectedResult = ImmutableMap.of(
"%%FIELD_TO_QUERY%%", "added",
"%%EXPECTED_COUNT_RESULT%%", 2,
"%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(57.0), ImmutableList.of(459.0))))
);
verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, expectedResult);
verifySegmentsCompacted(1, MAX_ROWS_PER_SEGMENT_COMPACTED);
List<TaskResponseObject> tasks = indexer.getCompleteTasksForDataSource(fullDatasourceName);
TaskResponseObject compactTask = null;
for (TaskResponseObject task : tasks) {
if (task.getType().equals("compact")) {
compactTask = task;
}
}
Assert.assertNotNull(compactTask);
TaskPayloadResponse task = indexer.getTaskPayload(compactTask.getId());
// Verify that compaction task interval is adjusted to align with segmentGranularity
Assert.assertEquals(Intervals.of("2013-08-26T00:00:00.000Z/2013-10-07T00:00:00.000Z"), ((CompactionIntervalSpec) ((CompactionTask) task.getPayload()).getIoConfig().getInputSpec()).getInterval());
}
}

@Test
public void testAutoCompactionDutyWithSegmentGranularityCoarserAndNotAlignWithSegment() throws Exception
{
updateCompactionTaskSlot(1, 1, null);
final ISOChronology chrono = ISOChronology.getInstance(DateTimes.inferTzFromString("America/Los_Angeles"));
Map<String, Object> specs = ImmutableMap.of("%%GRANULARITYSPEC%%", new UniformGranularitySpec(Granularities.WEEK, Granularities.DAY, false, ImmutableList.of(new Interval("2013-08-31/2013-09-02", chrono))));
loadData(INDEX_TASK_WITH_GRANULARITY_SPEC, specs);
try (final Closeable ignored = unloader(fullDatasourceName)) {
Map<String, Object> expectedResult = ImmutableMap.of(
"%%FIELD_TO_QUERY%%", "added",
"%%EXPECTED_COUNT_RESULT%%", 2,
"%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(57.0), ImmutableList.of(459.0))))
);
verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, expectedResult);
submitCompactionConfig(
MAX_ROWS_PER_SEGMENT_COMPACTED,
NO_SKIP_OFFSET,
new UserCompactionTaskGranularityConfig(Granularities.MONTH, null, null),
false
);
// Before compaction, we have segments with the interval 2013-08-26T00:00:00.000Z/2013-09-02T00:00:00.000Z
// We will compact the latest segment to MONTH.
// Although the segments before compaction only cover 2013-08-26 to 2013-09-02,
// we expect the compaction task's interval to align with the MONTH segmentGranularity (2013-08-01 to 2013-10-01)
forceTriggerAutoCompaction(2);
// Make sure that no data is lost after compaction
expectedResult = ImmutableMap.of(
"%%FIELD_TO_QUERY%%", "added",
"%%EXPECTED_COUNT_RESULT%%", 2,
"%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(57.0), ImmutableList.of(459.0))))
);
verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, expectedResult);
verifySegmentsCompacted(2, MAX_ROWS_PER_SEGMENT_COMPACTED);
List<TaskResponseObject> tasks = indexer.getCompleteTasksForDataSource(fullDatasourceName);
TaskResponseObject compactTask = null;
for (TaskResponseObject task : tasks) {
if (task.getType().equals("compact")) {
compactTask = task;
}
}
Assert.assertNotNull(compactTask);
TaskPayloadResponse task = indexer.getTaskPayload(compactTask.getId());
// Verify that compaction task interval is adjusted to align with segmentGranularity
Assert.assertEquals(Intervals.of("2013-08-01T00:00:00.000Z/2013-10-01T00:00:00.000Z"), ((CompactionIntervalSpec) ((CompactionTask) task.getPayload()).getIoConfig().getInputSpec()).getInterval());
}
}

@Test
public void testAutoCompactionDutyWithRollup() throws Exception
{
@@ -23,6 +23,8 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.JodaUtils;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.timeline.DataSegment;
import org.joda.time.Interval;

@@ -38,18 +40,45 @@
*/
public class ClientCompactionIntervalSpec
{
private static final Logger LOGGER = new Logger(ClientCompactionIntervalSpec.class);

private static final String TYPE = "interval";

private final Interval interval;
@Nullable
private final String sha256OfSortedSegmentIds;

public static ClientCompactionIntervalSpec fromSegments(List<DataSegment> segments)
public static ClientCompactionIntervalSpec fromSegments(List<DataSegment> segments, @Nullable Granularity segmentGranularity)
{
Interval interval = JodaUtils.umbrellaInterval(segments.stream().map(DataSegment::getInterval).collect(Collectors.toList()));
LOGGER.info("Original umbrella interval %s in compaction task for datasource %s", interval, segments.get(0).getDataSource());
if (segmentGranularity != null) {
// If segmentGranularity is set, then the segmentGranularity of the segments may not align with the configured segmentGranularity
// We must adjust the interval of the compaction task to fully cover and align with the segmentGranularity
// For example,
// - The umbrella interval of the segments is 2015-04-11/2015-04-12 but configured segmentGranularity is YEAR,
// if the compaction task's interval is 2015-04-11/2015-04-12 then we can run into race condition where after submitting
// the compaction task, a new segment outside of the interval (i.e. 2015-02-11/2015-02-12) got created will be lost as it is
// overshadowed by the compacted segment (compacted segment has interval 2015-01-01/2016-01-01.
// Hence, in this case, we must adjust the compaction task interval to 2015-01-01/2016-01-01.
// - The segment to be compacted has MONTH segmentGranularity with the interval 2015-02-01/2015-03-01 but configured
// segmentGranularity is WEEK. If the compaction task's interval is 2015-02-01/2015-03-01 then compacted segments created will be
// 2015-01-26/2015-02-02, 2015-02-02/2015-02-09, 2015-02-09/2015-02-16, 2015-02-16/2015-02-23, 2015-02-23/2015-03-02.
// This is because Druid's WEEK segments alway start and end on Monday. In the above example, 2015-01-26 and 2015-03-02
// are Mondays but 2015-02-01 and 2015-03-01 are not. Hence, the WEEK segments have to start and end on 2015-01-26 and 2015-03-02.
// If the compaction task's interval is 2015-02-01/2015-03-01, then the compacted segment would cause existing data
// from 2015-01-26 to 2015-02-01 and 2015-03-01 to 2015-03-02 to be lost. Hence, in this case,
// we must adjust the compaction task interval to 2015-01-26/2015-03-02
interval = JodaUtils.umbrellaInterval(segmentGranularity.getIterable(interval));
LOGGER.info(
"Interval adjusted to %s in compaction task for datasource %s with configured segmentGranularity %s",
interval,
segments.get(0).getDataSource(),
segmentGranularity
);
}
return new ClientCompactionIntervalSpec(
JodaUtils.umbrellaInterval(
segments.stream().map(DataSegment::getInterval).collect(Collectors.toList())
),
interval,
null
);
}
@@ -31,6 +31,7 @@
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.http.client.response.StringFullResponseHolder;
@@ -103,10 +104,11 @@ public String compactSegments(
context.put("priority", compactionTaskPriority);

final String taskId = IdUtils.newTaskId(idPrefix, ClientCompactionTaskQuery.TYPE, dataSource, null);
final Granularity segmentGranularity = granularitySpec == null ? null : granularitySpec.getSegmentGranularity();
final ClientTaskQuery taskQuery = new ClientCompactionTaskQuery(
taskId,
dataSource,
new ClientCompactionIOConfig(ClientCompactionIntervalSpec.fromSegments(segments), dropExisting),
new ClientCompactionIOConfig(ClientCompactionIntervalSpec.fromSegments(segments, segmentGranularity), dropExisting),
tuningConfig,
granularitySpec,
dimensionsSpec,
@@ -0,0 +1,114 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.client.indexing;

import com.google.common.collect.ImmutableList;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.NoneShardSpec;
import org.junit.Assert;
import org.junit.Test;

import java.util.ArrayList;
import java.util.HashMap;

public class ClientCompactionIntervalSpecTest
{
private final DataSegment dataSegment1 = new DataSegment(
"test",
Intervals.of("2015-04-11/2015-04-12"),
DateTimes.nowUtc().toString(),
new HashMap<>(),
new ArrayList<>(),
new ArrayList<>(),
NoneShardSpec.instance(),
IndexIO.CURRENT_VERSION_ID,
11
);
private final DataSegment dataSegment2 = new DataSegment(
"test",
Intervals.of("2015-04-12/2015-04-14"),
DateTimes.nowUtc().toString(),
new HashMap<>(),
new ArrayList<>(),
new ArrayList<>(),
NoneShardSpec.instance(),
IndexIO.CURRENT_VERSION_ID,
11
);
private final DataSegment dataSegment3 = new DataSegment(
"test",
Intervals.of("2015-02-12/2015-03-13"),
DateTimes.nowUtc().toString(),
new HashMap<>(),
new ArrayList<>(),
new ArrayList<>(),
NoneShardSpec.instance(),
IndexIO.CURRENT_VERSION_ID,
11
);

@Test
public void testFromSegmentWithNoSegmentGranularity()
{
// The umbrella interval of segments is 2015-02-12/2015-04-14
ClientCompactionIntervalSpec actual = ClientCompactionIntervalSpec.fromSegments(ImmutableList.of(dataSegment1, dataSegment2, dataSegment3), null);
Assert.assertEquals(Intervals.of("2015-02-12/2015-04-14"), actual.getInterval());
}

@Test
public void testFromSegmentWitSegmentGranularitySameAsSegment()
{
// The umbrella interval of segments is 2015-04-11/2015-04-12
ClientCompactionIntervalSpec actual = ClientCompactionIntervalSpec.fromSegments(ImmutableList.of(dataSegment1), Granularities.DAY);
Assert.assertEquals(Intervals.of("2015-04-11/2015-04-12"), actual.getInterval());
}

@Test
public void testFromSegmentWithCoarserSegmentGranularity()
{
// The umbrella interval of segments is 2015-02-12/2015-04-14
ClientCompactionIntervalSpec actual = ClientCompactionIntervalSpec.fromSegments(ImmutableList.of(dataSegment1, dataSegment2, dataSegment3), Granularities.YEAR);
// The compaction interval should be expanded to start of the year and end of the year to cover the segmentGranularity
Assert.assertEquals(Intervals.of("2015-01-01/2016-01-01"), actual.getInterval());
}

@Test
public void testFromSegmentWithFinerSegmentGranularityAndUmbrellaIntervalAlign()
{
// The umbrella interval of segments is 2015-02-12/2015-04-14
ClientCompactionIntervalSpec actual = ClientCompactionIntervalSpec.fromSegments(ImmutableList.of(dataSegment1, dataSegment2, dataSegment3), Granularities.DAY);
// The segmentGranularity of DAY align with the umbrella interval (umbrella interval can be evenly divide into the segmentGranularity)
Assert.assertEquals(Intervals.of("2015-02-12/2015-04-14"), actual.getInterval());
}

@Test
public void testFromSegmentWithFinerSegmentGranularityAndUmbrellaIntervalNotAlign()
{
// The umbrella interval of segments is 2015-02-12/2015-04-14
ClientCompactionIntervalSpec actual = ClientCompactionIntervalSpec.fromSegments(ImmutableList.of(dataSegment1, dataSegment2, dataSegment3), Granularities.WEEK);
// The segmentGranularity of WEEK does not align with the umbrella interval (umbrella interval cannot be evenly divide into the segmentGranularity)
// Hence the compaction interval is modified to aling with the segmentGranularity
Assert.assertEquals(Intervals.of("2015-02-09/2015-04-20"), actual.getInterval());
}
}

0 comments on commit dbb9518

Please sign in to comment.