Skip to content

Commit

Permalink
[SPARK-31037][SQL] refine AQE config names
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

When introducing AQE to others, I feel the config names are a bit incoherent and hard to use.
This PR refines the config names:
1. remove the "shuffle" prefix. AQE is all about shuffle and we don't need to add the "shuffle" prefix everywhere.
2. `targetPostShuffleInputSize` is obscure, rename to `advisoryShufflePartitionSizeInBytes`.
3. `reducePostShufflePartitions` doesn't match the actual optimization, rename to `coalesceShufflePartitions`
4. `minNumPostShufflePartitions` is obscure, rename it `minPartitionNum` under the `coalesceShufflePartitions` namespace
5. `maxNumPostShufflePartitions` is confusing with the word "max", rename it `initialPartitionNum`
6. `skewedJoinOptimization` is too verbose. skew join is a well-known terminology in database area, we can just say `skewJoin`

### Why are the changes needed?

Make the config names easy to understand.

### Does this PR introduce any user-facing change?

deprecate the config `spark.sql.adaptive.shuffle.targetPostShuffleInputSize`

### How was this patch tested?

N/A

Closes #27793 from cloud-fan/aqe.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
cloud-fan committed Mar 5, 2020
1 parent 2257ce2 commit ba86524
Show file tree
Hide file tree
Showing 10 changed files with 121 additions and 112 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -380,8 +380,17 @@ object SQLConf {
.checkValue(_ > 0, "The value of spark.sql.shuffle.partitions must be positive")
.createWithDefault(200)

val SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE =
buildConf("spark.sql.adaptive.shuffle.targetPostShuffleInputSize")
.internal()
.doc("(Deprecated since Spark 3.0)")
.version("1.6.0")
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("64MB")

val ADAPTIVE_EXECUTION_ENABLED = buildConf("spark.sql.adaptive.enabled")
.doc("When true, enable adaptive query execution.")
.doc("When true, enable adaptive query execution, which re-optimizes the query plan in the " +
"middle of query execution, based on accurate runtime statistics.")
.version("1.6.0")
.booleanConf
.createWithDefault(false)
Expand All @@ -390,99 +399,99 @@ object SQLConf {
.internal()
.doc("Adaptive query execution is skipped when the query does not have exchanges or " +
"sub-queries. By setting this config to true (together with " +
s"'${ADAPTIVE_EXECUTION_ENABLED.key}' enabled), Spark will force apply adaptive query " +
s"'${ADAPTIVE_EXECUTION_ENABLED.key}' set to true), Spark will force apply adaptive query " +
"execution for all supported queries.")
.version("3.0.0")
.booleanConf
.createWithDefault(false)

val REDUCE_POST_SHUFFLE_PARTITIONS_ENABLED =
buildConf("spark.sql.adaptive.shuffle.reducePostShufflePartitions")
.doc(s"When true and '${ADAPTIVE_EXECUTION_ENABLED.key}' is enabled, this enables reducing " +
"the number of post-shuffle partitions based on map output statistics.")
val ADVISORY_PARTITION_SIZE_IN_BYTES =
buildConf("spark.sql.adaptive.advisoryPartitionSizeInBytes")
.doc("The advisory size in bytes of the shuffle partition during adaptive optimization " +
s"(when ${ADAPTIVE_EXECUTION_ENABLED.key} is true). It takes effect when Spark " +
"coalesces small shuffle partitions or splits skewed shuffle partition.")
.version("3.0.0")
.booleanConf
.createWithDefault(true)
.fallbackConf(SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE)

val FETCH_SHUFFLE_BLOCKS_IN_BATCH_ENABLED =
buildConf("spark.sql.adaptive.shuffle.fetchShuffleBlocksInBatch")
.doc("Whether to fetch the continuous shuffle blocks in batch. Instead of fetching blocks " +
"one by one, fetching continuous shuffle blocks for the same map task in batch can " +
"reduce IO and improve performance. Note, multiple continuous blocks exist in single " +
s"fetch request only happen when '${ADAPTIVE_EXECUTION_ENABLED.key}' and " +
s"'${REDUCE_POST_SHUFFLE_PARTITIONS_ENABLED.key}' is enabled, this feature also depends " +
"on a relocatable serializer, the concatenation support codec in use and the new version " +
"shuffle fetch protocol.")
val COALESCE_PARTITIONS_ENABLED =
buildConf("spark.sql.adaptive.coalescePartitions.enabled")
.doc(s"When true and '${ADAPTIVE_EXECUTION_ENABLED.key}' is true, Spark will coalesce " +
"contiguous shuffle partitions according to the target size (specified by " +
s"'${ADVISORY_PARTITION_SIZE_IN_BYTES.key}'), to avoid too many small tasks.")
.version("3.0.0")
.booleanConf
.createWithDefault(true)

val SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS =
buildConf("spark.sql.adaptive.shuffle.minNumPostShufflePartitions")
.doc("The advisory minimum number of post-shuffle partitions used when " +
s"'${ADAPTIVE_EXECUTION_ENABLED.key}' and " +
s"'${REDUCE_POST_SHUFFLE_PARTITIONS_ENABLED.key}' is enabled.")
val COALESCE_PARTITIONS_MIN_PARTITION_NUM =
buildConf("spark.sql.adaptive.coalescePartitions.minPartitionNum")
.doc("The minimum number of shuffle partitions after coalescing. This configuration only " +
s"has an effect when '${ADAPTIVE_EXECUTION_ENABLED.key}' and " +
s"'${COALESCE_PARTITIONS_ENABLED.key}' are both true.")
.version("3.0.0")
.intConf
.checkValue(_ > 0, "The minimum shuffle partition number " +
"must be a positive integer.")
.checkValue(_ > 0, "The minimum number of partitions must be positive.")
.createWithDefault(1)

val SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE =
buildConf("spark.sql.adaptive.shuffle.targetPostShuffleInputSize")
.doc("The target post-shuffle input size in bytes of a task. This configuration only has " +
s"an effect when '${ADAPTIVE_EXECUTION_ENABLED.key}' and " +
s"'${REDUCE_POST_SHUFFLE_PARTITIONS_ENABLED.key}' is enabled.")
.version("1.6.0")
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("64MB")

val SHUFFLE_MAX_NUM_POSTSHUFFLE_PARTITIONS =
buildConf("spark.sql.adaptive.shuffle.maxNumPostShufflePartitions")
.doc("The advisory maximum number of post-shuffle partitions used in adaptive execution. " +
"This is used as the initial number of pre-shuffle partitions. By default it equals to " +
"spark.sql.shuffle.partitions. This configuration only has an effect when " +
s"'${ADAPTIVE_EXECUTION_ENABLED.key}' and " +
s"'${REDUCE_POST_SHUFFLE_PARTITIONS_ENABLED.key}' is enabled.")
val COALESCE_PARTITIONS_INITIAL_PARTITION_NUM =
buildConf("spark.sql.adaptive.coalescePartitions.initialPartitionNum")
.doc("The initial number of shuffle partitions before coalescing. By default it equals to " +
s"${SHUFFLE_PARTITIONS.key}. This configuration only has an effect when " +
s"'${ADAPTIVE_EXECUTION_ENABLED.key}' and '${COALESCE_PARTITIONS_ENABLED.key}' " +
"are both true.")
.version("3.0.0")
.intConf
.checkValue(_ > 0, "The maximum shuffle partition number " +
"must be a positive integer.")
.checkValue(_ > 0, "The initial number of partitions must be positive.")
.createOptional

val FETCH_SHUFFLE_BLOCKS_IN_BATCH =
buildConf("spark.sql.adaptive.fetchShuffleBlocksInBatch")
.internal()
.doc("Whether to fetch the contiguous shuffle blocks in batch. Instead of fetching blocks " +
"one by one, fetching contiguous shuffle blocks for the same map task in batch can " +
"reduce IO and improve performance. Note, multiple contiguous blocks exist in single " +
s"fetch request only happen when '${ADAPTIVE_EXECUTION_ENABLED.key}' and " +
s"'${COALESCE_PARTITIONS_ENABLED.key}' are both true. This feature also depends " +
"on a relocatable serializer, the concatenation support codec in use and the new version " +
"shuffle fetch protocol.")
.version("3.0.0")
.booleanConf
.createWithDefault(true)

val LOCAL_SHUFFLE_READER_ENABLED =
buildConf("spark.sql.adaptive.shuffle.localShuffleReader.enabled")
.doc(s"When true and '${ADAPTIVE_EXECUTION_ENABLED.key}' is enabled, this enables the " +
"optimization of converting the shuffle reader to local shuffle reader for the shuffle " +
"exchange of the broadcast hash join in probe side.")
.version("3.0.0")
.booleanConf
.createWithDefault(true)
buildConf("spark.sql.adaptive.localShuffleReader.enabled")
.doc(s"When true and '${ADAPTIVE_EXECUTION_ENABLED.key}' is true, Spark tries to use local " +
"shuffle reader to read the shuffle data when the shuffle partitioning is not needed, " +
"for example, after converting sort-merge join to broadcast-hash join.")
.version("3.0.0")
.booleanConf
.createWithDefault(true)

val ADAPTIVE_EXECUTION_SKEWED_JOIN_ENABLED =
buildConf("spark.sql.adaptive.skewedJoinOptimization.enabled")
.doc("When true and adaptive execution is enabled, a skewed join is automatically handled at " +
"runtime.")
.version("3.0.0")
.booleanConf
.createWithDefault(true)
val SKEW_JOIN_ENABLED =
buildConf("spark.sql.adaptive.skewJoin.enabled")
.doc(s"When true and '${ADAPTIVE_EXECUTION_ENABLED.key}' is true, Spark dynamically " +
"handles skew in sort-merge join by splitting (and replicating if needed) skewed " +
"partitions.")
.version("3.0.0")
.booleanConf
.createWithDefault(true)

val ADAPTIVE_EXECUTION_SKEWED_PARTITION_FACTOR =
buildConf("spark.sql.adaptive.skewedJoinOptimization.skewedPartitionFactor")
.doc("A partition is considered as a skewed partition if its size is larger than" +
" this factor multiple the median partition size and also larger than " +
s" ${SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key}")
val SKEW_JOIN_SKEWED_PARTITION_FACTOR =
buildConf("spark.sql.adaptive.skewJoin.skewedPartitionFactor")
.doc("A partition is considered as skewed if its size is larger than this factor " +
"multiplying the median partition size and also larger than " +
s"'${ADVISORY_PARTITION_SIZE_IN_BYTES.key}'")
.version("3.0.0")
.intConf
.checkValue(_ > 0, "The skew factor must be positive.")
.createWithDefault(10)

val NON_EMPTY_PARTITION_RATIO_FOR_BROADCAST_JOIN =
buildConf("spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin")
.internal()
.doc("The relation with a non-empty partition ratio lower than this config will not be " +
"considered as the build side of a broadcast-hash join in adaptive execution regardless " +
"of its size.This configuration only has an effect when " +
s"'${ADAPTIVE_EXECUTION_ENABLED.key}' is enabled.")
s"'${ADAPTIVE_EXECUTION_ENABLED.key}' is true.")
.version("3.0.0")
.doubleConf
.checkValue(_ >= 0, "The non-empty partition ratio must be positive number.")
Expand Down Expand Up @@ -2504,7 +2513,9 @@ object SQLConf {
DeprecatedConfig(ARROW_EXECUTION_ENABLED.key, "3.0",
s"Use '${ARROW_PYSPARK_EXECUTION_ENABLED.key}' instead of it."),
DeprecatedConfig(ARROW_FALLBACK_ENABLED.key, "3.0",
s"Use '${ARROW_PYSPARK_FALLBACK_ENABLED.key}' instead of it.")
s"Use '${ARROW_PYSPARK_FALLBACK_ENABLED.key}' instead of it."),
DeprecatedConfig(SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key, "3.0",
s"Use '${ADVISORY_PARTITION_SIZE_IN_BYTES.key}' instead of it.")
)

