Skip to content

Commit c9cc9b7

Browse files
committed
[KYUUBI #3209] Support configure TPC-H connector in runtime
### _Why are the changes needed?_ This PR proposes to introduce `TPCHConf` to support configure TPC-H connector in runtime, just like TPC-DS connector ### _How was this patch tested?_ - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request Closes #3209 from pan3793/tpch-conf. Closes #3209 af45166 [Cheng Pan] nit bbc1cba [Cheng Pan] Support configure TPC-H connector in runtime Authored-by: Cheng Pan <chengpan@apache.org> Signed-off-by: Cheng Pan <chengpan@apache.org>
1 parent 875fedd commit c9cc9b7

File tree

6 files changed

+97
-31
lines changed

6 files changed

+97
-31
lines changed

extensions/spark/kyuubi-spark-connector-tpch/src/main/scala/org/apache/kyuubi/spark/connector/tpch/TPCHBatchScan.scala

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,20 +42,19 @@ case class TPCHTableChuck(table: String, scale: Double, parallelism: Int, index:
4242
class TPCHBatchScan(
4343
@transient table: TpchTable[_],
4444
scale: Double,
45-
schema: StructType) extends ScanBuilder
45+
schema: StructType,
46+
readConf: TPCHReadConf) extends ScanBuilder
4647
with SupportsReportStatistics with Batch with Serializable {
4748

4849
private val _sizeInBytes: Long = TPCHStatisticsUtils.sizeInBytes(table, scale)
4950

5051
private val _numRows: Long = TPCHStatisticsUtils.numRows(table, scale)
5152

52-
private val rowCountPerTask: Int = 1000000
53-
5453
private val parallelism: Int =
5554
if (table.equals(TpchTable.NATION) || table.equals(TpchTable.REGION)) 1
5655
else math.max(
5756
SparkSession.active.sparkContext.defaultParallelism,
58-
(_numRows / rowCountPerTask.toDouble).ceil.toInt)
57+
(_numRows / readConf.maxPartitionBytes).ceil.toInt)
5958

6059
override def build: Scan = this
6160

extensions/spark/kyuubi-spark-connector-tpch/src/main/scala/org/apache/kyuubi/spark/connector/tpch/TPCHCatalog.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import java.util
2222
import scala.collection.JavaConverters._
2323

2424
import org.apache.spark.internal.Logging
25+
import org.apache.spark.sql.SparkSession
2526
import org.apache.spark.sql.catalyst.analysis.{NoSuchNamespaceException, NoSuchTableException}
2627
import org.apache.spark.sql.connector.catalog.{Identifier, NamespaceChange, SupportsNamespaces, Table => SparkTable, TableCatalog, TableChange}
2728
import org.apache.spark.sql.connector.expressions.Transform
@@ -34,17 +35,16 @@ class TPCHCatalog extends TableCatalog with SupportsNamespaces with Logging {
3435

3536
val tables: Array[String] = TPCHSchemaUtils.BASE_TABLES.map(_.getTableName)
3637

37-
var options: CaseInsensitiveStringMap = _
38+
var tpchConf: TPCHConf = _
3839

3940
var _name: String = _
4041

4142
override def name: String = _name
4243

4344
override def initialize(name: String, options: CaseInsensitiveStringMap): Unit = {
4445
this._name = name
45-
this.options = options
46-
val uncheckedExcludeDatabases = options.getOrDefault("excludeDatabases", "")
47-
.split(",").map(_.toLowerCase.trim).filter(_.nonEmpty)
46+
this.tpchConf = TPCHConf(SparkSession.active, options)
47+
val uncheckedExcludeDatabases = tpchConf.excludeDatabases
4848
val invalidExcludeDatabases = uncheckedExcludeDatabases diff TPCHSchemaUtils.DATABASES
4949
if (invalidExcludeDatabases.nonEmpty) {
5050
logWarning(
@@ -64,7 +64,7 @@ class TPCHCatalog extends TableCatalog with SupportsNamespaces with Logging {
6464
override def loadTable(ident: Identifier): SparkTable = (ident.namespace, ident.name) match {
6565
case (Array(db), table) if (databases contains db) && tables.contains(table.toLowerCase) =>
6666
val scale = TPCHSchemaUtils.scale(db)
67-
new TPCHTable(table.toLowerCase, scale, options)
67+
new TPCHTable(table.toLowerCase, scale, tpchConf)
6868
case (_, _) => throw new NoSuchTableException(ident)
6969
}
7070

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.kyuubi.spark.connector.tpch
19+
20+
import org.apache.spark.sql.SparkSession
21+
import org.apache.spark.sql.connector.catalog.Table
22+
import org.apache.spark.sql.util.CaseInsensitiveStringMap
23+
24+
import org.apache.kyuubi.spark.connector.common.SparkConfParser
25+
import org.apache.kyuubi.spark.connector.tpch.TPCHConf._
26+
27+
case class TPCHConf(spark: SparkSession, options: CaseInsensitiveStringMap) {
28+
29+
private val confParser: SparkConfParser = SparkConfParser(options, spark.conf, null)
30+
31+
lazy val excludeDatabases: Array[String] = confParser.stringConf()
32+
.option(EXCLUDE_DATABASES)
33+
.parseOptional()
34+
.map(_.split(",").map(_.toLowerCase.trim).filter(_.nonEmpty))
35+
.getOrElse(Array.empty)
36+
37+
// When true, use CHAR VARCHAR; otherwise use STRING
38+
lazy val useAnsiStringType: Boolean = confParser.booleanConf()
39+
.option(USE_ANSI_STRING_TYPE)
40+
.sessionConf(s"$TPCH_CONNECTOR_CONF_PREFIX.$USE_ANSI_STRING_TYPE")
41+
.defaultValue(USE_ANSI_STRING_TYPE_DEFAULT)
42+
.parse()
43+
}
44+
45+
case class TPCHReadConf(
46+
spark: SparkSession,
47+
table: Table,
48+
options: CaseInsensitiveStringMap) {
49+
50+
private val confParser: SparkConfParser =
51+
SparkConfParser(options, spark.conf, table.properties)
52+
53+
lazy val maxPartitionBytes: Long = confParser.longConf()
54+
.option(MAX_PARTITION_BYTES_CONF)
55+
.sessionConf(s"$TPCH_CONNECTOR_READ_CONF_PREFIX.$MAX_PARTITION_BYTES_CONF")
56+
.tableProperty(s"$TPCH_CONNECTOR_READ_CONF_PREFIX.$MAX_PARTITION_BYTES_CONF")
57+
.defaultValue(MAX_PARTITION_BYTES_DEFAULT)
58+
.parse()
59+
}
60+
61+
object TPCHConf {
62+
val EXCLUDE_DATABASES = "excludeDatabases"
63+
64+
val TPCH_CONNECTOR_CONF_PREFIX = "spark.connector.tpch"
65+
val USE_ANSI_STRING_TYPE = "useAnsiStringType"
66+
val USE_ANSI_STRING_TYPE_DEFAULT = false
67+
68+
val TPCH_CONNECTOR_READ_CONF_PREFIX = s"$TPCH_CONNECTOR_CONF_PREFIX.read"
69+
val MAX_PARTITION_BYTES_CONF = "maxPartitionBytes"
70+
val MAX_PARTITION_BYTES_DEFAULT: Long = 128 * 1024 * 1024L
71+
}

extensions/spark/kyuubi-spark-connector-tpch/src/main/scala/org/apache/kyuubi/spark/connector/tpch/TPCHStatisticsUtils.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import io.trino.tpch.TpchTable._
2222

2323
import org.apache.kyuubi.spark.connector.tpch.TPCHSchemaUtils.{normalize, SCALES}
2424

25-
// https://www.tpc.org/tpc_documents_current_versions/pdf/tpc-h_v3.0.0.pdf
25+
// https://www.tpc.org/tpc_documents_current_versions/pdf/tpc-h_v3.0.1.pdf
2626
// Page 88 Table 3: Estimated Database Size
2727
object TPCHStatisticsUtils {
2828

extensions/spark/kyuubi-spark-connector-tpch/src/main/scala/org/apache/kyuubi/spark/connector/tpch/TPCHTable.scala

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,17 +25,15 @@ import scala.collection.convert.ImplicitConversions.`list asScalaBuffer`
2525

2626
import io.trino.tpch.{TpchColumnType, TpchEntity, TpchTable}
2727
import io.trino.tpch.TpchColumnType.Base._
28+
import org.apache.spark.sql.SparkSession
2829
import org.apache.spark.sql.connector.catalog.{SupportsRead, Table => SparkTable, TableCapability}
2930
import org.apache.spark.sql.connector.read.ScanBuilder
3031
import org.apache.spark.sql.types._
3132
import org.apache.spark.sql.util.CaseInsensitiveStringMap
3233

33-
class TPCHTable(tbl: String, scale: Double, options: CaseInsensitiveStringMap)
34+
class TPCHTable(tbl: String, scale: Double, tpchConf: TPCHConf)
3435
extends SparkTable with SupportsRead {
3536

36-
// When true, use CHAR VARCHAR; otherwise use STRING
37-
val useAnsiStringType: Boolean = options.getBoolean("useAnsiStringType", false)
38-
3937
val tpchTable: TpchTable[_] = TpchTable.getTable(tbl)
4038

4139
override def name: String = s"${TPCHSchemaUtils.dbName(scale)}.$tbl"
@@ -53,7 +51,7 @@ class TPCHTable(tbl: String, scale: Double, options: CaseInsensitiveStringMap)
5351
Set(TableCapability.BATCH_READ).asJava
5452

5553
override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = {
56-
new TPCHBatchScan(tpchTable, scale, schema)
54+
new TPCHBatchScan(tpchTable, scale, schema, TPCHReadConf(SparkSession.active, this, options))
5755
}
5856

5957
def toSparkDataType(tpchType: TpchColumnType): DataType = {
@@ -63,7 +61,7 @@ class TPCHTable(tbl: String, scale: Double, options: CaseInsensitiveStringMap)
6361
case (DOUBLE, None, None) => DoubleType
6462
case (DATE, None, None) => DateType
6563
case (VARCHAR, Some(precision), None) =>
66-
if (useAnsiStringType) VarcharType(precision.toInt) else StringType
64+
if (tpchConf.useAnsiStringType) VarcharType(precision.toInt) else StringType
6765
case (t, po, so) =>
6866
throw new IllegalArgumentException(s"Unsupported TPC-H type: ($t, $po, $so)")
6967
}

extensions/spark/kyuubi-spark-connector-tpch/src/test/scala/org/apache/kyuubi/spark/connector/tpch/TPCHCatalogSuite.scala

Lines changed: 13 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -27,22 +27,20 @@ import org.apache.kyuubi.spark.connector.common.SparkUtils
2727

2828
class TPCHCatalogSuite extends KyuubiFunSuite {
2929

30-
protected lazy val spark: SparkSession = {
31-
SparkSession.builder()
32-
.master("local[*]")
33-
.config("spark.ui.enabled", "false")
34-
.config("spark.sql.catalogImplementation", "in-memory")
35-
.config("spark.sql.catalog.tpch", classOf[TPCHCatalog].getName)
36-
.config("spark.sql.cbo.enabled", "true")
37-
.config("spark.sql.cbo.planStats.enabled", "true")
38-
.getOrCreate()
39-
}
40-
4130
test("get catalog name") {
42-
val catalog = new TPCHCatalog
43-
val catalogName = "test"
44-
catalog.initialize(catalogName, CaseInsensitiveStringMap.empty())
45-
assert(catalog._name == catalogName)
31+
val sparkConf = new SparkConf()
32+
.setMaster("local[*]")
33+
.set("spark.ui.enabled", "false")
34+
.set("spark.sql.catalogImplementation", "in-memory")
35+
.set("spark.sql.catalog.tpch", classOf[TPCHCatalog].getName)
36+
.set("spark.sql.cbo.enabled", "true")
37+
.set("spark.sql.cbo.planStats.enabled", "true")
38+
withSparkSession(SparkSession.builder.config(sparkConf).getOrCreate()) { spark =>
39+
val catalog = new TPCHCatalog
40+
val catalogName = "test"
41+
catalog.initialize(catalogName, CaseInsensitiveStringMap.empty())
42+
assert(catalog._name == catalogName)
43+
}
4644
}
4745

4846
test("supports namespaces") {

0 commit comments

Comments
 (0)