Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support assign tasks to run on different categories of MiddleManagers #7066

Merged
merged 10 commits into from
Oct 17, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 58 additions & 0 deletions docs/content/configuration/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -1028,6 +1028,37 @@ useful if you want work evenly distributed across your middleManagers.
|`type`|`equalDistribution`.|required; must be `equalDistribution`|
|`affinityConfig`|[Affinity config](#affinity) object|null (no affinity)|

###### Equal Distribution With Tier Spec

This strategy is a variant of `Equal Distribution`, which support `workerTierSpec` field rather than `affinityConfig`. By specifying `workerTierSpec`, you can assign tasks to run on different tiers of MiddleManagers based on the tasks' **taskType** and **dataSource name**.

|Property|Description|Default|
|--------|-----------|-------|
|`type`|`equalDistributionWithTierSpec`.|required; must be `equalDistributionWithTierSpec`|
|`workerTierSpec`|[Worker Tier Spec](#workerTierSpec) object|null (no worker tier spec)|

Example: specify tasks default to run on _tier1_ whose task
type is "index_kafka", while dataSource "ds1" run on _tier2_.

```json
{
"selectStrategy": {
"type": "equalDistributionWithTierSpec",
"workerTierSpec": {
"strong": false,
"tierMap": {
"index_kafka": {
"defaultTier": "tier1",
"tiers": {
egor-ryashin marked this conversation as resolved.
Show resolved Hide resolved
"ds1": "tier2"
}
}
}
}
}
}
```

###### Fill Capacity

Tasks are assigned to the worker with the most currently-running tasks at the time the task begins running. This is
Expand All @@ -1042,6 +1073,15 @@ middleManagers up to capacity simultaneously, rather than a single middleManager
|`type`|`fillCapacity`.|required; must be `fillCapacity`|
|`affinityConfig`|[Affinity config](#affinity) object|null (no affinity)|

###### Fill Capacity With Tier Spec

This strategy is a variant of `Fill Capacity`, which support `workerTierSpec` field rather than `affinityConfig`. The usage is the same with _equalDistributionWithTierSpec_ strategy.

|Property|Description|Default|
|--------|-----------|-------|
|`type`|`fillCapacityWithTierSpec`.|required; must be `fillCapacityWithTierSpec`|
|`workerTierSpec`|[Worker Tier Spec](#workerTierSpec) object|null (no worker tier spec)|

###### Javascript<a id="javascript-worker-select-strategy"></a>

Allows defining arbitrary logic for selecting workers to run task using a JavaScript function.
Expand Down Expand Up @@ -1078,6 +1118,23 @@ field. If not provided, the default is to not use affinity at all.
|`affinity`|JSON object mapping a datasource String name to a list of indexing service middleManager host:port String values. Druid doesn't perform DNS resolution, so the 'host' value must match what is configured on the middleManager and what the middleManager announces itself as (examine the Overlord logs to see what your middleManager announces itself as).|{}|
|`strong`|With weak affinity (the default), tasks for a dataSource may be assigned to other middleManagers if their affinity-mapped middleManagers are not able to run all pending tasks in the queue for that dataSource. With strong affinity, tasks for a dataSource will only ever be assigned to their affinity-mapped middleManagers, and will wait in the pending queue if necessary.|false|

###### WorkerTierSpec

WorkerTierSpec can be provided to the _equalDistributionWithTierSpec_ and _fillCapacityWithTierSpec_ strategies using the "workerTierSpec"
field. If not provided, the default is to not use it at all.

|Property|Description|Default|
|--------|-----------|-------|
|`tierMap`|A JSON map object mapping a task type String name to a [TierConfig](tierConfig) object, by which you can specify tier config for different task type.|{}|
|`strong`|With weak workerTierSpec (the default), tasks for a dataSource may be assigned to other middleManagers if the middleManagers specified in `tierMap` are not able to run all pending tasks in the queue for that dataSource. With strong workerTierSpec, tasks for a dataSource will only ever be assigned to their specified middleManagers, and will wait in the pending queue if necessary.|false|

###### TierConfig

|Property|Description|Default|
|--------|-----------|-------|
|`defaultTier`|Specify default tier for a task type.|null|
|`tiers`|A JSON map object mapping a datasource String name to a tier String name of the MiddleManager. If tier isn't specified for a datasource, then using the `defaultTier`. If no specified tier and the `defaultTier` is also null, then tasks can run on any available MiddleManagers.|null|

##### Autoscaler

Amazon's EC2 is currently the only supported autoscaler.
Expand Down Expand Up @@ -1127,6 +1184,7 @@ Middle managers pass their configurations down to their child peons. The MiddleM
|`druid.worker.ip`|The IP of the worker.|localhost|
|`druid.worker.version`|Version identifier for the MiddleManager.|0|
|`druid.worker.capacity`|Maximum number of tasks the MiddleManager can accept.|Number of available processors - 1|
|`druid.worker.tier`|A string to name the tier that the MiddleManager node belongs to.|`_default_worker_tier`|

#### Peon Processing

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.druid.indexing.overlord.setup.WorkerBehaviorConfig;
import org.apache.druid.indexing.overlord.setup.WorkerSelectStrategy;
import org.apache.druid.indexing.worker.Worker;
import org.apache.druid.indexing.worker.config.WorkerConfig;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
import org.apache.druid.java.util.emitter.EmittingLogger;
Expand Down Expand Up @@ -477,7 +478,7 @@ private static ImmutableWorkerInfo workerWithTask(ImmutableWorkerInfo immutableW
private static ImmutableWorkerInfo createDummyWorker(String scheme, String host, int capacity, String version)
{
return new ImmutableWorkerInfo(
new Worker(scheme, host, "-2", capacity, version),
new Worker(scheme, host, "-2", capacity, version, WorkerConfig.DEFAULT_TIER),
0,
new HashSet<>(),
new HashSet<>(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -489,7 +489,8 @@ private Worker toWorker(DiscoveryDruidNode node)
node.getDruidNode().getHostAndPortToUse(),
((WorkerNodeService) node.getServices().get(WorkerNodeService.DISCOVERY_SERVICE_KEY)).getIp(),
((WorkerNodeService) node.getServices().get(WorkerNodeService.DISCOVERY_SERVICE_KEY)).getCapacity(),
((WorkerNodeService) node.getServices().get(WorkerNodeService.DISCOVERY_SERVICE_KEY)).getVersion()
((WorkerNodeService) node.getServices().get(WorkerNodeService.DISCOVERY_SERVICE_KEY)).getVersion(),
((WorkerNodeService) node.getServices().get(WorkerNodeService.DISCOVERY_SERVICE_KEY)).getTier()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,8 @@ public ImmutableWorkerInfo toImmutable()
worker.getHost(),
worker.getIp(),
worker.getCapacity(),
""
"",
worker.getTier()
);
}
w = disabledWorker;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.indexing.overlord.setup;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.ImmutableWorkerInfo;
import org.apache.druid.indexing.overlord.config.WorkerTaskRunnerConfig;

import javax.annotation.Nullable;
import java.util.Objects;

public class EqualDistributionWithTierSpecWorkerSelectStrategy implements WorkerSelectStrategy
{
private final WorkerTierSpec workerTierSpec;

@JsonCreator
public EqualDistributionWithTierSpecWorkerSelectStrategy(
@JsonProperty("workerTierSpec") WorkerTierSpec workerTierSpec
)
{
this.workerTierSpec = workerTierSpec;
}

@JsonProperty
public WorkerTierSpec getWorkerTierSpec()
{
return workerTierSpec;
}

@Nullable
@Override
public ImmutableWorkerInfo findWorkerForTask(
final WorkerTaskRunnerConfig config,
final ImmutableMap<String, ImmutableWorkerInfo> zkWorkers,
final Task task
)
{
return WorkerSelectUtils.selectWorker(
task,
zkWorkers,
config,
workerTierSpec,
EqualDistributionWorkerSelectStrategy::selectFromEligibleWorkers
);
}

@Override
public boolean equals(final Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final EqualDistributionWithTierSpecWorkerSelectStrategy that = (EqualDistributionWithTierSpecWorkerSelectStrategy) o;
return Objects.equals(workerTierSpec, that.workerTierSpec);
}

@Override
public int hashCode()
{
return Objects.hash(workerTierSpec);
}

@Override
public String toString()
{
return "EqualDistributionWithTierSpecWorkerSelectStrategy{" +
"workerTierSpec=" + workerTierSpec +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public ImmutableWorkerInfo findWorkerForTask(
);
}

private static ImmutableWorkerInfo selectFromEligibleWorkers(final Map<String, ImmutableWorkerInfo> eligibleWorkers)
static ImmutableWorkerInfo selectFromEligibleWorkers(final Map<String, ImmutableWorkerInfo> eligibleWorkers)
{
return eligibleWorkers.values().stream().max(
Comparator.comparing(ImmutableWorkerInfo::getAvailableCapacity)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.indexing.overlord.setup;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.ImmutableWorkerInfo;
import org.apache.druid.indexing.overlord.config.WorkerTaskRunnerConfig;

import javax.annotation.Nullable;
import java.util.Objects;

public class FillCapacityWithTierSpecWorkerSelectStrategy implements WorkerSelectStrategy
{
private final WorkerTierSpec workerTierSpec;

@JsonCreator
public FillCapacityWithTierSpecWorkerSelectStrategy(
@JsonProperty("workerTierSpec") WorkerTierSpec workerTierSpec
)
{
this.workerTierSpec = workerTierSpec;
}

@JsonProperty
public WorkerTierSpec getWorkerTierSpec()
{
return workerTierSpec;
}

@Nullable
@Override
public ImmutableWorkerInfo findWorkerForTask(
final WorkerTaskRunnerConfig config,
final ImmutableMap<String, ImmutableWorkerInfo> zkWorkers,
final Task task
)
{
return WorkerSelectUtils.selectWorker(
task,
zkWorkers,
config,
workerTierSpec,
FillCapacityWorkerSelectStrategy::selectFromEligibleWorkers
);
}

@Override
public boolean equals(final Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final FillCapacityWithTierSpecWorkerSelectStrategy that = (FillCapacityWithTierSpecWorkerSelectStrategy) o;
return Objects.equals(workerTierSpec, that.workerTierSpec);
}

@Override
public int hashCode()
{
return Objects.hash(workerTierSpec);
}

@Override
public String toString()
{
return "FillCapacityWithTierSpecWorkerSelectStrategy{" +
"workerTierSpec=" + workerTierSpec +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public ImmutableWorkerInfo findWorkerForTask(
);
}

private static ImmutableWorkerInfo selectFromEligibleWorkers(final Map<String, ImmutableWorkerInfo> eligibleWorkers)
static ImmutableWorkerInfo selectFromEligibleWorkers(final Map<String, ImmutableWorkerInfo> eligibleWorkers)
{
return eligibleWorkers.values().stream().max(
Comparator.comparing(ImmutableWorkerInfo::getCurrCapacityUsed)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@
@JsonSubTypes.Type(name = "fillCapacityWithAffinity", value = FillCapacityWithAffinityWorkerSelectStrategy.class),
@JsonSubTypes.Type(name = "equalDistribution", value = EqualDistributionWorkerSelectStrategy.class),
@JsonSubTypes.Type(name = "equalDistributionWithAffinity", value = EqualDistributionWithAffinityWorkerSelectStrategy.class),
@JsonSubTypes.Type(name = "javascript", value = JavaScriptWorkerSelectStrategy.class)
@JsonSubTypes.Type(name = "javascript", value = JavaScriptWorkerSelectStrategy.class),
@JsonSubTypes.Type(name = "fillCapacityWithTierSpec", value = FillCapacityWithTierSpecWorkerSelectStrategy.class),
@JsonSubTypes.Type(name = "equalDistributionWithTierSpec", value = EqualDistributionWithTierSpecWorkerSelectStrategy.class)
})
@PublicApi
public interface WorkerSelectStrategy
Expand Down
Loading