Skip to content

Commit 15aba5d

Browse files
lightning-Lulysses-you
authored andcommitted
[KYUUBI #1936] Send credentials when opening session and wait for completion
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html 2. If the PR is related to an issue in https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'. 3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'. --> ### _Why are the changes needed?_ <!-- Please clarify why the changes are needed. For instance, 1. If you add a feature, you can talk about the use case of it. 2. If you fix a bug, you can clarify why it is a bug. --> To close #1936 ### _How was this patch tested?_ - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [ ] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request Closes #2059 from lightning-L/kyuubi-1936. Closes #1936 e9c83ef [Tianlin Liao] [KYUUBI #1936] send credentials when opening session and wait for completion Authored-by: Tianlin Liao <tiliao@ebay.com> Signed-off-by: ulysses-you <ulyssesyou@apache.org> (cherry picked from commit 8e983a1) Signed-off-by: ulysses-you <ulyssesyou@apache.org>
1 parent 4565b21 commit 15aba5d

File tree

3 files changed

+69
-12
lines changed

3 files changed

+69
-12
lines changed

kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/HadoopCredentialsManager.scala

Lines changed: 34 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import scala.util.{Failure, Success, Try}
2626
import org.apache.hadoop.conf.Configuration
2727
import org.apache.hadoop.security.Credentials
2828

29-
import org.apache.kyuubi.Logging
29+
import org.apache.kyuubi.{KyuubiException, Logging}
3030
import org.apache.kyuubi.config.KyuubiConf
3131
import org.apache.kyuubi.config.KyuubiConf._
3232
import org.apache.kyuubi.service.AbstractService
@@ -146,12 +146,13 @@ class HadoopCredentialsManager private (name: String) extends AbstractService(na
146146
def sendCredentialsIfNeeded(
147147
sessionId: String,
148148
appUser: String,
149-
send: String => Unit): Unit = {
149+
send: String => Unit,
150+
onetime: Boolean = false): Unit = {
150151
if (renewalExecutor.isEmpty) {
151152
return
152153
}
153154

154-
val userRef = getOrCreateUserCredentialsRef(appUser)
155+
val userRef = getOrCreateUserCredentialsRef(appUser, onetime)
155156
val sessionEpoch = getSessionCredentialsEpoch(sessionId)
156157

157158
if (userRef.getEpoch > sessionEpoch) {
@@ -181,16 +182,26 @@ class HadoopCredentialsManager private (name: String) extends AbstractService(na
181182
}
182183

183184
// Visible for testing.
184-
private[credentials] def getOrCreateUserCredentialsRef(appUser: String): CredentialsRef =
185-
userCredentialsRefMap.computeIfAbsent(
185+
private[credentials] def getOrCreateUserCredentialsRef(
186+
appUser: String,
187+
onetime: Boolean = false): CredentialsRef = {
188+
val ref = userCredentialsRefMap.computeIfAbsent(
186189
appUser,
187190
appUser => {
188191
val ref = new CredentialsRef(appUser)
189-
scheduleRenewal(ref, 0)
192+
scheduleRenewal(ref, 0, onetime)
190193
info(s"Created CredentialsRef for user $appUser and scheduled a renewal task")
191194
ref
192195
})
193196

197+
// schedule renewal task when encodedCredentials are invalid
198+
if (onetime && ref.getEncodedCredentials == null) {
199+
scheduleRenewal(ref, 0, onetime)
200+
}
201+
202+
ref
203+
}
204+
194205
// Visible for testing.
195206
private[credentials] def getSessionCredentialsEpoch(sessionId: String): Long = {
196207
sessionCredentialsEpochMap.getOrDefault(sessionId, CredentialsRef.UNSET_EPOCH)
@@ -201,7 +212,10 @@ class HadoopCredentialsManager private (name: String) extends AbstractService(na
201212
providers.contains(serviceName)
202213
}
203214

204-
private def scheduleRenewal(userRef: CredentialsRef, delay: Long): Unit = {
215+
private def scheduleRenewal(
216+
userRef: CredentialsRef,
217+
delay: Long,
218+
waitCompletion: Boolean = false): Unit = {
205219
val renewalTask = new Runnable {
206220
override def run(): Unit = {
207221
try {
@@ -219,13 +233,23 @@ class HadoopCredentialsManager private (name: String) extends AbstractService(na
219233
s" $renewalRetryWait ms",
220234
e)
221235
scheduleRenewal(userRef, renewalRetryWait)
236+
// throw exception when one-time execution fails,
237+
// so that client side can be aware of this
238+
if (waitCompletion) {
239+
throw new KyuubiException(s"One-time execution failed for token update task " +
240+
s"for ${userRef.getAppUser}")
241+
}
222242
}
223243
}
224244
}
225245

226-
renewalExecutor.foreach { executor =>
227-
info(s"Scheduling renewal in $delay ms.")
228-
executor.schedule(renewalTask, delay, TimeUnit.MILLISECONDS)
246+
if (waitCompletion) {
247+
renewalTask.run()
248+
} else {
249+
renewalExecutor.foreach { executor =>
250+
info(s"Scheduling renewal in $delay ms.")
251+
executor.schedule(renewalTask, delay, TimeUnit.MILLISECONDS)
252+
}
229253
}
230254
}
231255

kyuubi-server/src/main/scala/org/apache/kyuubi/operation/LaunchEngine.scala

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
package org.apache.kyuubi.operation
1919

2020
import org.apache.kyuubi.operation.log.OperationLog
21-
import org.apache.kyuubi.session.KyuubiSessionImpl
21+
import org.apache.kyuubi.session.{KyuubiSessionImpl, KyuubiSessionManager}
2222

2323
class LaunchEngine(session: KyuubiSessionImpl, override val shouldRunAsync: Boolean)
2424
extends KyuubiOperation(OperationType.UNKNOWN_OPERATION, session) {
@@ -49,6 +49,7 @@ class LaunchEngine(session: KyuubiSessionImpl, override val shouldRunAsync: Bool
4949
setState(OperationState.RUNNING)
5050
try {
5151
session.openEngineSession(getOperationLog)
52+
renewEngineCredentials()
5253
setState(OperationState.FINISHED)
5354
} catch onError()
5455
}
@@ -59,4 +60,18 @@ class LaunchEngine(session: KyuubiSessionImpl, override val shouldRunAsync: Bool
5960

6061
if (!shouldRunAsync) getBackgroundHandle.get()
6162
}
63+
64+
private def renewEngineCredentials(): Unit = {
65+
val sessionManager = session.sessionManager.asInstanceOf[KyuubiSessionManager]
66+
try {
67+
sessionManager.credentialsManager.sendCredentialsIfNeeded(
68+
session.handle.identifier.toString,
69+
session.user,
70+
client.sendCredentials,
71+
true)
72+
} catch {
73+
case e: Exception =>
74+
error(s"Failed to renew engine credentials when launching engine", e)
75+
}
76+
}
6277
}

kyuubi-server/src/test/scala/org/apache/kyuubi/credentials/HadoopCredentialsManagerSuite.scala

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import org.apache.hadoop.conf.Configuration
2424
import org.apache.hadoop.security.Credentials
2525
import org.scalatest.time.SpanSugar.convertIntToGrainOfTime
2626

27-
import org.apache.kyuubi.KyuubiFunSuite
27+
import org.apache.kyuubi.{KyuubiException, KyuubiFunSuite}
2828
import org.apache.kyuubi.config.KyuubiConf
2929

3030
class HadoopCredentialsManagerSuite extends KyuubiFunSuite {
@@ -93,6 +93,24 @@ class HadoopCredentialsManagerSuite extends KyuubiFunSuite {
9393
}
9494
}
9595

96+
test("execute credentials renewal task and wait for completion") {
97+
val kyuubiConf = new KyuubiConf(false)
98+
.set(KyuubiConf.CREDENTIALS_RENEWAL_INTERVAL, 1000L)
99+
withStartedManager(kyuubiConf) { manager =>
100+
val userRef = manager.getOrCreateUserCredentialsRef(appUser, true)
101+
assert(userRef.getEpoch == 0)
102+
}
103+
}
104+
105+
test("throw exception when credential renewal fails") {
106+
val kyuubiConf = new KyuubiConf(false)
107+
.set(KyuubiConf.CREDENTIALS_RENEWAL_INTERVAL, 1000L)
108+
withStartedManager(kyuubiConf) { manager =>
109+
UnstableDelegationTokenProvider.throwException = true
110+
assertThrows[KyuubiException](manager.getOrCreateUserCredentialsRef(appUser, true))
111+
}
112+
}
113+
96114
test("schedule credentials renewal retry when failed") {
97115
val kyuubiConf = new KyuubiConf(false)
98116
.set(KyuubiConf.CREDENTIALS_RENEWAL_INTERVAL, 1000L)

0 commit comments

Comments
 (0)