From 7f83f0d6999a1fd8cb67b5515ad598c8ed6a7ef2 Mon Sep 17 00:00:00 2001 From: Ankita Victor Date: Thu, 25 Apr 2024 23:00:59 +0530 Subject: [PATCH 1/5] Add array_size --- .../org/apache/gluten/utils/CHExpressionUtil.scala | 1 + .../execution/ScalarFunctionsValidateSuite.scala | 12 ++++++++++++ .../gluten/expression/ExpressionMappings.scala | 1 + .../apache/gluten/expression/ExpressionNames.scala | 1 + 4 files changed, 15 insertions(+) diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/utils/CHExpressionUtil.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/utils/CHExpressionUtil.scala index 3663ef07a7ac..0538470c9713 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/utils/CHExpressionUtil.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/utils/CHExpressionUtil.scala @@ -175,6 +175,7 @@ object CHExpressionUtil { ARRAY_EXCEPT -> DefaultValidator(), ARRAY_REPEAT -> DefaultValidator(), ARRAY_REMOVE -> DefaultValidator(), + ARRAY_SIZE -> DefaultValidator(), DATE_FROM_UNIX_DATE -> DefaultValidator(), UNIX_DATE -> DefaultValidator(), MONOTONICALLY_INCREASING_ID -> DefaultValidator(), diff --git a/backends-velox/src/test/scala/org/apache/gluten/execution/ScalarFunctionsValidateSuite.scala b/backends-velox/src/test/scala/org/apache/gluten/execution/ScalarFunctionsValidateSuite.scala index b1f01537dc48..2011ca79b177 100644 --- a/backends-velox/src/test/scala/org/apache/gluten/execution/ScalarFunctionsValidateSuite.scala +++ b/backends-velox/src/test/scala/org/apache/gluten/execution/ScalarFunctionsValidateSuite.scala @@ -821,4 +821,16 @@ class ScalarFunctionsValidateSuite extends FunctionsValidateTest { } } + test("test array_size") { + runQueryAndCompare("select array_size(array())") { + checkGlutenOperatorMatch[ProjectExecTransformer] + } + runQueryAndCompare("select array_size(array(true))") { + checkGlutenOperatorMatch[ProjectExecTransformer] + } + runQueryAndCompare("select array_size(array(2, 1))") { + checkGlutenOperatorMatch[ProjectExecTransformer] + } + } + } diff --git a/gluten-core/src/main/scala/org/apache/gluten/expression/ExpressionMappings.scala b/gluten-core/src/main/scala/org/apache/gluten/expression/ExpressionMappings.scala index 6be5b0f9bb58..70e2cbd02a7d 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/expression/ExpressionMappings.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/expression/ExpressionMappings.scala @@ -230,6 +230,7 @@ object ExpressionMappings { Sig[ArrayExcept](ARRAY_EXCEPT), Sig[ArrayRepeat](ARRAY_REPEAT), Sig[ArrayRemove](ARRAY_REMOVE), + Sig[ArraySize](ARRAY_SIZE), Sig[ArrayFilter](FILTER), Sig[ArrayForAll](FORALL), Sig[ArrayExists](EXISTS), diff --git a/shims/common/src/main/scala/org/apache/gluten/expression/ExpressionNames.scala b/shims/common/src/main/scala/org/apache/gluten/expression/ExpressionNames.scala index 44384be72f61..713ea7a516fc 100644 --- a/shims/common/src/main/scala/org/apache/gluten/expression/ExpressionNames.scala +++ b/shims/common/src/main/scala/org/apache/gluten/expression/ExpressionNames.scala @@ -246,6 +246,7 @@ object ExpressionNames { final val ARRAY_EXCEPT = "array_except" final val ARRAY_REPEAT = "array_repeat" final val ARRAY_REMOVE = "array_remove" + final val ARRAY_SIZE = "array_size" final val FILTER = "filter" final val FORALL = "forall" final val EXISTS = "exists" From d6886c1b96bf75569bebf3e6cdba6a1b3abc8464 Mon Sep 17 00:00:00 2001 From: Ankita Victor Date: Thu, 25 Apr 2024 23:50:05 +0530 Subject: [PATCH 2/5] Remove array_size from 3.2 --- .../apache/gluten/expression/ExpressionMappings.scala | 1 - .../apache/gluten/sql/shims/spark33/Spark33Shims.scala | 3 ++- .../apache/gluten/sql/shims/spark34/Spark34Shims.scala | 3 ++- .../apache/gluten/sql/shims/spark35/Spark35Shims.scala | 9 ++++----- 4 files changed, 8 insertions(+), 8 deletions(-) diff --git a/gluten-core/src/main/scala/org/apache/gluten/expression/ExpressionMappings.scala b/gluten-core/src/main/scala/org/apache/gluten/expression/ExpressionMappings.scala index 70e2cbd02a7d..6be5b0f9bb58 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/expression/ExpressionMappings.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/expression/ExpressionMappings.scala @@ -230,7 +230,6 @@ object ExpressionMappings { Sig[ArrayExcept](ARRAY_EXCEPT), Sig[ArrayRepeat](ARRAY_REPEAT), Sig[ArrayRemove](ARRAY_REMOVE), - Sig[ArraySize](ARRAY_SIZE), Sig[ArrayFilter](FILTER), Sig[ArrayForAll](FORALL), Sig[ArrayExists](EXISTS), diff --git a/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala b/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala index d264bd1acc55..2515db51470c 100644 --- a/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala +++ b/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala @@ -69,7 +69,8 @@ class Spark33Shims extends SparkShims { Sig[Empty2Null](ExpressionNames.EMPTY2NULL), Sig[TimestampAdd](TIMESTAMP_ADD), Sig[RoundFloor](FLOOR), - Sig[RoundCeil](CEIL) + Sig[RoundCeil](CEIL), + Sig[ArraySize](ARRAY_SIZE) ) } diff --git a/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala b/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala index 53d7d7f8f8b2..39cf578d6e50 100644 --- a/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala +++ b/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala @@ -70,7 +70,8 @@ class Spark34Shims extends SparkShims { Sig[Sec](ExpressionNames.SEC), Sig[Csc](ExpressionNames.CSC), Sig[KnownNullable](KNOWN_NULLABLE), - Sig[Empty2Null](ExpressionNames.EMPTY2NULL) + Sig[Empty2Null](ExpressionNames.EMPTY2NULL), + Sig[ArraySize](ARRAY_SIZE) ) } diff --git a/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala b/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala index f78d4ca6b52d..04ec8a0b362f 100644 --- a/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala +++ b/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala @@ -16,9 +16,9 @@ */ package org.apache.gluten.sql.shims.spark35 +import org.apache.gluten.expression.ExpressionNames.ARRAY_SIZE import org.apache.gluten.expression.{ExpressionNames, Sig} import org.apache.gluten.sql.shims.{ShimDescriptor, SparkShims} - import org.apache.spark._ import org.apache.spark.broadcast.Broadcast import org.apache.spark.internal.io.FileCommitProtocol @@ -29,7 +29,7 @@ import org.apache.spark.sql.{AnalysisException, SparkSession} import org.apache.spark.sql.catalyst.{ExtendedAnalysisException, InternalRow} import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.expressions.aggregate.{BloomFilterAggregate, RegrIntercept, RegrR2, RegrReplacement, RegrSlope, RegrSXY, TypedImperativeAggregate} +import org.apache.spark.sql.catalyst.expressions.aggregate.{BloomFilterAggregate, RegrIntercept, RegrR2, RegrReplacement, RegrSXY, RegrSlope, TypedImperativeAggregate} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, Distribution, KeyGroupedPartitioning, Partitioning} import org.apache.spark.sql.catalyst.rules.Rule @@ -48,12 +48,10 @@ import org.apache.spark.sql.execution.window.{WindowGroupLimitExec, WindowGroupL import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.storage.{BlockId, BlockManagerId} - import org.apache.hadoop.fs.{FileStatus, Path} import java.time.ZoneOffset import java.util.{HashMap => JHashMap, Map => JMap} - import scala.reflect.ClassTag class Spark35Shims extends SparkShims { @@ -70,7 +68,8 @@ class Spark35Shims extends SparkShims { Sig[SplitPart](ExpressionNames.SPLIT_PART), Sig[Sec](ExpressionNames.SEC), Sig[Csc](ExpressionNames.CSC), - Sig[Empty2Null](ExpressionNames.EMPTY2NULL)) + Sig[Empty2Null](ExpressionNames.EMPTY2NULL), + Sig[ArraySize](ARRAY_SIZE)) } override def aggregateExpressionMappings: Seq[Sig] = { From 294f74815302b1b27a80d426bd6b0b990d369644 Mon Sep 17 00:00:00 2001 From: Ankita Victor Date: Mon, 29 Apr 2024 15:42:22 +0530 Subject: [PATCH 3/5] Fix format --- .../apache/gluten/sql/shims/spark33/Spark33Shims.scala | 2 +- .../apache/gluten/sql/shims/spark34/Spark34Shims.scala | 2 +- .../apache/gluten/sql/shims/spark35/Spark35Shims.scala | 10 +++++++--- 3 files changed, 9 insertions(+), 5 deletions(-) diff --git a/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala b/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala index 2515db51470c..a712e602f3db 100644 --- a/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala +++ b/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala @@ -18,7 +18,7 @@ package org.apache.gluten.sql.shims.spark33 import org.apache.gluten.execution.datasource.GlutenParquetWriterInjects import org.apache.gluten.expression.{ExpressionNames, Sig} -import org.apache.gluten.expression.ExpressionNames.{CEIL, FLOOR, KNOWN_NULLABLE, TIMESTAMP_ADD} +import org.apache.gluten.expression.ExpressionNames.{ARRAY_SIZE, CEIL, FLOOR, KNOWN_NULLABLE, TIMESTAMP_ADD} import org.apache.gluten.sql.shims.{ShimDescriptor, SparkShims} import org.apache.spark._ diff --git a/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala b/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala index 39cf578d6e50..8c4e9a62cd8a 100644 --- a/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala +++ b/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala @@ -17,7 +17,7 @@ package org.apache.gluten.sql.shims.spark34 import org.apache.gluten.expression.{ExpressionNames, Sig} -import org.apache.gluten.expression.ExpressionNames.KNOWN_NULLABLE +import org.apache.gluten.expression.ExpressionNames.{ARRAY_SIZE, KNOWN_NULLABLE} import org.apache.gluten.sql.shims.{ShimDescriptor, SparkShims} import org.apache.spark._ diff --git a/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala b/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala index 04ec8a0b362f..f1c957f21701 100644 --- a/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala +++ b/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala @@ -16,9 +16,10 @@ */ package org.apache.gluten.sql.shims.spark35 -import org.apache.gluten.expression.ExpressionNames.ARRAY_SIZE import org.apache.gluten.expression.{ExpressionNames, Sig} +import org.apache.gluten.expression.ExpressionNames.ARRAY_SIZE import org.apache.gluten.sql.shims.{ShimDescriptor, SparkShims} + import org.apache.spark._ import org.apache.spark.broadcast.Broadcast import org.apache.spark.internal.io.FileCommitProtocol @@ -29,7 +30,7 @@ import org.apache.spark.sql.{AnalysisException, SparkSession} import org.apache.spark.sql.catalyst.{ExtendedAnalysisException, InternalRow} import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.expressions.aggregate.{BloomFilterAggregate, RegrIntercept, RegrR2, RegrReplacement, RegrSXY, RegrSlope, TypedImperativeAggregate} +import org.apache.spark.sql.catalyst.expressions.aggregate.{BloomFilterAggregate, RegrIntercept, RegrR2, RegrReplacement, RegrSlope, RegrSXY, TypedImperativeAggregate} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, Distribution, KeyGroupedPartitioning, Partitioning} import org.apache.spark.sql.catalyst.rules.Rule @@ -48,10 +49,12 @@ import org.apache.spark.sql.execution.window.{WindowGroupLimitExec, WindowGroupL import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.storage.{BlockId, BlockManagerId} + import org.apache.hadoop.fs.{FileStatus, Path} import java.time.ZoneOffset import java.util.{HashMap => JHashMap, Map => JMap} + import scala.reflect.ClassTag class Spark35Shims extends SparkShims { @@ -69,7 +72,8 @@ class Spark35Shims extends SparkShims { Sig[Sec](ExpressionNames.SEC), Sig[Csc](ExpressionNames.CSC), Sig[Empty2Null](ExpressionNames.EMPTY2NULL), - Sig[ArraySize](ARRAY_SIZE)) + Sig[ArraySize](ARRAY_SIZE) + ) } override def aggregateExpressionMappings: Seq[Sig] = { From b72757ed73d75f918490023f6a7cef78a2163c6b Mon Sep 17 00:00:00 2001 From: Ankita Victor Date: Mon, 29 Apr 2024 19:10:15 +0530 Subject: [PATCH 4/5] Update test --- .../ScalarFunctionsValidateSuite.scala | 23 +++++++++++-------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/backends-velox/src/test/scala/org/apache/gluten/execution/ScalarFunctionsValidateSuite.scala b/backends-velox/src/test/scala/org/apache/gluten/execution/ScalarFunctionsValidateSuite.scala index 73c7c0121a84..24f3f23a5d61 100644 --- a/backends-velox/src/test/scala/org/apache/gluten/execution/ScalarFunctionsValidateSuite.scala +++ b/backends-velox/src/test/scala/org/apache/gluten/execution/ScalarFunctionsValidateSuite.scala @@ -16,6 +16,7 @@ */ package org.apache.gluten.execution +import org.apache.gluten.sql.shims.SparkShimLoader import org.apache.spark.sql.types._ import java.sql.Timestamp @@ -827,15 +828,19 @@ class ScalarFunctionsValidateSuite extends FunctionsValidateTest { } test("test array_size") { - runQueryAndCompare("select array_size(array())") { - checkGlutenOperatorMatch[ProjectExecTransformer] - } - runQueryAndCompare("select array_size(array(true))") { - checkGlutenOperatorMatch[ProjectExecTransformer] - } - runQueryAndCompare("select array_size(array(2, 1))") { - checkGlutenOperatorMatch[ProjectExecTransformer] + if (!SparkShimLoader.getSparkVersion.startsWith("3.2")) { + withTempPath { path => + Seq[Seq[Integer]](Seq(1, null, 5, 4), Seq(5, -1, 8, 9, -7, 2), Seq.empty, null) + .toDF("value") + .write + .parquet(path.getCanonicalPath) + + spark.read.parquet(path.getCanonicalPath).createOrReplaceTempView("array_tbl") + + runQueryAndCompare("select array_size(value) as res from array_tbl;") { + checkGlutenOperatorMatch[ProjectExecTransformer] + } + } } } - } From 98afa37454189b04dcf34e0f8cacd000fbff8a5f Mon Sep 17 00:00:00 2001 From: Ankita Victor Date: Mon, 29 Apr 2024 19:35:25 +0530 Subject: [PATCH 5/5] Fix format --- .../ScalarFunctionsValidateSuite.scala | 24 ++++++++++--------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/backends-velox/src/test/scala/org/apache/gluten/execution/ScalarFunctionsValidateSuite.scala b/backends-velox/src/test/scala/org/apache/gluten/execution/ScalarFunctionsValidateSuite.scala index 24f3f23a5d61..655da20bf711 100644 --- a/backends-velox/src/test/scala/org/apache/gluten/execution/ScalarFunctionsValidateSuite.scala +++ b/backends-velox/src/test/scala/org/apache/gluten/execution/ScalarFunctionsValidateSuite.scala @@ -17,6 +17,7 @@ package org.apache.gluten.execution import org.apache.gluten.sql.shims.SparkShimLoader + import org.apache.spark.sql.types._ import java.sql.Timestamp @@ -829,17 +830,18 @@ class ScalarFunctionsValidateSuite extends FunctionsValidateTest { test("test array_size") { if (!SparkShimLoader.getSparkVersion.startsWith("3.2")) { - withTempPath { path => - Seq[Seq[Integer]](Seq(1, null, 5, 4), Seq(5, -1, 8, 9, -7, 2), Seq.empty, null) - .toDF("value") - .write - .parquet(path.getCanonicalPath) - - spark.read.parquet(path.getCanonicalPath).createOrReplaceTempView("array_tbl") - - runQueryAndCompare("select array_size(value) as res from array_tbl;") { - checkGlutenOperatorMatch[ProjectExecTransformer] - } + withTempPath { + path => + Seq[Seq[Integer]](Seq(1, null, 5, 4), Seq(5, -1, 8, 9, -7, 2), Seq.empty, null) + .toDF("value") + .write + .parquet(path.getCanonicalPath) + + spark.read.parquet(path.getCanonicalPath).createOrReplaceTempView("array_tbl") + + runQueryAndCompare("select array_size(value) as res from array_tbl;") { + checkGlutenOperatorMatch[ProjectExecTransformer] + } } } }