From e435d4259388ba337b384bb102958247bb2107fa Mon Sep 17 00:00:00 2001 From: Pavel Pscheidl Date: Tue, 10 Mar 2020 18:52:38 +0100 Subject: [PATCH] PUBDEV-6852 - Kubernetes support (#4370) * PUBDEV-6852 - Kubernetes support * Lookup constraints * Creating host-extracting pattern only once per lookup. * Improved cluster startup logging - environment variable values crucial for H2O cloud forming are logged. * Introduced the concept of EmbeddedConfigProvider as proposed by Michal Kurka * H2O-K8S module uses H2O logging. * EmbeddedConfigProvider uses full Class name with package included in getName method * Reverted accidental changes in H2O.java * Renamed H2OKubernetesEmbeddedConfigProvider to KubernetesEmbeddedConfigProvider * Removed SLF4J dependencies. Removed JAR task from h2o-k8s. * Test for KubernetesEmbeddedConfigProvider. * Documentation and class names changes. * K8S Readme.md * Fixed documentation typos * Documentation enhancements * Environment variable keys renaming. * H2O.getEmbeddedH2OConfig now uses map/orElse chain. * H2O K8S inside main assembly. Removed from H2OApp. * Container resource requests 4 Gigabytes of memory. Image name placeholder. * Warn on DNS Lookup naming exception. * Remove duplicate inclusion of h2o-k8s module in settings.gradle * KubernetesEmbeddedConfigProvider is active only on 'H2O_KUBERNETES_SERVICE_DNS' env var presence. * Fix 'Routess' typo * Do not rely on H2O.exit() to do System.exit in KubernetesEmbeddedConfig. --- build.gradle | 6 +- h2o-assemblies/main/build.gradle | 1 + h2o-core/src/main/java/water/H2O.java | 59 +++++- .../water/init/EmbeddedConfigProvider.java | 29 +++ h2o-k8s/README.md | 106 +++++++++++ h2o-k8s/build.gradle | 13 ++ .../water/k8s/KubernetesEmbeddedConfig.java | 58 ++++++ .../k8s/KubernetesEmbeddedConfigProvider.java | 89 +++++++++ .../k8s/lookup/ClusterSizeConstraint.java | 20 ++ .../water/k8s/lookup/KubernetesDnsLookup.java | 171 ++++++++++++++++++ .../water/k8s/lookup/KubernetesLookup.java | 19 ++ .../water/k8s/lookup/LookupConstraint.java | 17 ++ .../k8s/lookup/LookupConstraintsBuilder.java | 69 +++++++ .../water/k8s/lookup/TimeoutConstraint.java | 24 +++ .../water.init.EmbeddedConfigProvider | 1 + .../KubernetesEmbeddedConfigProviderTest.java | 39 ++++ .../lookup/LookupConstraintsBuilderTest.java | 75 ++++++++ settings.gradle | 1 + 18 files changed, 792 insertions(+), 5 deletions(-) create mode 100644 h2o-core/src/main/java/water/init/EmbeddedConfigProvider.java create mode 100644 h2o-k8s/README.md create mode 100644 h2o-k8s/build.gradle create mode 100644 h2o-k8s/src/main/java/water/k8s/KubernetesEmbeddedConfig.java create mode 100644 h2o-k8s/src/main/java/water/k8s/KubernetesEmbeddedConfigProvider.java create mode 100644 h2o-k8s/src/main/java/water/k8s/lookup/ClusterSizeConstraint.java create mode 100644 h2o-k8s/src/main/java/water/k8s/lookup/KubernetesDnsLookup.java create mode 100644 h2o-k8s/src/main/java/water/k8s/lookup/KubernetesLookup.java create mode 100644 h2o-k8s/src/main/java/water/k8s/lookup/LookupConstraint.java create mode 100644 h2o-k8s/src/main/java/water/k8s/lookup/LookupConstraintsBuilder.java create mode 100644 h2o-k8s/src/main/java/water/k8s/lookup/TimeoutConstraint.java create mode 100644 h2o-k8s/src/main/resources/META-INF/services/water.init.EmbeddedConfigProvider create mode 100644 h2o-k8s/src/test/java/water/k8s/KubernetesEmbeddedConfigProviderTest.java create mode 100644 h2o-k8s/src/test/java/water/k8s/lookup/LookupConstraintsBuilderTest.java diff --git a/build.gradle b/build.gradle index 99f5753baaee..dc83caa0c09c 100644 --- a/build.gradle +++ b/build.gradle @@ -94,7 +94,8 @@ ext { project(':h2o-hive'), project(':h2o-security'), project(':h2o-logger'), - project(':h2o-genmodel-extensions:jgrapht') + project(':h2o-genmodel-extensions:jgrapht'), + project(':h2o-k8s') ] javaProjects = [ @@ -128,7 +129,8 @@ ext { project(':h2o-hive'), project(':h2o-security'), project(':h2o-logger'), - project(':h2o-genmodel-extensions:jgrapht') + project(':h2o-genmodel-extensions:jgrapht'), + project(':h2o-k8s') ] scalaProjects = [ diff --git a/h2o-assemblies/main/build.gradle b/h2o-assemblies/main/build.gradle index 914f78b3e8c0..4da21fa800da 100644 --- a/h2o-assemblies/main/build.gradle +++ b/h2o-assemblies/main/build.gradle @@ -28,6 +28,7 @@ dependencies { compile project(":h2o-orc-parser") } compile project(":h2o-parquet-parser") + compile project(":h2o-k8s") compile "org.slf4j:slf4j-log4j12:1.7.10" } diff --git a/h2o-core/src/main/java/water/H2O.java b/h2o-core/src/main/java/water/H2O.java index a3e51119e231..76bc588c5c78 100644 --- a/h2o-core/src/main/java/water/H2O.java +++ b/h2o-core/src/main/java/water/H2O.java @@ -43,6 +43,7 @@ */ final public class H2O { public static final String DEFAULT_JKS_PASS = "h2oh2o"; + public static final int H2O_DEFAULT_PORT = 54321; //------------------------------------------------------------------------------------------------------------------- // Command-line argument parsing and help @@ -268,7 +269,7 @@ public static void printHelp() { public int port; /** -baseport=####; Port to start upward searching from. */ - public int baseport = 54321; + public int baseport = H2O_DEFAULT_PORT; /** -port_offset=####; Offset between the API(=web) port and the internal communication port; api_port + port_offset = h2o_port */ public int port_offset = 1; @@ -793,8 +794,60 @@ private static void validateArguments() { /** * Register embedded H2O configuration object with H2O instance. */ - public static void setEmbeddedH2OConfig(AbstractEmbeddedH2OConfig c) { embeddedH2OConfig = c; } - public static AbstractEmbeddedH2OConfig getEmbeddedH2OConfig() { return embeddedH2OConfig; } + public static void setEmbeddedH2OConfig(AbstractEmbeddedH2OConfig c) { + embeddedH2OConfig = c; + } + + /** + * Returns an instance of {@link AbstractEmbeddedH2OConfig}. The origin of the embedded config might be either + * from directly setting the embeddedH2OConfig field via setEmbeddedH2OConfig setter, or dynamically provided via + * service loader. Directly set {@link AbstractEmbeddedH2OConfig} is always prioritized. ServiceLoader lookup is only + * performed if no config is previously set. + *

+ * Result of first ServiceLoader lookup is also considered final - once a service is found, dynamic lookup is not + * performed any further. + * + * @return An instance of {@link AbstractEmbeddedH2OConfig}, if set or dynamically provided. Otherwise null + * @author Michal Kurka + */ + public static AbstractEmbeddedH2OConfig getEmbeddedH2OConfig() { + if (embeddedH2OConfig != null) { + return embeddedH2OConfig; + } + + embeddedH2OConfig = discoverEmbeddedConfigProvider() + .map(embeddedConfigProvider -> { + Log.info(String.format("Dynamically loaded '%s' as AbstractEmbeddedH2OConfigProvider.", embeddedConfigProvider.getName())); + return embeddedConfigProvider.getConfig(); + }).orElse(null); + + return embeddedH2OConfig; + } + + /** + * Uses {@link ServiceLoader} to discover active instances of {@link EmbeddedConfigProvider}. Only one provider + * may be active at a time. If more providers are detected, {@link IllegalStateException} is thrown. + * + * @return An {@link Optional} of {@link EmbeddedConfigProvider}, if a single active provider is found. Otherwise + * an empty optional. + * @throws IllegalStateException When there are multiple active instances {@link EmbeddedConfigProvider} discovered. + */ + private static Optional discoverEmbeddedConfigProvider() throws IllegalStateException { + final ServiceLoader configProviders = ServiceLoader.load(EmbeddedConfigProvider.class); + EmbeddedConfigProvider provider = null; + for (final EmbeddedConfigProvider candidateProvider : configProviders) { + candidateProvider.init(); + if (!candidateProvider.isActive()) + continue; + if (provider != null) { + throw new IllegalStateException("Multiple active EmbeddedH2OConfig providers: " + provider.getName() + + " and " + candidateProvider.getName() + " (possibly other as well)."); + } + provider = candidateProvider; + } + + return Optional.ofNullable(provider); + } /** * Tell the embedding software that this H2O instance belongs to diff --git a/h2o-core/src/main/java/water/init/EmbeddedConfigProvider.java b/h2o-core/src/main/java/water/init/EmbeddedConfigProvider.java new file mode 100644 index 000000000000..807c6eb67709 --- /dev/null +++ b/h2o-core/src/main/java/water/init/EmbeddedConfigProvider.java @@ -0,0 +1,29 @@ +package water.init; + +public interface EmbeddedConfigProvider { + + default String getName() { + return getClass().getName(); + } + + /** + * Provider initialization. Guaranteed to be called before any other method is called, including the`isActive` + * method. + */ + void init(); + + /** + * Whether the provider is active and should be used by H2O. + * + * @return True if H2O should use this {@link EmbeddedConfigProvider}, otherwise false. + */ + default boolean isActive() { + return false; + } + + /** + * @return An instance of {@link AbstractEmbeddedH2OConfig} configuration. Never null. + */ + AbstractEmbeddedH2OConfig getConfig(); + +} diff --git a/h2o-k8s/README.md b/h2o-k8s/README.md new file mode 100644 index 000000000000..952804a4e4b7 --- /dev/null +++ b/h2o-k8s/README.md @@ -0,0 +1,106 @@ +# H2O Kubernetes integration + +The integration of Kubernetes and H2O is possible via `water.k8s.KubernetesEmbeddedConfigProvider` - to be found +in this module. This implementation of `EmbeddedConfigProvider` is dynamically loaded on H2O start and remains inactive +unless H2O is running in a Docker container managed by Kubernetes is detected. + +## Running H2O in K8s - user's guide + +H2O Pods deployed on Kubernetes cluster require a +[headless service](https://kubernetes.io/docs/concepts/services-networking/service/#headless-services) +for H2O Node discovery. The headless service, instead of load-balancing incoming requests to the underlying +H2O pods, returns a set of adresses of all the underlying pods. It is therefore the responsibility of the K8S +cluster administrator to set-up the service correctly to cover H2O nodes only. + +### Creating the headless service +First, a headless service must be created on Kubernetes. + +```yaml +apiVersion: v1 +kind: Service +metadata: + name: h2o-service +spec: + type: ClusterIP + clusterIP: None + selector: + app: h2o-k8s + ports: + - protocol: TCP + port: 54321 +``` + +The `clusterIP: None` defines the service as headless. The `port: 54321` is the default H2O port. Users and client libraries +use this port to talk to the H2O cluster. + +The `app: h2o-k8s` setting is of **great importance**, as this is the name of the application with H2O pods inside. +Please make sure this setting corresponds to the name of H2O deployment name chosen. + +### Creating the H2O deployment + +It is **strongly recommended** to run H2O as a [Stateful set](https://kubernetes.io/docs/concepts/workloads/controllers/statefulset/) +on a Kubernetes cluster. Kubernetes assumes all the pods inside the cluster are stateful and does not attempt to restart +the individual pods on failure. Once a job is triggered on an H2O cluster, the cluster is locked and no additional nodes +can be added. Therefore, the cluster has to be restarted as a whole if required - which is a perfect fit for a StatefulSet. + + +```yaml +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: h2o-stateful-set + namespace: h2o-statefulset +spec: + serviceName: h2o-service + replicas: 3 + selector: + matchLabels: + app: h2o-k8s + template: + metadata: + labels: + app: h2o-k8s + spec: + terminationGracePeriodSeconds: 10 + containers: + - name: h2o-k8s + image: '' + resources: + requests: + memory: "4Gi" + ports: + - containerPort: 54321 + protocol: TCP + env: + - name: H2O_KUBERNETES_SERVICE_DNS + value: h2o-service.h2o-statefulset.svc.cluster.local + - name: H2O_NODE_LOOKUP_TIMEOUT + value: '180' + - name: H2O_NODE_EXPECTED_COUNT + value: '3' +``` +Besides standardized Kubernetes settings, like `replicas: 3` defining the number of pods with H2O instantiated, there are +several settings to pay attention to. + +The name of the application `app: h2o-k8s` must correspond to the name expected by the above-defined headless service in order +for the H2O node discovery to work. H2O communicates on port 54321, therefore `containerPort: 54321`must be exposed to +make it possible for the clients to connect. + +Environment variables: + +1. `H2O_KUBERNETES_SERVICE_DNS` - **[MANDATORY]** Crucial for the clustering to work. The format usually follows the + `..svc.cluster.local` pattern. This setting enables H2O node discovery via DNS. + It must be modified to match the name of the headless service created. Also, pay attention to the rest of the address + to match the specifics of your Kubernetes implementation. +1. `H2O_NODE_LOOKUP_TIMEOUT` - **[OPTIONAL]** Node lookup constraint. Time before the node lookup is ended. +1. `H2O_NODE_EXPECTED_COUNT` - **[OPTIONAL]** Node lookup constraint. Expected number of H2O pods to be discovered. + +If none of the optional lookup constraints is specified, a sensible default node lookup timeout will be set - currently +defaults to 3 minutes. If any of the lookup constraints are defined, the H2O node lookup is terminated on whichever +condition is met first. + +### Exposing H2O cluster + +Exposing the H2O cluster is a responsibility of the Kubernetes administrator. By default, an + [Ingress](https://kubernetes.io/docs/concepts/services-networking/ingress/) can be created. Different platforms offer + different capabilities, e.g. OpenShift offers [Routes](https://docs.openshift.com/container-platform/4.3/networking/routes/route-configuration.html). diff --git a/h2o-k8s/build.gradle b/h2o-k8s/build.gradle new file mode 100644 index 000000000000..1bc3107fc860 --- /dev/null +++ b/h2o-k8s/build.gradle @@ -0,0 +1,13 @@ +apply plugin: 'java' + +sourceCompatibility = 1.8 + +repositories { + mavenCentral() +} + +dependencies { + compile project(":h2o-core") + testCompile group: 'junit', name: 'junit', version: '4.12' + testCompile 'com.github.stefanbirkner:system-rules:1.19.0' +} diff --git a/h2o-k8s/src/main/java/water/k8s/KubernetesEmbeddedConfig.java b/h2o-k8s/src/main/java/water/k8s/KubernetesEmbeddedConfig.java new file mode 100644 index 000000000000..f4f0472272e9 --- /dev/null +++ b/h2o-k8s/src/main/java/water/k8s/KubernetesEmbeddedConfig.java @@ -0,0 +1,58 @@ +package water.k8s; + +import water.H2O; +import water.init.AbstractEmbeddedH2OConfig; +import water.util.Log; + +import java.net.InetAddress; +import java.util.Collection; + +public class KubernetesEmbeddedConfig extends AbstractEmbeddedH2OConfig { + + private final String flatfile; + + public KubernetesEmbeddedConfig(final Collection nodeIPs) { + this.flatfile = writeFlatFile(nodeIPs); + } + + private String writeFlatFile(final Collection nodeIPs) { + final StringBuilder flatFileBuilder = new StringBuilder(); + + nodeIPs.forEach(nodeIP -> { + flatFileBuilder.append(nodeIP); + flatFileBuilder.append(":"); + flatFileBuilder.append(H2O.H2O_DEFAULT_PORT); // All pods are expected to utilize the default H2O port + flatFileBuilder.append("\n"); + }); + + return flatFileBuilder.toString(); + } + + @Override + public void notifyAboutEmbeddedWebServerIpPort(InetAddress ip, int port) { + } + + @Override + public void notifyAboutCloudSize(InetAddress ip, int port, InetAddress leaderIp, int leaderPort, int size) { + Log.info(String.format("Created cluster of size %d, leader node IP is '%s'", size, leaderIp.toString())); + } + + @Override + public boolean providesFlatfile() { + return true; + } + + @Override + public String fetchFlatfile() { + return flatfile; + } + + @Override + public void exit(int status) { + System.exit(status); + } + + @Override + public void print() { + } +} diff --git a/h2o-k8s/src/main/java/water/k8s/KubernetesEmbeddedConfigProvider.java b/h2o-k8s/src/main/java/water/k8s/KubernetesEmbeddedConfigProvider.java new file mode 100644 index 000000000000..c9e738808e7b --- /dev/null +++ b/h2o-k8s/src/main/java/water/k8s/KubernetesEmbeddedConfigProvider.java @@ -0,0 +1,89 @@ +package water.k8s; + +import water.init.AbstractEmbeddedH2OConfig; +import water.init.EmbeddedConfigProvider; +import water.k8s.lookup.KubernetesDnsLookup; +import water.k8s.lookup.KubernetesLookup; +import water.k8s.lookup.LookupConstraintsBuilder; +import water.util.Log; + +import java.util.Collection; +import java.util.Optional; +import java.util.Set; +import java.util.regex.Pattern; + +/** + * A configuration provider for H2O running in Kubernetes cluster. It is able to detected H2O is being ran in K8S + * environment, otherwise remains inactive. + *

