Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automatic compaction by coordinators #5102

Merged
merged 33 commits into from Jan 13, 2018
Merged
Show file tree
Hide file tree
Changes from 28 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
a4acb33
Automatic compaction by coordinator
jihoonson Nov 18, 2017
5d2b153
add links
jihoonson Nov 18, 2017
ff77e53
skip compaction for very recent segments if they are small
jihoonson Nov 21, 2017
d452694
Merge branch 'master' of https://github.com/druid-io/druid into coord…
jihoonson Nov 21, 2017
c19b3f4
fix finding search interval
jihoonson Nov 22, 2017
8abff3d
fix finding search interval
jihoonson Nov 23, 2017
75606d2
fix TimelineHolder iteration
jihoonson Nov 23, 2017
1cc6e3f
add test for newestSegmentFirstPolicy
jihoonson Nov 28, 2017
3d19dee
add CompactionSegmentIterator
jihoonson Nov 29, 2017
82c14a1
add numTargetCompactionSegments
jihoonson Dec 1, 2017
9c6d7d8
add missing config
jihoonson Dec 1, 2017
87bd171
fix skipping huge shards
jihoonson Dec 2, 2017
2f768f6
fix handling large number of segments per shard
jihoonson Dec 5, 2017
e0455f7
fix test failure
jihoonson Dec 5, 2017
5088323
change recursive call to loop
jihoonson Dec 5, 2017
bfb5d45
fix logging
jihoonson Dec 5, 2017
f389122
fix build
jihoonson Dec 5, 2017
b29a1c7
fix test failure
jihoonson Dec 5, 2017
28d0232
address comments
jihoonson Dec 12, 2017
a6be73d
Merge branch 'master' of https://github.com/druid-io/druid into coord…
jihoonson Dec 21, 2017
e3478ff
change dataSources type
jihoonson Dec 21, 2017
a387102
check running pendingTasks at each run
jihoonson Dec 21, 2017
b8b58c8
fix test
jihoonson Dec 21, 2017
fae32e8
address comments
jihoonson Dec 22, 2017
ba2d766
fix build
jihoonson Dec 22, 2017
4514cd3
Merge branch 'master' of https://github.com/druid-io/druid into coord…
jihoonson Dec 22, 2017
f036538
Merge branch 'master' of https://github.com/druid-io/druid into coord…
jihoonson Jan 5, 2018
5d561a3
fix test
jihoonson Jan 5, 2018
6dee41e
address comments
jihoonson Jan 6, 2018
048d09d
address comments
jihoonson Jan 9, 2018
b44aeb0
add doc for segment size optimization
jihoonson Jan 9, 2018
80c0989
address comment
jihoonson Jan 10, 2018
b091a9b
Merge branch 'master' of https://github.com/druid-io/druid into coord…
jihoonson Jan 13, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
9 changes: 9 additions & 0 deletions api/src/main/java/io/druid/indexer/TaskStatusPlus.java
Expand Up @@ -29,6 +29,7 @@
public class TaskStatusPlus
{
private final String id;
private final String type;
private final DateTime createdTime;
private final DateTime queueInsertionTime;
private final TaskState state;
Expand All @@ -38,6 +39,7 @@ public class TaskStatusPlus
@JsonCreator
public TaskStatusPlus(
@JsonProperty("id") String id,
@JsonProperty("type") String type,
@JsonProperty("createdTime") DateTime createdTime,
@JsonProperty("queueInsertionTime") DateTime queueInsertionTime,
@JsonProperty("state") @Nullable TaskState state,
Expand All @@ -49,6 +51,7 @@ public TaskStatusPlus(
Preconditions.checkNotNull(duration, "duration");
}
this.id = Preconditions.checkNotNull(id, "id");
this.type = Preconditions.checkNotNull(type, "type");
this.createdTime = Preconditions.checkNotNull(createdTime, "createdTime");
this.queueInsertionTime = Preconditions.checkNotNull(queueInsertionTime, "queueInsertionTime");
this.state = state;
Expand All @@ -62,6 +65,12 @@ public String getId()
return id;
}

@JsonProperty
public String getType()
{
return type;
}

@JsonProperty
public DateTime getCreatedTime()
{
Expand Down
@@ -0,0 +1,144 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.druid.server.coordinator;

import com.google.common.collect.ImmutableList;
import io.druid.java.util.common.DateTimes;
import io.druid.server.coordinator.helper.CompactionSegmentIterator;
import io.druid.server.coordinator.helper.CompactionSegmentSearchPolicy;
import io.druid.server.coordinator.helper.NewestSegmentFirstPolicy;
import io.druid.timeline.DataSegment;
import io.druid.timeline.VersionedIntervalTimeline;
import io.druid.timeline.partition.NumberedShardSpec;
import io.druid.timeline.partition.ShardSpec;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.infra.Blackhole;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;

@State(Scope.Benchmark)
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
@Fork(value = 1, jvmArgsAppend = "-XX:+UseG1GC")
public class NewestSegmentFirstPolicyBenchmark
{
private static final String DATA_SOURCE_PREFIX = "dataSource_";

private final CompactionSegmentSearchPolicy policy = new NewestSegmentFirstPolicy();

@Param("100")
private int numDataSources;

@Param("10000")
private int numDayIntervalsPerDataSource;

@Param("10")
private int numPartitionsPerDayInterval;

@Param("800000000")
private long targetCompactionSizeBytes;

@Param("1000000")
private long segmentSizeBytes;

@Param("10")
private int numCompactionTaskSlots;

private Map<String, CoordinatorCompactionConfig> compactionConfigs;
private Map<String, VersionedIntervalTimeline<String, DataSegment>> dataSources;

@Setup(Level.Trial)
public void setup()
{
compactionConfigs = new HashMap<>();
for (int i = 0; i < numDataSources; i++) {
final String dataSource = DATA_SOURCE_PREFIX + i;
compactionConfigs.put(
dataSource,
new CoordinatorCompactionConfig(
dataSource,
0,
targetCompactionSizeBytes,
null,
null,
null,
null
)
);
}

dataSources = new HashMap<>();
for (int i = 0; i < numDataSources; i++) {
final String dataSource = DATA_SOURCE_PREFIX + i;

VersionedIntervalTimeline<String, DataSegment> timeline = new VersionedIntervalTimeline<>(
String.CASE_INSENSITIVE_ORDER
);

final int startYear = ThreadLocalRandom.current().nextInt(2000, 2040);
DateTime date = DateTimes.of(startYear, 1, 1, 0, 0);

for (int j = 0; j < numDayIntervalsPerDataSource; j++, date = date.plusDays(1)) {
for (int k = 0; k < numPartitionsPerDayInterval; k++) {
final ShardSpec shardSpec = new NumberedShardSpec(numPartitionsPerDayInterval, k);
final DataSegment segment = new DataSegment(
dataSource,
new Interval(date, date.plusDays(1)),
"version",
null,
ImmutableList.of(),
ImmutableList.of(),
shardSpec,
0,
segmentSizeBytes
);
timeline.add(segment.getInterval(), segment.getVersion(), shardSpec.createChunk(segment));
}
}

dataSources.put(dataSource, timeline);
}
}

@Benchmark
public void measureNewestSegmentFirstPolicy(Blackhole blackhole)
{
final CompactionSegmentIterator iterator = policy.reset(compactionConfigs, dataSources);
for (int i = 0; i < numCompactionTaskSlots && iterator.hasNext(); i++) {
final List<DataSegment> segments = iterator.next();
blackhole.consume(segments);
}
}
}
Expand Up @@ -224,6 +224,48 @@ public List<TimelineObjectHolder<VersionType, ObjectType>> lookupWithIncompleteP
}
}

