Skip to content

Commit 9375329

Browse files
wForgetpan3793
authored andcommitted
[KYUUBI #2541] Set nullable in table schema
### _Why are the changes needed?_ close #2541 ### _How was this patch tested?_ - [X] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [ ] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request Closes #2635 from wForget/KYUUBI-2541. Closes #2541 3ec56f7 [wforget] fix d1a6766 [wforget] refactor and comments 84ccafa [wforget] add comment and rename method 4b8af0e [wforget] revise column index 105e9ad [wforget] [KYUUBI-2541] Set nullable in table schema Authored-by: wforget <643348094@qq.com> Signed-off-by: Cheng Pan <chengpan@apache.org>
1 parent 0c5b0d1 commit 9375329

File tree

4 files changed

+542
-6
lines changed

4 files changed

+542
-6
lines changed

extensions/spark/kyuubi-spark-connector-tpcds/src/main/scala/org/apache/kyuubi/spark/connector/tpcds/TPCDSCatalog.scala

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ import java.util
2222
import scala.collection.JavaConverters._
2323
import scala.collection.immutable
2424

25-
import io.trino.tpcds.Table
2625
import org.apache.spark.sql.catalyst.analysis.{NoSuchNamespaceException, NoSuchTableException}
2726
import org.apache.spark.sql.connector.catalog.{Identifier, NamespaceChange, SupportsNamespaces, Table => SparkTable, TableCatalog, TableChange}
2827
import org.apache.spark.sql.connector.expressions.Transform
@@ -31,8 +30,7 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap
3130

3231
class TPCDSCatalog extends TableCatalog with SupportsNamespaces {
3332

34-
val tables: Array[String] = Table.getBaseTables.asScala
35-
.map(_.getName).filterNot(_ == "dbgen_version").toArray
33+
val tables: Array[String] = TPCDSTableUtils.BASE_TABLES.map(_.getName)
3634

3735
val scales: Array[Int] = Array(0, 1, 10, 100, 300, 1000, 3000, 10000, 30000, 100000)
3836

extensions/spark/kyuubi-spark-connector-tpcds/src/main/scala/org/apache/kyuubi/spark/connector/tpcds/TPCDSTable.scala

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -61,11 +61,18 @@ class TPCDSTable(tbl: String, scale: Int, options: CaseInsensitiveStringMap)
6161
override def toString: String = s"TPCDSTable($name)"
6262

6363
override def schema: StructType = {
64-
// TODO tpcdsTable.notNullBitMap does not correct, set nullable follows
65-
// https://tpc.org/TPC_Documents_Current_Versions/pdf/TPC-DS_v3.2.0.pdf
64+
def nullable(index: Int): Boolean = {
65+
val bitMask = 1L << index
66+
(bitMask & ~tpcdsTable.getNotNullBitMap) != 0
67+
}
6668
StructType(
6769
tpcdsTable.getColumns.zipWithIndex.map { case (c, i) =>
68-
StructField(reviseColumnName(c), toSparkDataType(c.getType))
70+
// Because the order of `GeneratorColumn` and `Column` of some tables is inconsistent,
71+
// we need to revise the index of null column, in order to be consistent
72+
// with the calculation of null column in the getValues method of Row.
73+
// Like: io.trino.tpcds.row.CallCenterRow.getValues
74+
val index = TPCDSTableUtils.reviseNullColumnIndex(tpcdsTable, i)
75+
StructField(reviseColumnName(c), toSparkDataType(c.getType), nullable(index))
6976
})
7077
}
7178

0 commit comments

Comments
 (0)