From e5b74b0c850ac7929b09e1c9ce55d40df996b9b1 Mon Sep 17 00:00:00 2001 From: waywtdcc Date: Wed, 22 Nov 2023 16:55:06 +0800 Subject: [PATCH 1/3] Support flink engine under the select statement, the results can be read in a stream --- .../apache/kyuubi/engine/spark/schema/SchemaHelper.scala | 7 ++++++- .../engine/spark/operation/SparkOperationSuite.scala | 1 + 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/schema/SchemaHelper.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/schema/SchemaHelper.scala index 8db46e2b7f4..b066ca53378 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/schema/SchemaHelper.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/schema/SchemaHelper.scala @@ -142,7 +142,12 @@ object SchemaHelper { .contains(dt.getClass.getSimpleName) => Some(dt.defaultSize) case dt @ (BooleanType | _: NumericType | DateType | TimestampType | CalendarIntervalType | NullType) => - Some(dt.defaultSize) + // decimal type + if (dt.isInstanceOf[DecimalType]) { + Some(dt.asInstanceOf[DecimalType].precision) + } else { + Some(dt.defaultSize) + } case StructType(fields) => val sizeArr = fields.map(f => getColumnSize(f.dataType)) if (sizeArr.contains(None)) { diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkOperationSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkOperationSuite.scala index adab0231d63..167a5e9eefe 100644 --- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkOperationSuite.scala +++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkOperationSuite.scala @@ -154,6 +154,7 @@ class SparkOperationSuite extends WithSparkSQLEngine with HiveMetadataTests with val colSize = rowSet.getInt(COLUMN_SIZE) schema(pos).dataType match { case StringType | BinaryType | _: ArrayType | _: MapType => assert(colSize === 0) + case d: DecimalType => assert(colSize === d.precision) case StructType(fields) if fields.length == 1 => assert(colSize === 0) case o => assert(colSize === o.defaultSize) } From 4286354c2d5ad6991d12cb0dc82b27409fd5d209 Mon Sep 17 00:00:00 2001 From: waywtdcc Date: Thu, 23 Nov 2023 08:18:13 +0800 Subject: [PATCH 2/3] Support flink engine under the select statement, the results can be read in a stream --- .../apache/kyuubi/engine/spark/schema/SchemaHelper.scala | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/schema/SchemaHelper.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/schema/SchemaHelper.scala index b066ca53378..ebd7caca613 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/schema/SchemaHelper.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/schema/SchemaHelper.scala @@ -140,14 +140,11 @@ object SchemaHelper { case dt if Array(TIMESTAMP_NTZ, DAY_TIME_INTERVAL, YEAR_MONTH_INTERVAL) .contains(dt.getClass.getSimpleName) => Some(dt.defaultSize) + case dt @ (_: DecimalType) => + Some(dt.precision) case dt @ (BooleanType | _: NumericType | DateType | TimestampType | CalendarIntervalType | NullType) => - // decimal type - if (dt.isInstanceOf[DecimalType]) { - Some(dt.asInstanceOf[DecimalType].precision) - } else { - Some(dt.defaultSize) - } + Some(dt.defaultSize) case StructType(fields) => val sizeArr = fields.map(f => getColumnSize(f.dataType)) if (sizeArr.contains(None)) { From 2d288f5aa002b7609063b63469d07ae1bfcf5a05 Mon Sep 17 00:00:00 2001 From: waywtdcc Date: Mon, 27 Nov 2023 10:43:48 +0800 Subject: [PATCH 3/3] [Spark] Fix the inaccurate issue of obtaining COLUMN_SIZE in the decimal field jdbc of spark engine --- .../org/apache/kyuubi/engine/spark/schema/SchemaHelper.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/schema/SchemaHelper.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/schema/SchemaHelper.scala index ebd7caca613..3beab47a5db 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/schema/SchemaHelper.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/schema/SchemaHelper.scala @@ -140,7 +140,7 @@ object SchemaHelper { case dt if Array(TIMESTAMP_NTZ, DAY_TIME_INTERVAL, YEAR_MONTH_INTERVAL) .contains(dt.getClass.getSimpleName) => Some(dt.defaultSize) - case dt @ (_: DecimalType) => + case dt: DecimalType => Some(dt.precision) case dt @ (BooleanType | _: NumericType | DateType | TimestampType | CalendarIntervalType | NullType) =>