Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-27812] Support Dynamic Change of Watched Namespaces #254

Merged
merged 2 commits into from
Jun 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@
<td>Boolean</td>
<td>Whether to enable on-the-fly config changes through the operator configmap.</td>
</tr>
<tr>
<td><h5>kubernetes.operator.dynamic.namespaces.enabled</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Enables dynamic change of watched/monitored namespaces. Defaults to false</td>
</tr>
<tr>
<td><h5>kubernetes.operator.job.upgrade.ignore-pending-savepoint</h5></td>
<td style="word-wrap: break-word;">false</td>
Expand Down Expand Up @@ -146,5 +152,11 @@
<td>Map</td>
<td>Custom HTTP header for HttpArtifactFetcher. The header will be applied when getting the session job artifacts. Expected format: headerKey1:headerValue1,headerKey2:headerValue2.</td>
</tr>
<tr>
<td><h5>kubernetes.operator.watched.namespaces</h5></td>
<td style="word-wrap: break-word;">"JOSDK_ALL_NAMESPACES"</td>
<td>String</td>
<td>Comma separated list of namespaces the operator monitors for custom resources. Defaults to JOSDK_ALL_NAMESPACES</td>
</tr>
</tbody>
</table>
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,13 @@

package org.apache.flink.kubernetes.operator;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.plugin.PluginManager;
import org.apache.flink.core.plugin.PluginUtils;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
import org.apache.flink.kubernetes.operator.controller.FlinkControllerConfig;
import org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController;
import org.apache.flink.kubernetes.operator.controller.FlinkSessionJobController;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.crd.status.FlinkSessionJobStatus;
Expand All @@ -49,15 +45,18 @@
import io.fabric8.kubernetes.client.DefaultKubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.javaoperatorsdk.operator.Operator;
import io.javaoperatorsdk.operator.RegisteredController;
import io.javaoperatorsdk.operator.api.config.ConfigurationServiceOverrider;
import io.javaoperatorsdk.operator.api.config.ControllerConfigurationOverrider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.function.Consumer;

