Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-47102][SQL] Add the COLLATION_ENABLED config flag #45285

Closed
wants to merge 19 commits into from
Closed
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 5 additions & 0 deletions common/utils/src/main/resources/error/error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -3854,6 +3854,11 @@
"Catalog <catalogName> does not support <operation>."
]
},
"COLLATION" : {
"message" : [
"Collation is not yet supported."
]
},
"COMBINATION_QUERY_RESULT_CLAUSES" : {
"message" : [
"Combination of ORDER BY/SORT BY/DISTRIBUTE BY/CLUSTER BY."
Expand Down
4 changes: 4 additions & 0 deletions docs/sql-error-conditions-unsupported-feature-error-class.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ The ANALYZE TABLE command does not support views.

Catalog `<catalogName>` does not support `<operation>`.

## COLLATION

Collation is not yet supported.

## COMBINATION_QUERY_RESULT_CLAUSES

Combination of ORDER BY/SORT BY/DISTRIBUTE BY/CLUSTER BY.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.apache.spark.sql.catalyst.analysis.ExpressionBuilder
import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.catalyst.util.CollationFactory
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._

@ExpressionDescription(
Expand All @@ -40,6 +41,12 @@ import org.apache.spark.sql.types._
group = "string_funcs")
object CollateExpressionBuilder extends ExpressionBuilder {
override def build(funcName: String, expressions: Seq[Expression]): Expression = {
// We need to throw collationNotEnabledError before unexpectedNullError
// and nonFoldableArgumentError, as we do not want user to see misleading
// messages that collation is enabled
if (!SQLConf.get.collationEnabled) {
throw QueryCompilationErrors.collationNotEnabledError()
}
mihailom-db marked this conversation as resolved.
Show resolved Hide resolved
expressions match {
case Seq(e: Expression, collationExpr: Expression) =>
(collationExpr.dataType, collationExpr.foldable) match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2190,6 +2190,13 @@ class AstBuilder extends DataTypeAstBuilder with SQLConfHelper with Logging {
Collate(expression(ctx.primaryExpression), collationName)
}

override def visitCollateClause(ctx: CollateClauseContext): String = withOrigin(ctx) {
if (!SQLConf.get.collationEnabled) {
throw QueryCompilationErrors.collationNotEnabledError()
}
string(visitStringLit(ctx.stringLit))
}

/**
* Create a [[Cast]] expression.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,12 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat
)
}

def collationNotEnabledError(): Throwable = {
new AnalysisException(
errorClass = "UNSUPPORTED_FEATURE.COLLATION",
messageParameters = Map.empty)
}

def unresolvedUsingColForJoinError(
colName: String, suggestion: String, side: String): Throwable = {
new AnalysisException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -753,6 +753,14 @@ object SQLConf {
.checkValue(_ > 0, "The initial number of partitions must be positive.")
.createOptional

lazy val COLLATION_ENABLED =
buildConf("spark.sql.collation.enabled")
.doc("Collations feature is under development and its use should be done under this" +
"feature flag.")
.version("4.0.0")
.booleanConf
.createWithDefault(Utils.isTesting)

val FETCH_SHUFFLE_BLOCKS_IN_BATCH =
buildConf("spark.sql.adaptive.fetchShuffleBlocksInBatch")
.internal()
Expand Down Expand Up @@ -4962,6 +4970,8 @@ class SQLConf extends Serializable with Logging with SqlApiConf {
}
}

def collationEnabled: Boolean = getConf(COLLATION_ENABLED)

def adaptiveExecutionEnabled: Boolean = getConf(ADAPTIVE_EXECUTION_ENABLED)

def adaptiveExecutionLogLevel: String = getConf(ADAPTIVE_EXECUTION_LOG_LEVEL)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.util.Locale
import org.apache.spark.sql.{AnalysisException, SaveMode, SparkSession}
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions.{Expression, InputFileBlockLength, InputFileBlockStart, InputFileName, RowOrdering}
import org.apache.spark.sql.catalyst.expressions.{Collate, Collation, Expression, InputFileBlockLength, InputFileBlockStart, InputFileName, RowOrdering}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes
Expand All @@ -32,6 +32,7 @@ import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.execution.datasources.{CreateTable => CreateTableV1}
import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.InsertableRelation
import org.apache.spark.sql.types.{AtomicType, StructType, VariantType}
import org.apache.spark.sql.util.PartitioningUtils.normalizePartitionSpec
Expand Down Expand Up @@ -592,3 +593,21 @@ case class QualifyLocationWithWarehouse(catalog: SessionCatalog) extends Rule[Lo
c.copy(tableDesc = newTable)
}
}

object CollationCheck extends (LogicalPlan => Unit) {
def apply(plan: LogicalPlan): Unit = {
plan.foreach {
mihailom-db marked this conversation as resolved.
Show resolved Hide resolved
case operator: LogicalPlan =>
operator.expressions.foreach(_.foreach(
e =>
if (isCollationExpression(e) && !SQLConf.get.collationEnabled) {
throw QueryCompilationErrors.collationNotEnabledError()
}
)
)
}
}

private def isCollationExpression(expression: Expression): Boolean =
expression.isInstanceOf[Collation] || expression.isInstanceOf[Collate]
}
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ abstract class BaseSessionStateBuilder(
HiveOnlyCheck +:
TableCapabilityCheck +:
CommandCheck +:
CollationCheck +:
mihailom-db marked this conversation as resolved.
Show resolved Hide resolved
customCheckRules
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -964,6 +964,37 @@ class QueryCompilationErrorsSuite
"className" -> "org.apache.spark.sql.catalyst.expressions.UnsafeRow"))
}

test("SPARK-47102: the collation feature is off without collate builder call") {
withSQLConf(SQLConf.COLLATION_ENABLED.key -> "false") {
Seq(
"CREATE TABLE t(col STRING COLLATE 'UNICODE_CI') USING parquet",
"CREATE TABLE t(col STRING COLLATE 'UNKNOWN_COLLATION_STRING') USING parquet",
"SELECT 'aaa' COLLATE 'UNICODE_CI'",
"select collation('aaa')"
).foreach { sqlText =>
checkError(
exception = intercept[AnalysisException](sql(sqlText)),
errorClass = "UNSUPPORTED_FEATURE.COLLATION")
}
}
}

test("SPARK-47102: the collation feature is off with collate builder call") {
withSQLConf(SQLConf.COLLATION_ENABLED.key -> "false") {
Seq(
"SELECT collate('aaa', 'UNICODE_CI')",
"SELECT collate('aaa', 'UNKNOWN_COLLATION_STRING')"
).foreach { sqlText =>
checkError(
exception = intercept[AnalysisException](sql(sqlText)),
errorClass = "UNSUPPORTED_FEATURE.COLLATION",
parameters = Map.empty,
context = ExpectedContext(
fragment = sqlText.substring(7), start = 7, stop = sqlText.length - 1))
}
}
}

test("INTERNAL_ERROR: Convert unsupported data type from Spark to Parquet") {
val converter = new SparkToParquetSchemaConverter
val dummyDataType = new DataType {
Expand Down
5 changes: 4 additions & 1 deletion sql/gen-sql-functions-docs.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import os
import re
from collections import namedtuple
from pyspark.conf import SparkConf

# To avoid adding a new direct dependency, we import markdown from within mkdocs.
from mkdocs.structure.pages import markdown
Expand Down Expand Up @@ -239,7 +240,9 @@ def generate_functions_examples_html(jvm, jspark, html_output_dir):


if __name__ == "__main__":
jvm = launch_gateway().jvm
conf = SparkConf()
conf.set("spark.sql.collation.enabled", "true")
jvm = launch_gateway(conf=conf).jvm
jspark = jvm.org.apache.spark.sql.SparkSession.builder().getOrCreate()
jspark.sparkContext().setLogLevel("ERROR") # Make it less noisy.
spark_root_dir = os.path.dirname(os.path.dirname(__file__))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ class HiveSessionStateBuilder(
PreReadCheck +:
TableCapabilityCheck +:
CommandCheck +:
CollationCheck +:
customCheckRules
}

Expand Down