Map(configs.map { cfg => cfg.key -> cfg } : _*)
Expand Down Expand Up @@ -2665,19 +2676,17 @@ class SQLConf extends Serializable with Logging {

def adaptiveExecutionEnabled: Boolean = getConf(ADAPTIVE_EXECUTION_ENABLED)

def targetPostShuffleInputSize: Long = getConf(SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE)

def fetchShuffleBlocksInBatchEnabled: Boolean = getConf(FETCH_SHUFFLE_BLOCKS_IN_BATCH_ENABLED)
def fetchShuffleBlocksInBatch: Boolean = getConf(FETCH_SHUFFLE_BLOCKS_IN_BATCH)

def nonEmptyPartitionRatioForBroadcastJoin: Double =
getConf(NON_EMPTY_PARTITION_RATIO_FOR_BROADCAST_JOIN)

def reducePostShufflePartitionsEnabled: Boolean = getConf(REDUCE_POST_SHUFFLE_PARTITIONS_ENABLED)
def coalesceShufflePartitionsEnabled: Boolean = getConf(COALESCE_PARTITIONS_ENABLED)

def minNumPostShufflePartitions: Int = getConf(SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS)
def minShufflePartitionNum: Int = getConf(COALESCE_PARTITIONS_MIN_PARTITION_NUM)

def maxNumPostShufflePartitions: Int =
getConf(SHUFFLE_MAX_NUM_POSTSHUFFLE_PARTITIONS).getOrElse(numShufflePartitions)
def initialShufflePartitionNum: Int =
getConf(COALESCE_PARTITIONS_INITIAL_PARTITION_NUM).getOrElse(numShufflePartitions)

def minBatchesToRetain: Int = getConf(MIN_BATCHES_TO_RETAIN)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ class ShuffledRowRDD(
Array.tabulate(dependency.partitioner.numPartitions)(i => CoalescedPartitionSpec(i, i + 1)))
}

