Skip to content

Commit 9e2aaff

Browse files
wangjunbopan3793
authored andcommitted
[KYUUBI #2984] Refactor TPCDS configurations using SparkConfParser
### _Why are the changes needed?_ close #2984 ### _How was this patch tested?_ - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request Closes #2988 from Kwafoor/master. Closes #2984 d4f761d [wangjunbo] [KYUUBI-2984]Refactor TPCDS configurations using SparkConfParser 5525084 [wangjunbo] [KYUUBI-2984]Refactor TPCDS configurations using SparkConfParser 293afc3 [wangjunbo] [KYUUBI-2984]Refactor TPCDS configurations using SparkConfParser 4ce8ba3 [wangjunbo] [KYUUBI-2984]Refactor TPCDS configurations using SparkConfParser Authored-by: wangjunbo <wangjunbo@qiyi.com> Signed-off-by: Cheng Pan <chengpan@apache.org>
1 parent 4b8dc79 commit 9e2aaff

File tree

2 files changed

+59
-15
lines changed

2 files changed

+59
-15
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.kyuubi.spark.connector.tpcds
19+
20+
import org.apache.spark.sql.SparkSession
21+
import org.apache.spark.sql.util.CaseInsensitiveStringMap
22+
23+
import org.apache.kyuubi.spark.connector.common.SparkConfParser
24+
import org.apache.kyuubi.spark.connector.tpcds.TPCDSConf._
25+
26+
case class TPCDSConf(spark: SparkSession, options: CaseInsensitiveStringMap) {
27+
28+
private val confParser: SparkConfParser = SparkConfParser(options, spark.conf, null)
29+
// When true, use CHAR VARCHAR; otherwise use STRING
30+
lazy val useAnsiStringType: Boolean = confParser.booleanConf()
31+
.option(USE_ANSI_STRING_TYPE)
32+
.sessionConf(USE_ANSI_STRING_TYPE)
33+
.defaultValue(USE_ANSI_STRING_TYPE_DEFAULT)
34+
.parse()
35+
// 09-26-2017 v2.6.0
36+
// Replaced two occurrences of "c_last_review_date" with "c_last_review_date_sk" to be consistent
37+
// with Table 2-14 (Customer Table Column Definitions) in section 2.4.7 of the specification
38+
// (fogbugz 2046).
39+
//
40+
// https://www.tpc.org/tpc_documents_current_versions/pdf/tpc-ds_v3.2.0.pdf
41+
lazy val useTableSchema_2_6: Boolean = confParser.booleanConf()
42+
.option(USE_TABLE_SCHEMA_2_6)
43+
.sessionConf(USE_TABLE_SCHEMA_2_6)
44+
.defaultValue(USE_TABLE_SCHEMA_2_6_DEFAULT)
45+
.parse()
46+
}
47+
48+
object TPCDSConf {
49+
val USE_ANSI_STRING_TYPE = "useAnsiStringType"
50+
val USE_ANSI_STRING_TYPE_DEFAULT = false;
51+
val USE_TABLE_SCHEMA_2_6 = "useTableSchema_2_6"
52+
val USE_TABLE_SCHEMA_2_6_DEFAULT = true
53+
}

extensions/spark/kyuubi-spark-connector-tpcds/src/main/scala/org/apache/kyuubi/spark/connector/tpcds/TPCDSTable.scala

Lines changed: 6 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -35,21 +35,12 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap
3535
class TPCDSTable(tbl: String, scale: Double, options: CaseInsensitiveStringMap)
3636
extends SparkTable with SupportsRead {
3737

38-
// When true, use CHAR VARCHAR; otherwise use STRING
39-
val useAnsiStringType: Boolean = options.getBoolean("useAnsiStringType", false)
40-
41-
// 09-26-2017 v2.6.0
42-
// Replaced two occurrences of "c_last_review_date" with "c_last_review_date_sk" to be consistent
43-
// with Table 2-14 (Customer Table Column Definitions) in section 2.4.7 of the specification
44-
// (fogbugz 2046).
45-
//
46-
// https://www.tpc.org/tpc_documents_current_versions/pdf/tpc-ds_v3.2.0.pdf
47-
val useTableSchema_2_6: Boolean = options.getBoolean("useTableSchema_2_6", true)
48-
4938
val tpcdsTable: Table = Table.getTable(tbl)
5039

5140
lazy val spark: SparkSession = SparkSession.active
5241

42+
lazy val tpcdsConf: TPCDSConf = TPCDSConf(spark, options);
43+
5344
override def name: String = s"${TPCDSSchemaUtils.dbName(scale)}.$tbl"
5445

5546
override def toString: String = s"TPCDSTable($name)"
@@ -67,14 +58,14 @@ class TPCDSTable(tbl: String, scale: Double, options: CaseInsensitiveStringMap)
6758
// Like: io.trino.tpcds.row.CallCenterRow.getValues
6859
val index = TPCDSSchemaUtils.reviseNullColumnIndex(tpcdsTable, i)
6960
StructField(
70-
TPCDSSchemaUtils.reviseColumnName(c, useTableSchema_2_6),
61+
TPCDSSchemaUtils.reviseColumnName(c, tpcdsConf.useTableSchema_2_6),
7162
toSparkDataType(c.getType),
7263
nullable(index))
7364
})
7465
}
7566

7667
override def partitioning: Array[Transform] = TPCDSSchemaUtils
77-
.tablePartitionColumnNames(tpcdsTable, useTableSchema_2_6)
68+
.tablePartitionColumnNames(tpcdsTable, tpcdsConf.useTableSchema_2_6)
7869
.map { Expressions.identity }
7970

8071
override def capabilities(): util.Set[TableCapability] =
@@ -92,9 +83,9 @@ class TPCDSTable(tbl: String, scale: Double, options: CaseInsensitiveStringMap)
9283
case (DATE, None, None) => DateType
9384
case (DECIMAL, Some(precision), Some(scale)) => DecimalType(precision, scale)
9485
case (VARCHAR, Some(precision), None) =>
95-
if (useAnsiStringType) VarcharType(precision) else StringType
86+
if (tpcdsConf.useAnsiStringType) VarcharType(precision) else StringType
9687
case (CHAR, Some(precision), None) =>
97-
if (useAnsiStringType) CharType(precision) else StringType
88+
if (tpcdsConf.useAnsiStringType) CharType(precision) else StringType
9889
case (t, po, so) =>
9990
throw new IllegalArgumentException(s"Unsupported TPC-DS type: ($t, $po, $so)")
10091
}

0 commit comments

Comments
 (0)