Skip to content

Commit 7b24ee9

Browse files
wForgetpan3793
authored andcommitted
[KYUUBI #2543] Add maxPartitionBytes configuration for TPC-DS connecter
### _Why are the changes needed?_ close #2543 ### _How was this patch tested?_ - [X] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [ ] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request Closes #2882 from wForget/KYUUBI-2543-2. Closes #2543 14fa002 [Wang Zhen] fix 386c92d [wforget] comments 8c1fcea [wforget] [KYUUBI-2543] Add `maxPartitionBytes` configuration Lead-authored-by: wforget <643348094@qq.com> Co-authored-by: Wang Zhen <wangzhen07@qiyi.com> Signed-off-by: Cheng Pan <chengpan@apache.org>
1 parent 1ea245d commit 7b24ee9

File tree

6 files changed

+269
-6
lines changed

6 files changed

+269
-6
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.kyuubi.spark.connector.common
19+
20+
import java.util
21+
22+
import org.apache.spark.sql.RuntimeConfig
23+
24+
/**
25+
* Parse the value of configuration in runtime options, session configurations and table properties,
26+
* the priority of parsing configuration: options > sessionConfigs > properties.
27+
* @param options runtime options
28+
* @param sessionConfigs spark session configurations
29+
* @param properties table properties
30+
*/
31+
case class SparkConfParser(
32+
options: util.Map[String, String],
33+
sessionConfigs: RuntimeConfig,
34+
properties: util.Map[String, String]) {
35+
36+
def booleanConf(): BooleanConfParser = new BooleanConfParser()
37+
def intConf(): IntConfParser = new IntConfParser()
38+
def longConf(): LongConfParser = new LongConfParser()
39+
def doubleConf(): DoubleConfParser = new DoubleConfParser()
40+
def stringConf(): StringConfParser = new StringConfParser()
41+
42+
class BooleanConfParser extends ConfParser[Boolean] {
43+
override protected def conversion(value: String): Boolean = value.toBoolean
44+
}
45+
46+
class IntConfParser extends ConfParser[Int] {
47+
override protected def conversion(value: String): Int = value.toInt
48+
}
49+
50+
class LongConfParser extends ConfParser[Long] {
51+
override protected def conversion(value: String): Long = value.toLong
52+
}
53+
54+
class DoubleConfParser extends ConfParser[Double] {
55+
override protected def conversion(value: String): Double = value.toDouble
56+
}
57+
58+
class StringConfParser extends ConfParser[String] {
59+
override protected def conversion(value: String): String = value
60+
}
61+
62+
abstract class ConfParser[T]() {
63+
private var optionName: Option[String] = None
64+
private var sessionConfName: Option[String] = None
65+
private var tablePropertyName: Option[String] = None
66+
private var defaultValue: Option[T] = None
67+
68+
def option(name: String): ConfParser[T] = {
69+
this.optionName = Some(name)
70+
this
71+
}
72+
73+
def sessionConf(name: String): ConfParser[T] = {
74+
this.sessionConfName = Some(name)
75+
this
76+
}
77+
78+
def tableProperty(name: String): ConfParser[T] = {
79+
this.tablePropertyName = Some(name)
80+
this
81+
}
82+
83+
def defaultValue(value: T): ConfParser[T] = {
84+
this.defaultValue = Some(value)
85+
this
86+
}
87+
88+
private def parse(conversion: String => T): Option[T] = {
89+
var valueOpt: Option[String] = None
90+
if (options != null) {
91+
valueOpt = optionName.flatMap(name => Option(options.get(name)))
92+
}
93+
if (valueOpt.isEmpty && sessionConfigs != null) {
94+
valueOpt = sessionConfName.flatMap(name => sessionConfigs.getOption(name))
95+
}
96+
if (valueOpt.isEmpty && properties != null) {
97+
valueOpt = tablePropertyName.flatMap(name => Option(properties.get(name)))
98+
}
99+
valueOpt = valueOpt.filter(_ != null)
100+
if (valueOpt.isDefined) {
101+
valueOpt.map(conversion(_))
102+
} else {
103+
defaultValue
104+
}
105+
}
106+
107+
protected def conversion(value: String): T
108+
109+
def parse(): T = {
110+
assert(defaultValue.isDefined, "Default value cannot be empty.")
111+
parseOptional().get
112+
}
113+
114+
def parseOptional(): Option[T] = parse(conversion)
115+
}
116+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.kyuubi.spark.connector.common
19+
20+
// scalastyle:off anyfunsuite
21+
import scala.collection.JavaConverters._
22+
23+
import org.apache.spark.sql.util.CaseInsensitiveStringMap
24+
import org.scalatest.BeforeAndAfterAll
25+
import org.scalatest.funsuite.AnyFunSuite
26+
27+
class SparkConfParserSuite extends AnyFunSuite with BeforeAndAfterAll {
28+
// scalastyle:on anyfunsuite
29+
30+
test("parse options config") {
31+
assert(confParser.stringConf().option("optKey1").defaultValue("test").parse() === "optValue1")
32+
assert(confParser.booleanConf().option("booleanKey").defaultValue(true).parse() === false)
33+
assert(confParser.intConf().option("intKey").defaultValue(0).parse() === 10)
34+
assert(confParser.longConf().option("longKey").defaultValue(0).parse() === Long.MaxValue)
35+
assert(confParser.doubleConf().option("doubleKey").defaultValue(0.0).parse() === 1.1)
36+
}
37+
38+
test("parse properties config") {
39+
assert(confParser.intConf().option("key1")
40+
.tableProperty("key1")
41+
.defaultValue(0).parse() === 111)
42+
assert(confParser.stringConf()
43+
.option("propertyKey1")
44+
.tableProperty("propertyKey1")
45+
.defaultValue("test").parse() === "propertyValue1")
46+
}
47+
48+
private var confParser: SparkConfParser = null
49+
50+
override protected def beforeAll(): Unit = {
51+
super.beforeAll()
52+
val options = new CaseInsensitiveStringMap(Map(
53+
"key1" -> "111",
54+
"optKey1" -> "optValue1",
55+
"booleanKey" -> "false",
56+
"intKey" -> "10",
57+
"longKey" -> String.valueOf(Long.MaxValue),
58+
"doubleKey" -> "1.1").asJava)
59+
val properties = Map(
60+
"key1" -> "333",
61+
"propertyKey1" -> "propertyValue1")
62+
confParser = SparkConfParser(options, null, properties.asJava)
63+
}
64+
65+
}

extensions/spark/kyuubi-spark-connector-tpcds/src/main/scala/org/apache/kyuubi/spark/connector/tpcds/TPCDSBatchScan.scala

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,19 +36,20 @@ case class TPCDSTableChuck(table: String, scale: Double, parallelism: Int, index
3636
class TPCDSBatchScan(
3737
@transient table: Table,
3838
scale: Double,
39-
schema: StructType) extends ScanBuilder
39+
schema: StructType,
40+
scanConf: TPCDSBatchScanConf) extends ScanBuilder
4041
with SupportsReportStatistics with Batch with Serializable {
4142

4243
private val _sizeInBytes: Long = TPCDSStatisticsUtils.sizeInBytes(table, scale)
4344
private val _numRows: Long = TPCDSStatisticsUtils.numRows(table, scale)
4445

45-
private val rowCountPerTask: Int = 1000000
46-
46+
// Tables with fewer than 1000000 are not parallelized,
47+
// the limit made in `io.trino.tpcds.Parallel#splitWork`.
4748
private val parallelism: Int =
4849
if (table.isSmall) 1
4950
else math.max(
5051
SparkSession.active.sparkContext.defaultParallelism,
51-
(_numRows / rowCountPerTask.toDouble).ceil.toInt)
52+
(_sizeInBytes / scanConf.maxPartitionBytes).ceil.toInt)
5253

5354
override def build: Scan = this
5455

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.kyuubi.spark.connector.tpcds
19+
20+
import org.apache.spark.sql.SparkSession
21+
import org.apache.spark.sql.connector.catalog.Table
22+
import org.apache.spark.sql.util.CaseInsensitiveStringMap
23+
24+
import org.apache.kyuubi.spark.connector.common.SparkConfParser
25+
import org.apache.kyuubi.spark.connector.tpcds.TPCDSBatchScanConf._
26+
27+
case class TPCDSBatchScanConf(
28+
spark: SparkSession,
29+
table: Table,
30+
options: CaseInsensitiveStringMap) {
31+
32+
private val confParser: SparkConfParser =
33+
SparkConfParser(options, spark.conf, table.properties())
34+
35+
lazy val maxPartitionBytes: Long = confParser.longConf()
36+
.option(MAX_PARTITION_BYTES_CONF)
37+
.sessionConf(s"$TPCDS_CONNECTOR_READ_CONF_PREFIX.$MAX_PARTITION_BYTES_CONF")
38+
.tableProperty(MAX_PARTITION_BYTES_CONF)
39+
.defaultValue(MAX_PARTITION_BYTES_DEFAULT)
40+
.parse()
41+
42+
}
43+
44+
object TPCDSBatchScanConf {
45+
val TPCDS_CONNECTOR_READ_CONF_PREFIX = "spark.connector.tpcds.read"
46+
47+
val MAX_PARTITION_BYTES_CONF = "maxPartitionBytes"
48+
val MAX_PARTITION_BYTES_DEFAULT = 128 * 1024 * 1024L
49+
}

extensions/spark/kyuubi-spark-connector-tpcds/src/main/scala/org/apache/kyuubi/spark/connector/tpcds/TPCDSTable.scala

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import scala.collection.JavaConverters._
2525
import io.trino.tpcds.Table
2626
import io.trino.tpcds.column._
2727
import io.trino.tpcds.column.ColumnType.Base._
28+
import org.apache.spark.sql.SparkSession
2829
import org.apache.spark.sql.connector.catalog.{SupportsRead, Table => SparkTable, TableCapability}
2930
import org.apache.spark.sql.connector.expressions.{Expressions, Transform}
3031
import org.apache.spark.sql.connector.read.ScanBuilder
@@ -47,6 +48,8 @@ class TPCDSTable(tbl: String, scale: Double, options: CaseInsensitiveStringMap)
4748

4849
val tpcdsTable: Table = Table.getTable(tbl)
4950

51+
lazy val spark: SparkSession = SparkSession.active
52+
5053
override def name: String = s"${TPCDSSchemaUtils.dbName(scale)}.$tbl"
5154

5255
override def toString: String = s"TPCDSTable($name)"
@@ -77,8 +80,10 @@ class TPCDSTable(tbl: String, scale: Double, options: CaseInsensitiveStringMap)
7780
override def capabilities(): util.Set[TableCapability] =
7881
Set(TableCapability.BATCH_READ).asJava
7982

80-
override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder =
81-
new TPCDSBatchScan(tpcdsTable, scale, schema)
83+
override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = {
84+
val scanConf = TPCDSBatchScanConf(spark, this, options)
85+
new TPCDSBatchScan(tpcdsTable, scale, schema, scanConf)
86+
}
8287

8388
def toSparkDataType(tpcdsType: ColumnType): DataType = {
8489
(tpcdsType.getBase, tpcdsType.getPrecision.asScala, tpcdsType.getScale.asScala) match {

extensions/spark/kyuubi-spark-connector-tpcds/src/test/scala/org/apache/kyuubi/spark/connector/tpcds/TPCDSTableSuite.scala

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,11 @@ import io.trino.tpcds.Table
2121
import io.trino.tpcds.generator.CallCenterGeneratorColumn
2222
import org.apache.spark.SparkConf
2323
import org.apache.spark.sql.SparkSession
24+
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
2425

2526
import org.apache.kyuubi.KyuubiFunSuite
2627
import org.apache.kyuubi.spark.connector.common.LocalSparkSession.withSparkSession
28+
import org.apache.kyuubi.spark.connector.tpcds.TPCDSBatchScanConf.{MAX_PARTITION_BYTES_CONF, TPCDS_CONNECTOR_READ_CONF_PREFIX}
2729

2830
class TPCDSTableSuite extends KyuubiFunSuite {
2931

@@ -120,4 +122,29 @@ class TPCDSTableSuite extends KyuubiFunSuite {
120122
CallCenterGeneratorColumn.CC_CALL_CENTER_SK.getGlobalColumnNumber)
121123
}
122124
}
125+
126+
test("test maxPartitionBytes") {
127+
val maxPartitionBytes: Long = 1 * 1024 * 1024L
128+
val sparkConf = new SparkConf().setMaster("local[*]")
129+
.set("spark.ui.enabled", "false")
130+
.set("spark.sql.catalogImplementation", "in-memory")
131+
.set("spark.sql.catalog.tpcds", classOf[TPCDSCatalog].getName)
132+
.set(
133+
s"$TPCDS_CONNECTOR_READ_CONF_PREFIX.$MAX_PARTITION_BYTES_CONF",
134+
String.valueOf(maxPartitionBytes))
135+
withSparkSession(SparkSession.builder.config(sparkConf).getOrCreate()) { spark =>
136+
val tableName = "catalog_returns"
137+
val table = Table.getTable(tableName)
138+
val scale = 100
139+
val df = spark.sql(s"select * from tpcds.sf$scale.$tableName")
140+
val scan = df.queryExecution.executedPlan.collectFirst {
141+
case scanExec: BatchScanExec if scanExec.scan.isInstanceOf[TPCDSBatchScan] =>
142+
scanExec.scan.asInstanceOf[TPCDSBatchScan]
143+
}
144+
assert(scan.isDefined)
145+
val expected =
146+
(TPCDSStatisticsUtils.sizeInBytes(table, scale) / maxPartitionBytes).ceil.toInt
147+
assert(scan.get.planInputPartitions.size == expected)
148+
}
149+
}
123150
}

0 commit comments

Comments
 (0)