From 254a83c8278f9bbdb3a69f3d6cebc26e06ce1b4c Mon Sep 17 00:00:00 2001 From: Zhou JIANG Date: Thu, 27 Feb 2025 16:36:50 -0800 Subject: [PATCH] [SPARK-51347] Enable Ingress and Service Support for Spark Driver --- docs/spark_custom_resources.md | 40 +++++ .../k8s/operator/spec/ApplicationSpec.java | 1 + .../spec/DriverServiceIngressSpec.java | 41 +++++ .../k8s/operator/SparkAppResourceSpec.java | 21 ++- .../operator/SparkAppSubmissionWorker.java | 11 +- .../utils/DriverServiceIngressUtils.java | 95 +++++++++++ .../operator/SparkAppResourceSpecTest.java | 3 +- .../utils/DriverServiceIngressUtilsTest.java | 157 ++++++++++++++++++ 8 files changed, 364 insertions(+), 5 deletions(-) create mode 100644 spark-operator-api/src/main/java/org/apache/spark/k8s/operator/spec/DriverServiceIngressSpec.java create mode 100644 spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/utils/DriverServiceIngressUtils.java create mode 100644 spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/utils/DriverServiceIngressUtilsTest.java diff --git a/docs/spark_custom_resources.md b/docs/spark_custom_resources.md index aef33776..61a86046 100644 --- a/docs/spark_custom_resources.md +++ b/docs/spark_custom_resources.md @@ -99,6 +99,46 @@ Please be advised that Spark still overrides necessary pod configuration in both more details, refer [Spark doc](https://spark.apache.org/docs/latest/running-on-kubernetes.html#pod-template). +## Enable Additional Ingress for Driver + +Operator may create [Ingress](https://kubernetes.io/docs/concepts/services-networking/ingress/) for +Spark driver of running applications on demand. For example, to expose Spark UI - which is by +default enabled on driver port 4040, you may configure + +```yaml +spec: + driverServiceIngressList: + - serviceMetadata: + name: "spark-ui-service" + serviceSpec: + ports: + - protocol: TCP + port: 80 + targetPort: 4040 + ingressMetadata: + name: "spark-ui-ingress" + annotations: + nginx.ingress.kubernetes.io/rewrite-target: / + ingressSpec: + ingressClassName: nginx-example + rules: + - http: + paths: + - path: "/" + pathType: Prefix + backend: + service: + name: spark-ui-service + port: + number: 80 +``` + +Spark Operator by default would populate the `.spec.selector` field of the created Service to match +the driver labels. If `.ingressSpec.rules` is not provided, Spark Operator would also populate one +default rule backed by the associated Service. It's recommended to always provide the ingress spec +to make sure it's compatible with your +[IngressController](https://kubernetes.io/docs/concepts/services-networking/ingress-controllers/). + ## Understanding Failure Types In addition to the general `Failed` state (that driver pod fails or driver container exits diff --git a/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/spec/ApplicationSpec.java b/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/spec/ApplicationSpec.java index a2510317..2a830f65 100644 --- a/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/spec/ApplicationSpec.java +++ b/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/spec/ApplicationSpec.java @@ -54,4 +54,5 @@ public class ApplicationSpec extends BaseSpec { protected BaseApplicationTemplateSpec driverSpec; protected BaseApplicationTemplateSpec executorSpec; + protected List driverServiceIngressList; } diff --git a/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/spec/DriverServiceIngressSpec.java b/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/spec/DriverServiceIngressSpec.java new file mode 100644 index 00000000..9712a652 --- /dev/null +++ b/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/spec/DriverServiceIngressSpec.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.k8s.operator.spec; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonInclude; +import io.fabric8.generator.annotation.Required; +import io.fabric8.kubernetes.api.model.ObjectMeta; +import io.fabric8.kubernetes.api.model.ServiceSpec; +import io.fabric8.kubernetes.api.model.networking.v1.IngressSpec; +import lombok.Builder; +import lombok.Data; + +@Data +@Builder +@JsonInclude(JsonInclude.Include.NON_NULL) +@JsonIgnoreProperties(ignoreUnknown = true) +public class DriverServiceIngressSpec { + @Required protected ObjectMeta serviceMetadata; + @Required protected ServiceSpec serviceSpec; + + @Required protected ObjectMeta ingressMetadata; + protected IngressSpec ingressSpec; +} diff --git a/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkAppResourceSpec.java b/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkAppResourceSpec.java index 101b29a9..43d931a5 100644 --- a/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkAppResourceSpec.java +++ b/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkAppResourceSpec.java @@ -20,7 +20,10 @@ package org.apache.spark.k8s.operator; import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; import java.util.List; +import java.util.stream.Collectors; import scala.Tuple2; import scala.collection.immutable.HashMap; @@ -40,6 +43,8 @@ import org.apache.spark.deploy.k8s.KubernetesDriverSpec; import org.apache.spark.deploy.k8s.SparkPod; import org.apache.spark.deploy.k8s.submit.KubernetesClientUtils; +import org.apache.spark.k8s.operator.spec.DriverServiceIngressSpec; +import org.apache.spark.k8s.operator.utils.DriverServiceIngressUtils; /** * Resembles resources that would be directly launched by operator. Based on resolved @@ -59,7 +64,9 @@ public class SparkAppResourceSpec { private final SparkAppDriverConf kubernetesDriverConf; public SparkAppResourceSpec( - SparkAppDriverConf kubernetesDriverConf, KubernetesDriverSpec kubernetesDriverSpec) { + SparkAppDriverConf kubernetesDriverConf, + KubernetesDriverSpec kubernetesDriverSpec, + List driverServiceIngressList) { this.kubernetesDriverConf = kubernetesDriverConf; String namespace = kubernetesDriverConf.sparkConf().get(Config.KUBERNETES_NAMESPACE().key()); Map confFilesMap = @@ -86,6 +93,7 @@ public SparkAppResourceSpec( this.driverResources.add( KubernetesClientUtils.buildConfigMap( kubernetesDriverConf.configMapNameDriver(), confFilesMap, new HashMap<>())); + this.driverResources.addAll(configureDriverServerIngress(sparkPod, driverServiceIngressList)); this.driverPreResources.forEach(r -> setNamespaceIfMissing(r, namespace)); this.driverResources.forEach(r -> setNamespaceIfMissing(r, namespace)); } @@ -126,4 +134,15 @@ private SparkPod addConfigMap(SparkPod pod, Map confFilesMap) { .build(); return new SparkPod(podWithConfigMapVolume, containerWithConfigMapVolume); } + + private List configureDriverServerIngress( + SparkPod pod, List driverServiceIngressList) { + if (driverServiceIngressList == null || driverServiceIngressList.isEmpty()) { + return Collections.emptyList(); + } + return driverServiceIngressList.stream() + .map(spec -> DriverServiceIngressUtils.buildIngressService(spec, pod.pod().getMetadata())) + .flatMap(Collection::stream) + .collect(Collectors.toList()); + } } diff --git a/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorker.java b/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorker.java index 31d243b8..7cf78d1f 100644 --- a/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorker.java +++ b/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorker.java @@ -20,6 +20,7 @@ package org.apache.spark.k8s.operator; import java.math.BigInteger; +import java.util.List; import java.util.Map; import scala.Option; @@ -37,6 +38,7 @@ import org.apache.spark.deploy.k8s.submit.PythonMainAppResource; import org.apache.spark.deploy.k8s.submit.RMainAppResource; import org.apache.spark.k8s.operator.spec.ApplicationSpec; +import org.apache.spark.k8s.operator.spec.DriverServiceIngressSpec; import org.apache.spark.k8s.operator.utils.ModelUtils; /** @@ -82,7 +84,7 @@ public class SparkAppSubmissionWorker { public SparkAppResourceSpec getResourceSpec( SparkApplication app, KubernetesClient client, Map confOverrides) { SparkAppDriverConf appDriverConf = buildDriverConf(app, confOverrides); - return buildResourceSpec(appDriverConf, client); + return buildResourceSpec(appDriverConf, app.getSpec().getDriverServiceIngressList(), client); } protected SparkAppDriverConf buildDriverConf( @@ -127,11 +129,14 @@ protected SparkAppDriverConf buildDriverConf( } protected SparkAppResourceSpec buildResourceSpec( - SparkAppDriverConf kubernetesDriverConf, KubernetesClient client) { + SparkAppDriverConf kubernetesDriverConf, + List driverServiceIngressList, + KubernetesClient client) { KubernetesDriverBuilder builder = new KubernetesDriverBuilder(); KubernetesDriverSpec kubernetesDriverSpec = builder.buildFromFeatures(kubernetesDriverConf, client); - return new SparkAppResourceSpec(kubernetesDriverConf, kubernetesDriverSpec); + return new SparkAppResourceSpec( + kubernetesDriverConf, kubernetesDriverSpec, driverServiceIngressList); } /** diff --git a/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/utils/DriverServiceIngressUtils.java b/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/utils/DriverServiceIngressUtils.java new file mode 100644 index 00000000..48d16c98 --- /dev/null +++ b/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/utils/DriverServiceIngressUtils.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.k8s.operator.utils; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import io.fabric8.kubernetes.api.model.HasMetadata; +import io.fabric8.kubernetes.api.model.ObjectMeta; +import io.fabric8.kubernetes.api.model.ObjectMetaBuilder; +import io.fabric8.kubernetes.api.model.Service; +import io.fabric8.kubernetes.api.model.ServiceBuilder; +import io.fabric8.kubernetes.api.model.networking.v1.Ingress; +import io.fabric8.kubernetes.api.model.networking.v1.IngressBuilder; +import io.fabric8.kubernetes.api.model.networking.v1.IngressSpec; +import io.fabric8.kubernetes.api.model.networking.v1.IngressSpecBuilder; + +import org.apache.spark.k8s.operator.spec.DriverServiceIngressSpec; + +public final class DriverServiceIngressUtils { + private DriverServiceIngressUtils() {} + + /** Build the full spec for ingress and service. */ + public static List buildIngressService( + DriverServiceIngressSpec spec, ObjectMeta driverPodMetaData) { + List resources = new ArrayList<>(2); + Service service = buildService(spec, driverPodMetaData); + resources.add(service); + resources.add(buildIngress(spec, service)); + return resources; + } + + private static Service buildService(DriverServiceIngressSpec spec, ObjectMeta driverPodMetaData) { + ObjectMeta serviceMeta = new ObjectMetaBuilder(spec.getServiceMetadata()).build(); + serviceMeta.setNamespace(driverPodMetaData.getNamespace()); + Map selectors = spec.getServiceSpec().getSelector(); + if (selectors == null || selectors.isEmpty()) { + selectors = driverPodMetaData.getLabels(); + } + return new ServiceBuilder() + .withMetadata(serviceMeta) + .withNewSpecLike(spec.getServiceSpec()) + .withSelector(selectors) + .endSpec() + .build(); + } + + private static Ingress buildIngress(DriverServiceIngressSpec spec, Service service) { + ObjectMeta metadata = new ObjectMetaBuilder(spec.getIngressMetadata()).build(); + IngressSpec ingressSpec = new IngressSpecBuilder(spec.getIngressSpec()).build(); + if ((ingressSpec.getRules() == null || ingressSpec.getRules().isEmpty()) + && service.getSpec().getPorts() != null + && !service.getSpec().getPorts().isEmpty()) { + // if no rule is provided, populate default path with backend to the associated service + ingressSpec = + new IngressSpecBuilder() + .addNewRule() + .withNewHttp() + .addNewPath() + .withPath("/") + .withPathType("ImplementationSpecific") + .withNewBackend() + .withNewService() + .withName(service.getMetadata().getName()) + .withNewPort() + .withNumber(service.getSpec().getPorts().get(0).getPort()) + .endPort() + .endService() + .endBackend() + .endPath() + .endHttp() + .endRule() + .build(); + } + return new IngressBuilder().withMetadata(metadata).withSpec(ingressSpec).build(); + } +} diff --git a/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkAppResourceSpecTest.java b/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkAppResourceSpecTest.java index 929f84e3..42e21068 100644 --- a/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkAppResourceSpecTest.java +++ b/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkAppResourceSpecTest.java @@ -69,7 +69,8 @@ void testDriverResourceIncludesConfigMap() { when(mockSpec.pod()).thenReturn(sparkPod); when(mockSpec.systemProperties()).thenReturn(new HashMap<>()); - SparkAppResourceSpec appResourceSpec = new SparkAppResourceSpec(mockConf, mockSpec); + SparkAppResourceSpec appResourceSpec = + new SparkAppResourceSpec(mockConf, mockSpec, Collections.emptyList()); Assertions.assertEquals(2, appResourceSpec.getDriverResources().size()); Assertions.assertEquals(1, appResourceSpec.getDriverPreResources().size()); diff --git a/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/utils/DriverServiceIngressUtilsTest.java b/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/utils/DriverServiceIngressUtilsTest.java new file mode 100644 index 00000000..34a97e81 --- /dev/null +++ b/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/utils/DriverServiceIngressUtilsTest.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.k8s.operator.utils; + +import java.util.List; + +import io.fabric8.kubernetes.api.model.HasMetadata; +import io.fabric8.kubernetes.api.model.IntOrString; +import io.fabric8.kubernetes.api.model.ObjectMeta; +import io.fabric8.kubernetes.api.model.ObjectMetaBuilder; +import io.fabric8.kubernetes.api.model.Service; +import io.fabric8.kubernetes.api.model.ServiceSpec; +import io.fabric8.kubernetes.api.model.ServiceSpecBuilder; +import io.fabric8.kubernetes.api.model.networking.v1.Ingress; +import io.fabric8.kubernetes.api.model.networking.v1.IngressRule; +import io.fabric8.kubernetes.api.model.networking.v1.IngressSpec; +import io.fabric8.kubernetes.api.model.networking.v1.IngressSpecBuilder; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import org.apache.spark.k8s.operator.spec.DriverServiceIngressSpec; + +class DriverServiceIngressUtilsTest { + @Test + void buildIngressService() { + ObjectMeta driverMeta = + new ObjectMetaBuilder().withName("foo-driver").addToLabels("foo", "bar").build(); + DriverServiceIngressSpec spec1 = buildSpecWithSinglePortService(); + List resources1 = DriverServiceIngressUtils.buildIngressService(spec1, driverMeta); + Assertions.assertEquals(2, resources1.size()); + Assertions.assertInstanceOf(Service.class, resources1.get(0)); + Assertions.assertInstanceOf(Ingress.class, resources1.get(1)); + Service service1 = (Service) resources1.get(0); + Assertions.assertEquals(driverMeta.getLabels(), service1.getSpec().getSelector()); + Ingress ingress1 = (Ingress) resources1.get(1); + Assertions.assertEquals(1, ingress1.getSpec().getRules().size()); + IngressRule ingressRule1 = ingress1.getSpec().getRules().get(0); + Assertions.assertEquals(1, ingressRule1.getHttp().getPaths().size()); + Assertions.assertEquals( + service1.getMetadata().getName(), + ingressRule1.getHttp().getPaths().get(0).getBackend().getService().getName()); + Assertions.assertEquals( + service1.getSpec().getPorts().get(0).getPort(), + ingressRule1.getHttp().getPaths().get(0).getBackend().getService().getPort().getNumber()); + + DriverServiceIngressSpec spec2 = buildSpecWithIngressSpec(); + List resources2 = DriverServiceIngressUtils.buildIngressService(spec2, driverMeta); + Assertions.assertEquals(2, resources2.size()); + Assertions.assertInstanceOf(Service.class, resources2.get(0)); + Assertions.assertInstanceOf(Ingress.class, resources2.get(1)); + Service service2 = (Service) resources2.get(0); + Assertions.assertEquals(driverMeta.getLabels(), service2.getSpec().getSelector()); + Ingress ingress2 = (Ingress) resources2.get(1); + Assertions.assertEquals(spec2.getIngressSpec(), ingress2.getSpec()); + } + + private DriverServiceIngressSpec buildSpecWithSinglePortService() { + ServiceSpec serviceSpec1 = + new ServiceSpecBuilder() + .addNewPort() + .withPort(80) + .withProtocol("TCP") + .withTargetPort(new IntOrString(4040)) + .endPort() + .withType("LoadBalancer") + .build(); + ObjectMeta serviceMeta1 = new ObjectMetaBuilder().withName("foo-service-1").build(); + ObjectMeta ingressMeta1 = new ObjectMetaBuilder().withName("foo-ingress-1").build(); + return DriverServiceIngressSpec.builder() + .serviceMetadata(serviceMeta1) + .serviceSpec(serviceSpec1) + .ingressMetadata(ingressMeta1) + .build(); + } + + private DriverServiceIngressSpec buildSpecWithIngressSpec() { + ServiceSpec serviceSpec2 = + new ServiceSpecBuilder() + .addNewPort() + .withPort(19098) + .withProtocol("TCP") + .withTargetPort(new IntOrString(19098)) + .endPort() + .addNewPort() + .withPort(19099) + .withProtocol("TCP") + .withTargetPort(new IntOrString(19099)) + .endPort() + .withType("ClusterIP") + .build(); + IngressSpec ingressSpec2 = + new IngressSpecBuilder() + .withIngressClassName("nginx-example") + .addNewRule() + .withNewHttp() + .addNewPath() + .withPath("/testpath1") + .withPathType("Prefix") + .withNewBackend() + .withNewService() + .withName("foo-service-2") + .withNewPort() + .withNumber(19098) + .endPort() + .endService() + .endBackend() + .endPath() + .endHttp() + .endRule() + .addNewRule() + .withNewHttp() + .addNewPath() + .withPath("/testpath2") + .withPathType("Prefix") + .withNewBackend() + .withNewService() + .withName("foo-service-2") + .withNewPort() + .withNumber(19099) + .endPort() + .endService() + .endBackend() + .endPath() + .endHttp() + .endRule() + .build(); + ObjectMeta serviceMeta2 = new ObjectMetaBuilder().withName("foo-service-2").build(); + ObjectMeta ingressMeta2 = + new ObjectMetaBuilder() + .withName("foo-ingress-2") + .addToAnnotations("nginx.ingress.kubernetes.io/rewrite-target", "/") + .build(); + return DriverServiceIngressSpec.builder() + .serviceMetadata(serviceMeta2) + .serviceSpec(serviceSpec2) + .ingressMetadata(ingressMeta2) + .ingressSpec(ingressSpec2) + .build(); + } +}