Skip to content

Commit 6c4a8b0

Browse files
zhouyifan279pan3793
authored andcommitted
[KYUUBI #3219] Error renew delegation tokens: Unknown version of delegation token 8
### _Why are the changes needed?_ Spark SQL Engine failed to get issue date from some DelegationTokenIdentifiers, such as OzoneTokenIdentifier. ### _How was this patch tested?_ - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request Closes #3225 from zhouyifan279/3219. Closes #3219 0ffdcf9 [zhouyifan279] [KYUUBI #3219] Error renew delegation tokens: Unknown version of delegation token 8 c5a37a0 [zhouyifan279] [KYUUBI #3219] Error renew delegation tokens: Unknown version of delegation token 8 594c5a0 [zhouyifan279] [KYUUBI #3219] Error renew delegation tokens: Unknown version of delegation token 8 cbbafd5 [zhouyifan279] [KYUUBI #3219] Error renew delegation tokens: Unknown version of delegation token 8 394308e [zhouyifan279] [KYUUBI #3219] Error renew delegation tokens: Unknown version of delegation token 8 Authored-by: zhouyifan279 <zhouyifan279@gmail.com> Signed-off-by: Cheng Pan <chengpan@apache.org>
1 parent e847ab3 commit 6c4a8b0

File tree

3 files changed

+91
-14
lines changed

3 files changed

+91
-14
lines changed

externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkTBinaryFrontendService.scala

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -141,8 +141,7 @@ object SparkTBinaryFrontendService extends Logging {
141141
}
142142
.map(_._2)
143143
newToken.foreach { token =>
144-
if (KyuubiHadoopUtils.getTokenIssueDate(token) >
145-
KyuubiHadoopUtils.getTokenIssueDate(oldAliasAndToken.get._2)) {
144+
if (compareIssueDate(token, oldAliasAndToken.get._2) > 0) {
146145
updateCreds.addToken(oldAliasAndToken.get._1, token)
147146
} else {
148147
warn(s"Ignore Hive token with earlier issue date: $token")
@@ -166,8 +165,7 @@ object SparkTBinaryFrontendService extends Logging {
166165
tokens.foreach { case (alias, newToken) =>
167166
val oldToken = oldCreds.getToken(alias)
168167
if (oldToken != null) {
169-
if (KyuubiHadoopUtils.getTokenIssueDate(newToken) >
170-
KyuubiHadoopUtils.getTokenIssueDate(oldToken)) {
168+
if (compareIssueDate(newToken, oldToken) > 0) {
171169
updateCreds.addToken(alias, newToken)
172170
} else {
173171
warn(s"Ignore token with earlier issue date: $newToken")
@@ -177,4 +175,16 @@ object SparkTBinaryFrontendService extends Logging {
177175
}
178176
}
179177
}
178+
179+
private def compareIssueDate(
180+
newToken: Token[_ <: TokenIdentifier],
181+
oldToken: Token[_ <: TokenIdentifier]): Int = {
182+
val newDate = KyuubiHadoopUtils.getTokenIssueDate(newToken)
183+
val oldDate = KyuubiHadoopUtils.getTokenIssueDate(oldToken)
184+
if (newDate.isDefined && oldDate.isDefined && newDate.get <= oldDate.get) {
185+
-1
186+
} else {
187+
1
188+
}
189+
}
180190
}

kyuubi-common/src/main/scala/org/apache/kyuubi/util/KyuubiHadoopUtils.scala

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,17 +21,20 @@ import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, Da
2121
import java.util.{Base64, Map => JMap}
2222

2323
import scala.collection.JavaConverters._
24+
import scala.util.{Failure, Success, Try}
2425

2526
import org.apache.hadoop.conf.Configuration
2627
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier
2728
import org.apache.hadoop.io.Text
2829
import org.apache.hadoop.security.{Credentials, SecurityUtil, UserGroupInformation}
2930
import org.apache.hadoop.security.token.{Token, TokenIdentifier}
31+
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier
3032
import org.apache.hadoop.yarn.conf.YarnConfiguration
3133

34+
import org.apache.kyuubi.Logging
3235
import org.apache.kyuubi.config.KyuubiConf
3336

34-
object KyuubiHadoopUtils {
37+
object KyuubiHadoopUtils extends Logging {
3538

3639
private val subjectField =
3740
classOf[UserGroupInformation].getDeclaredField("subject")
@@ -85,13 +88,25 @@ object KyuubiHadoopUtils {
8588
.toMap
8689
}
8790

88-
def getTokenIssueDate(token: Token[_ <: TokenIdentifier]): Long = {
89-
// It is safe to deserialize any token identifier to hdfs `DelegationTokenIdentifier`
90-
// as all token identifiers have the same binary format.
91-
val tokenIdentifier = new DelegationTokenIdentifier
92-
val buf = new ByteArrayInputStream(token.getIdentifier)
93-
val in = new DataInputStream(buf)
94-
tokenIdentifier.readFields(in)
95-
tokenIdentifier.getIssueDate
91+
def getTokenIssueDate(token: Token[_ <: TokenIdentifier]): Option[Long] = {
92+
token.decodeIdentifier() match {
93+
case tokenIdent: AbstractDelegationTokenIdentifier =>
94+
Some(tokenIdent.getIssueDate)
95+
case null =>
96+
// TokenIdentifiers not found in ServiceLoader
97+
val tokenIdentifier = new DelegationTokenIdentifier
98+
val buf = new ByteArrayInputStream(token.getIdentifier)
99+
val in = new DataInputStream(buf)
100+
Try(tokenIdentifier.readFields(in)) match {
101+
case Success(_) =>
102+
Some(tokenIdentifier.getIssueDate)
103+
case Failure(e) =>
104+
warn(s"Can not decode identifier of token $token", e)
105+
None
106+
}
107+
case tokenIdent =>
108+
debug(s"Unsupported TokenIdentifier kind: ${tokenIdent.getKind}")
109+
None
110+
}
96111
}
97112
}

kyuubi-common/src/test/scala/org/apache/kyuubi/util/KyuubiHadoopUtilsSuite.scala

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,17 @@
1717

1818
package org.apache.kyuubi.util
1919

20+
import java.io.{DataInput, DataOutput}
2021
import java.util.stream.StreamSupport
2122

2223
import scala.util.Random
2324

25+
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier
26+
import org.apache.hadoop.hdfs.security.token.delegation.{DelegationTokenIdentifier => HDFSTokenIdent}
2427
import org.apache.hadoop.io.Text
2528
import org.apache.hadoop.security.Credentials
26-
import org.apache.hadoop.security.token.Token
29+
import org.apache.hadoop.security.token.{Token, TokenIdentifier}
30+
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier
2731

2832
import org.apache.kyuubi.KyuubiFunSuite
2933
import org.apache.kyuubi.config.KyuubiConf
@@ -75,4 +79,52 @@ class KyuubiHadoopUtilsSuite extends KyuubiFunSuite {
7579
assert(StreamSupport.stream(hadoopConf.spliterator(), false)
7680
.noneMatch(entry => entry.getKey.startsWith("hadoop") || entry.getKey.startsWith("fs")))
7781
}
82+
83+
test("get token issue date") {
84+
val issueDate = System.currentTimeMillis()
85+
86+
def checkIssueDate(tokenIdent: TokenIdentifier, expected: Option[Long]): Unit = {
87+
val hdfsToken = new Token[HDFSTokenIdent]()
88+
hdfsToken.setKind(tokenIdent.getKind)
89+
hdfsToken.setID(tokenIdent.getBytes)
90+
assert(KyuubiHadoopUtils.getTokenIssueDate(hdfsToken) == expected)
91+
}
92+
93+
// DelegationTokenIdentifier found in ServiceLoader
94+
// Such as HDFS_DELEGATION_TOKEN, OzoneToken
95+
val hdfsTokenIdent = new HDFSTokenIdent()
96+
hdfsTokenIdent.setIssueDate(issueDate)
97+
checkIssueDate(hdfsTokenIdent, Some(issueDate))
98+
99+
// TokenIdentifier with no issue date found in ServiceLoader
100+
val blockTokenIdent = new BlockTokenIdentifier()
101+
checkIssueDate(blockTokenIdent, None)
102+
103+
// DelegationTokenIdentifier not found in ServiceLoader
104+
// Such as HIVE_DELEGATION_TOKEN
105+
val testTokenIdent = new TestDelegationTokenIdentifier()
106+
testTokenIdent.setIssueDate(issueDate)
107+
checkIssueDate(testTokenIdent, Some(issueDate))
108+
109+
// DelegationTokenIdentifier with custom binary format and not found in ServiceLoader
110+
val testTokenIdent2 = new TestDelegationTokenIdentifier2()
111+
testTokenIdent2.setIssueDate(issueDate)
112+
checkIssueDate(testTokenIdent2, None)
113+
}
114+
}
115+
116+
private class TestDelegationTokenIdentifier extends AbstractDelegationTokenIdentifier {
117+
override def getKind: Text = new Text("KYUUBI_TOKEN_NOT_IN_SERVICE_LOADER")
118+
}
119+
120+
private class TestDelegationTokenIdentifier2 extends AbstractDelegationTokenIdentifier {
121+
override def getKind: Text = new Text("KYUUBI_TOKEN_OVERRIDE_WRITE")
122+
123+
override def write(out: DataOutput): Unit = {
124+
out.writeLong(getIssueDate)
125+
}
126+
127+
override def readFields(in: DataInput): Unit = {
128+
setIssueDate(in.readLong())
129+
}
78130
}

0 commit comments

Comments
 (0)