if (SQLConf.get.fetchShuffleBlocksInBatchEnabled) {
if (SQLConf.get.fetchShuffleBlocksInBatch) {
dependency.rdd.context.setLocalProperty(
SortShuffleManager.FETCH_SHUFFLE_BLOCKS_IN_BATCH_ENABLED_KEY, "true")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ case class AdaptiveSparkPlanExec(
// before 'ReduceNumShufflePartitions', as the skewed partition handled
// in 'OptimizeSkewedJoin' rule, should be omitted in 'ReduceNumShufflePartitions'.
OptimizeSkewedJoin(conf),
ReduceNumShufflePartitions(conf),
CoalesceShufflePartitions(conf),
// The rule of 'OptimizeLocalShuffleReader' need to make use of the 'partitionStartIndices'
// in 'ReduceNumShufflePartitions' rule. So it must be after 'ReduceNumShufflePartitions' rule.
OptimizeLocalShuffleReader(conf),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,14 @@ import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.internal.SQLConf

/**
* A rule to reduce the post shuffle partitions based on the map output statistics, which can
* A rule to coalesce the shuffle partitions based on the map output statistics, which can
* avoid many small reduce tasks that hurt performance.
*/
case class ReduceNumShufflePartitions(conf: SQLConf) extends Rule[SparkPlan] {
import ReduceNumShufflePartitions._
case class CoalesceShufflePartitions(conf: SQLConf) extends Rule[SparkPlan] {
import CoalesceShufflePartitions._

override def apply(plan: SparkPlan): SparkPlan = {
if (!conf.reducePostShufflePartitionsEnabled) {
if (!conf.coalesceShufflePartitionsEnabled) {
return plan
}
if (!plan.collectLeaves().forall(_.isInstanceOf[QueryStageExec])
Expand Down Expand Up @@ -70,8 +70,8 @@ case class ReduceNumShufflePartitions(conf: SQLConf) extends Rule[SparkPlan] {
validMetrics.toArray,
firstPartitionIndex = 0,
lastPartitionIndex = distinctNumPreShufflePartitions.head,
advisoryTargetSize = conf.targetPostShuffleInputSize,
minNumPartitions = conf.minNumPostShufflePartitions)
advisoryTargetSize = conf.getConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES),
minNumPartitions = conf.minShufflePartitionNum)
// This transformation adds new nodes, so we must use `transformUp` here.
val stageIds = shuffleStages.map(_.id).toSet
plan.transformUp {
Expand All @@ -88,6 +88,6 @@ case class ReduceNumShufflePartitions(conf: SQLConf) extends Rule[SparkPlan] {
}
}

object ReduceNumShufflePartitions {
object CoalesceShufflePartitions {
val COALESCED_SHUFFLE_READER_DESCRIPTION = "coalesced"
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,11 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] {
/**
* A partition is considered as a skewed partition if its size is larger than the median
* partition size * ADAPTIVE_EXECUTION_SKEWED_PARTITION_FACTOR and also larger than
* SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.
* ADVISORY_PARTITION_SIZE_IN_BYTES.
*/
private def isSkewed(size: Long, medianSize: Long): Boolean = {
size > medianSize * conf.getConf(SQLConf.ADAPTIVE_EXECUTION_SKEWED_PARTITION_FACTOR) &&
size > conf.getConf(SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE)
size > medianSize * conf.getConf(SQLConf.SKEW_JOIN_SKEWED_PARTITION_FACTOR) &&
size > conf.getConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES)
}

private def medianSize(stats: MapOutputStatistics): Long = {
Expand All @@ -87,7 +87,7 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] {
* target post-shuffle partition size if avg size is smaller than it.
*/
private def targetSize(stats: MapOutputStatistics, medianSize: Long): Long = {
val targetPostShuffleSize = conf.getConf(SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE)
val targetPostShuffleSize = conf.getConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES)
val nonSkewSizes = stats.bytesByPartitionId.filterNot(isSkewed(_, medianSize))
// It's impossible that all the partitions are skewed, as we use median size to define skew.
assert(nonSkewSizes.nonEmpty)
Expand Down Expand Up @@ -271,7 +271,7 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] {
rightStats: MapOutputStatistics,
nonSkewPartitionIndices: Seq[Int]): Seq[ShufflePartitionSpec] = {
assert(nonSkewPartitionIndices.nonEmpty)
val shouldCoalesce = conf.getConf(SQLConf.REDUCE_POST_SHUFFLE_PARTITIONS_ENABLED)
val shouldCoalesce = conf.getConf(SQLConf.COALESCE_PARTITIONS_ENABLED)
if (!shouldCoalesce || nonSkewPartitionIndices.length == 1) {
nonSkewPartitionIndices.map(i => CoalescedPartitionSpec(i, i + 1))
} else {
Expand All @@ -280,7 +280,7 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] {
firstPartitionIndex = nonSkewPartitionIndices.head,
// `lastPartitionIndex` is exclusive.
lastPartitionIndex = nonSkewPartitionIndices.last + 1,
advisoryTargetSize = conf.targetPostShuffleInputSize)
advisoryTargetSize = conf.getConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES))
}
}

Expand All @@ -300,7 +300,7 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] {
}

override def apply(plan: SparkPlan): SparkPlan = {
if (!conf.getConf(SQLConf.ADAPTIVE_EXECUTION_SKEWED_JOIN_ENABLED)) {
if (!conf.getConf(SQLConf.SKEW_JOIN_ENABLED)) {
return plan
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ import org.apache.spark.sql.internal.SQLConf
*/
case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] {
private def defaultNumPreShufflePartitions: Int =
if (conf.adaptiveExecutionEnabled && conf.reducePostShufflePartitionsEnabled) {
conf.maxNumPostShufflePartitions
if (conf.adaptiveExecutionEnabled && conf.coalesceShufflePartitionsEnabled) {
conf.initialShufflePartitionNum
} else {
conf.numShufflePartitions
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@ import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.internal.config.UI.UI_ENABLED
import org.apache.spark.sql._
import org.apache.spark.sql.execution.adaptive._
import org.apache.spark.sql.execution.adaptive.CoalesceShufflePartitions.COALESCED_SHUFFLE_READER_DESCRIPTION
import org.apache.spark.sql.execution.adaptive.CustomShuffleReaderExec
import org.apache.spark.sql.execution.adaptive.ReduceNumShufflePartitions.COALESCED_SHUFFLE_READER_DESCRIPTION
import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf

class ReduceNumShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterAll {
class CoalesceShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterAll {

private var originalActiveSparkSession: Option[SparkSession] = _
private var originalInstantiatedSparkSession: Option[SparkSession] = _
Expand Down Expand Up @@ -65,17 +65,17 @@ class ReduceNumShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterA
.setAppName("test")
.set(UI_ENABLED, false)
.set(SQLConf.SHUFFLE_PARTITIONS.key, "5")
.set(SQLConf.SHUFFLE_MAX_NUM_POSTSHUFFLE_PARTITIONS.key, "5")
.set(SQLConf.COALESCE_PARTITIONS_INITIAL_PARTITION_NUM.key, "5")
.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "true")
.set(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key, "-1")
.set(
SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key,
SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key,
targetPostShuffleInputSize.toString)
minNumPostShufflePartitions match {
case Some(numPartitions) =>
sparkConf.set(SQLConf.SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS.key, numPartitions.toString)
sparkConf.set(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key, numPartitions.toString)
case None =>
sparkConf.set(SQLConf.SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS.key, "1")
sparkConf.set(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key, "1")
}

val spark = SparkSession.builder()
Expand Down
Loading

0 comments on commit ba86524

Please sign in to comment.