Skip to content

Commit

Permalink
[Druid] Pulling in 'Support assign tasks to run on different categori…
Browse files Browse the repository at this point in the history
…es of MiddleManagers apache#7066'

Summary:
Pulling in 'Support assign tasks to run on different categories of MiddleManagers apache#7066'

apache#7066

Reviewers: O1139 Druid, jwang

Reviewed By: O1139 Druid, jwang

Subscribers: jenkins, mleonard, #realtime-analytics

Differential Revision: https://phabricator.pinadmin.com/D651472
  • Loading branch information
itallam committed Dec 1, 2020
1 parent a0a0e49 commit 0a6e5d6
Show file tree
Hide file tree
Showing 35 changed files with 1,101 additions and 89 deletions.
60 changes: 60 additions & 0 deletions docs/configuration/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -983,6 +983,37 @@ useful if you want work evenly distributed across your MiddleManagers.
|`type`|`equalDistribution`.|required; must be `equalDistribution`|
|`affinityConfig`|[Affinity config](#affinity) object|null (no affinity)|

###### Equal Distribution With Category Spec

This strategy is a variant of `Equal Distribution`, which support `workerCategorySpec` field rather than `affinityConfig`. By specifying `workerCategorySpec`, you can assign tasks to run on different categories of MiddleManagers based on the tasks' **taskType** and **dataSource name**. This strategy can't work with `AutoScaler` since the behavior is undefined.

|Property|Description|Default|
|--------|-----------|-------|
|`type`|`equalDistributionWithCategorySpec`.|required; must be `equalDistributionWithCategorySpec`|
|`workerCategorySpec`|[Worker Category Spec](#workercategoryspec) object|null (no worker category spec)|

Example: specify tasks default to run on **c1** whose task
type is "index_kafka", while dataSource "ds1" run on **c2**.

```json
{
"selectStrategy": {
"type": "equalDistributionWithCategorySpec",
"workerCategorySpec": {
"strong": false,
"categoryMap": {
"index_kafka": {
"defaultCategory": "c1",
"categoryAffinity": {
"ds1": "c2"
}
}
}
}
}
}
```

###### Fill Capacity

Tasks are assigned to the worker with the most currently-running tasks at the time the task begins running. This is
Expand All @@ -997,6 +1028,17 @@ MiddleManagers up to capacity simultaneously, rather than a single MiddleManager
|`type`|`fillCapacity`.|required; must be `fillCapacity`|
|`affinityConfig`|[Affinity config](#affinity) object|null (no affinity)|

###### Fill Capacity With Category Spec

This strategy is a variant of `Fill Capacity`, which support `workerCategorySpec` field rather than `affinityConfig`. The usage is the same with _equalDistributionWithCategorySpec_ strategy. This strategy can't work with `AutoScaler` since the behavior is undefined.

|Property|Description|Default|
|--------|-----------|-------|
|`type`|`fillCapacityWithCategorySpec`.|required; must be `fillCapacityWithCategorySpec`|
|`workerCategorySpec`|[Worker Category Spec](#workercategoryspec) object|null (no worker category spec)|

> Before using the _equalDistributionWithCategorySpec_ and _fillCapacityWithCategorySpec_ strategies, you must upgrade overlord and all MiddleManagers to the version that support this feature.
<a name="javascript-worker-select-strategy"></a>

###### JavaScript
Expand Down Expand Up @@ -1033,6 +1075,23 @@ field. If not provided, the default is to not use affinity at all.
|`affinity`|JSON object mapping a datasource String name to a list of indexing service MiddleManager host:port String values. Druid doesn't perform DNS resolution, so the 'host' value must match what is configured on the MiddleManager and what the MiddleManager announces itself as (examine the Overlord logs to see what your MiddleManager announces itself as).|{}|
|`strong`|With weak affinity (the default), tasks for a dataSource may be assigned to other MiddleManagers if their affinity-mapped MiddleManagers are not able to run all pending tasks in the queue for that dataSource. With strong affinity, tasks for a dataSource will only ever be assigned to their affinity-mapped MiddleManagers, and will wait in the pending queue if necessary.|false|

###### WorkerCategorySpec

WorkerCategorySpec can be provided to the _equalDistributionWithCategorySpec_ and _fillCapacityWithCategorySpec_ strategies using the "workerCategorySpec"
field. If not provided, the default is to not use it at all.

|Property|Description|Default|
|--------|-----------|-------|
|`categoryMap`|A JSON map object mapping a task type String name to a [CategoryConfig](#categoryconfig) object, by which you can specify category config for different task type.|{}|
|`strong`|With weak workerCategorySpec (the default), tasks for a dataSource may be assigned to other MiddleManagers if the MiddleManagers specified in `categoryMap` are not able to run all pending tasks in the queue for that dataSource. With strong workerCategorySpec, tasks for a dataSource will only ever be assigned to their specified MiddleManagers, and will wait in the pending queue if necessary.|false|

###### CategoryConfig

|Property|Description|Default|
|--------|-----------|-------|
|`defaultCategory`|Specify default category for a task type.|null|
|`categoryAffinity`|A JSON map object mapping a datasource String name to a category String name of the MiddleManager. If category isn't specified for a datasource, then using the `defaultCategory`. If no specified category and the `defaultCategory` is also null, then tasks can run on any available MiddleManagers.|null|

##### Autoscaler

Amazon's EC2 is currently the only supported autoscaler.
Expand Down Expand Up @@ -1084,6 +1143,7 @@ Middle managers pass their configurations down to their child peons. The MiddleM
|`druid.worker.ip`|The IP of the worker.|localhost|
|`druid.worker.version`|Version identifier for the MiddleManager.|0|
|`druid.worker.capacity`|Maximum number of tasks the MiddleManager can accept.|Number of available processors - 1|
|`druid.worker.category`|A string to name the category that the MiddleManager node belongs to.|`__default_worker_category`|

#### Peon Processing

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.druid.indexing.overlord.setup.WorkerBehaviorConfig;
import org.apache.druid.indexing.overlord.setup.WorkerSelectStrategy;
import org.apache.druid.indexing.worker.Worker;
import org.apache.druid.indexing.worker.config.WorkerConfig;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
import org.apache.druid.java.util.emitter.EmittingLogger;
Expand Down Expand Up @@ -477,7 +478,7 @@ private static ImmutableWorkerInfo workerWithTask(ImmutableWorkerInfo immutableW
private static ImmutableWorkerInfo createDummyWorker(String scheme, String host, int capacity, String version)
{
return new ImmutableWorkerInfo(
new Worker(scheme, host, "-2", capacity, version),
new Worker(scheme, host, "-2", capacity, version, WorkerConfig.DEFAULT_CATEGORY),
0,
new HashSet<>(),
new HashSet<>(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import org.apache.druid.indexing.overlord.setup.WorkerSelectStrategy;
import org.apache.druid.indexing.worker.TaskAnnouncement;
import org.apache.druid.indexing.worker.Worker;
import org.apache.druid.indexing.worker.config.WorkerConfig;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
Expand Down Expand Up @@ -484,7 +485,8 @@ private Worker toWorker(DiscoveryDruidNode node)
node.getDruidNode().getHostAndPortToUse(),
((WorkerNodeService) node.getServices().get(WorkerNodeService.DISCOVERY_SERVICE_KEY)).getIp(),
((WorkerNodeService) node.getServices().get(WorkerNodeService.DISCOVERY_SERVICE_KEY)).getCapacity(),
((WorkerNodeService) node.getServices().get(WorkerNodeService.DISCOVERY_SERVICE_KEY)).getVersion()
((WorkerNodeService) node.getServices().get(WorkerNodeService.DISCOVERY_SERVICE_KEY)).getVersion(),
WorkerConfig.DEFAULT_CATEGORY
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,8 @@ public ImmutableWorkerInfo toImmutable()
worker.getHost(),
worker.getIp(),
worker.getCapacity(),
""
"",
worker.getCategory()
);
}
w = disabledWorker;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.indexing.overlord.setup;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.ImmutableWorkerInfo;
import org.apache.druid.indexing.overlord.config.WorkerTaskRunnerConfig;

import javax.annotation.Nullable;
import java.util.Objects;

public class EqualDistributionWithCategorySpecWorkerSelectStrategy implements WorkerSelectStrategy
{
private final WorkerCategorySpec workerCategorySpec;

@JsonCreator
public EqualDistributionWithCategorySpecWorkerSelectStrategy(
@JsonProperty("workerCategorySpec") WorkerCategorySpec workerCategorySpec
)
{
this.workerCategorySpec = workerCategorySpec;
}

@JsonProperty
public WorkerCategorySpec getWorkerCategorySpec()
{
return workerCategorySpec;
}

@Nullable
@Override
public ImmutableWorkerInfo findWorkerForTask(
final WorkerTaskRunnerConfig config,
final ImmutableMap<String, ImmutableWorkerInfo> zkWorkers,
final Task task
)
{
return WorkerSelectUtils.selectWorker(
task,
zkWorkers,
config,
workerCategorySpec,
EqualDistributionWorkerSelectStrategy::selectFromEligibleWorkers
);
}

@Override
public boolean equals(final Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final EqualDistributionWithCategorySpecWorkerSelectStrategy that = (EqualDistributionWithCategorySpecWorkerSelectStrategy) o;
return Objects.equals(workerCategorySpec, that.workerCategorySpec);
}

@Override
public int hashCode()
{
return Objects.hash(workerCategorySpec);
}

@Override
public String toString()
{
return "EqualDistributionWithCategorySpecWorkerSelectStrategy{" +
"workerCategorySpec=" + workerCategorySpec +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public ImmutableWorkerInfo findWorkerForTask(
);
}

private static ImmutableWorkerInfo selectFromEligibleWorkers(final Map<String, ImmutableWorkerInfo> eligibleWorkers)
static ImmutableWorkerInfo selectFromEligibleWorkers(final Map<String, ImmutableWorkerInfo> eligibleWorkers)
{
return eligibleWorkers.values().stream().max(
Comparator.comparing(ImmutableWorkerInfo::getAvailableCapacity)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.indexing.overlord.setup;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.ImmutableWorkerInfo;
import org.apache.druid.indexing.overlord.config.WorkerTaskRunnerConfig;

import javax.annotation.Nullable;
import java.util.Objects;

public class FillCapacityWithCategorySpecWorkerSelectStrategy implements WorkerSelectStrategy
{
private final WorkerCategorySpec workerCategorySpec;

@JsonCreator
public FillCapacityWithCategorySpecWorkerSelectStrategy(
@JsonProperty("workerCategorySpec") WorkerCategorySpec workerCategorySpec
)
{
this.workerCategorySpec = workerCategorySpec;
}

@JsonProperty
public WorkerCategorySpec getWorkerCategorySpec()
{
return workerCategorySpec;
}

@Nullable
@Override
public ImmutableWorkerInfo findWorkerForTask(
final WorkerTaskRunnerConfig config,
final ImmutableMap<String, ImmutableWorkerInfo> zkWorkers,
final Task task
)
{
return WorkerSelectUtils.selectWorker(
task,
zkWorkers,
config,
workerCategorySpec,
FillCapacityWorkerSelectStrategy::selectFromEligibleWorkers
);
}

@Override
public boolean equals(final Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final FillCapacityWithCategorySpecWorkerSelectStrategy that = (FillCapacityWithCategorySpecWorkerSelectStrategy) o;
return Objects.equals(workerCategorySpec, that.workerCategorySpec);
}

@Override
public int hashCode()
{
return Objects.hash(workerCategorySpec);
}

@Override
public String toString()
{
return "FillCapacityWithCategorySpecWorkerSelectStrategy{" +
"workerCategorySpec=" + workerCategorySpec +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public ImmutableWorkerInfo findWorkerForTask(
);
}

private static ImmutableWorkerInfo selectFromEligibleWorkers(final Map<String, ImmutableWorkerInfo> eligibleWorkers)
static ImmutableWorkerInfo selectFromEligibleWorkers(final Map<String, ImmutableWorkerInfo> eligibleWorkers)
{
return eligibleWorkers.values().stream().max(
Comparator.comparing(ImmutableWorkerInfo::getCurrCapacityUsed)
Expand Down
Loading

0 comments on commit 0a6e5d6

Please sign in to comment.