public boolean isEmpty()
{
try {
lock.readLock().lock();
return completePartitionsTimeline.isEmpty();
}
finally {
lock.readLock().unlock();
}
}

public TimelineObjectHolder<VersionType, ObjectType> first()
{
try {
lock.readLock().lock();
return timelineEntryToObjectHolder(completePartitionsTimeline.firstEntry().getValue());
}
finally {
lock.readLock().unlock();
}
}

public TimelineObjectHolder<VersionType, ObjectType> last()
{
try {
lock.readLock().lock();
return timelineEntryToObjectHolder(completePartitionsTimeline.lastEntry().getValue());
}
finally {
lock.readLock().unlock();
}
}

private TimelineObjectHolder<VersionType, ObjectType> timelineEntryToObjectHolder(TimelineEntry entry)
{
return new TimelineObjectHolder<>(
entry.getTrueInterval(),
entry.getVersion(),
new PartitionHolder<>(entry.getPartitionHolder())
);
}

public Set<TimelineObjectHolder<VersionType, ObjectType>> findOvershadowed()
{
try {
Expand Down Expand Up @@ -260,13 +302,7 @@ public Set<TimelineObjectHolder<VersionType, ObjectType>> findOvershadowed()
for (Map.Entry<Interval, Map<VersionType, TimelineEntry>> versionEntry : overShadowed.entrySet()) {
for (Map.Entry<VersionType, TimelineEntry> entry : versionEntry.getValue().entrySet()) {
TimelineEntry object = entry.getValue();
retVal.add(
new TimelineObjectHolder<VersionType, ObjectType>(
object.getTrueInterval(),
object.getVersion(),
new PartitionHolder<ObjectType>(object.getPartitionHolder())
)
);
retVal.add(timelineEntryToObjectHolder(object));
}
}

Expand Down Expand Up @@ -557,14 +593,14 @@ public class TimelineEntry
private final VersionType version;
private final PartitionHolder<ObjectType> partitionHolder;

public TimelineEntry(Interval trueInterval, VersionType version, PartitionHolder<ObjectType> partitionHolder)
TimelineEntry(Interval trueInterval, VersionType version, PartitionHolder<ObjectType> partitionHolder)
{
this.trueInterval = Preconditions.checkNotNull(trueInterval);
this.version = Preconditions.checkNotNull(version);
this.partitionHolder = Preconditions.checkNotNull(partitionHolder);
}

public Interval getTrueInterval()
Interval getTrueInterval()
{
return trueInterval;
}
Expand Down
40 changes: 39 additions & 1 deletion docs/content/configuration/coordinator.md
Expand Up @@ -24,7 +24,7 @@ The coordinator node uses several of the global configs in [Configuration](../co
|Property|Description|Default|
|--------|-----------|-------|
|`druid.coordinator.period`|The run period for the coordinator. The coordinator’s operates by maintaining the current state of the world in memory and periodically looking at the set of segments available and segments being served to make decisions about whether any changes need to be made to the data topology. This property sets the delay between each of these runs.|PT60S|
|`druid.coordinator.period.indexingPeriod`|How often to send indexing tasks to the indexing service. Only applies if merge or conversion is turned on.|PT1800S (30 mins)|
|`druid.coordinator.period.indexingPeriod`|How often to send compact/merge/conversion tasks to the indexing service. It's recommended to be longer than `druid.manager.segments.pollDuration`|PT1800S (30 mins)|
|`druid.coordinator.startDelay`|The operation of the Coordinator works on the assumption that it has an up-to-date view of the state of the world when it runs, the current ZK interaction code, however, is written in a way that doesn’t allow the Coordinator to know for a fact that it’s done loading the current state of the world. This delay is a hack to give it enough time to believe that it has all the data.|PT300S|
|`druid.coordinator.merge.on`|Boolean flag for whether or not the coordinator should try and merge small segments into a more optimal segment size.|false|
|`druid.coordinator.conversion.on`|Boolean flag for converting old segment indexing versions to the latest segment indexing version.|false|
Expand Down Expand Up @@ -106,6 +106,9 @@ Issuing a GET request at the same URL will return the spec that is currently in
|`killDataSourceWhitelist`|List of dataSources for which kill tasks are sent if property `druid.coordinator.kill.on` is true.|none|
|`killAllDataSources`|Send kill tasks for ALL dataSources if property `druid.coordinator.kill.on` is true. If this is set to true then `killDataSourceWhitelist` must not be specified or be empty list.|false|
|`maxSegmentsInNodeLoadingQueue`|The maximum number of segments that could be queued for loading to any given server. This parameter could be used to speed up segments loading process, especially if there are "slow" nodes in the cluster (with low loading speed) or if too much segments scheduled to be replicated to some particular node (faster loading could be preferred to better segments distribution). Desired value depends on segments loading speed, acceptable replication time and number of nodes. Value 1000 could be a start point for a rather big cluster. Default value is 0 (loading queue is unbounded) |0|
|`compactionConfigs`|Compaction config list. See the below [Compaction Config](#compaction-config)|none|
|`compactionTaskSlotRatio`|The ratio of the total task slots to the copmaction task slots. The actual max number of compaction tasks is `min(maxCompactionTaskSlots, compactionTaskSlotRatio * total task slots)`.|0.1|
|`maxCompactionTaskSlots`|The maximum number of task slots for compaction task. The actual max number of compaction tasks is `min(maxCompactionTaskSlots, compactionTaskSlotRatio * total task slots)`.|Unbounded|

To view the audit history of coordinator dynamic config issue a GET request to the URL -

Expand All @@ -121,6 +124,41 @@ To view last <n> entries of the audit history of coordinator dynamic config issu
http://<COORDINATOR_IP>:<PORT>/druid/coordinator/v1/config/history?count=<n>
```

# Compaction Config

|Property|Description|Required|
|--------|-----------|--------|
|`dataSource`|dataSource name to be compacted.|yes|
|`taskPriority`|[Priority](../ingestion/tasks.html#task-priorities) of compact task.|no (default = 25)|
|`targetCompactionSizeBytes`|The target segment size of compaction. The actual size of a compact segment might be slightly larger or smaller than this value.|no (default = 838860800)|
|`numTargetCompactionSegments`|Max number of segments to compact together.|no (default = 150)|
|`skipOffsetFromLatest`|The offset for searching segments to be compacted. Strongly recommended to set for realtime dataSources. |no (default = "P1D")|
|`tuningConfig`|Tuning config for compact tasks. See below [Compact Task TuningConfig](#compact-task-tuningconfig).|no|
|`taskContext`|[Task context](../ingestion/tasks.html#task-context) for compact tasks.|no|

An example of compaction config is:

```json
{
"dataSource": "wikiticker",
"targetCompactionSizeBytes": 800000000,
"skipOffsetFromLatest": "P1D"
}
```

For the realtime dataSources, it's recommended to set `skipOffsetFromLatest` to some sufficiently large values to avoid frequent compact task failures.

## Compact Task TuningConfig

Compact task tuning config is a subset of the tuningConfig of IndexTask. See [TuningConfig](../ingestion/tasks.html#tuningconfig) for more details.

|Property|Required|
|--------|--------|
|`maxRowsInMemory`|no|
|`maxTotalRows`|no|
|`indexSpec`|no|
|`maxPendingPersists`|no|
|`publishTimeout`|no|

# Lookups Dynamic Config (EXPERIMENTAL)
These configuration options control the behavior of the Lookup dynamic configuration described in the [lookups page](../querying/lookups.html)
Expand Down