Skip to content

Commit

Permalink
[pulsar-functions] enhance kubernetes manifest customizer with defaul…
Browse files Browse the repository at this point in the history
…t options (apache#9445)

The KubernetesManifestCustomizer was introduced by customizing the stateful set of running Pulsar Functions. but no default value was loaded from `functions_worker.yaml`.

Add load default runtime options in `BasicKubernetesManifestCustomizer`
Add unit tests

- [ ] Make sure that the change passes the CI checks.
  • Loading branch information
freeznet authored and eolivelli committed May 7, 2021
1 parent 186e63b commit 421fd82
Show file tree
Hide file tree
Showing 6 changed files with 309 additions and 51 deletions.
14 changes: 14 additions & 0 deletions conf/functions_worker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,20 @@ functionRuntimeFactoryConfigs:
## This class receives the customRuntimeOptions string and can customize details of how the runtime operates.
#runtimeCustomizerClassName: "org.apache.pulsar.functions.runtime.kubernetes.KubernetesManifestCustomizer"

## This config will pass to RuntimeCustomizer's initialize function to do initializing.
#runtimeCustomizerConfig:
# extractLabels:
# extraLabel: value
# extraAnnotations:
# extraAnnotation: value
# nodeSelectorLabels:
# customLabel: value
# jobNamespace: namespace
# tolerations:
# - key: custom-key
# value: value
# effect: NoSchedule

## Config admin CLI
#configAdminCLI:

Expand Down
4 changes: 0 additions & 4 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -483,11 +483,7 @@
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<<<<<<< HEAD
<version>${commons-lang3.version}</version>
=======
<version>3.9</version>
>>>>>>> da22685e074... upgrade presto version to 334 and JDK11
</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,18 @@
import com.google.gson.Gson;
import io.kubernetes.client.custom.Quantity;
import io.kubernetes.client.openapi.models.*;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.functions.proto.Function;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

Expand All @@ -39,6 +45,7 @@
* modify (for example, a service account must have permissions in the specified jobNamespace)
*
*/
@Slf4j
public class BasicKubernetesManifestCustomizer implements KubernetesManifestCustomizer {

private static final String RESOURCE_CPU = "cpu";
Expand All @@ -48,7 +55,9 @@ public class BasicKubernetesManifestCustomizer implements KubernetesManifestCust
@Getter
@Setter
@NoArgsConstructor
static private class RuntimeOpts {
@AllArgsConstructor
@Builder(toBuilder = true)
static public class RuntimeOpts {
private String jobNamespace;
private String jobName;
private Map<String, String> extraLabels;
Expand All @@ -58,13 +67,25 @@ static private class RuntimeOpts {
private List<V1Toleration> tolerations;
}

@Getter
private RuntimeOpts runtimeOpts = new RuntimeOpts();

@Override
public void initialize(Map<String, Object> config) {
if (config != null) {
RuntimeOpts opts = ObjectMapperFactory.getThreadLocal().convertValue(config, RuntimeOpts.class);
if (opts != null) {
runtimeOpts = opts.toBuilder().build();
}
} else {
log.warn("initialize with null config");
}
}

@Override
public String customizeNamespace(Function.FunctionDetails funcDetails, String currentNamespace) {
RuntimeOpts opts = getOptsFromDetails(funcDetails);
opts = mergeRuntimeOpts(runtimeOpts, opts);
if (!StringUtils.isEmpty(opts.getJobNamespace())) {
return opts.getJobNamespace();
} else {
Expand All @@ -75,6 +96,7 @@ public String customizeNamespace(Function.FunctionDetails funcDetails, String cu
@Override
public String customizeName(Function.FunctionDetails funcDetails, String currentName) {
RuntimeOpts opts = getOptsFromDetails(funcDetails);
opts = mergeRuntimeOpts(runtimeOpts, opts);
if (!StringUtils.isEmpty(opts.getJobName())) {
return opts.getJobName();
} else {
Expand All @@ -85,24 +107,27 @@ public String customizeName(Function.FunctionDetails funcDetails, String current
@Override
public V1Service customizeService(Function.FunctionDetails funcDetails, V1Service service) {
RuntimeOpts opts = getOptsFromDetails(funcDetails);
opts = mergeRuntimeOpts(runtimeOpts, opts);
service.setMetadata(updateMeta(opts, service.getMetadata()));
return service;
}

@Override
public V1StatefulSet customizeStatefulSet(Function.FunctionDetails funcDetails, V1StatefulSet statefulSet) {
RuntimeOpts opts = getOptsFromDetails(funcDetails);
RuntimeOpts opts = mergeRuntimeOpts(runtimeOpts, getOptsFromDetails(funcDetails));
statefulSet.setMetadata(updateMeta(opts, statefulSet.getMetadata()));
V1PodTemplateSpec pt = statefulSet.getSpec().getTemplate();
pt.setMetadata(updateMeta(opts, pt.getMetadata()));
V1PodSpec ps = pt.getSpec();
if (opts.getNodeSelectorLabels() != null && opts.getNodeSelectorLabels().size() > 0) {
opts.getNodeSelectorLabels().forEach(ps::putNodeSelectorItem);
}
if (opts.getTolerations() != null && opts.getTolerations().size() > 0) {
opts.getTolerations().forEach(ps::addTolerationsItem);
if (ps != null) {
if (opts.getNodeSelectorLabels() != null && opts.getNodeSelectorLabels().size() > 0) {
opts.getNodeSelectorLabels().forEach(ps::putNodeSelectorItem);
}
if (opts.getTolerations() != null && opts.getTolerations().size() > 0) {
opts.getTolerations().forEach(ps::addTolerationsItem);
}
ps.getContainers().forEach(container -> updateContainerResources(container, opts));
}
ps.getContainers().forEach(container -> updateContainerResources(container, opts));
return statefulSet;
}

Expand All @@ -113,10 +138,10 @@ private void updateContainerResources(V1Container container, RuntimeOpts opts) {
Map<String, Quantity> limits = resourceRequirements.getLimits();
Map<String, Quantity> requests = resourceRequirements.getRequests();
for (String resource : RESOURCES) {
if (limits.containsKey(resource)) {
if (limits != null && limits.containsKey(resource)) {
containerResources.putLimitsItem(resource, limits.get(resource));
}
if (requests.containsKey(resource)) {
if (requests != null && requests.containsKey(resource)) {
containerResources.putRequestsItem(resource, requests.get(resource));
}
}
Expand All @@ -143,4 +168,76 @@ private V1ObjectMeta updateMeta(RuntimeOpts opts, V1ObjectMeta meta) {
return meta;
}

public static RuntimeOpts mergeRuntimeOpts(RuntimeOpts oriOpts, RuntimeOpts newOpts) {
RuntimeOpts mergedOpts = oriOpts.toBuilder().build();
if (mergedOpts.getExtraLabels() == null) {
mergedOpts.setExtraLabels(new HashMap<>());
}
if (mergedOpts.getExtraAnnotations() == null) {
mergedOpts.setExtraAnnotations(new HashMap<>());
}
if (mergedOpts.getNodeSelectorLabels() == null) {
mergedOpts.setNodeSelectorLabels(new HashMap<>());
}
if (mergedOpts.getTolerations() == null) {
mergedOpts.setTolerations(new ArrayList<>());
}
if (mergedOpts.getResourceRequirements() == null) {
mergedOpts.setResourceRequirements(new V1ResourceRequirements());
}

if (!StringUtils.isEmpty(newOpts.getJobName())) {
mergedOpts.setJobName(newOpts.getJobName());
}
if (!StringUtils.isEmpty(newOpts.getJobNamespace())) {
mergedOpts.setJobNamespace(newOpts.getJobNamespace());
}
if (newOpts.getExtraLabels() != null && !newOpts.getExtraLabels().isEmpty()) {
newOpts.getExtraLabels().forEach((key, labelsItem) -> {
if (!mergedOpts.getExtraLabels().containsKey(key)) {
log.debug("extra label {} has been changed to {}", key, labelsItem);
}
mergedOpts.getExtraLabels().put(key, labelsItem);
});
}
if (newOpts.getExtraAnnotations() != null && !newOpts.getExtraAnnotations().isEmpty()) {
newOpts.getExtraAnnotations().forEach((key, annotationsItem) -> {
if (!mergedOpts.getExtraAnnotations().containsKey(key)) {
log.debug("extra annotation {} has been changed to {}", key, annotationsItem);
}
mergedOpts.getExtraAnnotations().put(key, annotationsItem);
});
}
if (newOpts.getNodeSelectorLabels() != null && !newOpts.getNodeSelectorLabels().isEmpty()) {
newOpts.getNodeSelectorLabels().forEach((key, nodeSelectorItem) -> {
if (!mergedOpts.getNodeSelectorLabels().containsKey(key)) {
log.debug("node selector label {} has been changed to {}", key, nodeSelectorItem);
}
mergedOpts.getNodeSelectorLabels().put(key, nodeSelectorItem);
});
}

if (newOpts.getResourceRequirements() != null) {
V1ResourceRequirements mergedResourcesRequirements = mergedOpts.getResourceRequirements();
V1ResourceRequirements newResourcesRequirements = newOpts.getResourceRequirements();

Map<String, Quantity> limits = newResourcesRequirements.getLimits();
Map<String, Quantity> requests = newResourcesRequirements.getRequests();
for (String resource : RESOURCES) {
if (limits != null && limits.containsKey(resource)) {
mergedResourcesRequirements.putLimitsItem(resource, limits.get(resource));
}
if (requests != null && requests.containsKey(resource)) {
mergedResourcesRequirements.putRequestsItem(resource, requests.get(resource));
}
}
mergedOpts.setResourceRequirements(mergedResourcesRequirements);
}

if (newOpts.getTolerations() != null && !newOpts.getTolerations().isEmpty()) {
mergedOpts.getTolerations().addAll(newOpts.getTolerations());
}
return mergedOpts;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.functions.runtime.kubernetes;

import com.google.gson.Gson;
import io.kubernetes.client.custom.Quantity;
import io.kubernetes.client.openapi.models.V1ResourceRequirements;
import io.kubernetes.client.openapi.models.V1Toleration;
import org.testng.annotations.Test;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNull;

/**
* Unit test of {@link BasicKubernetesManifestCustomizerTest}.
*/
public class BasicKubernetesManifestCustomizerTest {

@Test
public void TestInitializeWithNullData() {
BasicKubernetesManifestCustomizer customizer = new BasicKubernetesManifestCustomizer();
customizer.initialize(null);
assertNotEquals(customizer.getRuntimeOpts(), null);
assertNull(customizer.getRuntimeOpts().getExtraLabels());
assertNull(customizer.getRuntimeOpts().getExtraAnnotations());
assertNull(customizer.getRuntimeOpts().getNodeSelectorLabels());
assertNull(customizer.getRuntimeOpts().getTolerations());
assertNull(customizer.getRuntimeOpts().getResourceRequirements());
}

@Test
public void TestInitializeWithData() {
BasicKubernetesManifestCustomizer customizer = new BasicKubernetesManifestCustomizer();
Map<String, Object> confs = new HashMap<>();
confs.put("jobNamespace", "custom-ns");
confs.put("jobName", "custom-name");
customizer.initialize(confs);
assertNotEquals(customizer.getRuntimeOpts(), null);
assertEquals(customizer.getRuntimeOpts().getJobName(), "custom-name");
assertEquals(customizer.getRuntimeOpts().getJobNamespace(), "custom-ns");
}

@Test
public void TestMergeRuntimeOpts() {
Map<String, Object> configs = new Gson().fromJson(KubernetesRuntimeTest.createRuntimeCustomizerConfig(), HashMap.class);
BasicKubernetesManifestCustomizer customizer = new BasicKubernetesManifestCustomizer();
customizer.initialize(configs);
BasicKubernetesManifestCustomizer.RuntimeOpts newOpts = new BasicKubernetesManifestCustomizer.RuntimeOpts();
newOpts.setJobName("merged-name");
newOpts.setTolerations(Collections.emptyList());
V1Toleration toleration = new V1Toleration();
toleration.setKey("merge-key");
toleration.setEffect("NoSchedule");
toleration.setOperator("Equal");
toleration.setTolerationSeconds(6000L);
newOpts.setTolerations(Collections.singletonList(toleration));
V1ResourceRequirements resourceRequirements = new V1ResourceRequirements();
resourceRequirements.putLimitsItem("cpu", new Quantity("20"));
resourceRequirements.putLimitsItem("memory", new Quantity("10240"));
newOpts.setResourceRequirements(resourceRequirements);
newOpts.setNodeSelectorLabels(Collections.singletonMap("disktype", "ssd"));
newOpts.setExtraAnnotations(Collections.singletonMap("functiontype", "sink"));
newOpts.setExtraLabels(Collections.singletonMap("functiontype", "sink"));
BasicKubernetesManifestCustomizer.RuntimeOpts mergedOpts = BasicKubernetesManifestCustomizer.mergeRuntimeOpts(
customizer.getRuntimeOpts(), newOpts);

assertEquals(mergedOpts.getJobName(), "merged-name");
assertEquals(mergedOpts.getTolerations().size(), 2);
assertEquals(mergedOpts.getExtraAnnotations().size(), 2);
assertEquals(mergedOpts.getExtraLabels().size(), 2);
assertEquals(mergedOpts.getNodeSelectorLabels().size(), 2);
assertEquals(mergedOpts.getResourceRequirements().getLimits().get("cpu").getNumber().intValue(), 20);
assertEquals(mergedOpts.getResourceRequirements().getLimits().get("memory").getNumber().intValue(), 10240);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
import org.apache.pulsar.functions.runtime.RuntimeCustomizer;
import org.apache.pulsar.functions.runtime.thread.ThreadRuntime;
import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider;
import org.apache.pulsar.functions.secretsproviderconfigurator.DefaultSecretsProviderConfigurator;
import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
Expand All @@ -58,7 +57,7 @@
import static org.testng.Assert.fail;

/**
* Unit test of {@link ThreadRuntime}.
* Unit test of {@link KubernetesRuntimeFactoryTest}.
*/
public class KubernetesRuntimeFactoryTest {

Expand Down
Loading

0 comments on commit 421fd82

Please sign in to comment.