Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support assign tasks to run on different categories of MiddleManagers #7066

Merged
merged 10 commits into from
Oct 17, 2019

Conversation

QiuMM
Copy link
Member

@QiuMM QiuMM commented Feb 13, 2019

Motivation

  • Tasks with different task type might need different resource to run, i.e realtime task need much CPU and memory than Hadoop batch task and kill task. If all the MiddleManager workers share the same resource configuration will cause much waste.

  • Support cross data center, different datasource‘s task might want to run on different data center.

  • Current worker select strategy use affinityConfig to specify different datasource running on different workers. However, using this way we need to maintain the workers' ip and port information, this is bad since the ip and port might change, especially if we run MiddleManagers on cloud. Besides, we can't specify tasks to run on different workers based on tasks' task type when using affinityConfig.

Solution

Separating MiddleManager nodes into categories, then we can run tasks in different categories based on tasks' task type and datasource name. First, add a category property for the MiddleManager worker. Then, implement WorkerSelectStrategy to support assign tasks to run on different categories. In this PR, providing two select strategy which naming EqualDistributionWithCategorySpecWorkerSelectStrategy and FillCapacityWithCategorySpecWorkerSelectStrategy . You can review the docs in this PR to know the details of usage.

@egor-ryashin
Copy link
Contributor

I suppose AutoScaler can provision only one type of workers still?