/** Main Class for Flink native k8s operator. */
public class FlinkOperator {
Expand All @@ -69,16 +68,16 @@ public class FlinkOperator {
private final FlinkService flinkService;
private final FlinkConfigManager configManager;
private final Set<FlinkResourceValidator> validators;
private final Set<RegisteredController> registeredControllers = new HashSet<>();
private final MetricGroup metricGroup;

public FlinkOperator(@Nullable Configuration conf) {
this.client = new DefaultKubernetesClient();
this.configManager = conf != null ? new FlinkConfigManager(conf) : new FlinkConfigManager();
this.operator =
new Operator(
client,
getConfigurationServiceOverriderConsumer(
configManager.getOperatorConfiguration()));
this.configManager =
conf != null
? new FlinkConfigManager(conf) // For testing only
: new FlinkConfigManager(this::handleNamespaceChanges);
this.operator = new Operator(client, this::overrideOperatorConfigs);
this.flinkService = new FlinkService(client, configManager);
this.validators = ValidatorUtils.discoverValidators(configManager);
this.metricGroup =
Expand All @@ -88,20 +87,25 @@ public FlinkOperator(@Nullable Configuration conf) {
FileSystem.initialize(configManager.getDefaultConfig(), pluginManager);
}

@VisibleForTesting
protected static Consumer<ConfigurationServiceOverrider>
getConfigurationServiceOverriderConsumer(
FlinkOperatorConfiguration operatorConfiguration) {
return overrider -> {
int parallelism = operatorConfiguration.getReconcilerMaxParallelism();
if (parallelism == -1) {
LOG.info("Configuring operator with unbounded reconciliation thread pool.");
overrider.withExecutorService(Executors.newCachedThreadPool());
} else {
LOG.info("Configuring operator with {} reconciliation threads.", parallelism);
overrider.withConcurrentReconciliationThreads(parallelism);
}
};
private void handleNamespaceChanges(Set<String> namespaces) {
registeredControllers.forEach(
controller -> {
if (controller.allowsNamespaceChanges()) {
LOG.info("Changing namespaces on {} to {}", controller, namespaces);
controller.changeNamespaces(namespaces);
}
});
}

private void overrideOperatorConfigs(ConfigurationServiceOverrider overrider) {
int parallelism = configManager.getOperatorConfiguration().getReconcilerMaxParallelism();
if (parallelism == -1) {
LOG.info("Configuring operator with unbounded reconciliation thread pool.");
overrider.withExecutorService(Executors.newCachedThreadPool());
} else {
LOG.info("Configuring operator with {} reconciliation threads.", parallelism);
overrider.withConcurrentReconciliationThreads(parallelism);
}
}

private void registerDeploymentController() {
Expand All @@ -120,12 +124,7 @@ private void registerDeploymentController() {
observerFactory,
new MetricManager<>(metricGroup),
statusHelper);

FlinkControllerConfig<FlinkDeployment> controllerConfig =
new FlinkControllerConfig<>(
controller,
configManager.getOperatorConfiguration().getWatchedNamespaces());
operator.register(controller, controllerConfig);
registeredControllers.add(operator.register(controller, this::overrideControllerConfigs));
}

private void registerSessionJobController() {
Expand All @@ -143,11 +142,20 @@ private void registerSessionJobController() {
new MetricManager<>(metricGroup),
statusHelper);

FlinkControllerConfig<FlinkSessionJob> controllerConfig =
new FlinkControllerConfig<>(
controller,
configManager.getOperatorConfiguration().getWatchedNamespaces());
operator.register(controller, controllerConfig);
registeredControllers.add(operator.register(controller, this::overrideControllerConfigs));
}

private void overrideControllerConfigs(ControllerConfigurationOverrider<?> overrider) {
// TODO: https://github.com/java-operator-sdk/java-operator-sdk/issues/1259
String[] watchedNamespaces =
configManager
.getOperatorConfiguration()
.getWatchedNamespaces()
.toArray(String[]::new);
String fakeNs = UUID.randomUUID().toString();
overrider.settingNamespace(fakeNs);
overrider.addingNamespaces(watchedNamespaces);
overrider.removingNamespaces(fakeNs);
}

public void run() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.utils.EnvUtils;

import org.apache.flink.shaded.guava30.com.google.common.cache.Cache;
import org.apache.flink.shaded.guava30.com.google.common.cache.CacheBuilder;
Expand All @@ -38,16 +37,19 @@
import lombok.Builder;
import lombok.SneakyThrows;
import lombok.Value;
import org.apache.commons.lang3.ObjectUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;

import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_DYNAMIC_CONFIG_CHECK_INTERVAL;
import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_DYNAMIC_CONFIG_ENABLED;
Expand All @@ -61,15 +63,21 @@ public class FlinkConfigManager {
private volatile Configuration defaultConfig;
private volatile FlinkOperatorConfiguration operatorConfiguration;
private final AtomicLong defaultConfigVersion = new AtomicLong(0);

private final LoadingCache<Key, Configuration> cache;
private final Set<String> namespaces = EnvUtils.getWatchedNamespaces();
private final Consumer<Set<String>> namespaceListener;

public FlinkConfigManager() {
this(GlobalConfiguration.loadConfiguration());
@VisibleForTesting
public FlinkConfigManager(Configuration defaultConfig) {
this(defaultConfig, ns -> {});
}

public FlinkConfigManager(Configuration defaultConfig) {
public FlinkConfigManager(Consumer<Set<String>> namespaceListener) {
this(GlobalConfiguration.loadConfiguration(), namespaceListener);
}

public FlinkConfigManager(
Configuration defaultConfig, Consumer<Set<String>> namespaceListener) {
this.namespaceListener = namespaceListener;
Duration cacheTimeout =
defaultConfig.get(KubernetesOperatorConfigOptions.OPERATOR_CONFIG_CACHE_TIMEOUT);
this.cache =
Expand Down Expand Up @@ -109,14 +117,22 @@ public Configuration getDefaultConfig() {

@VisibleForTesting
public void updateDefaultConfig(Configuration newConf) {
if (newConf.equals(defaultConfig)) {
if (ObjectUtils.allNotNull(this.defaultConfig, newConf)
&& this.defaultConfig.toMap().equals(newConf.toMap())) {
LOG.info("Default configuration did not change, nothing to do...");
return;
}

LOG.info("Updating default configuration to {}", newConf);
this.operatorConfiguration =
FlinkOperatorConfiguration.fromConfiguration(newConf, namespaces);
var oldNs =
Optional.ofNullable(this.operatorConfiguration)
.map(FlinkOperatorConfiguration::getWatchedNamespaces)
.orElse(Set.of());
this.operatorConfiguration = FlinkOperatorConfiguration.fromConfiguration(newConf);
var newNs = this.operatorConfiguration.getWatchedNamespaces();
if (this.operatorConfiguration.getDynamicNamespacesEnabled() && !oldNs.equals(newNs)) {
this.namespaceListener.accept(operatorConfiguration.getWatchedNamespaces());
}
this.defaultConfig = newConf.clone();
// We do not invalidate the cache to avoid deleting currently used temp files,
// simply bump the version
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,27 +24,31 @@
import lombok.Value;

import java.time.Duration;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;

/** Configuration class for operator. */
@Value
public class FlinkOperatorConfiguration {

private static final String NAMESPACES_SPLITTER_KEY = "\\s*,\\s*";

Duration reconcileInterval;
int reconcilerMaxParallelism;
Duration progressCheckInterval;
Duration restApiReadyDelay;
Duration flinkClientTimeout;
String flinkServiceHostOverride;
Set<String> watchedNamespaces;
Boolean dynamicNamespacesEnabled;
Duration flinkCancelJobTimeout;
Duration flinkShutdownClusterTimeout;
String artifactsBaseDir;
Integer savepointHistoryCountThreshold;
Duration savepointHistoryAgeThreshold;

public static FlinkOperatorConfiguration fromConfiguration(
Configuration operatorConfig, Set<String> watchedNamespaces) {
public static FlinkOperatorConfiguration fromConfiguration(Configuration operatorConfig) {
Duration reconcileInterval =
operatorConfig.get(
KubernetesOperatorConfigOptions.OPERATOR_RECONCILER_RESCHEDULE_INTERVAL);
Expand Down Expand Up @@ -94,6 +98,19 @@ public static FlinkOperatorConfiguration fromConfiguration(
flinkServiceHostOverride = "localhost";
}

var watchedNamespaces =
new HashSet<>(
Arrays.asList(
operatorConfig
.get(
KubernetesOperatorConfigOptions
.OPERATOR_WATCHED_NAMESPACES)
.split(NAMESPACES_SPLITTER_KEY)));

boolean dynamicNamespacesEnabled =
operatorConfig.get(
KubernetesOperatorConfigOptions.OPERATOR_DYNAMIC_NAMESPACES_ENABLED);

return new FlinkOperatorConfiguration(
reconcileInterval,
reconcilerMaxParallelism,
Expand All @@ -102,6 +119,7 @@ public static FlinkOperatorConfiguration fromConfiguration(
flinkClientTimeout,
flinkServiceHostOverride,
watchedNamespaces,
dynamicNamespacesEnabled,
flinkCancelJobTimeout,
flinkShutdownClusterTimeout,
artifactsBaseDir,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.configuration.ConfigOptions;

import io.javaoperatorsdk.operator.api.config.ConfigurationService;
import io.javaoperatorsdk.operator.api.reconciler.Constants;

import java.time.Duration;
import java.util.Map;
Expand Down Expand Up @@ -187,4 +188,19 @@ public class KubernetesOperatorConfigOptions {
.withDescription(
"Interval at which periodic savepoints will be triggered. "
+ "The triggering schedule is not guaranteed, savepoints will be triggered as part of the regular reconcile loop.");

public static final ConfigOption<String> OPERATOR_WATCHED_NAMESPACES =
ConfigOptions.key("kubernetes.operator.watched.namespaces")
.stringType()
.defaultValue(Constants.WATCH_ALL_NAMESPACES)
.withDescription(
"Comma separated list of namespaces the operator monitors for custom resources. Defaults to "
+ Constants.WATCH_ALL_NAMESPACES);

public static final ConfigOption<Boolean> OPERATOR_DYNAMIC_NAMESPACES_ENABLED =
ConfigOptions.key("kubernetes.operator.dynamic.namespaces.enabled")
.booleanType()
.defaultValue(false)
.withDescription(
"Enables dynamic change of watched/monitored namespaces. Defaults to false");
}

This file was deleted.

Loading