Skip to content

Commit

Permalink
[KYUUBI #1737] [WIP][KYUUBI #1731][FEATURE] Kyuubi server should not …
Browse files Browse the repository at this point in the history
…fail to start when Hadoop FileSystem class not found

### _Why are the changes needed?_
Kyuubi server fails to start when Hadoop FileSystem class not found.

### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [ ] [Run test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #1737 from zhouyifan279/1731.

Closes #1737

Closes #1731

477ca37 [zhouyifan279] [KYUUBI #1731][FEATURE] Kyuubi server should not fail to start when Hadoop FileSystem class not found

Authored-by: zhouyifan279 <zhouyifan279@gmail.com>
Signed-off-by: ulysses-you <ulyssesyou@apache.org>
  • Loading branch information
zhouyifan279 authored and ulysses-you committed Jan 12, 2022
1 parent c1a6cd5 commit a550ac7
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 4 deletions.
Expand Up @@ -21,6 +21,7 @@ import java.lang.reflect.UndeclaredThrowableException
import java.security.PrivilegedExceptionAction

import scala.collection.JavaConverters._
import scala.util.{Failure, Success, Try}

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
Expand Down Expand Up @@ -77,7 +78,7 @@ class HadoopFsDelegationTokenProvider extends HadoopDelegationTokenProvider with

}

object HadoopFsDelegationTokenProvider {
object HadoopFsDelegationTokenProvider extends Logging {

def disableFsCache(kyuubiConf: KyuubiConf, hadoopConf: Configuration): Configuration = {
// Avoid unnecessary disk io by not loading default resources
Expand All @@ -93,13 +94,26 @@ object HadoopFsDelegationTokenProvider {
}

def hadoopFSsToAccess(kyuubiConf: KyuubiConf, hadoopConf: Configuration): Set[FileSystem] = {
val defaultFS = FileSystem.get(hadoopConf)
val filesystemsToAccess = kyuubiConf
.get(KyuubiConf.CREDENTIALS_HADOOP_FS_URIS)
.map(new Path(_).getFileSystem(hadoopConf))
.flatMap { uri =>
Try(new Path(uri).getFileSystem(hadoopConf)) match {
case Success(value) =>
Some(value)
case Failure(e) =>
warn(s"Failed to get Hadoop FileSystem instance by URI: $uri", e)
None
}
}
.toSet

filesystemsToAccess + defaultFS
Try(FileSystem.get(hadoopConf)) match {
case Success(value) =>
filesystemsToAccess + value
case Failure(e) =>
warn(s"Failed to get default Hadoop FileSystem instance", e)
filesystemsToAccess
}
}

def doAsProxyUser[T](proxyUser: String)(f: => T): T = {
Expand Down
Expand Up @@ -17,7 +17,9 @@

package org.apache.kyuubi.credentials

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.hdfs.DistributedFileSystem
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier
import org.apache.hadoop.io.Text
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
Expand Down Expand Up @@ -57,4 +59,23 @@ class HadoopFsDelegationTokenProviderSuite extends WithSecuredDFSService {
}
}

test("FileSystem implementation class not found") {
tryWithSecurityEnabled {
UserGroupInformation.loginUserFromKeytab(testPrincipal, testKeytab)

val hdfsConf = new Configuration(getHadoopConf)
val hdfsUri = hdfsConf.get("fs.defaultFS")
hdfsConf.set("fs.defaultFS", "unknown://kyuubi")
hdfsConf.set("fs.unknown.impl", "unknown.hadoop.FileSystem")

val kyuubiConf = new KyuubiConf(false)
kyuubiConf.set(
KyuubiConf.CREDENTIALS_HADOOP_FS_URIS,
Seq("unknown://kyuubi", hdfsUri))

val fileSystems = HadoopFsDelegationTokenProvider.hadoopFSsToAccess(kyuubiConf, hdfsConf)
assert(fileSystems.size == 1)
assert(fileSystems.head.isInstanceOf[DistributedFileSystem])
}
}
}

0 comments on commit a550ac7

Please sign in to comment.