Skip to content

Commit dbe315e

Browse files
jiaoqingbopan3793
authored andcommitted
[KYUUBI #2664] Kyuubi Spark TPC-H Connector - SupportsReportStatistics
### _Why are the changes needed?_ fix #2664 ### _How was this patch tested?_ - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request Closes #2740 from jiaoqingbo/kyuubi-2664. Closes #2664 ebac8ad [jiaoqingbo] fix ut failed 6532cc5 [jiaoqingbo] [KYUUBI #2664] Kyuubi Spark TPC-H Connector - SupportsReportStatistics Authored-by: jiaoqingbo <1178404354@qq.com> Signed-off-by: Cheng Pan <chengpan@apache.org>
1 parent ff1d7ec commit dbe315e

File tree

4 files changed

+117
-3
lines changed

4 files changed

+117
-3
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.kyuubi.spark.connector.tpch
19+
20+
import org.apache.spark.SPARK_VERSION
21+
22+
object SparkUtils {
23+
24+
/**
25+
* Given a Kyuubi/Spark/Hive version string,
26+
* return the (major version number, minor version number).
27+
* E.g., for 2.0.1-SNAPSHOT, return (2, 0).
28+
*/
29+
def majorMinorVersion(version: String): (Int, Int) = {
30+
"""^(\d+)\.(\d+)(\..*)?$""".r.findFirstMatchIn(version) match {
31+
case Some(m) =>
32+
(m.group(1).toInt, m.group(2).toInt)
33+
case None =>
34+
throw new IllegalArgumentException(s"Tried to parse '$version' as a project" +
35+
s" version string, but it could not find the major and minor version numbers.")
36+
}
37+
}
38+
39+
/**
40+
* Given a Kyuubi/Spark/Hive version string, return the major version number.
41+
* E.g., for 2.0.1-SNAPSHOT, return 2.
42+
*/
43+
def majorVersion(version: String): Int = majorMinorVersion(version)._1
44+
45+
/**
46+
* Given a Kyuubi/Spark/Hive version string, return the minor version number.
47+
* E.g., for 2.0.1-SNAPSHOT, return 0.
48+
*/
49+
def minorVersion(version: String): Int = majorMinorVersion(version)._2
50+
51+
def isSparkVersionAtMost(ver: String): Boolean = {
52+
val runtimeMajor = majorVersion(SPARK_VERSION)
53+
val targetMajor = majorVersion(ver)
54+
(runtimeMajor < targetMajor) || {
55+
val runtimeMinor = minorVersion(SPARK_VERSION)
56+
val targetMinor = minorVersion(ver)
57+
runtimeMajor == targetMajor && runtimeMinor <= targetMinor
58+
}
59+
}
60+
61+
def isSparkVersionAtLeast(ver: String): Boolean = {
62+
val runtimeMajor = majorVersion(SPARK_VERSION)
63+
val targetMajor = majorVersion(ver)
64+
(runtimeMajor > targetMajor) || {
65+
val runtimeMinor = minorVersion(SPARK_VERSION)
66+
val targetMinor = minorVersion(ver)
67+
runtimeMajor == targetMajor && runtimeMinor >= targetMinor
68+
}
69+
}
70+
71+
def isSparkVersionEqualTo(ver: String): Boolean = {
72+
val runtimeMajor = majorVersion(SPARK_VERSION)
73+
val targetMajor = majorVersion(ver)
74+
val runtimeMinor = minorVersion(SPARK_VERSION)
75+
val targetMinor = minorVersion(ver)
76+
runtimeMajor == targetMajor && runtimeMinor == targetMinor
77+
}
78+
}

extensions/spark/kyuubi-spark-connector-tpch/src/main/scala/org/apache/kyuubi/spark/connector/tpch/TPCHBatchScan.scala

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package org.apache.kyuubi.spark.connector.tpch
1919

2020
import java.time.LocalDate
2121
import java.time.format.DateTimeFormatter
22+
import java.util.OptionalLong
2223

2324
import scala.collection.mutable.ArrayBuffer
2425

@@ -38,7 +39,9 @@ class TPCHBatchScan(
3839
@transient table: TpchTable[_],
3940
scale: Int,
4041
schema: StructType) extends ScanBuilder
41-
with Scan with Batch with Serializable {
42+
with SupportsReportStatistics with Batch with Serializable {
43+
44+
private val _sizeInBytes: Long = TPCHStatisticsUtils.sizeInBytes(table, scale)
4245

4346
private val _numRows: Long = TPCHStatisticsUtils.numRows(table, scale)
4447

@@ -69,6 +72,11 @@ class TPCHBatchScan(
6972
new TPCHPartitionReader(chuck.table, chuck.scale, chuck.parallelism, chuck.index, schema)
7073
}
7174

75+
override def estimateStatistics: Statistics = new Statistics {
76+
override def sizeInBytes: OptionalLong = OptionalLong.of(_sizeInBytes)
77+
override def numRows: OptionalLong = OptionalLong.of(_numRows)
78+
}
79+
7280
}
7381

7482
class TPCHPartitionReader(

extensions/spark/kyuubi-spark-connector-tpch/src/main/scala/org/apache/kyuubi/spark/connector/tpch/TPCHStatisticsUtils.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ object TPCHStatisticsUtils {
4646
case (PART_SUPPLIER, scale) => 800000L * scale
4747
case (SUPPLIER, scale) => 10000L * scale
4848
case (NATION, _) => 25L
49-
case (REGION, _) => 25L
49+
case (REGION, _) => 5L
5050
}
5151
}
5252

extensions/spark/kyuubi-spark-connector-tpch/src/test/scala/org/apache/kyuubi/spark/connector/tpch/TPCHCatalogSuite.scala

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.kyuubi.spark.connector.tpch
1919

20-
import org.apache.spark.sql.SparkSession
20+
import org.apache.spark.sql.{AnalysisException, SparkSession}
2121

2222
import org.apache.kyuubi.KyuubiFunSuite
2323

@@ -51,6 +51,34 @@ class TPCHCatalogSuite extends KyuubiFunSuite {
5151
assert(spark.table("tpch.sf1.region").count === 5)
5252
}
5353

54+
test("tpch.sf1 stats") {
55+
def assertStats(tableName: String, sizeInBytes: BigInt, rowCount: BigInt): Unit = {
56+
val stats = spark.table(tableName).queryExecution.analyzed.stats
57+
assert(stats.sizeInBytes == sizeInBytes)
58+
// stats.rowCount only has value after SPARK-33954
59+
if (SparkUtils.isSparkVersionAtLeast("3.2")) {
60+
assert(stats.rowCount.contains(rowCount), tableName)
61+
}
62+
}
63+
64+
assertStats("tpch.sf1.customer", 26850000, 150000)
65+
assertStats("tpch.sf1.orders", 156000000, 1500000)
66+
assertStats("tpch.sf1.lineitem", 672136080, 6001215)
67+
assertStats("tpch.sf1.part", 31000000, 200000)
68+
assertStats("tpch.sf1.partsupp", 115200000, 800000)
69+
assertStats("tpch.sf1.supplier", 1590000, 10000)
70+
assertStats("tpch.sf1.nation", 3200, 25)
71+
assertStats("tpch.sf1.region", 620, 5)
72+
73+
}
74+
75+
test("nonexistent table") {
76+
val exception = intercept[AnalysisException] {
77+
spark.table("tpch.sf1.nonexistent_table")
78+
}
79+
assert(exception.message === "Table or view not found: tpch.sf1.nonexistent_table")
80+
}
81+
5482
override def afterAll(): Unit = {
5583
super.afterAll()
5684
spark.stop()

0 commit comments

Comments
 (0)