+ * Uses potentially multiple strategies to discover H2O Pods on a Kubernetes cluster. + */ +public class KubernetesEmbeddedConfigProvider implements EmbeddedConfigProvider { + + private static final String K8S_NODE_LOOKUP_TIMEOUT_KEY = "H2O_NODE_LOOKUP_TIMEOUT"; + private static final String K8S_DESIRED_CLUSTER_SIZE_KEY = "H2O_NODE_EXPECTED_COUNT"; + + private boolean runningOnKubernetes = false; + private KubernetesEmbeddedConfig kubernetesEmbeddedConfig; + + /** + * + * @return A Set of node addresses. The adresses are internal adresses/IPs to the Kubernetes cluster. + */ + private static final Optional> resolveInternalNodeIPs() { + final LookupConstraintsBuilder lookupConstraintsBuilder = new LookupConstraintsBuilder(); + + try { + final Integer timeoutSeconds = Integer.parseInt(System.getenv(K8S_NODE_LOOKUP_TIMEOUT_KEY)); + lookupConstraintsBuilder.withTimeoutSeconds(timeoutSeconds); + } catch (NumberFormatException e) { + Log.info(String.format("'%s' environment variable not set.", K8S_NODE_LOOKUP_TIMEOUT_KEY)); + } + + try { + final Integer desiredClusterSize = Integer.parseInt(System.getenv(K8S_DESIRED_CLUSTER_SIZE_KEY)); + lookupConstraintsBuilder.withDesiredClusterSize(desiredClusterSize); + } catch (NumberFormatException e) { + Log.info(String.format("'%s' environment variable not set.", K8S_DESIRED_CLUSTER_SIZE_KEY)); + } + + final KubernetesLookup kubernetesDnsDiscovery = KubernetesDnsLookup.fromH2ODefaults(); + return kubernetesDnsDiscovery.lookupNodes(lookupConstraintsBuilder.build()); + } + + @Override + public void init() { + runningOnKubernetes = isRunningOnKubernetes(); + + if (!runningOnKubernetes) { + return; // Do not initialize any configuration if H2O is not running in K8S-spawned container. + } + + Log.info("Initializing H2O Kubernetes cluster"); + final Collection nodeIPs = resolveInternalNodeIPs() + .orElseThrow(() -> new IllegalStateException("Unable to resolve Node IPs from DNS service.")); + + Log.info(String.format("Using the following pods to form H2O cluster: [%s]", + String.join(",", nodeIPs))); + + kubernetesEmbeddedConfig = new KubernetesEmbeddedConfig(nodeIPs); + } + + @Override + public boolean isActive() { + return runningOnKubernetes; + } + + @Override + public AbstractEmbeddedH2OConfig getConfig() { + return kubernetesEmbeddedConfig; + } + + /** + * @return True if there are environment variables indicating H2O is running inside a container managed by + * Kubernetes. Otherwise false. + */ + private boolean isRunningOnKubernetes() { + return KubernetesDnsLookup.isLookupPossible(); + } +} diff --git a/h2o-k8s/src/main/java/water/k8s/lookup/ClusterSizeConstraint.java b/h2o-k8s/src/main/java/water/k8s/lookup/ClusterSizeConstraint.java new file mode 100644 index 000000000000..22eeb4d95e40 --- /dev/null +++ b/h2o-k8s/src/main/java/water/k8s/lookup/ClusterSizeConstraint.java @@ -0,0 +1,20 @@ +package water.k8s.lookup; + +import java.util.Set; + +/** + * Constraint triggered when a pre-defined amount of pods is discovered. + */ +public class ClusterSizeConstraint implements LookupConstraint { + + private final int desiredClusterSize; + + public ClusterSizeConstraint(final int desiredClusterSize) { + this.desiredClusterSize = desiredClusterSize; + } + + @Override + public boolean isLookupEnded(final Set discoveredNodes) { + return discoveredNodes.size() == desiredClusterSize; + } +} diff --git a/h2o-k8s/src/main/java/water/k8s/lookup/KubernetesDnsLookup.java b/h2o-k8s/src/main/java/water/k8s/lookup/KubernetesDnsLookup.java new file mode 100644 index 000000000000..d74f6b2997d1 --- /dev/null +++ b/h2o-k8s/src/main/java/water/k8s/lookup/KubernetesDnsLookup.java @@ -0,0 +1,171 @@ +package water.k8s.lookup; + + +import water.util.Log; + +import javax.naming.Context; +import javax.naming.NamingEnumeration; +import javax.naming.NamingException; +import javax.naming.directory.Attribute; +import javax.naming.directory.Attributes; +import javax.naming.directory.DirContext; +import javax.naming.directory.InitialDirContext; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.*; +import java.util.regex.Pattern; + +/** + * Discovery strategy on a DNS cluster leveraging the DNS record of a Kubernetes headless service present in the cluster. + * Kubernetes headless services, instead of load-balancing the requests onto one of the underlying pods, return the + * addresses of all the pods covered by the headless service. + *

+ * Such pods can then be discovered via the above-mentioned DNS record. In order for H2O to know which service to query, + * it is mandatory for the K8S user to pass the name of the headless service to the H2O container, as follows: + * + *

+ * apiVersion: apps/v1
+ * kind: StatefulSet
+ * metadata:
+ *   name: h2o-stateful-set
+ *   namespace: h2o-statefulset
+ * spec:
+ *   serviceName: h2o-service
+ *   replicas: 3
+ *   selector:
+ *     matchLabels:
+ *       app: h2o-k8s
+ *   template:
+ *     metadata:
+ *       labels:
+ *         app: h2o-k8s
+ *     spec:
+ *       terminationGracePeriodSeconds: 10
+ *       containers:
+ *         - name: h2o-k8s
+ *           image: ''
+ *           resources:
+ *             requests:
+ *               memory: "4Gi"
+ *           ports:
+ *             - containerPort: 54321
+ *               protocol: TCP
+ *           env:
+ *           - name: H2O_KUBERNETES_SERVICE_DNS
+ *             value: h2o-service.h2o-statefulset.svc.cluster.local
+ *           - name: H2O_NODE_LOOKUP_TIMEOUT
+ *             value: '180'
+ *           - name: H2O_NODE_EXPECTED_COUNT
+ *             value: '3'
+ * 
+ */ +public class KubernetesDnsLookup implements KubernetesLookup { + + private static final String K8S_SERVICE_DNS_ENV_VAR_KEY = "H2O_KUBERNETES_SERVICE_DNS"; + private static final String DNS_TIMEOUT_DEFAULT = "30000"; // 30 seconds + private static final int ONE_SECOND = 1000; + private final String serviceDns; + private final DirContext dirContext; + + public KubernetesDnsLookup(final String serviceDns) { + this.serviceDns = serviceDns; + this.dirContext = initDirContext(); + } + + /** + * @return + * @throws IllegalStateException When the H2O-related kubernetes DNS service is not found + */ + public static KubernetesDnsLookup fromH2ODefaults() throws IllegalStateException { + final String dnsServiceName = System.getenv(K8S_SERVICE_DNS_ENV_VAR_KEY); + if (dnsServiceName == null) { + throw new IllegalStateException(String.format("DNS of H2O service not set. Please set the '%s' variable.", + K8S_SERVICE_DNS_ENV_VAR_KEY)); + } else if (dnsServiceName.trim().isEmpty()) { + throw new IllegalStateException(String.format("DNS Service '%s' name is invalid.", dnsServiceName)); + } + return new KubernetesDnsLookup(dnsServiceName); + } + + private static String extractHost(final String server, final Pattern extractHostPattern) { + String host = server.split(" ")[3]; + return extractHostPattern.matcher(host).replaceAll(""); + } + + /** + * Looks up H2O pods via configured K8S Stateless service DNS. Environment variable with key defined in + * H2O_K8S_SERVICE_DNS_KEY constant is used to obtain address of the DNS. The DNS is then queried for pods + * in the underlying service. It is the responsibility of the K8S cluster owner to set-up the service correctly to + * only provide correct adresses of pods with H2O active. If pods with no H2O running are supplied, the resulting + * flatfile may contain pod IPs with no H2O running as well. + * + * @param lookupConstraints Constraints to obey during lookup + * @return A {@link Set} of adresses of looked up nodes represented as String. The resulting set is never empty. + */ + public Optional> lookupNodes(final Collection lookupConstraints) { + final Set lookedUpNodes = new HashSet<>(); + + while (lookupConstraints.stream().allMatch(lookupStrategy -> !lookupStrategy.isLookupEnded(lookedUpNodes))) { + try { + dnsLookup(lookedUpNodes); + Thread.sleep(ONE_SECOND); + } catch (NamingException e) { + Log.warn(e.getMessage()); + continue; + } catch (InterruptedException e) { + Log.err(e); + return Optional.empty(); + } + } + + return Optional.of(lookedUpNodes); + } + + public static boolean isLookupPossible() { + return System.getenv() + .containsKey(KubernetesDnsLookup.K8S_SERVICE_DNS_ENV_VAR_KEY); + } + + /** + * Performs a single DNS lookup. Discovered nodes (their IPs respectively) are addded to the existing + * set of nodeIPs. + * + * @param nodeIPs A {@link Set} of nodes already discovered during previous lookups. + * @throws NamingException If the DNS under given name is unreachable / does not exist. + */ + private void dnsLookup(final Set nodeIPs) throws NamingException { + final Attributes attributes = dirContext.getAttributes(serviceDns, new String[]{"SRV"}); + final Attribute srvAttribute = attributes.get("srv"); + final Pattern extractHostPattern = Pattern.compile("\\\\.$"); + if (srvAttribute != null) { + final NamingEnumeration servers = srvAttribute.getAll(); + while (servers.hasMore()) { + final String server = (String) servers.next(); + final String serverHost = extractHost(server, extractHostPattern); + final InetAddress nodeIP; + try { + nodeIP = InetAddress.getByName(serverHost); + } catch (UnknownHostException e) { + Log.err("Unknown host for IP Address: " + serverHost); + continue; + } + if (nodeIPs.add(nodeIP.getHostAddress())) { + Log.info(String.format("New H2O pod with DNS record '%s' discovered.", nodeIP)); + } + } + servers.close(); + } + } + + private DirContext initDirContext() { + final Hashtable environment = new Hashtable<>(); + environment.put(Context.INITIAL_CONTEXT_FACTORY, "com.sun.jndi.dns.DnsContextFactory"); + environment.put(Context.PROVIDER_URL, "dns:"); + environment.put("com.sun.jndi.dns.timeout.initial", DNS_TIMEOUT_DEFAULT); + try { + return new InitialDirContext(environment); + } catch (NamingException e) { + throw new IllegalStateException("Error while initializing DirContext", e); + } + } +} diff --git a/h2o-k8s/src/main/java/water/k8s/lookup/KubernetesLookup.java b/h2o-k8s/src/main/java/water/k8s/lookup/KubernetesLookup.java new file mode 100644 index 000000000000..b41a9e325081 --- /dev/null +++ b/h2o-k8s/src/main/java/water/k8s/lookup/KubernetesLookup.java @@ -0,0 +1,19 @@ +package water.k8s.lookup; + +import water.k8s.lookup.LookupConstraint; + +import java.util.Collection; +import java.util.Optional; +import java.util.Set; + +public interface KubernetesLookup { + + /** + * Looks up H2O pods in K8S cluster. + * + * @param lookupConstraints Constraints to obey during lookup + * @return A {@link Set} of adresses of looked up nodes represented as String. If there are difficulties + * during node lookup, Optional.empty() is returned. + */ + Optional> lookupNodes(final Collection lookupConstraints); +} diff --git a/h2o-k8s/src/main/java/water/k8s/lookup/LookupConstraint.java b/h2o-k8s/src/main/java/water/k8s/lookup/LookupConstraint.java new file mode 100644 index 000000000000..c2e96f2dc601 --- /dev/null +++ b/h2o-k8s/src/main/java/water/k8s/lookup/LookupConstraint.java @@ -0,0 +1,17 @@ +package water.k8s.lookup; + +import java.util.Set; + +/** + * A constraint during Pod lookup in Kubernetes cluster. Each implementation represents a single rule to constraint + * the lookup with. + */ +public interface LookupConstraint { + + /** + * @param lookedUpNodes A set of unique string representations of the nodes discovered + * @return True if after the recent node discovery, the lookup should be ended. + */ + boolean isLookupEnded(final Set lookedUpNodes); + +} diff --git a/h2o-k8s/src/main/java/water/k8s/lookup/LookupConstraintsBuilder.java b/h2o-k8s/src/main/java/water/k8s/lookup/LookupConstraintsBuilder.java new file mode 100644 index 000000000000..ad0c2a9e84f4 --- /dev/null +++ b/h2o-k8s/src/main/java/water/k8s/lookup/LookupConstraintsBuilder.java @@ -0,0 +1,69 @@ +package water.k8s.lookup; + +import water.util.Log; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +/** + * Builder for lookup constraints. For different input/configuration, this builder outputs the exact set of instances + * of {@link LookupConstraint} to meet user's requirements. + */ +public class LookupConstraintsBuilder { + private static final int K8S_DEFAULT_CLUSTERING_TIMEOUT_SECONDS = 180; + private Integer timeoutSeconds; + private Integer desiredClusterSize; + + public LookupConstraintsBuilder() { + this.timeoutSeconds = null; + this.desiredClusterSize = null; + } + + /** + * @param timeoutSeconds Timeout in seconds. Inserting a null value resets the timeout settings. + * @return The very instance of {@link LookupConstraintsBuilder} called (builder pattern). + */ + public LookupConstraintsBuilder withTimeoutSeconds(Integer timeoutSeconds) { + this.timeoutSeconds = timeoutSeconds; + return this; + } + + /** + * @param desiredClusterSize Desired amount of pods discovered. Inserting a null value resets the desired cluster + * size. + * @return The very instance of {@link LookupConstraintsBuilder} called (builder pattern). + */ + public LookupConstraintsBuilder withDesiredClusterSize(final int desiredClusterSize) { + this.desiredClusterSize = desiredClusterSize; + return this; + } + + /** + * Construct a never-empty collection of {@link LookupConstraint} instances. By guaranteeing the resulting collection + * to be never empty, it is ensured the H2O Node lookup on available pods will always end in a reasonably finite time. + * + * @return A {@link Collection} of {@link LookupConstraint}. The collection is never empty. + */ + public Collection build() { + + final List lookupConstraintList = new ArrayList<>(); + + // If there are no constraints set by the user via environment variables, use a sensible timeout. + if (timeoutSeconds == null && desiredClusterSize == null) { + Log.info(String.format("No H2O Node discovery constraints set. Using default timeout of %d seconds.", + K8S_DEFAULT_CLUSTERING_TIMEOUT_SECONDS)); + lookupConstraintList.add(new TimeoutConstraint(K8S_DEFAULT_CLUSTERING_TIMEOUT_SECONDS)); + } + + if (timeoutSeconds != null) { + Log.info(String.format("Timeout for node discovery is set to %d seconds.", timeoutSeconds)); + lookupConstraintList.add(new TimeoutConstraint(timeoutSeconds)); + } + if (desiredClusterSize != null) { + Log.info(String.format(String.format("Desired cluster size is set to %d nodes.", desiredClusterSize))); + lookupConstraintList.add(new ClusterSizeConstraint(desiredClusterSize)); + } + return lookupConstraintList; + } +} diff --git a/h2o-k8s/src/main/java/water/k8s/lookup/TimeoutConstraint.java b/h2o-k8s/src/main/java/water/k8s/lookup/TimeoutConstraint.java new file mode 100644 index 000000000000..7822189a4fe3 --- /dev/null +++ b/h2o-k8s/src/main/java/water/k8s/lookup/TimeoutConstraint.java @@ -0,0 +1,24 @@ +package water.k8s.lookup; + +import java.time.Duration; +import java.time.Instant; +import java.util.Set; + +/** + * Constraints triggered once the lookup takes a certain amount of time. + */ +public class TimeoutConstraint implements LookupConstraint { + + private final int timeoutSeconds; + private final Instant beginning; + + public TimeoutConstraint(final int timeoutSeconds) { + this.timeoutSeconds = timeoutSeconds; + beginning = Instant.now(); + } + + @Override + public boolean isLookupEnded(final Set discoveredNodes) { + return Duration.between(beginning, Instant.now()).getSeconds() >= timeoutSeconds; + } +} diff --git a/h2o-k8s/src/main/resources/META-INF/services/water.init.EmbeddedConfigProvider b/h2o-k8s/src/main/resources/META-INF/services/water.init.EmbeddedConfigProvider new file mode 100644 index 000000000000..7172c536dd3a --- /dev/null +++ b/h2o-k8s/src/main/resources/META-INF/services/water.init.EmbeddedConfigProvider @@ -0,0 +1 @@ +water.k8s.KubernetesEmbeddedConfigProvider diff --git a/h2o-k8s/src/test/java/water/k8s/KubernetesEmbeddedConfigProviderTest.java b/h2o-k8s/src/test/java/water/k8s/KubernetesEmbeddedConfigProviderTest.java new file mode 100644 index 000000000000..f5fbca271218 --- /dev/null +++ b/h2o-k8s/src/test/java/water/k8s/KubernetesEmbeddedConfigProviderTest.java @@ -0,0 +1,39 @@ +package water.k8s; + +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.contrib.java.lang.system.EnvironmentVariables; + +import static org.junit.Assert.*; + +public class KubernetesEmbeddedConfigProviderTest { + + @Rule + public EnvironmentVariables environmentVariables = new EnvironmentVariables(); + + private KubernetesEmbeddedConfigProvider kubernetesEmbeddedConfigProvider; + + @Before + public void beforeTest(){ + environmentVariables.set("H2O_KUBERNETES_SERVICE_DNS", "127.0.0.1"); + kubernetesEmbeddedConfigProvider = new KubernetesEmbeddedConfigProvider(); + } + + @Test + public void testActiveOnK8SEnvVariablesSet(){ + environmentVariables.set("KUBERNETES_SERVICE_HOST", "127.0.0.1"); + environmentVariables.set("H2O_KUBERNETES_SERVICE_DNS", "127.0.0.1"); + environmentVariables.set("H2O_NODE_LOOKUP_TIMEOUT", "1"); + + kubernetesEmbeddedConfigProvider.init(); + assertTrue(kubernetesEmbeddedConfigProvider.isActive()); + } + + @Test + public void testInactiveOnK8SEnvVariablesSet(){ + kubernetesEmbeddedConfigProvider.init(); + assertFalse(kubernetesEmbeddedConfigProvider.isActive()); + } + +} diff --git a/h2o-k8s/src/test/java/water/k8s/lookup/LookupConstraintsBuilderTest.java b/h2o-k8s/src/test/java/water/k8s/lookup/LookupConstraintsBuilderTest.java new file mode 100644 index 000000000000..9e7f646bbe7f --- /dev/null +++ b/h2o-k8s/src/test/java/water/k8s/lookup/LookupConstraintsBuilderTest.java @@ -0,0 +1,75 @@ +package water.k8s.lookup; + +import org.junit.Before; +import org.junit.Test; + +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; + +import static org.junit.Assert.*; + +public class LookupConstraintsBuilderTest { + + private LookupConstraintsBuilder lookupConstraintsBuilder; + private Set lookedUpNodes; + + @Before + public void beforeTest() { + lookupConstraintsBuilder = new LookupConstraintsBuilder(); + lookedUpNodes = new HashSet<>(); + } + + @Test + public void testTimeoutOnly() { + final Collection lookupStrategies = lookupConstraintsBuilder.withTimeoutSeconds(0) + .build(); + + assertEquals(1, lookupStrategies.size()); + assertTrue(lookupStrategies.stream().allMatch(lookupStrategy -> lookupStrategy instanceof TimeoutConstraint)); + assertTrue(lookupStrategies.stream().allMatch(strategy -> strategy.isLookupEnded(lookedUpNodes))); + } + + @Test + public void testTimeoutOnlyRunning() { + final Collection lookupStrategies = lookupConstraintsBuilder.withTimeoutSeconds(Integer.MAX_VALUE) + .build(); + + assertEquals(1, lookupStrategies.size()); + assertTrue(lookupStrategies.stream().allMatch(lookupStrategy -> lookupStrategy instanceof TimeoutConstraint)); + assertFalse(lookupStrategies.stream().allMatch(strategy -> strategy.isLookupEnded(lookedUpNodes))); + } + + @Test + public void testClusterSize() { + final Collection lookupStrategies = this.lookupConstraintsBuilder.withDesiredClusterSize(2) + .build(); + assertEquals(1, lookupStrategies.size()); + assertTrue(lookupStrategies.stream().allMatch(lookupStrategy -> lookupStrategy instanceof ClusterSizeConstraint)); + lookedUpNodes.add("ABCD"); + assertFalse(lookupStrategies.stream().allMatch(strategy -> strategy.isLookupEnded(lookedUpNodes))); + + lookedUpNodes.add("EFGE"); + assertTrue(lookupStrategies.stream().allMatch(strategy -> strategy.isLookupEnded(lookedUpNodes))); + } + + @Test + public void testTimeoutAndClusterSize() { + final Collection lookupStrategies = this.lookupConstraintsBuilder.withDesiredClusterSize(1) + .withTimeoutSeconds(1) + .build(); + assertEquals(2, lookupStrategies.size()); + assertEquals(1, lookupStrategies.stream().filter(lookupStrategy -> lookupStrategy instanceof TimeoutConstraint).count()); + assertEquals(1, lookupStrategies.stream().filter(lookupStrategy -> lookupStrategy instanceof ClusterSizeConstraint).count()); + } + + @Test + public void testNoConstraints() { + final Collection lookupStrategies = lookupConstraintsBuilder.build(); + + assertEquals(1, lookupStrategies.size()); + assertTrue(lookupStrategies.stream().allMatch(lookupStrategy -> lookupStrategy instanceof TimeoutConstraint)); + assertFalse(lookupStrategies.stream().allMatch(strategy -> strategy.isLookupEnded(lookedUpNodes))); + } + +} diff --git a/settings.gradle b/settings.gradle index 5afd9e8a4dec..b393ec915828 100644 --- a/settings.gradle +++ b/settings.gradle @@ -38,6 +38,7 @@ include 'h2o-ext-target-encoder' include 'h2o-security' include 'h2o-logger' include 'h2o-genmodel-extensions:jgrapht' +include 'h2o-k8s' // GRPC support if ("true".equals(System.getenv("H2O_BUILD_GRPC"))) {