Skip to content

Commit

Permalink
refactoring Selector class
Browse files Browse the repository at this point in the history
  • Loading branch information
YongGang committed Jun 12, 2024
1 parent 18432bf commit 57e2831
Show file tree
Hide file tree
Showing 11 changed files with 400 additions and 493 deletions.
59 changes: 23 additions & 36 deletions docs/development/extensions-contrib/k8s-jobs.md
Original file line number Diff line number Diff line change
Expand Up @@ -223,9 +223,9 @@ The Dynamic Pod Template Selection feature enhances the K8s extension by enablin
|Property|Description|Default|
|--------|-----------|-------|
|`TaskTypePodTemplateSelectStrategy`| This strategy selects pod templates based on task type for execution purposes, implementing the behavior that maps templates to specific task types. | true |
|`TaskPropertiesPodTemplateSelectStrategy`| This strategy evaluates a series of selectors, known as `templateSelectors`, which are aligned with potential task properties. | false |
|`SelectorBasedPodTemplateSelectStrategy`| This strategy evaluates a series of selectors, known as `selectors`, which are aligned with potential task properties. | false |

`TaskPropertiesPodTemplateSelectStrategy`, the strategy implementing this new feature, is based on conditional selectors `templateSelectors` that match against task properties. These selectors are ordered in the dynamic configuration, with the first selector given the highest priority during the evaluation process. This means that the selection process uses these ordered conditions to determine a task’s Pod template based on context tags and task fields. The first matching condition immediately determines the Pod template, thereby prioritizing certain configurations over others.
`SelectorBasedPodTemplateSelectStrategy`, the strategy implementing this new feature, is based on conditional `selectors` that match against top-level keys from the task payload. Currently, it supports matching based on task context tags, task type, and dataSource. These selectors are ordered in the dynamic configuration, with the first selector given the highest priority during the evaluation process. This means that the selection process uses these ordered conditions to determine a task’s Pod template. The first matching condition immediately determines the Pod template, thereby prioritizing certain configurations over others. If no selector matches, it will fall back to an optional `defaultKey` if configured; if there is still no match, it will use the `base` template.

Example Configuration:

