Skip to content

Commit

Permalink
[KYUUBI #3385] Set executor pod name prefix if missing in spark on k8…
Browse files Browse the repository at this point in the history
…s case

### _Why are the changes needed?_

See more in #3590
For #3590 this PR is reverted, and author not reply for long time.
Fix this issue #3385 here.

### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [ ] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #3913 from zwangsheng/k8s/set_executor_pod_name_prefix.

Closes #3385

64b2c6b [zwangsheng] fix for review
b58ff3f [zwangsheng] add unit test
cb0ad9f [zwangsheng] fix
7a36292 [zwangsheng] init

Authored-by: zwangsheng <2213335496@qq.com>
Signed-off-by: Cheng Pan <chengpan@apache.org>
  • Loading branch information
zwangsheng authored and pan3793 committed Dec 12, 2022
1 parent 4efd4d0 commit 46ffc7b
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@
package org.apache.kyuubi.engine.spark

import java.time.Instant
import java.util.{Locale, UUID}
import java.util.concurrent.{CountDownLatch, ScheduledExecutorService, ThreadPoolExecutor, TimeUnit}
import java.util.concurrent.atomic.AtomicBoolean

import scala.concurrent.duration.Duration
import scala.util.control.NonFatal

import com.google.common.annotations.VisibleForTesting
import org.apache.spark.{ui, SparkConf}
import org.apache.spark.kyuubi.{SparkContextHelper, SparkSQLEngineEventListener, SparkSQLEngineListener}
import org.apache.spark.kyuubi.SparkUtilsHelper.getLocalDir
Expand Down Expand Up @@ -192,6 +194,13 @@ object SparkSQLEngine extends Logging {

if (Utils.isOnK8s) {
kyuubiConf.setIfMissing(FRONTEND_CONNECTION_URL_USE_HOSTNAME, false)

// https://github.com/apache/incubator-kyuubi/issues/3385
// Set unset executor pod prefix to prevent kubernetes pod length limit error
// due to the long app name
_sparkConf.setIfMissing(
"spark.kubernetes.executor.podNamePrefix",
generateExecutorPodNamePrefixForK8s(user))
}

// Set web ui port 0 to avoid port conflicts during non-k8s cluster mode
Expand Down Expand Up @@ -342,4 +351,23 @@ object SparkSQLEngine extends Logging {
// only spark driver pod will build with `SPARK_APPLICATION_ID` env.
Utils.isOnK8s && sys.env.contains("SPARK_APPLICATION_ID")
}

@VisibleForTesting
def generateExecutorPodNamePrefixForK8s(userName: String): String = {
val resolvedUserName =
userName.trim.toLowerCase(Locale.ROOT)
.replaceAll("[^a-z0-9\\-]", "-")
.replaceAll("-+", "-")
.replaceAll("^-", "")
val podNamePrefixWithUser = s"kyuubi-$resolvedUserName-${Instant.now().toEpochMilli}"
if (podNamePrefixWithUser.length <= EXECUTOR_POD_NAME_PREFIX_MAX_LENGTH) {
podNamePrefixWithUser
} else {
s"kyuubi-${UUID.randomUUID()}"
}
}

// Kubernetes pod name max length - '-exec-' - Int.MAX_VALUE.length
// 253 - 10 - 6
val EXECUTOR_POD_NAME_PREFIX_MAX_LENGTH = 237
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kyuubi.engine.spark

import org.apache.kyuubi.KyuubiFunSuite

class SparkSQLEngineSuite extends KyuubiFunSuite {

test("[KYUUBI #3385] generate executor pod name prefix with user or UUID") {
val userName1 = "/kyuubi_user+-*"
val executorPodNamePrefix1 = SparkSQLEngine.generateExecutorPodNamePrefixForK8s(userName1)
assert(executorPodNamePrefix1.contains("-kyuubi-user-"))

val userName2 = "LongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLong" +
"LongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLong" +
"LongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLong" +
"LongLongLongLongLongLongLongLongLongLongLongLongName"
val executorPodNamePrefix2 = SparkSQLEngine.generateExecutorPodNamePrefixForK8s(userName2)
assert(!executorPodNamePrefix2.contains(userName2))
assert(executorPodNamePrefix2.length <= SparkSQLEngine.EXECUTOR_POD_NAME_PREFIX_MAX_LENGTH)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import org.apache.kyuubi.KyuubiFunSuite

trait WithKyuubiServerOnKubernetes extends KyuubiFunSuite {
protected def connectionConf: Map[String, String] = Map.empty
private val miniKubernetesClient: DefaultKubernetesClient = MiniKube.getKubernetesClient

lazy val miniKubernetesClient: DefaultKubernetesClient = MiniKube.getKubernetesClient
lazy val kyuubiPod: Pod = miniKubernetesClient.pods().withName("kyuubi-test").get()
lazy val kyuubiServerIp: String = kyuubiPod.getStatus.getPodIP
lazy val miniKubeIp: String = MiniKube.getIp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,21 @@ class KyuubiOnKubernetesWithClientSparkTestsSuite
super.connectionConf ++ Map(
"spark.submit.deployMode" -> "client",
"spark.driver.host" -> kyuubiServerIp,
"kyuubi.frontend.connection.url.use.hostname" -> "false")
"kyuubi.frontend.connection.url.use.hostname" -> "false",
"spark.kubernetes.executor.label.kyuubi-it-test" -> "client")
}

override protected def jdbcUrl: String = getJdbcUrl(connectionConf)

override protected lazy val user: String = "client"
override protected lazy val user: String = "kyuubi_user"

test("[KYUUBI #3385] Set executor pod name prefix if missing in spark on k8s case") {
miniKubernetesClient.pods().withLabel(
"spark.kubernetes.executor.label.kyuubi-it-test",
"client").list().getItems.forEach(pod => {
assert(pod.getMetadata.getName.contains("kyuubi-user"))
})
}
}

/**
Expand Down

0 comments on commit 46ffc7b

Please sign in to comment.