New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automatic compaction by coordinators #5102

Merged
merged 33 commits into from Jan 13, 2018

Conversation

Projects
None yet
4 participants
@jihoonson
Contributor

jihoonson commented Nov 18, 2017

Part of #4479.

This patch introduces the automatic compaction by coordinators.

How it works

Each run, the Druid coordinator compacts small segments abutting each other. It first finds the segments to compact together based on the segment search policy. Once it finds some segments, it launches a compactTask to compact those segments. The maximum number of running compact tasks is limited by max(sum of worker capacity * compactionTaskSlotRatio, 1).

Segment Search Policy

Currently there is only one segment search policy, newest segment first policy.

Newest Segment First Policy

This policy searches the segments of all dataSources in inverse order of their intervals.
For example, let me assume there are 3 dataSources (ds1, ds2, ds3) and 5 segments (seg_ds1_2017-10-01_2017-10-02, seg_ds1_2017-11-01_2017-11-02, seg_ds2_2017-08-01_2017-08-02, seg_ds3_2017-07-01_2017-07-02, seg_ds3_2017-12-01_2017-12-02) for those dataSources. The segment name indicates its dataSource and interval. The search result of newestSegmentFirstPolicy is [seg_ds3_2017-12-01_2017-12-02, seg_ds1_2017-11-01_2017-11-02, seg_ds1_2017-10-01_2017-10-02, seg_ds2_2017-08-01_2017-08-02, seg_ds3_2017-07-01_2017-07-02].

Every run, this policy starts searching segments for the (very latest interval - skipOffsetFromLatest). This is to handle the late segments ingested to realtime dataSources.

This policy currenlty cannot handle the situation when a shard consists of a lot of small segments, thereby its total size exceeds the targetCompactionSizebytes. If it finds such shards, it simply skips them.

I did some benchmark for the performance of this policy. It took about 1 seconds to iterate 10000000 segments in my mac. I think it's fine because 1) having such many segments is not common and 2) the term between each run of this policy is usually much larger than 1 seconds (it's 30 mins by default).

How you can test it

You can post, get, and delete a compaction config for a dataSource with a new coordinator API http://{COORDINATOR_IP:PORT}/druid/coordinator/v1/config/{dataSource}.

A sample compaction config is:

{
  "dataSource": "wikiticker",
  "targetCompactionSizeBytes": 800000000,
  "skipOffsetFromLatest": "P1D"
}

This change is Reviewable

@jihoonson jihoonson added the WIP label Nov 27, 2017

@jihoonson jihoonson removed the WIP label Dec 2, 2017

@jihoonson

This comment has been minimized.

Show comment
Hide comment
@jihoonson

jihoonson Dec 2, 2017

Contributor

This patch is ready for review.

Contributor

jihoonson commented Dec 2, 2017

This patch is ready for review.

@himanshug

This comment has been minimized.

Show comment
Hide comment
@himanshug

himanshug Dec 8, 2017

Contributor

started reviewing this PR, some explanatory javadocs on NewestSegmentFirstIterator class would be helpful.

Contributor

himanshug commented Dec 8, 2017

started reviewing this PR, some explanatory javadocs on NewestSegmentFirstIterator class would be helpful.

)
);
}
);

This comment has been minimized.

@himanshug

himanshug Dec 8, 2017

Contributor

is this always done or only when compaction is enabled and this information is needed ?

@himanshug

himanshug Dec 8, 2017

Contributor

is this always done or only when compaction is enabled and this information is needed ?

This comment has been minimized.

@jihoonson

jihoonson Dec 12, 2017

Contributor

This is always done. Currently, all segments of the all dataSources are being gathered to emit some statistics in DruidCoordinatorLogger. I only changed the type of dataSources in DruidCoordinatorRuntimeParams from Set<ImmutableDataSource> to Map<String, VersionedIntervalTimeline<String, DataSegment>> to use this information in DruidCoordinatorSegmentCompactor as well.