Expand All @@ -236,42 +236,29 @@ We define two template keys in the configuration—`low-throughput` and `medium-
- Medium Throughput Template: If a task does not meet the low-throughput criteria, the system will then evaluate it against the next selector in order. In this example, if the task type is index_kafka, it will fall into the `medium-throughput` template.
```
{
"type":"default",
"podTemplateSelectStrategy":{
"type":"taskProperties",
"templateSelectors":[
{
"templateKey":"low-throughput",
"matcher":{
"type":"taskProperties",
"context.tags":{
"billingCategory":[
"streaming_ingestion"
]
},
"task":{
"datasource":[
"wikipedia"
]
}
}
},
{
"templateKey":"medium-throughput",
"matcher":{
"type":"taskProperties",
"task":{
"type":[
"index_kafka"
]
}
}
}
]
}
"type": "default",
"podTemplateSelectStrategy":
{
"type": "selectorBased",
"selectors": [
{
"selectionKey": "low-throughput",
"context.tags":
{
"billingCategory": ["streaming_ingestion"]
},
"dataSource": ["wikipedia"]
},
{
"selectionKey": "medium-throughput",
"type": ["index_kafka"]
}
],
"defaultKey"" "base"
}
}
```
Task specific pod templates can be specified as the runtime property `druid.indexer.runner.k8s.podTemplate.{template}: /path/to/taskSpecificPodSpec.yaml` where {template} is the matched `templateKey` of the `podTemplateSelectStrategy` i.e low-throughput.
Task specific pod templates can be specified as the runtime property `druid.indexer.runner.k8s.podTemplate.{template}: /path/to/taskSpecificPodSpec.yaml` where {template} is the matched `selectionKey` of the `podTemplateSelectStrategy` i.e low-throughput.

Similar to Overlord dynamic configuration, the following API endpoints are defined to retrieve and manage dynamic configurations of Pod Template Selection config:

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = TaskTypePodTemplateSelectStrategy.class)
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = "default", value = TaskTypePodTemplateSelectStrategy.class),
@JsonSubTypes.Type(name = "taskProperties", value = TaskPropertiesPodTemplateSelectStrategy.class),
@JsonSubTypes.Type(name = "selectorBased", value = SelectorBasedPodTemplateSelectStrategy.class),
})
public interface PodTemplateSelectStrategy
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.k8s.overlord.execution;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.query.DruidMetrics;

import java.util.Map;
import java.util.Objects;
import java.util.Set;

/**
* Represents a condition-based selector that evaluates whether a given task meets specified criteria.
* The selector uses conditions defined on context tags and task fields to determine if a task matches.
*/
public class Selector
{
private final String selectionKey;
private final Map<String, Set<String>> cxtTagsConditions;
private final Set<String> taskTypeCondition;
private final Set<String> dataSourceCondition;

/**
* Creates a selector with specified conditions for context tags and task fields.
*
* @param selectionKey the identifier representing the outcome when a task matches the conditions
* @param cxtTagsConditions conditions on context tags
* @param taskTypeCondition conditions on task type
* @param dataSourceCondition conditions on task dataSource
*/
@JsonCreator
public Selector(
@JsonProperty("selectionKey") String selectionKey,
@JsonProperty("context.tags") Map<String, Set<String>> cxtTagsConditions,
@JsonProperty("type") Set<String> taskTypeCondition,
@JsonProperty("dataSource") Set<String> dataSourceCondition
)
{
this.selectionKey = selectionKey;
this.cxtTagsConditions = cxtTagsConditions;
this.taskTypeCondition = taskTypeCondition;
this.dataSourceCondition = dataSourceCondition;
}

/**
* Evaluates this selector against a given task.
*
* @param task the task to evaluate
* @return true if the task meets all the conditions specified by this selector, otherwise false
*/
public boolean evaluate(Task task)
{
boolean isMatch = true;
if (cxtTagsConditions != null) {
isMatch = cxtTagsConditions.entrySet().stream().allMatch(entry -> {
String tagKey = entry.getKey();
Set<String> tagValues = entry.getValue();
Map<String, Object> tags = task.getContextValue(DruidMetrics.TAGS);
if (tags == null || tags.isEmpty()) {
return false;
}
Object tagValue = tags.get(tagKey);

return tagValue == null ? false : tagValues.contains((String) tagValue);
});
}

if (isMatch && taskTypeCondition != null) {
isMatch = taskTypeCondition.contains(task.getType());
}

if (isMatch && dataSourceCondition != null) {
isMatch = dataSourceCondition.contains(task.getDataSource());
}

return isMatch;
}

@JsonProperty
public String getSelectionKey()
{
return selectionKey;
}

@JsonProperty("context.tags")
public Map<String, Set<String>> getCxtTagsConditions()
{
return cxtTagsConditions;
}

@JsonProperty("type")
public Set<String> getTaskTypeCondition()
{
return taskTypeCondition;
}

@JsonProperty("dataSource")
public Set<String> getDataSourceCondition()
{
return dataSourceCondition;
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
Selector selector = (Selector) o;
return Objects.equals(selectionKey, selector.selectionKey) && Objects.equals(
cxtTagsConditions,
selector.cxtTagsConditions
) && Objects.equals(taskTypeCondition, selector.taskTypeCondition) && Objects.equals(
dataSourceCondition,
selector.dataSourceCondition
);
}

@Override
public int hashCode()
{
return Objects.hash(selectionKey, cxtTagsConditions, taskTypeCondition, dataSourceCondition);
}

@Override
public String toString()
{
return "Selector{" +
"selectionKey=" + selectionKey +
", context.tags=" + cxtTagsConditions +
", type=" + taskTypeCondition +
", dataSource=" + dataSourceCondition +
'}';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.k8s.overlord.execution;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import io.fabric8.kubernetes.api.model.PodTemplate;
import org.apache.druid.indexing.common.task.Task;

import javax.annotation.Nullable;
import java.util.List;
import java.util.Map;
import java.util.Objects;

/**
* Implements {@link PodTemplateSelectStrategy} by dynamically evaluating a series of selectors.
* Each selector corresponds to a potential task template key.
*/
public class SelectorBasedPodTemplateSelectStrategy implements PodTemplateSelectStrategy
{
@Nullable
private String defaultKey;
private List<Selector> selectors;

@JsonCreator
public SelectorBasedPodTemplateSelectStrategy(
@JsonProperty("selectors") List<Selector> selectors,
@JsonProperty("defaultKey") @Nullable String defaultKey
)
{
Preconditions.checkNotNull(selectors, "selectors");
this.selectors = selectors;
this.defaultKey = defaultKey;
}

/**
* Evaluates the provided task against the set selectors to determine its template.
*
* @param task the task to be checked
* @return the template if a selector matches, otherwise fallback to base template
*/
@Override
public PodTemplate getPodTemplateForTask(Task task, Map<String, PodTemplate> templates)
{
String templateKey = selectors.stream()
.filter(selector -> selector.evaluate(task))
.findFirst()
.map(Selector::getSelectionKey)
.orElse(defaultKey);

return templates.getOrDefault(templateKey, templates.get("base"));
}

@JsonProperty
public List<Selector> getSelectors()
{
return selectors;
}

@Nullable
@JsonProperty
public String getDefaultKey()
{
return defaultKey;
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
SelectorBasedPodTemplateSelectStrategy that = (SelectorBasedPodTemplateSelectStrategy) o;
return Objects.equals(defaultKey, that.defaultKey) && Objects.equals(selectors, that.selectors);
}

@Override
public int hashCode()
{
return Objects.hash(defaultKey, selectors);
}

@Override
public String toString()
{
return "SelectorBasedPodTemplateSelectStrategy{" +
"selectors=" + selectors +
", defaultKey=" + defaultKey +
'}';
}
}
Loading

0 comments on commit 57e2831

Please sign in to comment.