Skip to content

Commit c17dd9d

Browse files
zhouyifan279ulysses-you
authored andcommitted
[KYUUBI #1731][FEATURE][FOLLOWUP] Use validated HadoopFs URI in HadoopFsDelegationTokenProvider#obtainDelegationTokens
### _Why are the changes needed?_ If core-site.xml contains a FileSystem uri, whose FileSystem implementation class is not present, following error will appear repeatedly in Kyuubi log after any user connects to Kyuubi and executed any SQL: ``` 22/01/12 19:45:08 WARN credentials.HadoopFsDelegationTokenProvider: Failed to get Hadoop FileSystem instance by URI: alluxio://localhost java.lang.RuntimeException: java.lang.ClassNotFoundException: Class alluxio.hadoop.FileSystem not found at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2667) at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3431) at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3466) at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:174) at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3574) at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3521) at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:540) at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365) at org.apache.kyuubi.credentials.HadoopFsDelegationTokenProvider$.$anonfun$hadoopFSsToAccess$2(HadoopFsDelegationTokenProvider.scala:100) at scala.util.Try$.apply(Try.scala:213) at org.apache.kyuubi.credentials.HadoopFsDelegationTokenProvider$.$anonfun$hadoopFSsToAccess$1(HadoopFsDelegationTokenProvider.scala:100) at scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:293) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at scala.collection.TraversableLike.flatMap(TraversableLike.scala:293) at scala.collection.TraversableLike.flatMap$(TraversableLike.scala:290) at scala.collection.AbstractTraversable.flatMap(Traversable.scala:108) at org.apache.kyuubi.credentials.HadoopFsDelegationTokenProvider$.hadoopFSsToAccess(HadoopFsDelegationTokenProvider.scala:99) at org.apache.kyuubi.credentials.HadoopFsDelegationTokenProvider.$anonfun$obtainDelegationTokens$1(HadoopFsDelegationTokenProvider.scala:62) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.kyuubi.credentials.HadoopFsDelegationTokenProvider$$anon$1.run(HadoopFsDelegationTokenProvider.scala:125) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1878) at org.apache.kyuubi.credentials.HadoopFsDelegationTokenProvider$.doAsProxyUser(HadoopFsDelegationTokenProvider.scala:124) at org.apache.kyuubi.credentials.HadoopFsDelegationTokenProvider.obtainDelegationTokens(HadoopFsDelegationTokenProvider.scala:60) at org.apache.kyuubi.credentials.HadoopCredentialsManager$$anon$1.$anonfun$run$1(HadoopCredentialsManager.scala:210) at org.apache.kyuubi.credentials.HadoopCredentialsManager$$anon$1.$anonfun$run$1$adapted(HadoopCredentialsManager.scala:210) at scala.collection.Iterator.foreach(Iterator.scala:943) at scala.collection.Iterator.foreach$(Iterator.scala:943) at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:214) at org.apache.kyuubi.credentials.HadoopCredentialsManager$$anon$1.run(HadoopCredentialsManager.scala:210) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.ClassNotFoundException: Class alluxio.hadoop.FileSystem not found at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2571) at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2665) ... 40 more ``` ### _How was this patch tested?_ - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [x] [Run test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests) locally before make a pull request Closes #1739 from zhouyifan279/1731. Closes #1731 387036d [zhouyifan279] [KYUUBI #1731][FEATURE] Kyuubi server should not fail to start when Hadoop FileSystem class not found 6d80b8e [zhouyifan279] [KYUUBI #1731][FEATURE] Kyuubi server should not fail to start when Hadoop FileSystem class not found Authored-by: zhouyifan279 <zhouyifan279@gmail.com> Signed-off-by: ulysses-you <ulyssesyou@apache.org>
1 parent a533b2f commit c17dd9d

File tree

2 files changed

+31
-46
lines changed

2 files changed

+31
-46
lines changed

kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/HadoopFsDelegationTokenProvider.scala

Lines changed: 26 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -18,48 +18,52 @@
1818
package org.apache.kyuubi.credentials
1919

2020
import java.lang.reflect.UndeclaredThrowableException
21+
import java.net.URI
2122
import java.security.PrivilegedExceptionAction
2223

2324
import scala.collection.JavaConverters._
24-
import scala.util.{Failure, Success, Try}
2525

2626
import org.apache.hadoop.conf.Configuration
27-
import org.apache.hadoop.fs.{FileSystem, Path}
27+
import org.apache.hadoop.fs.FileSystem
2828
import org.apache.hadoop.hdfs.HdfsConfiguration
2929
import org.apache.hadoop.security.{Credentials, SecurityUtil, UserGroupInformation}
3030
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod
3131

3232
import org.apache.kyuubi.Logging
3333
import org.apache.kyuubi.config.KyuubiConf
34-
import org.apache.kyuubi.credentials.HadoopFsDelegationTokenProvider.{disableFsCache, doAsProxyUser}
34+
import org.apache.kyuubi.credentials.HadoopFsDelegationTokenProvider.{doAsProxyUser, validatedFsUris}
3535
import org.apache.kyuubi.util.KyuubiHadoopUtils
3636

3737
class HadoopFsDelegationTokenProvider extends HadoopDelegationTokenProvider with Logging {
3838

3939
private var tokenRequired: Boolean = _
4040
private var hadoopConf: Configuration = _
4141
private var kyuubiConf: KyuubiConf = _
42+
private var fsUris: Seq[URI] = _
4243

4344
override val serviceName: String = "hadoopfs"
4445

4546
override def initialize(hadoopConf: Configuration, kyuubiConf: KyuubiConf): Unit = {
4647
this.tokenRequired =
4748
SecurityUtil.getAuthenticationMethod(hadoopConf) != AuthenticationMethod.SIMPLE
4849

49-
// FileSystem objects are cached in FileSystem.CACHE by a composite key.
50-
// The UserGroupInformation object used to create it is part of that key.
51-
// If cache is enabled, new FileSystem objects are created and cached at every method
52-
// invocation.
53-
this.hadoopConf = disableFsCache(kyuubiConf, new HdfsConfiguration(hadoopConf))
5450
this.kyuubiConf = kyuubiConf
51+
this.fsUris = validatedFsUris(kyuubiConf, hadoopConf)
52+
this.hadoopConf = KyuubiHadoopUtils.newHadoopConf(kyuubiConf, loadDefaults = false)
53+
54+
// Using HdfsConfiguration to ensure hdfs-site.xml is loaded
55+
new HdfsConfiguration(hadoopConf).iterator().asScala.foreach(e =>
56+
this.hadoopConf.set(e.getKey, e.getValue))
57+
// Disable FileSystem cache as its size grows at each invocation of #obtainDelegationTokens
58+
this.fsUris.foreach(uri =>
59+
this.hadoopConf.setBoolean(s"fs.${uri.getScheme}.impl.disable.cache", true))
5560
}
5661

5762
override def delegationTokensRequired(): Boolean = tokenRequired
5863

5964
override def obtainDelegationTokens(owner: String, creds: Credentials): Unit = {
6065
doAsProxyUser(owner) {
61-
val fileSystems =
62-
HadoopFsDelegationTokenProvider.hadoopFSsToAccess(kyuubiConf, hadoopConf)
66+
val fileSystems = fsUris.map(FileSystem.get(_, hadoopConf)).toSet
6367

6468
try {
6569
// Renewer is not needed. But setting a renewer can avoid potential NPE.
@@ -80,39 +84,19 @@ class HadoopFsDelegationTokenProvider extends HadoopDelegationTokenProvider with
8084

8185
object HadoopFsDelegationTokenProvider extends Logging {
8286

83-
def disableFsCache(kyuubiConf: KyuubiConf, hadoopConf: Configuration): Configuration = {
84-
// Avoid unnecessary disk io by not loading default resources
85-
val newConf = KyuubiHadoopUtils.newHadoopConf(
86-
kyuubiConf,
87-
loadDefaults = false)
88-
89-
hadoopConf.iterator().asScala.foreach(e => newConf.set(e.getKey, e.getValue))
90-
91-
hadoopFSsToAccess(kyuubiConf, hadoopConf)
92-
.foreach(fs => newConf.setBoolean(s"fs.${fs.getScheme}.impl.disable.cache", true))
93-
newConf
94-
}
95-
96-
def hadoopFSsToAccess(kyuubiConf: KyuubiConf, hadoopConf: Configuration): Set[FileSystem] = {
97-
val filesystemsToAccess = kyuubiConf
98-
.get(KyuubiConf.CREDENTIALS_HADOOP_FS_URIS)
99-
.flatMap { uri =>
100-
Try(new Path(uri).getFileSystem(hadoopConf)) match {
101-
case Success(value) =>
102-
Some(value)
103-
case Failure(e) =>
104-
warn(s"Failed to get Hadoop FileSystem instance by URI: $uri", e)
105-
None
106-
}
87+
def validatedFsUris(kyuubiConf: KyuubiConf, hadoopConf: Configuration): Seq[URI] = {
88+
val uris = kyuubiConf.get(KyuubiConf.CREDENTIALS_HADOOP_FS_URIS) :+
89+
hadoopConf.get("fs.defaultFS", "file:///")
90+
uris.flatMap { str =>
91+
try {
92+
val uri = URI.create(str)
93+
FileSystem.get(uri, hadoopConf)
94+
Some(uri)
95+
} catch {
96+
case e: Throwable =>
97+
warn(s"Failed to get Hadoop FileSystem instance by URI: $str", e)
98+
None
10799
}
108-
.toSet
109-
110-
Try(FileSystem.get(hadoopConf)) match {
111-
case Success(value) =>
112-
filesystemsToAccess + value
113-
case Failure(e) =>
114-
warn(s"Failed to get default Hadoop FileSystem instance", e)
115-
filesystemsToAccess
116100
}
117101
}
118102

kyuubi-server/src/test/scala/org/apache/kyuubi/credentials/HadoopFsDelegationTokenProviderSuite.scala

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ package org.apache.kyuubi.credentials
1919

2020
import org.apache.hadoop.conf.Configuration
2121
import org.apache.hadoop.fs.FileSystem
22-
import org.apache.hadoop.hdfs.DistributedFileSystem
2322
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier
2423
import org.apache.hadoop.io.Text
2524
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
@@ -73,9 +72,11 @@ class HadoopFsDelegationTokenProviderSuite extends WithSecuredDFSService {
7372
KyuubiConf.CREDENTIALS_HADOOP_FS_URIS,
7473
Seq("unknown://kyuubi", hdfsUri))
7574

76-
val fileSystems = HadoopFsDelegationTokenProvider.hadoopFSsToAccess(kyuubiConf, hdfsConf)
77-
assert(fileSystems.size == 1)
78-
assert(fileSystems.head.isInstanceOf[DistributedFileSystem])
75+
val uris = HadoopFsDelegationTokenProvider.validatedFsUris(kyuubiConf, hdfsConf)
76+
assert(uris.size == 1)
77+
assert(uris.head.toString == hdfsUri)
78+
79+
new HadoopFsDelegationTokenProvider().initialize(hdfsConf, kyuubiConf)
7980
}
8081
}
8182
}

0 commit comments

Comments
 (0)