docs/content/configuration/index.md Outdated Show resolved Hide resolved
{
private final String defaultTier;
// key: datasource, value: tier
private final Map<String, String> tiers;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tiersByDatasources ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about datasourceToTier or datasource2Tier?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@QiuMM I think there's no strong argument for choosing between valueByKey or keyToValue template, I would take the one which is more common throughout the project.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of supporting only a fixed usecase like tiers per datasource or tiers per indexing type, why not just support a "tier" property within the "context" section of an ingestion spec?

This would allow for arbitrary tier mappings including the ones you have in mind.
The context section is already used for those purposes in the context of queries to set priorities, timeouts etc. I think it would therefore also be a valid design choice to support standard context properties for ingestion tasks.
`
{
"type": "index_hadoop",
"context": {
"tier": "my-very-personal-tier-token",
...
}
...
}

Good point about the autoscaler. its interface should then probably also receive a tier name for which a scale operation should happen...

Copy link
Member Author

@QiuMM QiuMM Mar 29, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sascha-coenen thanks for your suggestion. I had also considered this, but it would involve much more code modification and we may need to change the current design of WorkerSelectStrategy as well as AutoScaler, which is not worth to pay so much effort. My solution is undoubtedly logical within current architecture. And it's enough to use I think.

),
true
);
final FillCapacityWithTierSpecWorkerSelectStrategy strategy = new FillCapacityWithTierSpecWorkerSelectStrategy(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems this block is the same everywhere

final FillCapacityWithTierSpecWorkerSelectStrategy strategy = new FillCapacityWithTierSpecWorkerSelectStrategy(
       workerTierSpec);

   ImmutableWorkerInfo worker = strategy.findWorkerForTask(
       new RemoteTaskRunnerConfig(),
       WORKERS_FOR_TIER_TESTS,
       new NoopTask(null, "ds1", 1, 0, null, null, null)
   );

Extracting a method will make the test class look more consice.

@QiuMM
Copy link
Member Author

QiuMM commented Feb 15, 2019

I suppose AutoScaler can provision only one type of workers still?

Thanks for your reminder. I have not considered this since I have never used AutoScaler. It need to support tier for AutoScaler since different tier might have different resource configuration, I have no idea about current auto scale strategy would use which tier's configuration to create new workers.

@fjy fjy added this to the 0.15.0 milestone Mar 11, 2019
@a2l007
Copy link
Contributor

a2l007 commented May 2, 2019

Hello @QiuMM just checking in to see if you've plans to continue working on this PR. We have a few usecases internally where this feature would fit in nicely.

@QiuMM
Copy link
Member Author

QiuMM commented May 5, 2019

@a2l007 I'm going to complete this PR in next week.

@jihoonson
Copy link
Contributor

@QiuMM thank you for working on this issue. I'm untagging milestone since this issue is not necessarily a release blocker. Feel free to let me know if you think this should be.

@jihoonson jihoonson removed this from the 0.15.0 milestone May 7, 2019
@QiuMM
Copy link
Member Author

QiuMM commented May 17, 2019

@egor-ryashin sorry for the delay, I have made some changes.

@leventov
Copy link
Member

@QiuMM did you consider naming different from "tier"? I'm concerned with the collision in names with historical tiers, it may be confusing. Maybe call it "class", or "kind", or "category"?

@QiuMM
Copy link
Member Author

QiuMM commented May 17, 2019

@leventov sounds reasonable. I'm not sure which one is better, maybe "category"?

Copy link
Contributor

@himanshug himanshug left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you need this patch because you're running into this on your cluster and this patch is already running on your cluster ? (sorry it was not clear from the PR description and I am generally more in favor of features that come out of real world production needs :) )

FWIW: use of name "tier" wasn't confusing to me when I read the PR title due to the context of MM and I wouldn't be against keeping it. but of course YMMV.

.stream()
.filter(worker -> worker.canRunTask(task)
&& worker.isValidVersion(workerTaskRunnerConfig.getMinWorkerVersion()))
.collect(Collectors.toMap(w -> w.getWorker().getHost(), Function.identity()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: would be nice to extract this into a separate method from both versions of selectWorker(..) as this is a mouthful.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

.collect(Collectors.toMap(w -> w.getWorker().getHost(), Function.identity()));

// select worker according to worker tier spec
if (workerTierSpec != null && workerTierSpec.getTierMap() != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe default the tierMap in WorkerTierSpec to be Collections.EMPTY_MAP so that second null check here is not needed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

final WorkerTierSpec.TierConfig tierConfig = workerTierSpec.getTierMap()
.getOrDefault(
task.getType(),
new WorkerTierSpec.TierConfig(null, null)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why create a TierConfig with nulls when we could simply next check if(tierConfig == null) and do the default action which would happen anyway.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

final Map<String, String> tierAffinity = tierConfig.getTierAffinity();

// select worker from preferred tier
final String preferredTier = tierAffinity != null ? tierAffinity.get(task.getDataSource()) : defaultTier;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe default the tierAffinity in TierConfig to be Collections.EMPTY_MAP so that null check here is not needed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

@JsonProperty("strong") boolean strong
)
{
this.tierMap = tierMap;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to remove null checks in other places...

Suggested change
this.tierMap = tierMap;
this.tierMap = tierMap == null ? Collections.EMPTY_MAP : tierMap;

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

)
{
this.defaultTier = defaultTier;
this.tierAffinity = tierAffinity;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to remove null checks in other places...

Suggested change
this.tierAffinity = tierAffinity;
this. tierAffinity = tierAffinity == null ? Collections.EMPTY_MAP : tierAffinity;

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

)
{
this.ip = ip;
this.capacity = capacity;
this.version = version;
this.tier = tier;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Many Druid users follow rolling upgrade (as described in http://druid.io/docs/latest/operations/rolling-updates.html ) . If you only upgrade overlord then, at overlord, tier is going to be null which would lead to a NPE in WorkerSelectUtils.getTierWorkers(..) at line "Maps.filterValues(workerMap, workerInfo -> workerInfo.getWorker().getTier().equals(tier))" and maybe in some other places as well.
that should probably be fixed or something needs to go in release nodes along the lines of making sure that new tier based strategies are only to be used when there has been at least one restart of overlord after all middleManager upgrade (due to second cluster upgrade or forced restart of overlord after MM upgrade or whatever)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If users want to use this feature, they must upgrade overlord and all middleManager ( follow the rolling upgrade is ok). I have noted this in the document. An extra restart of overlord is not necessary.

@QiuMM
Copy link
Member Author

QiuMM commented May 18, 2019

@himanshug yes, this patch is already running on my cluster.

@leventov
Copy link
Member

@QiuMM "category" sounds good.

@himanshug
Copy link
Contributor

@QiuMM great, thanks.

@stale
Copy link

stale bot commented Jul 19, 2019

This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 4 weeks if no further activity occurs. If you think that's incorrect or this pull request should instead be reviewed, please simply write any comment. Even if closed, you can still revive the PR at any time or discuss it on the dev@druid.apache.org list. Thank you for your contributions.

@stale stale bot added the stale label Jul 19, 2019
@clintropolis
Copy link
Member

Hi @QiuMM, any chance you have some time to spare to resolve conflicts and address outstanding review? This seems like a rather useful feature for Druid and not so far from being finished. I would hate to see it get killed by the stale bot.

@stale
Copy link

stale bot commented Aug 9, 2019

This issue is no longer marked as stale.

@stale stale bot removed the stale label Aug 9, 2019
@a2l007
Copy link
Contributor

a2l007 commented Aug 22, 2019

@QiuMM It would be great if we could push this in 0.16.0 as we're planning to use this feature once it is available. Would it be possible for you to finish this up at your earliest convenience?

@gianm
Copy link
Contributor

gianm commented Sep 19, 2019

Hey @QiuMM, it looks like of all the open PRs, this one has the most positive reactions up top ❤️

Any chance you are interested in wrapping it up?

@sascha-coenen
Copy link

sascha-coenen commented Sep 20, 2019

@gianm @QiuMM I'm very interested in seeing this feature happen.
If there is nobody who can bring this effort across the finishing line, I could put two people onto this task. However, these people have not made any code contribution yet and would need to work themselves in.

@QiuMM
Copy link
Member Author

QiuMM commented Sep 26, 2019

Oops, I have forgotten this, sorry. I'll update this week @sascha-coenen @gianm

@QiuMM
Copy link
Member Author

QiuMM commented Sep 29, 2019

I'm buried in work this week. I'll update in next week ):

@QiuMM QiuMM changed the title Support assign tasks to run on different tiers of MiddleManagers Support assign tasks to run on different categorys of MiddleManagers Oct 7, 2019
@QiuMM QiuMM changed the title Support assign tasks to run on different categorys of MiddleManagers Support assign tasks to run on different categories of MiddleManagers Oct 7, 2019
@VladimirIordanov
Copy link

VladimirIordanov commented Oct 8, 2019

I suppose AutoScaler can provision only one type of workers still?

That make sense to me. I think realtime tasks can require a cluster with not the same configuration as for batch ones. So probably overlord could accept an array of worker config specs via dynamic configuration API call.
I looked into the code and found that current ProvisioningStrategy implementations could be changed to select available Autoscaler from DefaultWorkerBehaviorConfig (it also should be changed to support multi-autoscalers declaration).
I could contribute such changes to code if it makes sense for everybody.

@QiuMM
Copy link
Member Author

QiuMM commented Oct 8, 2019

I could contribute such changes to code if it makes sense for everybody.

@VladimirIordanov Great! I have never used AutoScaler, I think we can finish this PR first, then you can open a new PR to add features you want.

@QiuMM
Copy link
Member Author

QiuMM commented Oct 8, 2019

The CI build failed:

Site built successfully. Generated files in 'build' folder.
Could not find self anchor '#WorkerCategorySpec' in './build/ApacheDruid/docs/configuration/index.html'
Could not find self anchor '#WorkerCategorySpec' in './build/ApacheDruid/docs/configuration/index.html'
Could not find self anchor '#CategoryConfig' in './build/ApacheDruid/docs/configuration/index.html'
There are 3 issues

I have checked the doc and can't find any error, can anyone help me?

Copy link
Member

@clintropolis clintropolis left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

overall lgtm 👍

|Property|Description|Default|
|--------|-----------|-------|
|`type`|`equalDistributionWithCategorySpec`.|required; must be `equalDistributionWithCategorySpec`|
|`workerCategorySpec`|[Worker Category Spec](#WorkerCategorySpec) object|null (no worker category spec)|
Copy link
Member

@clintropolis clintropolis Oct 11, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it looks like these links should be all lower case, e.g. #workercategoryspec, I think that should fix CI

Copy link
Contributor

@himanshug himanshug left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM when build is successful. Thanks @QiuMM

@clintropolis
Copy link
Member

Aw, now it is triggering false spelling errors for config names: https://travis-ci.org/apache/incubator-druid/jobs/596417943#L274

can add entries here https://github.com/apache/incubator-druid/blob/master/website/.spelling to make it ignore it, I'm +1 after CI too though

@QiuMM
Copy link
Member Author

QiuMM commented Oct 13, 2019

@clintropolis It seems didn't work.

@clintropolis
Copy link
Member

@clintropolis It seems didn't work.

Oops, I guess you added it to the wrong part of the file so the exclusions are applying to

 - ../docs/tutorials/index.md

The first part of the file is exclusions that apply to everything and then for some reason after that we have file specific exclusions, but not really sure why we have it split up like that instead of just a big global list.

@QiuMM
Copy link
Member Author

QiuMM commented Oct 17, 2019

The CI have passed, thank you very much @clintropolis .

@glasser
Copy link
Contributor

glasser commented Nov 15, 2019

Very excited for this! We are working on making it easier to do rolling upgrades of our middlemanagers. As part of that we're limiting the task durations of our index_kafka tasks and are using the middlemanager disable API to gracefully drain them. But it's hard to limit the duration of an index_parallel task. With this API we'll be able to run our index_parallel tasks in their own middlemanager, which can also use less resources because index_parallel is a pretty non-resource-intensive task.

@sascha-coenen
Copy link

Just FYI to whoever showed interest in this feature:
Along with introducing several worker categories, one also needs to be able to autoscale each worker category separately.

The following proposal is adding support for this:
#8695

An implementation is available too here: #8989
It should be good for early review and might receive some finishing touches like documentation / javadoc

If you are interested in seeing this feature in Druid as well, feel free to take a look and upvote the proposal.

sachinsagare added a commit to pinterest/druid that referenced this pull request Apr 28, 2022
81344f9	add metrics for active processing thread count and thread pending task.
135fcc9	Add metrics for historical intermediate result buffer available count & max processing thread count
3f2604d	Add support to attempt converting to top n query when two group by dimensions present
0a6e5d6	"Pulling in 'Support assign tasks to run on different categories of MiddleManagers apache#7066'
apache#7066"
debasatwa29 pushed a commit to debasatwa29/druid that referenced this pull request Jun 2, 2022
…es of MiddleManagers apache#7066'

Summary:
Pulling in 'Support assign tasks to run on different categories of MiddleManagers apache#7066'

apache#7066

Reviewers: O1139 Druid, jwang

Reviewed By: O1139 Druid, jwang

Subscribers: jenkins, mleonard, #realtime-analytics

Differential Revision: https://phabricator.pinadmin.com/D651472
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet