Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[pulsar-functions] enhance kubernetes manifest customizer with default options #9445

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions conf/functions_worker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,20 @@ functionRuntimeFactoryConfigs:
## This class receives the customRuntimeOptions string and can customize details of how the runtime operates.
#runtimeCustomizerClassName: "org.apache.pulsar.functions.runtime.kubernetes.KubernetesManifestCustomizer"

## This config will pass to RuntimeCustomizer's initialize function to do initializing.
#runtimeCustomizerConfig:
# extractLabels:
# extraLabel: value
# extraAnnotations:
# extraAnnotation: value
# nodeSelectorLabels:
# customLabel: value
# jobNamespace: namespace
# tolerations:
# - key: custom-key
# value: value
# effect: NoSchedule

## Config admin CLI
#configAdminCLI:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,18 @@
import com.google.gson.Gson;
import io.kubernetes.client.custom.Quantity;
import io.kubernetes.client.openapi.models.*;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.functions.proto.Function;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

Expand All @@ -39,6 +45,7 @@
* modify (for example, a service account must have permissions in the specified jobNamespace)
*
*/
@Slf4j
public class BasicKubernetesManifestCustomizer implements KubernetesManifestCustomizer {

private static final String RESOURCE_CPU = "cpu";
Expand All @@ -48,7 +55,9 @@ public class BasicKubernetesManifestCustomizer implements KubernetesManifestCust
@Getter
@Setter
@NoArgsConstructor
static private class RuntimeOpts {
@AllArgsConstructor
@Builder(toBuilder = true)
static public class RuntimeOpts {
private String jobNamespace;
private String jobName;
private Map<String, String> extraLabels;
Expand All @@ -58,13 +67,25 @@ static private class RuntimeOpts {
private List<V1Toleration> tolerations;
}

@Getter
private RuntimeOpts runtimeOpts = new RuntimeOpts();

@Override
public void initialize(Map<String, Object> config) {
if (config != null) {
RuntimeOpts opts = ObjectMapperFactory.getThreadLocal().convertValue(config, RuntimeOpts.class);
if (opts != null) {
runtimeOpts = opts.toBuilder().build();
}
} else {
log.warn("initialize with null config");
}
}

@Override
public String customizeNamespace(Function.FunctionDetails funcDetails, String currentNamespace) {
RuntimeOpts opts = getOptsFromDetails(funcDetails);
opts = mergeRuntimeOpts(runtimeOpts, opts);
if (!StringUtils.isEmpty(opts.getJobNamespace())) {
return opts.getJobNamespace();
} else {
Expand All @@ -75,6 +96,7 @@ public String customizeNamespace(Function.FunctionDetails funcDetails, String cu
@Override
public String customizeName(Function.FunctionDetails funcDetails, String currentName) {
RuntimeOpts opts = getOptsFromDetails(funcDetails);
opts = mergeRuntimeOpts(runtimeOpts, opts);
if (!StringUtils.isEmpty(opts.getJobName())) {
return opts.getJobName();
} else {
Expand All @@ -85,24 +107,27 @@ public String customizeName(Function.FunctionDetails funcDetails, String current
@Override
public V1Service customizeService(Function.FunctionDetails funcDetails, V1Service service) {
RuntimeOpts opts = getOptsFromDetails(funcDetails);
opts = mergeRuntimeOpts(runtimeOpts, opts);
service.setMetadata(updateMeta(opts, service.getMetadata()));
return service;
}

@Override
public V1StatefulSet customizeStatefulSet(Function.FunctionDetails funcDetails, V1StatefulSet statefulSet) {
RuntimeOpts opts = getOptsFromDetails(funcDetails);
RuntimeOpts opts = mergeRuntimeOpts(runtimeOpts, getOptsFromDetails(funcDetails));
statefulSet.setMetadata(updateMeta(opts, statefulSet.getMetadata()));
V1PodTemplateSpec pt = statefulSet.getSpec().getTemplate();
pt.setMetadata(updateMeta(opts, pt.getMetadata()));
V1PodSpec ps = pt.getSpec();
if (opts.getNodeSelectorLabels() != null && opts.getNodeSelectorLabels().size() > 0) {
opts.getNodeSelectorLabels().forEach(ps::putNodeSelectorItem);
}
if (opts.getTolerations() != null && opts.getTolerations().size() > 0) {
opts.getTolerations().forEach(ps::addTolerationsItem);
if (ps != null) {
if (opts.getNodeSelectorLabels() != null && opts.getNodeSelectorLabels().size() > 0) {
opts.getNodeSelectorLabels().forEach(ps::putNodeSelectorItem);
}
if (opts.getTolerations() != null && opts.getTolerations().size() > 0) {
opts.getTolerations().forEach(ps::addTolerationsItem);
}
ps.getContainers().forEach(container -> updateContainerResources(container, opts));
}
ps.getContainers().forEach(container -> updateContainerResources(container, opts));
return statefulSet;
}

Expand All @@ -113,10 +138,10 @@ private void updateContainerResources(V1Container container, RuntimeOpts opts) {
Map<String, Quantity> limits = resourceRequirements.getLimits();
Map<String, Quantity> requests = resourceRequirements.getRequests();
for (String resource : RESOURCES) {
if (limits.containsKey(resource)) {
if (limits != null && limits.containsKey(resource)) {
containerResources.putLimitsItem(resource, limits.get(resource));
}
if (requests.containsKey(resource)) {
if (requests != null && requests.containsKey(resource)) {
containerResources.putRequestsItem(resource, requests.get(resource));
}
}
Expand All @@ -143,4 +168,76 @@ private V1ObjectMeta updateMeta(RuntimeOpts opts, V1ObjectMeta meta) {
return meta;
}

public static RuntimeOpts mergeRuntimeOpts(RuntimeOpts oriOpts, RuntimeOpts newOpts) {
RuntimeOpts mergedOpts = oriOpts.toBuilder().build();
if (mergedOpts.getExtraLabels() == null) {
mergedOpts.setExtraLabels(new HashMap<>());
}
if (mergedOpts.getExtraAnnotations() == null) {
mergedOpts.setExtraAnnotations(new HashMap<>());
}
if (mergedOpts.getNodeSelectorLabels() == null) {
mergedOpts.setNodeSelectorLabels(new HashMap<>());
}
if (mergedOpts.getTolerations() == null) {
mergedOpts.setTolerations(new ArrayList<>());
}
if (mergedOpts.getResourceRequirements() == null) {
mergedOpts.setResourceRequirements(new V1ResourceRequirements());
}

if (!StringUtils.isEmpty(newOpts.getJobName())) {
mergedOpts.setJobName(newOpts.getJobName());
}
if (!StringUtils.isEmpty(newOpts.getJobNamespace())) {
mergedOpts.setJobNamespace(newOpts.getJobNamespace());
}
if (newOpts.getExtraLabels() != null && !newOpts.getExtraLabels().isEmpty()) {
newOpts.getExtraLabels().forEach((key, labelsItem) -> {
if (!mergedOpts.getExtraLabels().containsKey(key)) {
log.debug("extra label {} has been changed to {}", key, labelsItem);
}
mergedOpts.getExtraLabels().put(key, labelsItem);
});
}
if (newOpts.getExtraAnnotations() != null && !newOpts.getExtraAnnotations().isEmpty()) {
newOpts.getExtraAnnotations().forEach((key, annotationsItem) -> {
if (!mergedOpts.getExtraAnnotations().containsKey(key)) {
log.debug("extra annotation {} has been changed to {}", key, annotationsItem);
}
mergedOpts.getExtraAnnotations().put(key, annotationsItem);
});
}
if (newOpts.getNodeSelectorLabels() != null && !newOpts.getNodeSelectorLabels().isEmpty()) {
newOpts.getNodeSelectorLabels().forEach((key, nodeSelectorItem) -> {
if (!mergedOpts.getNodeSelectorLabels().containsKey(key)) {
log.debug("node selector label {} has been changed to {}", key, nodeSelectorItem);
}
mergedOpts.getNodeSelectorLabels().put(key, nodeSelectorItem);
});
}

if (newOpts.getResourceRequirements() != null) {
V1ResourceRequirements mergedResourcesRequirements = mergedOpts.getResourceRequirements();
V1ResourceRequirements newResourcesRequirements = newOpts.getResourceRequirements();

Map<String, Quantity> limits = newResourcesRequirements.getLimits();
Map<String, Quantity> requests = newResourcesRequirements.getRequests();
for (String resource : RESOURCES) {
if (limits != null && limits.containsKey(resource)) {
mergedResourcesRequirements.putLimitsItem(resource, limits.get(resource));
}
if (requests != null && requests.containsKey(resource)) {
mergedResourcesRequirements.putRequestsItem(resource, requests.get(resource));
}
}
mergedOpts.setResourceRequirements(mergedResourcesRequirements);
}

if (newOpts.getTolerations() != null && !newOpts.getTolerations().isEmpty()) {
mergedOpts.getTolerations().addAll(newOpts.getTolerations());
}
return mergedOpts;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.functions.runtime.kubernetes;

import com.google.gson.Gson;
import io.kubernetes.client.custom.Quantity;
import io.kubernetes.client.openapi.models.V1ResourceRequirements;
import io.kubernetes.client.openapi.models.V1Toleration;
import org.testng.annotations.Test;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNull;

/**
* Unit test of {@link BasicKubernetesManifestCustomizerTest}.
*/
public class BasicKubernetesManifestCustomizerTest {

@Test
public void TestInitializeWithNullData() {
BasicKubernetesManifestCustomizer customizer = new BasicKubernetesManifestCustomizer();
customizer.initialize(null);
assertNotEquals(customizer.getRuntimeOpts(), null);
assertNull(customizer.getRuntimeOpts().getExtraLabels());
assertNull(customizer.getRuntimeOpts().getExtraAnnotations());
assertNull(customizer.getRuntimeOpts().getNodeSelectorLabels());
assertNull(customizer.getRuntimeOpts().getTolerations());
assertNull(customizer.getRuntimeOpts().getResourceRequirements());
}

@Test
public void TestInitializeWithData() {
BasicKubernetesManifestCustomizer customizer = new BasicKubernetesManifestCustomizer();
Map<String, Object> confs = new HashMap<>();
confs.put("jobNamespace", "custom-ns");
confs.put("jobName", "custom-name");
customizer.initialize(confs);
assertNotEquals(customizer.getRuntimeOpts(), null);
assertEquals(customizer.getRuntimeOpts().getJobName(), "custom-name");
assertEquals(customizer.getRuntimeOpts().getJobNamespace(), "custom-ns");
}

@Test
public void TestMergeRuntimeOpts() {
Map<String, Object> configs = new Gson().fromJson(KubernetesRuntimeTest.createRuntimeCustomizerConfig(), HashMap.class);
BasicKubernetesManifestCustomizer customizer = new BasicKubernetesManifestCustomizer();
customizer.initialize(configs);
BasicKubernetesManifestCustomizer.RuntimeOpts newOpts = new BasicKubernetesManifestCustomizer.RuntimeOpts();
newOpts.setJobName("merged-name");
newOpts.setTolerations(Collections.emptyList());
V1Toleration toleration = new V1Toleration();
toleration.setKey("merge-key");
toleration.setEffect("NoSchedule");
toleration.setOperator("Equal");
toleration.setTolerationSeconds(6000L);
newOpts.setTolerations(Collections.singletonList(toleration));
V1ResourceRequirements resourceRequirements = new V1ResourceRequirements();
resourceRequirements.putLimitsItem("cpu", new Quantity("20"));
resourceRequirements.putLimitsItem("memory", new Quantity("10240"));
newOpts.setResourceRequirements(resourceRequirements);
newOpts.setNodeSelectorLabels(Collections.singletonMap("disktype", "ssd"));
newOpts.setExtraAnnotations(Collections.singletonMap("functiontype", "sink"));
newOpts.setExtraLabels(Collections.singletonMap("functiontype", "sink"));
BasicKubernetesManifestCustomizer.RuntimeOpts mergedOpts = BasicKubernetesManifestCustomizer.mergeRuntimeOpts(
customizer.getRuntimeOpts(), newOpts);

assertEquals(mergedOpts.getJobName(), "merged-name");
assertEquals(mergedOpts.getTolerations().size(), 2);
assertEquals(mergedOpts.getExtraAnnotations().size(), 2);
assertEquals(mergedOpts.getExtraLabels().size(), 2);
assertEquals(mergedOpts.getNodeSelectorLabels().size(), 2);
assertEquals(mergedOpts.getResourceRequirements().getLimits().get("cpu").getNumber().intValue(), 20);
assertEquals(mergedOpts.getResourceRequirements().getLimits().get("memory").getNumber().intValue(), 10240);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
import org.apache.pulsar.functions.runtime.RuntimeCustomizer;
import org.apache.pulsar.functions.runtime.thread.ThreadRuntime;
import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider;
import org.apache.pulsar.functions.secretsproviderconfigurator.DefaultSecretsProviderConfigurator;
import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
Expand All @@ -58,7 +57,7 @@
import static org.testng.Assert.fail;

/**
* Unit test of {@link ThreadRuntime}.
* Unit test of {@link KubernetesRuntimeFactoryTest}.
*/
public class KubernetesRuntimeFactoryTest {

Expand Down
Loading