Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ object NativeMemoryManager {
rl,
ConfigUtil.serialize(
GlutenConfig
.getNativeSessionConf(backendName, GlutenConfigUtil.parseConfig(SQLConf.get.getAllConfs)))
.getNativeSessionConf(backendName, GlutenConfigUtil.parseConfig(SQLConf.get.getAllConfs))
.asJava)
)
spillers.append(new Spiller() {
override def spill(self: MemoryTarget, phase: Spiller.Phase, size: Long): Long = phase match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,9 @@ object Runtime {
nmm.getHandle(),
ConfigUtil.serialize(
(GlutenConfig
.getNativeSessionConf(backendName, GlutenConfigUtil.parseConfig(SQLConf.get.getAllConfs))
.asScala ++ extraConf.asScala).asJava)
.getNativeSessionConf(
backendName,
GlutenConfigUtil.parseConfig(SQLConf.get.getAllConfs)) ++ extraConf.asScala).asJava)
)

private val released: AtomicBoolean = new AtomicBoolean(false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@ import org.apache.gluten.shuffle.SupportsColumnarShuffle
import org.apache.spark.network.util.{ByteUnit, JavaUtils}
import org.apache.spark.sql.internal.{GlutenConfigUtil, SQLConf}

import com.google.common.collect.ImmutableList
import org.apache.hadoop.security.UserGroupInformation

import java.util
import java.util.Locale

import scala.collection.JavaConverters._
import scala.collection.mutable

trait ShuffleWriterType {
val name: String
Expand Down Expand Up @@ -519,18 +519,13 @@ object GlutenConfig extends ConfigRegistry {
"spark.gluten.sql.columnar.backend.velox.cudf.enableTableScan"
)

/**
* Get dynamic configs.
*
* TODO: Improve the get native conf logic.
*/
def getNativeSessionConf(
backendName: String,
conf: Map[String, String]): util.Map[String, String] = {
val nativeConfMap = new util.HashMap[String, String]()
nativeConfMap.putAll(conf.filter(e => nativeKeys.contains(e._1)).asJava)
/** Get dynamic configs. */
def getNativeSessionConf(backendName: String, conf: Map[String, String]): Map[String, String] = {
val nativeConfMap = mutable.Map[String, String](conf.filter {
case (key, _) => nativeKeys.contains(key)
}.toSeq: _*)

val keyWithDefault = ImmutableList.of(
Seq(
(SQLConf.CASE_SENSITIVE.key, SQLConf.CASE_SENSITIVE.defaultValueString),
(SQLConf.IGNORE_MISSING_FILES.key, SQLConf.IGNORE_MISSING_FILES.defaultValueString),
(
Expand All @@ -546,19 +541,17 @@ object GlutenConfig extends ConfigRegistry {
(SQLConf.MAP_KEY_DEDUP_POLICY.key, SQLConf.MAP_KEY_DEDUP_POLICY.defaultValueString),
(SQLConf.SESSION_LOCAL_TIMEZONE.key, SQLConf.SESSION_LOCAL_TIMEZONE.defaultValueString),
(SQLConf.ANSI_ENABLED.key, SQLConf.ANSI_ENABLED.defaultValueString)
).foreach { case (k, defaultValue) => nativeConfMap.put(k, conf.getOrElse(k, defaultValue)) }

Seq(
(SPARK_UNSAFE_SORTER_SPILL_READER_BUFFER_SIZE, ByteUnit.BYTE, (v: Long) => v.toString),
(SPARK_SHUFFLE_SPILL_DISK_WRITE_BUFFER_SIZE, ByteUnit.BYTE, (v: Long) => v.toString),
(SPARK_SHUFFLE_FILE_BUFFER, ByteUnit.KiB, (v: Long) => (v * 1024).toString)
)
keyWithDefault.forEach(e => nativeConfMap.put(e._1, conf.getOrElse(e._1, e._2)))
GlutenConfigUtil.mapByteConfValue(
conf,
SPARK_UNSAFE_SORTER_SPILL_READER_BUFFER_SIZE,
ByteUnit.BYTE)(
v => nativeConfMap.put(SPARK_UNSAFE_SORTER_SPILL_READER_BUFFER_SIZE, v.toString))
GlutenConfigUtil.mapByteConfValue(
conf,
SPARK_SHUFFLE_SPILL_DISK_WRITE_BUFFER_SIZE,
ByteUnit.BYTE)(v => nativeConfMap.put(SPARK_SHUFFLE_SPILL_DISK_WRITE_BUFFER_SIZE, v.toString))
GlutenConfigUtil.mapByteConfValue(conf, SPARK_SHUFFLE_FILE_BUFFER, ByteUnit.KiB)(
v => nativeConfMap.put(SPARK_SHUFFLE_FILE_BUFFER, (v * 1024).toString))
.foreach {
case (k, unit, f) =>
GlutenConfigUtil.mapByteConfValue(conf, k, unit)(v => nativeConfMap.put(k, f(v)))
}

conf
.get(SQLConf.LEGACY_TIME_PARSER_POLICY.key)
Expand All @@ -570,8 +563,11 @@ object GlutenConfig extends ConfigRegistry {
// Backend's dynamic session conf only.
val confPrefix = prefixOf(backendName)
conf
.filter(entry => entry._1.startsWith(confPrefix) && !SQLConf.isStaticConfigKey(entry._1))
.foreach(entry => nativeConfMap.put(entry._1, entry._2))
.filter {
case (k, _) =>
k.startsWith(confPrefix) && !SQLConf.isStaticConfigKey(k)
}
.foreach { case (k, v) => nativeConfMap.put(k, v) }

// Pass the latest tokens to native
nativeConfMap.put(
Expand All @@ -584,23 +580,21 @@ object GlutenConfig extends ConfigRegistry {
UserGroupInformation.getCurrentUser.getUserName)

// return
nativeConfMap
nativeConfMap.toMap
}

/**
* Get static and dynamic configs. Some of the config is dynamic in spark, but is static in
* gluten, these will be used to construct HiveConnector which intends reused in velox
*
* TODO: Improve the get native conf logic.
*/
def getNativeBackendConf(
backendName: String,
conf: scala.collection.Map[String, String]): util.Map[String, String] = {

val nativeConfMap = new util.HashMap[String, String]()
val nativeConfMap = mutable.HashMap.empty[String, String]

// some configs having default values
val keyWithDefault = ImmutableList.of(
Seq(
(SPARK_S3_CONNECTION_SSL_ENABLED, "false"),
(SPARK_S3_PATH_STYLE_ACCESS, "true"),
(SPARK_S3_USE_INSTANCE_CREDENTIALS, "false"),
Expand All @@ -627,8 +621,7 @@ object GlutenConfig extends ConfigRegistry {
("spark.gluten.velox.s3UseProxyFromEnv", "false"),
("spark.gluten.velox.s3PayloadSigningPolicy", "Never"),
(SQLConf.SESSION_LOCAL_TIMEZONE.key, SQLConf.SESSION_LOCAL_TIMEZONE.defaultValueString)
)
keyWithDefault.forEach(e => nativeConfMap.put(e._1, conf.getOrElse(e._1, e._2)))
).foreach { case (k, defaultValue) => nativeConfMap.put(k, conf.getOrElse(k, defaultValue)) }

val keys = Set(
DEBUG_ENABLED.key,
Expand All @@ -645,35 +638,24 @@ object GlutenConfig extends ConfigRegistry {
SQLConf.LEGACY_STATISTICAL_AGGREGATE.key,
COLUMNAR_CUDF_ENABLED.key
)
nativeConfMap.putAll(conf.filter(e => keys.contains(e._1)).asJava)

val confPrefix = prefixOf(backendName)
conf
.filter(_._1.startsWith(confPrefix))
.foreach(entry => nativeConfMap.put(entry._1, entry._2))

// put in all S3 configs
conf
.filter(_._1.startsWith(HADOOP_PREFIX + S3A_PREFIX))
.foreach(entry => nativeConfMap.put(entry._1, entry._2))

// handle ABFS config
conf
.filter(_._1.startsWith(HADOOP_PREFIX + ABFS_PREFIX))
.foreach(entry => nativeConfMap.put(entry._1, entry._2))
nativeConfMap ++= conf.filter { case (k, _) => keys.contains(k) }

// put in all GCS configs
conf
.filter(_._1.startsWith(HADOOP_PREFIX + GCS_PREFIX))
.foreach(entry => nativeConfMap.put(entry._1, entry._2))

// put in all gluten velox configs
val confPrefix = prefixOf(backendName)
val s3Prefix = HADOOP_PREFIX + S3A_PREFIX
val azurePrefix = HADOOP_PREFIX + ABFS_PREFIX
val gsPrefix = HADOOP_PREFIX + GCS_PREFIX
val backendPrefix = s"spark.gluten.$backendName"
conf
.filter(_._1.startsWith(s"spark.gluten.$backendName"))
.foreach(entry => nativeConfMap.put(entry._1, entry._2))
.filter {
case (k, _) =>
k.startsWith(confPrefix) || k.startsWith(s3Prefix) || k.startsWith(azurePrefix) || k
.startsWith(gsPrefix) || k.startsWith(backendPrefix)
}
.foreach { case (k, v) => nativeConfMap.put(k, v) }

// return
nativeConfMap
nativeConfMap.asJava
}

val GLUTEN_ENABLED = GlutenCoreConfig.GLUTEN_ENABLED
Expand Down