Another alternative is to keep the current type of dataSources in DruidCoordinatorRuntimeParams and convert it to Map<String, VersionedIntervalTimeline<String, DataSegment>> in DruidCoordinatorSegmentCompactor when it's needed, but this may increase the memory usage. As an extreme example, there might be a lot of segments and the compaction might be enabled for all dataSources. In that case, this alternative approach will double the memory consumption. I expect users to enable the automatic compaction for all their realtime dataSources as well as some batch dataSources, so the memory burden might not be small.

@jihoonson

jihoonson Dec 12, 2017

Contributor

This is always done. Currently, all segments of the all dataSources are being gathered to emit some statistics in DruidCoordinatorLogger. I only changed the type of dataSources in DruidCoordinatorRuntimeParams from Set<ImmutableDataSource> to Map<String, VersionedIntervalTimeline<String, DataSegment>> to use this information in DruidCoordinatorSegmentCompactor as well.

Another alternative is to keep the current type of dataSources in DruidCoordinatorRuntimeParams and convert it to Map<String, VersionedIntervalTimeline<String, DataSegment>> in DruidCoordinatorSegmentCompactor when it's needed, but this may increase the memory usage. As an extreme example, there might be a lot of segments and the compaction might be enabled for all dataSources. In that case, this alternative approach will double the memory consumption. I expect users to enable the automatic compaction for all their realtime dataSources as well as some batch dataSources, so the memory burden might not be small.

This comment has been minimized.

@himanshug

himanshug Jan 5, 2018

Contributor

its alright given specially since it was there already.

@himanshug

himanshug Jan 5, 2018

Contributor

its alright given specially since it was there already.

@himanshug

This comment has been minimized.

Show comment
Hide comment
@himanshug

himanshug Dec 8, 2017

Contributor

is this feature general enough that any druid user can just turn it on and forget about it? if not, then can you please add some blurb to the docs explaining when it should be enabled and when not.

Contributor

himanshug commented Dec 8, 2017

is this feature general enough that any druid user can just turn it on and forget about it? if not, then can you please add some blurb to the docs explaining when it should be enabled and when not.

@jihoonson

This comment has been minimized.

Show comment
Hide comment
Contributor

jihoonson commented Dec 12, 2017

@himanshug

This comment has been minimized.

Show comment
Hide comment
@himanshug

himanshug Dec 18, 2017

Contributor

@jihoonson please let me know when you're done with this, I'm assuming #5149 needs to be merged first to complete this PR.

Contributor

himanshug commented Dec 18, 2017

@jihoonson please let me know when you're done with this, I'm assuming #5149 needs to be merged first to complete this PR.

