Skip to content

Commit

Permalink
MGDSTRM-9181: Have kas ingress controller utilise the openshift-route…
Browse files Browse the repository at this point in the history
…r's dynamic config manager. (#850)

* MGDSTRM-9181: Have kas ingress controller utilise the openshift-route's dynamic config manager.
* give fleetshard the responsibility to create blueprints for bootstrap/admin routes

Co-authored-by: Sam Barker <sam@quadrocket.co.uk>
  • Loading branch information
2 people authored and biswassri committed Jan 17, 2023
1 parent 3a23bdf commit 769c687
Show file tree
Hide file tree
Showing 8 changed files with 200 additions and 10 deletions.
4 changes: 4 additions & 0 deletions common/src/main/java/org/bf2/common/OperandUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,14 @@ public class OperandUtils {
public static final String OPENSHIFT_RATE_LIMIT_ANNOTATION = "haproxy.router.openshift.io/rate-limit-connections";
public static final String OPENSHIFT_RATE_LIMIT_ANNOTATION_CONCURRENT_TCP = OPENSHIFT_RATE_LIMIT_ANNOTATION + ".concurrent-tcp";
public static final String OPENSHIFT_RATE_LIMIT_ANNOTATION_TCP_RATE = OPENSHIFT_RATE_LIMIT_ANNOTATION + ".rate-tcp";
public static final String OPENSHIFT_INGRESS_BALANCE = "haproxy.router.openshift.io/balance";
public static final String OPENSHIFT_INGRESS_BALANCE_LEASTCONN = "leastconn";

public static final String STRIMZI_OPERATOR_NAME = "strimzi-cluster-operator";
public static final String FLEETSHARD_OPERATOR_NAME = "kas-fleetshard-operator";
public static final String MASTER_SECRET_NAME = "master-secret";
public static final String INGRESS_TYPE = "ingressType";
public static final String SHARDED = "sharded";

/**
* Set the provided resource as owner of the resource
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@
import io.fabric8.kubernetes.api.model.ContainerBuilder;
import io.fabric8.kubernetes.api.model.GenericKubernetesResource;
import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.api.model.IntOrString;
import io.fabric8.kubernetes.api.model.LabelSelector;
import io.fabric8.kubernetes.api.model.Node;
import io.fabric8.kubernetes.api.model.NodeList;
import io.fabric8.kubernetes.api.model.ObjectMeta;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodList;
import io.fabric8.kubernetes.api.model.Quantity;
Expand All @@ -24,6 +26,10 @@
import io.fabric8.kubernetes.client.utils.CachedSingleThreadScheduler;
import io.fabric8.kubernetes.client.utils.Serialization;
import io.fabric8.openshift.api.model.Route;
import io.fabric8.openshift.api.model.RouteBuilder;
import io.fabric8.openshift.api.model.RouteSpec;
import io.fabric8.openshift.api.model.TLSConfig;
import io.fabric8.openshift.api.model.TLSConfigBuilder;
import io.fabric8.openshift.api.model.operator.v1.ConfigBuilder;
import io.fabric8.openshift.api.model.operator.v1.IngressController;
import io.fabric8.openshift.api.model.operator.v1.IngressControllerBuilder;
Expand All @@ -37,6 +43,7 @@
import io.strimzi.api.kafka.model.KafkaClusterSpec;
import io.strimzi.api.kafka.model.listener.arraylistener.GenericKafkaListener;
import io.strimzi.api.kafka.model.listener.arraylistener.GenericKafkaListenerConfiguration;
import org.apache.commons.codec.binary.Base32;
import org.bf2.common.OperandUtils;
import org.bf2.common.ResourceInformer;
import org.bf2.common.ResourceInformerFactory;
Expand All @@ -54,6 +61,9 @@
import javax.inject.Inject;

import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
Expand All @@ -65,6 +75,7 @@
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
Expand All @@ -89,6 +100,7 @@ public class IngressControllerManager {

private static final String MAX_CONNECTIONS = "maxConnections";
private static final String RELOAD_INTERVAL = "reloadInterval";
private static final String DYNAMIC_CONFIG_MANAGER = "dynamicConfigManager";
private static final String UNSUPPORTED_CONFIG_OVERRIDES = "unsupportedConfigOverrides";
private static final String TUNING_OPTIONS = "tuningOptions";
protected static final String INGRESSCONTROLLER_LABEL = "ingresscontroller.operator.openshift.io/owning-ingresscontroller";
Expand Down Expand Up @@ -168,6 +180,12 @@ public class IngressControllerManager {
@ConfigProperty(name = "ingresscontroller.reload-interval-seconds")
Integer ingressReloadIntervalSeconds;

@ConfigProperty(name = "ingresscontroller.dynamic-config-manager")
Boolean dynamicConfigManager;

@ConfigProperty(name = "ingresscontroller.blueprint-namespace")
String blueprintRouteNamespace;

@ConfigProperty(name = "ingresscontroller.peak-throughput-percentage")
int peakThroughputPercentage;
@ConfigProperty(name = "ingresscontroller.peak-connection-percentage")
Expand Down Expand Up @@ -429,6 +447,66 @@ void reconcileIngressControllers() {
}
}

public void ensureBlueprintRouteMatching(Route route, String blueprintBaseName) {
var annotations = Optional.ofNullable(route.getMetadata()).map(ObjectMeta::getAnnotations);
var tslConfig = Optional.ofNullable(route.getSpec()).map(RouteSpec::getTls);

ensureBlueprintRouteMatching(annotations, tslConfig, blueprintBaseName);
}

public void ensureBlueprintRouteMatching(Optional<Map<String, String>> annotations, Optional<TLSConfig> tlsConfig, String blueprintBaseName) {
// see findMatchingBlueprint https://github.com/openshift/router/blob/master/pkg/router/template/configmanager/haproxy/manager.go#L817
if (!Boolean.TRUE.equals(dynamicConfigManager) || blueprintRouteNamespace == null) {
return;
}

var orderedAnnotations = annotations.map(TreeMap::new);
MessageDigest stableIdDigest;
try {
stableIdDigest = MessageDigest.getInstance("SHA-1");
orderedAnnotations.ifPresent(m -> m.forEach((k, v) -> {
stableIdDigest.update(String.valueOf(k).getBytes(StandardCharsets.UTF_8));
stableIdDigest.update(String.valueOf(v).getBytes(StandardCharsets.UTF_8));
}));
} catch (NoSuchAlgorithmException e) {
throw new RuntimeException(e);
}
tlsConfig.map(TLSConfig::getTermination).map(String::getBytes).ifPresent(stableIdDigest::update);
tlsConfig.map(TLSConfig::getCertificate).map(String::getBytes).ifPresent(stableIdDigest::update);
tlsConfig.map(TLSConfig::getKey).map(String::getBytes).ifPresent(stableIdDigest::update);
tlsConfig.map(TLSConfig::getCaCertificate).map(String::getBytes).ifPresent(stableIdDigest::update);

var stable = new Base32().encodeToString(stableIdDigest.digest()).toLowerCase().replaceFirst("=$", "");
var stableResourceName = String.format("%s-%s-blueprint", blueprintBaseName, stable);

var blueprintRouteLabels = new HashMap<>(getRouteMatchLabels());
blueprintRouteLabels.put(OperandUtils.INGRESS_TYPE, OperandUtils.SHARDED);
blueprintRouteLabels.put("bf2.org/blueprint", "true");

// strip InsecureEdgeTerminationPolicy from the placeholder, the operator doesn't consider it.
var config = new TLSConfigBuilder(tlsConfig.orElse(new TLSConfig())).withInsecureEdgeTerminationPolicy(null).build();
var blueprintRoute = new RouteBuilder()
.withNewMetadata()
.withName(stableResourceName)
.withLabels(blueprintRouteLabels)
.withAnnotations(orderedAnnotations.orElse(null))
.endMetadata()
.withNewSpec()
.withHost(stableResourceName)
.withNewPort()
.withTargetPort(new IntOrString("unused"))
.endPort()
.withTls(config)
.withNewTo()
.withKind("Service")
.withName("dummy")
.endTo()
.endSpec()
.build();

openShiftClient.routes().inNamespace(blueprintRouteNamespace).createOrReplace(blueprintRoute);
}

private void createOrEdit(IngressController expected, IngressController existing) {
String name = expected.getMetadata().getName();

Expand Down Expand Up @@ -550,11 +628,12 @@ private void buildIngressController(String name, String domain,
} else {
removeSpecProperty(spec, RELOAD_INTERVAL);
}
setSpecProperty(spec, UNSUPPORTED_CONFIG_OVERRIDES, DYNAMIC_CONFIG_MANAGER, dynamicConfigManager != null ? dynamicConfigManager.toString() : Boolean.FALSE.toString());
setSpecProperty(spec, TUNING_OPTIONS, MAX_CONNECTIONS, maxIngressConnections);
setSpecProperty(spec, UNSUPPORTED_CONFIG_OVERRIDES, MAX_CONNECTIONS, maxIngressConnections);

// on fabric8 6.1 we can convert back from the generic to the actual spec, on earlier versions we cannot
// because the unsupportOptions won't be preserved
// because the unsupportedOptions won't be preserved
builder.editSpec()
.withTuningOptions(Serialization.jsonMapper()
.convertValue(spec.get(TUNING_OPTIONS), IngressControllerTuningOptions.class))
Expand Down Expand Up @@ -704,4 +783,9 @@ List<String> getIngressContainerCommand() {
Optional<Quantity> getRequestMemory() {
return requestMemory;
}

/* testing */ String getBlueprintRouteNamespace() {
return blueprintRouteNamespace;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ protected List<GenericKafkaListener> buildListeners(ManagedKafka managedKafka, i
GenericKafkaListenerConfigurationBuilder listenerConfigBuilder = new GenericKafkaListenerConfigurationBuilder()
.withBootstrap(new GenericKafkaListenerConfigurationBootstrapBuilder()
.withHost(managedKafka.getSpec().getEndpoint().getBootstrapServerHost())
.withAnnotations(Map.of("haproxy.router.openshift.io/balance", "leastconn"))
.withAnnotations(buildExternalListenerAnnotations(managedKafka))
.build()
)
.withBrokers(buildBrokerOverrides(managedKafka, replicas))
Expand Down Expand Up @@ -417,6 +417,10 @@ protected List<GenericKafkaListener> buildListeners(ManagedKafka managedKafka, i
);
}

protected Map<String, String> buildExternalListenerAnnotations(ManagedKafka managedKafka) {
return Map.of();
}

protected List<GenericKafkaListenerConfigurationBroker> buildBrokerOverrides(ManagedKafka managedKafka, int replicas) {
List<GenericKafkaListenerConfigurationBroker> brokerOverrides = new ArrayList<>(replicas);
for (int i = 0; i < replicas; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,9 @@ public void createOrUpdate(ManagedKafka managedKafka) {
if (openShiftClient != null) {
Route currentRoute = cachedRoute(managedKafka);
Route route = routeFrom(managedKafka, currentRoute);
if (ingressControllerManagerInstance.isResolvable()) {
ingressControllerManagerInstance.get().ensureBlueprintRouteMatching(route, "kafka-admin");
}

OperandUtils.createOrUpdate(openShiftClient.routes(), route);
}
Expand Down Expand Up @@ -385,7 +388,7 @@ private Map<String, String> buildAnnotations(ManagedKafka managedKafka) {

private Map<String, String> buildRouteLabels() {
Map<String, String> labels = OperandUtils.getDefaultLabels();
labels.put("ingressType", "sharded");
labels.put(OperandUtils.INGRESS_TYPE, OperandUtils.SHARDED);

if (ingressControllerManagerInstance.isResolvable()) {
labels.putAll(ingressControllerManagerInstance.get().getRouteMatchLabels());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.fabric8.kubernetes.api.model.TopologySpreadConstraintBuilder;
import io.fabric8.kubernetes.api.model.apps.Deployment;
import io.fabric8.kubernetes.client.dsl.Resource;
import io.fabric8.openshift.api.model.TLSConfigBuilder;
import io.javaoperatorsdk.operator.api.reconciler.Context;
import io.quarkus.arc.DefaultBean;
import io.strimzi.api.kafka.model.CruiseControlSpec;
Expand Down Expand Up @@ -207,6 +208,12 @@ public void createOrUpdate(ManagedKafka managedKafka) {

createOrUpdateIfNecessary(currentCruiseControlLoggingConfigMap, cruiseControlLoggingConfigMap);

if (ingressControllerManagerInstance.isResolvable()) {
ingressControllerManagerInstance.get().ensureBlueprintRouteMatching(Optional.ofNullable(buildExternalListenerAnnotations(managedKafka)),
Optional.of(new TLSConfigBuilder().withTermination("passthrough").build()),
"kafka-bootstrap");
}

super.createOrUpdate(managedKafka);
}

Expand Down Expand Up @@ -1134,7 +1141,7 @@ private Map<String, String> buildKafkaLabels(ManagedKafka managedKafka) {
Map<String, String> labels = OperandUtils.getDefaultLabels();
//this.strimziManager.changeStrimziVersion(managedKafka, this, labels);
Optional.ofNullable(managedKafka.getMetadata().getLabels()).ifPresent(labels::putAll);
labels.put("ingressType", "sharded");
labels.put(OperandUtils.INGRESS_TYPE, OperandUtils.SHARDED);
labels.put(this.strimziManager.getVersionLabel(), this.strimziManager.currentStrimziVersion(managedKafka));

if (ingressControllerManagerInstance.isResolvable()) {
Expand Down Expand Up @@ -1258,4 +1265,8 @@ public OperandReadiness getReadiness(ManagedKafka managedKafka) {
return super.getReadiness(managedKafka);
}

protected Map<String, String> buildExternalListenerAnnotations(ManagedKafka managedKafka) {
return Map.of(OperandUtils.OPENSHIFT_INGRESS_BALANCE, OperandUtils.OPENSHIFT_INGRESS_BALANCE_LEASTCONN);
}

}
6 changes: 5 additions & 1 deletion operator/src/main/kubernetes/kubernetes.yml
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,11 @@ spec:
- name: QUARKUS_PROFILE
value: prod
- name: INGRESSCONTROLLER_AZ_REPLICA_COUNT
value: "1"
value: "1"
- name: INGRESSCONTROLLER_BLUEPRINT_NAMESPACE
valueFrom:
fieldRef:
fieldPath: metadata.namespace
ports:
- containerPort: 8080
name: http
Expand Down
25 changes: 20 additions & 5 deletions operator/src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,29 @@ ingresscontroller.max-ingress-connections=108000
# percentage of peak connections you actually need to meet
ingresscontroller.peak-connection-percentage=100

# Enables haproxy option contstats in the kas ingress so that stats are reported live rather than at connection close (RFE-3007 provide a proper mechanism to enable this option).
ingresscontroller.ingress-container-command=/usr/bin/bash,-c,awk '{print $0} /^defaults$/ {print \" option contstats\"}' < $TEMPLATE_FILE > /tmp/haproxy-config.template; exec /usr/bin/openshift-router --v=2 --template /tmp/haproxy-config.template
# Disconnect established connections after a haproxy reconfiguration (NE-879 should eliminate the need for this)
ingresscontroller.router-verbosity=2
# note that this variable is overridden using the downward API in the deployment.
ingresscontroller.blueprint-namespace=kas-fleetshard-operator
ingresscontroller.blueprint-selector=bf2.org/blueprint=true

# customizes the router operators command line to enable specific options that can't currently be controller from the CR.
ingresscontroller.ingress-container-command=/usr/bin/bash,-c,awk '{print $0} /^defaults$/ {print \" option contstats\"}' < $TEMPLATE_FILE > /tmp/haproxy-config.template; exec /usr/bin/openshift-router --v=${ingresscontroller.router-verbosity} --template /tmp/haproxy-config.template --commit-interval=${ingresscontroller.commit-interval} --blueprint-route-pool-size=${ingresscontroller.blueprint-route-pool-size} --max-dynamic-servers=${ingresscontroller.max-dynamic-servers} --blueprint-route-namespace=${ingresscontroller.blueprint-namespace} --blueprint-route-labels=${ingresscontroller.blueprint-selector}

# HA proxy dynamic config manager
ingresscontroller.dynamic-config-manager=true
# the following three variables are meaningful when the dynamic-config-manager=true
ingresscontroller.commit-interval=9223372036854775807ns
ingresscontroller.blueprint-route-pool-size=500
# The worst case is the bootstrap route, For 2SU, this needs to be 6. When we start supporting 3SU, we might want
# to switch to using "router.openshift.io/pool-size" on the bootstrap blueprint.
ingresscontroller.max-dynamic-servers=6

# Disconnect established connections after a haproxy reconfiguration event that *requires a restart*.
ingresscontroller.hard-stop-after=5s
# Coalesce up-to reloadInterval worth of haproxy updates. This prevents a flurry of haproxy restarts (1 per OpenShift Route) as each kafka
# instance is created (NE-879 should eliminate the need for this).
# Coalesce up-to reload-interval-seconds worth of haproxy reconfiguration events before restarting.
ingresscontroller.reload-interval-seconds=60


# external configuration injection through configmap
quarkus.kubernetes-config.enabled=true
quarkus.kubernetes-config.fail-on-missing-config=false
Expand Down
Loading

0 comments on commit 769c687

Please sign in to comment.