Skip to content

Commit eb4d289

Browse files
lightning-Lulysses-you
authored andcommitted
[KYUUBI #1936][FOLLOWUP] Send credentials when opening session and wait for completion
### _Why are the changes needed?_ Follow up #1936 ### _How was this patch tested?_ - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [ ] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request Closes #2092 from lightning-L/kyuubi-1936. Closes #1936 2bb2c10 [Tianlin Liao] [KYUUBI #1936][FOLLOWUP] Send credentials when opening session and wait for completion Authored-by: Tianlin Liao <tiliao@ebay.com> Signed-off-by: ulysses-you <ulyssesyou@apache.org>
1 parent ffb7a6f commit eb4d289

File tree

6 files changed

+77
-27
lines changed

6 files changed

+77
-27
lines changed

docs/deployment/settings.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,7 @@ Key | Default | Meaning | Type | Since
163163
<code>kyuubi.credentials.hive.enabled</code>|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>true</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>Whether to renew Hive metastore delegation token</div>|<div style='width: 30pt'>boolean</div>|<div style='width: 20pt'>1.4.0</div>
164164
<code>kyuubi.credentials.renewal.interval</code>|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>PT1H</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>How often Kyuubi renews one user's delegation tokens</div>|<div style='width: 30pt'>duration</div>|<div style='width: 20pt'>1.4.0</div>
165165
<code>kyuubi.credentials.renewal.retry.wait</code>|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>PT1M</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>How long to wait before retrying to fetch new credentials after a failure.</div>|<div style='width: 30pt'>duration</div>|<div style='width: 20pt'>1.4.0</div>
166+
<code>kyuubi.credentials.update.wait.timeout</code>|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>PT1M</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>How long to wait until credentials are ready.</div>|<div style='width: 30pt'>duration</div>|<div style='width: 20pt'>1.5.0</div>
166167

167168

168169
### Delegation

kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -246,6 +246,14 @@ object KyuubiConf {
246246
.checkValue(t => t > 0, "must be positive integer")
247247
.createWithDefault(Duration.ofMinutes(1).toMillis)
248248

249+
val CREDENTIALS_UPDATE_WAIT_TIMEOUT: ConfigEntry[Long] =
250+
buildConf("credentials.update.wait.timeout")
251+
.doc("How long to wait until credentials are ready.")
252+
.version("1.5.0")
253+
.timeConf
254+
.checkValue(t => t > 0, "must be positive integer")
255+
.createWithDefault(Duration.ofMinutes(1).toMillis)
256+
249257
val CREDENTIALS_HADOOP_FS_ENABLED: ConfigEntry[Boolean] =
250258
buildConf("credentials.hadoopfs.enabled")
251259
.doc("Whether to renew Hadoop filesystem delegation tokens")

kyuubi-common/src/main/scala/org/apache/kyuubi/util/ThreadUtils.scala

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,10 @@ package org.apache.kyuubi.util
1919

2020
import java.util.concurrent.{LinkedBlockingQueue, ScheduledExecutorService, ScheduledThreadPoolExecutor, ThreadPoolExecutor, TimeUnit}
2121

22-
import org.apache.kyuubi.Logging
22+
import scala.concurrent.Awaitable
23+
import scala.concurrent.duration.Duration
24+
25+
import org.apache.kyuubi.{KyuubiException, Logging}
2326

2427
object ThreadUtils extends Logging {
2528

@@ -49,4 +52,16 @@ object ThreadUtils extends Logging {
4952
executor.allowCoreThreadTimeOut(true)
5053
executor
5154
}
55+
56+
def awaitResult[T](awaitable: Awaitable[T], atMost: Duration): T = {
57+
try {
58+
// `awaitPermission` is not actually used anywhere so it's safe to pass in null here.
59+
// See SPARK-13747.
60+
val awaitPermission = null.asInstanceOf[scala.concurrent.CanAwait]
61+
awaitable.result(atMost)(awaitPermission)
62+
} catch {
63+
case e: Exception =>
64+
throw new KyuubiException("Exception thrown in awaitResult: ", e)
65+
}
66+
}
5267
}

kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/CredentialsRef.scala

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,15 @@
1717

1818
package org.apache.kyuubi.credentials
1919

20+
import java.util.concurrent.atomic.AtomicReference
21+
22+
import scala.concurrent.Future
23+
import scala.concurrent.duration.Duration
24+
2025
import org.apache.hadoop.security.Credentials
2126

2227
import org.apache.kyuubi.credentials.CredentialsRef.UNSET_EPOCH
23-
import org.apache.kyuubi.util.KyuubiHadoopUtils
28+
import org.apache.kyuubi.util.{KyuubiHadoopUtils, ThreadUtils}
2429

2530
class CredentialsRef(appUser: String) {
2631

@@ -29,6 +34,12 @@ class CredentialsRef(appUser: String) {
2934

3035
private var encodedCredentials: String = _
3136

37+
private val credentialFuture = new AtomicReference[Future[Unit]]()
38+
39+
def setFuture(future: Future[Unit]): Unit = {
40+
credentialFuture.set(future)
41+
}
42+
3243
def getEpoch: Long = epoch
3344

3445
def getAppUser: String = appUser
@@ -42,6 +53,10 @@ class CredentialsRef(appUser: String) {
4253
epoch += 1
4354
}
4455

56+
def waitUntilReady(timeout: Duration): Unit = {
57+
Option(credentialFuture.get).foreach(ThreadUtils.awaitResult(_, timeout))
58+
}
59+
4560
}
4661

4762
object CredentialsRef {

kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/HadoopCredentialsManager.scala

Lines changed: 32 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,15 @@ import java.util.ServiceLoader
2121
import java.util.concurrent._
2222

2323
import scala.collection.mutable
24+
import scala.concurrent.Future
25+
import scala.concurrent.Promise
26+
import scala.concurrent.duration.Duration
2427
import scala.util.{Failure, Success, Try}
2528

2629
import org.apache.hadoop.conf.Configuration
2730
import org.apache.hadoop.security.Credentials
2831

29-
import org.apache.kyuubi.{KyuubiException, Logging}
32+
import org.apache.kyuubi.Logging
3033
import org.apache.kyuubi.config.KyuubiConf
3134
import org.apache.kyuubi.config.KyuubiConf._
3235
import org.apache.kyuubi.service.AbstractService
@@ -84,6 +87,7 @@ class HadoopCredentialsManager private (name: String) extends AbstractService(na
8487
private var providers: Map[String, HadoopDelegationTokenProvider] = _
8588
private var renewalInterval: Long = _
8689
private var renewalRetryWait: Long = _
90+
private var credentialsWaitTimeout: Long = _
8791
private var hadoopConf: Configuration = _
8892

8993
private[credentials] var renewalExecutor: Option[ScheduledExecutorService] = None
@@ -111,6 +115,7 @@ class HadoopCredentialsManager private (name: String) extends AbstractService(na
111115

112116
renewalInterval = conf.get(CREDENTIALS_RENEWAL_INTERVAL)
113117
renewalRetryWait = conf.get(CREDENTIALS_RENEWAL_RETRY_WAIT)
118+
credentialsWaitTimeout = conf.get(CREDENTIALS_UPDATE_WAIT_TIMEOUT)
114119
super.initialize(conf)
115120
}
116121

@@ -147,12 +152,12 @@ class HadoopCredentialsManager private (name: String) extends AbstractService(na
147152
sessionId: String,
148153
appUser: String,
149154
send: String => Unit,
150-
onetime: Boolean = false): Unit = {
155+
waitUntilCredentialsReady: Boolean = false): Unit = {
151156
if (renewalExecutor.isEmpty) {
152157
return
153158
}
154159

155-
val userRef = getOrCreateUserCredentialsRef(appUser, onetime)
160+
val userRef = getOrCreateUserCredentialsRef(appUser, waitUntilCredentialsReady)
156161
val sessionEpoch = getSessionCredentialsEpoch(sessionId)
157162

158163
if (userRef.getEpoch > sessionEpoch) {
@@ -184,19 +189,19 @@ class HadoopCredentialsManager private (name: String) extends AbstractService(na
184189
// Visible for testing.
185190
private[credentials] def getOrCreateUserCredentialsRef(
186191
appUser: String,
187-
onetime: Boolean = false): CredentialsRef = {
192+
waitUntilCredentialsReady: Boolean = false): CredentialsRef = {
188193
val ref = userCredentialsRefMap.computeIfAbsent(
189194
appUser,
190195
appUser => {
191196
val ref = new CredentialsRef(appUser)
192-
scheduleRenewal(ref, 0, onetime)
197+
val credentialsFuture: Future[Unit] = scheduleRenewal(ref, 0, waitUntilCredentialsReady)
198+
ref.setFuture(credentialsFuture)
193199
info(s"Created CredentialsRef for user $appUser and scheduled a renewal task")
194200
ref
195201
})
196202

197-
// schedule renewal task when encodedCredentials are invalid
198-
if (onetime && ref.getEncodedCredentials == null) {
199-
scheduleRenewal(ref, 0, onetime)
203+
if (waitUntilCredentialsReady) {
204+
ref.waitUntilReady(Duration(credentialsWaitTimeout, TimeUnit.MILLISECONDS))
200205
}
201206

202207
ref
@@ -212,17 +217,24 @@ class HadoopCredentialsManager private (name: String) extends AbstractService(na
212217
providers.contains(serviceName)
213218
}
214219

220+
private def updateCredentials(userRef: CredentialsRef): Unit = {
221+
val creds = new Credentials()
222+
providers.values
223+
.foreach(_.obtainDelegationTokens(userRef.getAppUser, creds))
224+
userRef.updateCredentials(creds)
225+
}
226+
215227
private def scheduleRenewal(
216228
userRef: CredentialsRef,
217229
delay: Long,
218-
waitCompletion: Boolean = false): Unit = {
230+
waitUntilCredentialsReady: Boolean = false): Future[Unit] = {
231+
val promise = Promise[Unit]()
232+
219233
val renewalTask = new Runnable {
220234
override def run(): Unit = {
221235
try {
222-
val creds = new Credentials()
223-
providers.values
224-
.foreach(_.obtainDelegationTokens(userRef.getAppUser, creds))
225-
userRef.updateCredentials(creds)
236+
promise.trySuccess(updateCredentials(userRef))
237+
226238
scheduleRenewal(userRef, renewalInterval)
227239
} catch {
228240
case _: InterruptedException =>
@@ -233,24 +245,19 @@ class HadoopCredentialsManager private (name: String) extends AbstractService(na
233245
s" $renewalRetryWait ms",
234246
e)
235247
scheduleRenewal(userRef, renewalRetryWait)
236-
// throw exception when one-time execution fails,
237-
// so that client side can be aware of this
238-
if (waitCompletion) {
239-
throw new KyuubiException(s"One-time execution failed for token update task " +
240-
s"for ${userRef.getAppUser}")
248+
if (waitUntilCredentialsReady) {
249+
promise.tryFailure(e)
241250
}
242251
}
243252
}
244253
}
245254

246-
if (waitCompletion) {
247-
renewalTask.run()
248-
} else {
249-
renewalExecutor.foreach { executor =>
250-
info(s"Scheduling renewal in $delay ms.")
251-
executor.schedule(renewalTask, delay, TimeUnit.MILLISECONDS)
252-
}
255+
renewalExecutor.foreach { executor =>
256+
info(s"Scheduling renewal in $delay ms.")
257+
executor.schedule(renewalTask, delay, TimeUnit.MILLISECONDS)
253258
}
259+
260+
promise.future
254261
}
255262

256263
}

kyuubi-server/src/test/scala/org/apache/kyuubi/credentials/HadoopCredentialsManagerSuite.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,10 @@ class HadoopCredentialsManagerSuite extends KyuubiFunSuite {
9999
withStartedManager(kyuubiConf) { manager =>
100100
val userRef = manager.getOrCreateUserCredentialsRef(appUser, true)
101101
assert(userRef.getEpoch == 0)
102+
103+
eventually(timeout(1100.milliseconds), interval(100.milliseconds)) {
104+
assert(userRef.getEpoch == 1)
105+
}
102106
}
103107
}
104108

0 commit comments

Comments
 (0)