State state
)
{
super(taskId, worker, location);
super(taskId, task == null ? null : task.getType(), worker, location);

This comment has been minimized.

@himanshug

himanshug Jan 5, 2018

Contributor

taskType should be updated in setTask(Task) method as well.

@himanshug

himanshug Jan 5, 2018

Contributor

taskType should be updated in setTask(Task) method as well.

This comment has been minimized.

@jihoonson

jihoonson Jan 6, 2018

Contributor

Thanks. Added.

@jihoonson

jihoonson Jan 6, 2018

Contributor

Thanks. Added.

@@ -473,15 +500,28 @@ public Response getPendingTasks(@Context final HttpServletRequest req)
@GET
@Path("/runningTasks")
@Produces(MediaType.APPLICATION_JSON)
public Response getRunningTasks(@Context final HttpServletRequest req)
public Response getRunningTasks(
@QueryParam("type") String taskType,

This comment has been minimized.

@himanshug

himanshug Jan 5, 2018

Contributor

so all the taskType business is added to support this use case ? why did we need this ?

@himanshug

himanshug Jan 5, 2018

Contributor

so all the taskType business is added to support this use case ? why did we need this ?

This comment has been minimized.

@jihoonson

jihoonson Jan 6, 2018

Contributor

First, in this PR, it's needed for coordinators to find only running compact tasks. At each run, the coordinator computes the available compaction task slots by (total worker capacity - running compact tasks).

Second, I'm planning to refactor the code around taskStatus. We have several classes relating taskStatus, and meta information (like taskLocation, task creation time, etc) is scattered in those classes. The meta information is important for clients as well as Druid itself, but this scattered information makes difficult to have the same view in all places.

@jihoonson

jihoonson Jan 6, 2018

Contributor

First, in this PR, it's needed for coordinators to find only running compact tasks. At each run, the coordinator computes the available compaction task slots by (total worker capacity - running compact tasks).

Second, I'm planning to refactor the code around taskStatus. We have several classes relating taskStatus, and meta information (like taskLocation, task creation time, etc) is scattered in those classes. The meta information is important for clients as well as Druid itself, but this scattered information makes difficult to have the same view in all places.

public class ClientCompactQueryTuningConfig
{
// These default values should be synchronized with those of IndexTuningConfig

This comment has been minimized.

@himanshug

himanshug Jan 5, 2018

Contributor

I think you meant RealtimeTuningConfig. However, in that case, we should probably declare these as public static final xxx in one of these classes and other class should refer to those. then they would stay same automatically.

@himanshug

himanshug Jan 5, 2018

Contributor

I think you meant RealtimeTuningConfig. However, in that case, we should probably declare these as public static final xxx in one of these classes and other class should refer to those. then they would stay same automatically.

This comment has been minimized.

@jihoonson

jihoonson Jan 6, 2018

Contributor

A compaction task is essentially an indexTask + ingestSegmentFirehose. This class should be synchronized with IndexTuningConfig.
Unfortunately, druid-server currently has no dependency on druid-indexing-service, so it cannot use the IndexTuningConfig for now. This is why this discussion comes up.

@jihoonson

jihoonson Jan 6, 2018

Contributor

A compaction task is essentially an indexTask + ingestSegmentFirehose. This class should be synchronized with IndexTuningConfig.
Unfortunately, druid-server currently has no dependency on druid-indexing-service, so it cannot use the IndexTuningConfig for now. This is why this discussion comes up.

This comment has been minimized.

@himanshug

himanshug Jan 7, 2018

Contributor

yeah , I know ... its unfortunate to have all these classes in server/ with counterparts in indexing-service/ . we should probably think about merging them.
maybe the solution is to merge everything from indexing-service/ into server/

created #5229 to track it.

@himanshug

himanshug Jan 7, 2018

Contributor

yeah , I know ... its unfortunate to have all these classes in server/ with counterparts in indexing-service/ . we should probably think about merging them.
maybe the solution is to merge everything from indexing-service/ into server/

created #5229 to track it.

/**
* Should be synchronized with io.druid.indexing.worker.Worker
*/
public class IndexingWorker

This comment has been minimized.

@himanshug

himanshug Jan 5, 2018

Contributor

is it possible to move specific classes like Worker to server module for now?

@himanshug

himanshug Jan 5, 2018

Contributor

is it possible to move specific classes like Worker to server module for now?

This comment has been minimized.

@jihoonson

jihoonson Jan 6, 2018

Contributor

It's not possible to move for now because Worker is used in a lot of places in druid-indexing-service module.

@jihoonson

jihoonson Jan 6, 2018

Contributor

It's not possible to move for now because Worker is used in a lot of places in druid-indexing-service module.

This comment has been minimized.

@himanshug

himanshug Jan 7, 2018

Contributor

#5229 would probably be the right solution

@himanshug

himanshug Jan 7, 2018

Contributor

#5229 would probably be the right solution

/**
* Should be synchronized with io.druid.indexing.overlord.ImmutableWorkerInfo
*/
public class IndexingWorkerInfo

This comment has been minimized.

@himanshug

himanshug Jan 5, 2018

Contributor

same here

@himanshug

himanshug Jan 5, 2018

Contributor

same here

/**
* Should be synchronized with io.druid.indexing.common.TaskStatus.
*/
public class QueryStatus

This comment has been minimized.

@himanshug

himanshug Jan 5, 2018

Contributor

same

@himanshug

himanshug Jan 5, 2018

Contributor

same

@JsonProperty("maxSegmentsInNodeLoadingQueue") int maxSegmentsInNodeLoadingQueue
@JsonProperty("killPendingSegmentsSkipList") Object killPendingSegmentsSkipList,
@JsonProperty("maxSegmentsInNodeLoadingQueue") int maxSegmentsInNodeLoadingQueue,
@JsonProperty("compactionConfigs") List<CoordinatorCompactionConfig> compactionConfigs,

This comment has been minimized.

@himanshug

himanshug Jan 5, 2018

Contributor

this needs to take Object for the same reason as that of killDataSourceWhitelist

@himanshug

himanshug Jan 5, 2018

Contributor

this needs to take Object for the same reason as that of killDataSourceWhitelist

This comment has been minimized.

@jihoonson

jihoonson Jan 6, 2018

Contributor

Hmm, CoordinatorDynamicConfigsResource provides an interface to enable/disable compaction for each dataSource like POST/GET/DELETE /druid/coordinator/v1/config/compaction/{dataSource}. Maybe the coordinator console can use this APIs?

@jihoonson

jihoonson Jan 6, 2018

Contributor

Hmm, CoordinatorDynamicConfigsResource provides an interface to enable/disable compaction for each dataSource like POST/GET/DELETE /druid/coordinator/v1/config/compaction/{dataSource}. Maybe the coordinator console can use this APIs?

This comment has been minimized.

@himanshug

himanshug Jan 7, 2018

Contributor

independent of /druid/coordinator/v1/config/compaction/{dataSource} , compaction config can also be updated via general coordinator dynamic config endpoint POST /druid/coordinator/v1/config... and coordinator console already has ability to do that.
e.g. after this PR... compaction config would show up in coordinator console and will be update-able there but it would fail because console doesn't have ability to send json list.

@himanshug

himanshug Jan 7, 2018

Contributor

independent of /druid/coordinator/v1/config/compaction/{dataSource} , compaction config can also be updated via general coordinator dynamic config endpoint POST /druid/coordinator/v1/config... and coordinator console already has ability to do that.
e.g. after this PR... compaction config would show up in coordinator console and will be update-able there but it would fail because console doesn't have ability to send json list.

This comment has been minimized.

@jihoonson

jihoonson Jan 9, 2018

Contributor

Hmm. I'm not much familiar with coffee script and node, but let's fix the coordinator console. Unlike killDataSourceWhitelist which allows a set of simple strings, compactionConfigs is a list of CoordinatorCompactionConfig each of which is a large JSON object. I think it's not a good UI if users should input configuration values as raw JSON objects.

@jihoonson

jihoonson Jan 9, 2018

Contributor

Hmm. I'm not much familiar with coffee script and node, but let's fix the coordinator console. Unlike killDataSourceWhitelist which allows a set of simple strings, compactionConfigs is a list of CoordinatorCompactionConfig each of which is a large JSON object. I think it's not a good UI if users should input configuration values as raw JSON objects.

This comment has been minimized.

@himanshug

himanshug Jan 10, 2018

Contributor

My intention wasn't really to change things on UI side and now I see your problem that its not a simple List<String>.
However this leads to a problem for users who would use auto compaction, they would not be able to update other coordinate dynamic configs from the console but will have to use manual POST requests for those too.

@himanshug

himanshug Jan 10, 2018

Contributor

My intention wasn't really to change things on UI side and now I see your problem that its not a simple List<String>.
However this leads to a problem for users who would use auto compaction, they would not be able to update other coordinate dynamic configs from the console but will have to use manual POST requests for those too.

This comment has been minimized.

@jihoonson

jihoonson Jan 10, 2018

Contributor

@gianm @himanshug ok. I'll make a new config object in a follow-up pr.

@jihoonson

jihoonson Jan 10, 2018

Contributor

@gianm @himanshug ok. I'll make a new config object in a follow-up pr.

This comment has been minimized.

@gianm

gianm Jan 10, 2018

Contributor

@jihoonson that's ok, but this follow up pr must be done before a release including this feature (to avoid config migration issues) so please raise an issue with milestone 0.13.0 before this current pr is committed.

@gianm

gianm Jan 10, 2018

Contributor

@jihoonson that's ok, but this follow up pr must be done before a release including this feature (to avoid config migration issues) so please raise an issue with milestone 0.13.0 before this current pr is committed.

This comment has been minimized.

@jihoonson

jihoonson Jan 10, 2018

Contributor

Raised #5242.

@jihoonson

jihoonson Jan 10, 2018

Contributor

Raised #5242.

This comment has been minimized.

@gianm

gianm Jan 10, 2018

Contributor

Thanks

@gianm

gianm Jan 10, 2018

Contributor

Thanks

This comment has been minimized.

@himanshug

himanshug Jan 10, 2018

Contributor

yep ... using separate space for storing compaction config sounds good to me. thanks.

@himanshug

himanshug Jan 10, 2018

Contributor

yep ... using separate space for storing compaction config sounds good to me. thanks.

@himanshug

This comment has been minimized.

Show comment
Hide comment
@himanshug

himanshug Jan 5, 2018

Contributor

looked till CompactionSegmentIterator class... will continue later

Contributor

himanshug commented Jan 5, 2018

looked till CompactionSegmentIterator class... will continue later

jihoonson added some commits Jan 9, 2018

@himanshug

This comment has been minimized.

Show comment
Hide comment
@himanshug

himanshug Jan 10, 2018

Contributor

forgot to see #5102 (comment) discussion before .
mentioning it here separately in case there are opinions on this.

Contributor

himanshug commented Jan 10, 2018

forgot to see #5102 (comment) discussion before .
mentioning it here separately in case there are opinions on this.

@jihoonson

This comment has been minimized.

Show comment
Hide comment
@jihoonson

jihoonson Jan 11, 2018

Contributor

@himanshug thanks for the review.

Contributor

jihoonson commented Jan 11, 2018

@himanshug thanks for the review.

@jihoonson jihoonson merged commit 241efaf into apache:master Jan 13, 2018

2 checks passed

Inspections: pull requests (Druid) TeamCity build finished
Details
continuous-integration/travis-ci/pr The Travis CI build passed
Details
@himanshug

This comment has been minimized.

Show comment
Hide comment
@himanshug

himanshug Jan 30, 2018

Contributor

@jihoonson is this PR rolling upgrade compatible? In many cases, users will upgrade Overlord first and then later Middle Manager . With Overlord running this patch, is it possible that runningTasks endpoint doesn't work when Overlord is restarted and taskType is null because MiddleManager announcements stored in zk wouldn't have it.
If that is true, we should probably ignore taskType being null in runningTasks endpoint instead of a NPE.

Contributor

himanshug commented Jan 30, 2018

@jihoonson is this PR rolling upgrade compatible? In many cases, users will upgrade Overlord first and then later Middle Manager . With Overlord running this patch, is it possible that runningTasks endpoint doesn't work when Overlord is restarted and taskType is null because MiddleManager announcements stored in zk wouldn't have it.
If that is true, we should probably ignore taskType being null in runningTasks endpoint instead of a NPE.

@jihoonson

This comment has been minimized.

Show comment
Hide comment
@jihoonson

jihoonson Jan 30, 2018

Contributor

@himanshug thanks for finding it! Raised #5309.

Contributor

jihoonson commented Jan 30, 2018

@himanshug thanks for finding it! Raised #5309.

@michas2 michas2 referenced this pull request May 29, 2018

Closed

Merging sharded segments #5809

jihoonson added a commit to implydata/druid that referenced this pull request Jun 11, 2018

Dylan1312 pushed a commit to spotxchange/druid that referenced this pull request Jul 4, 2018

Dylan1312 added a commit to spotxchange/druid that referenced this pull request Jul 4, 2018

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment