Skip to content

Commit c32b64a

Browse files
dejankrak-dbstefankandic
authored andcommitted
[SPARK-51067][SQL] Revert session level collation for DML queries and apply object level collation for DDL queries
### What changes were proposed in this pull request? This PR is a partial revert of the original PR #48962 that introduced the resolution of default session level collation for DDL and DML queries. The part that is reverted is the default collation resolution for DML queries, whereas the part that is kept is the default collation resolution for DDL queries, which is required to apply the object level collation that was introduced as part of PR #49084. As part of this logic, object level collation is now applied to DDL queries accordingly, with the main logic implemented in ResolveDefaultStringTypes.stringTypeForDDLCommand() method. ### Why are the changes needed? As there were some unresolved technical issues when attempting to merge the functionality from PR #48962 on Delta side, due to its effect on DML queries, it was decided to pause this functionality for now, thus partially reverting unused parts for maintaining a cleaner code moving forward. Also, this is inline with customer feedback where object level collation is much more requested functionality, so the focus is to introduce the resolution of object level collation for DDL queries instead, allowing the collation to be specified per table or view on their creation or modification, with propagating the default collation specified to subsequent queries on top of those entities. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing tests that cover the collations functionality, as well adding new dedicated tests for applying object level collation to the underlying columns. ### Was this patch authored or co-authored using generative AI tooling? No Closes #49772 from dejankrak-db/revert-session-collations. Lead-authored-by: Dejan Krakovic <dejan.krakovic@databricks.com> Co-authored-by: Stefan Kandic <stefan.kandic@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit e92e12a) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent 3228435 commit c32b64a

File tree

36 files changed

+192
-6014
lines changed

36 files changed

+192
-6014
lines changed

common/utils/src/main/resources/error/error-conditions.json

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2308,16 +2308,6 @@
23082308
"The value '<confValue>' in the config \"<confName>\" is invalid."
23092309
],
23102310
"subClass" : {
2311-
"DEFAULT_COLLATION" : {
2312-
"message" : [
2313-
"Cannot resolve the given default collation. Suggested valid collation names: ['<proposals>']?"
2314-
]
2315-
},
2316-
"DEFAULT_COLLATION_NOT_SUPPORTED" : {
2317-
"message" : [
2318-
"Setting default session collation other than UTF8_BINARY is currently not supported."
2319-
]
2320-
},
23212311
"TIME_ZONE" : {
23222312
"message" : [
23232313
"Cannot resolve the given timezone."

connector/avro/src/main/scala/org/apache/spark/sql/avro/SchemaOfAvro.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,15 +23,14 @@ import org.apache.spark.sql.catalyst.expressions.{Expression, LeafExpression, Li
2323
import org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke
2424
import org.apache.spark.sql.catalyst.util.{FailFastMode, ParseMode, PermissiveMode}
2525
import org.apache.spark.sql.errors.QueryCompilationErrors
26-
import org.apache.spark.sql.internal.SQLConf
27-
import org.apache.spark.sql.types.{DataType, ObjectType}
26+
import org.apache.spark.sql.types.{DataType, ObjectType, StringType}
2827

2928
private[sql] case class SchemaOfAvro(
3029
jsonFormatSchema: String,
3130
options: Map[String, String])
3231
extends LeafExpression with RuntimeReplaceable {
3332

34-
override def dataType: DataType = SQLConf.get.defaultStringType
33+
override def dataType: DataType = StringType
3534

3635
override def nullable: Boolean = false
3736

sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -342,8 +342,7 @@ statement
342342
;
343343

344344
setResetStatement
345-
: SET COLLATION collationName=identifier #setCollation
346-
| SET ROLE .*? #failSetRole
345+
: SET ROLE .*? #failSetRole
347346
| SET TIME ZONE interval #setTimeZone
348347
| SET TIME ZONE timezone #setTimeZone
349348
| SET TIME ZONE .*? #setTimeZone

sql/api/src/main/scala/org/apache/spark/sql/internal/SqlApiConf.scala

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import java.util.TimeZone
2020

2121
import scala.util.Try
2222

23-
import org.apache.spark.sql.types.{AtomicType, StringType, TimestampType}
23+
import org.apache.spark.sql.types.{AtomicType, TimestampType}
2424
import org.apache.spark.util.SparkClassUtils
2525

2626
/**
@@ -44,7 +44,6 @@ private[sql] trait SqlApiConf {
4444
def datetimeJava8ApiEnabled: Boolean
4545
def sessionLocalTimeZone: String
4646
def legacyTimeParserPolicy: LegacyBehaviorPolicy.Value
47-
def defaultStringType: StringType
4847
def stackTracesInDataFrameContext: Int
4948
def dataFrameQueryContextEnabled: Boolean
5049
def legacyAllowUntypedScalaUDFs: Boolean
@@ -61,7 +60,6 @@ private[sql] object SqlApiConf {
6160
val LOCAL_RELATION_CACHE_THRESHOLD_KEY: String = {
6261
SqlApiConfHelper.LOCAL_RELATION_CACHE_THRESHOLD_KEY
6362
}
64-
val DEFAULT_COLLATION: String = SqlApiConfHelper.DEFAULT_COLLATION
6563

6664
def get: SqlApiConf = SqlApiConfHelper.getConfGetter.get()()
6765

@@ -87,7 +85,6 @@ private[sql] object DefaultSqlApiConf extends SqlApiConf {
8785
override def datetimeJava8ApiEnabled: Boolean = false
8886
override def sessionLocalTimeZone: String = TimeZone.getDefault.getID
8987
override def legacyTimeParserPolicy: LegacyBehaviorPolicy.Value = LegacyBehaviorPolicy.CORRECTED
90-
override def defaultStringType: StringType = StringType
9188
override def stackTracesInDataFrameContext: Int = 1
9289
override def dataFrameQueryContextEnabled: Boolean = true
9390
override def legacyAllowUntypedScalaUDFs: Boolean = false

sql/api/src/main/scala/org/apache/spark/sql/internal/SqlApiConfHelper.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ private[sql] object SqlApiConfHelper {
3232
val CASE_SENSITIVE_KEY: String = "spark.sql.caseSensitive"
3333
val SESSION_LOCAL_TIMEZONE_KEY: String = "spark.sql.session.timeZone"
3434
val LOCAL_RELATION_CACHE_THRESHOLD_KEY: String = "spark.sql.session.localRelationCacheThreshold"
35-
val DEFAULT_COLLATION: String = "spark.sql.session.collation.default"
3635
val ARROW_EXECUTION_USE_LARGE_VAR_TYPES = "spark.sql.execution.arrow.useLargeVarTypes"
3736

3837
val confGetter: AtomicReference[() => SqlApiConf] = {

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -378,7 +378,7 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
378378
ResolveAliases ::
379379
ResolveSubquery ::
380380
ResolveSubqueryColumnAliases ::
381-
ResolveDefaultStringTypes ::
381+
ResolveDDLCommandStringTypes ::
382382
ResolveWindowOrder ::
383383
ResolveWindowFrame ::
384384
ResolveNaturalAndUsingJoin ::

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CollationTypeCoercion.scala

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -246,10 +246,6 @@ object CollationTypeCoercion {
246246
if (!expressions.exists(e => SchemaUtils.hasNonUTF8BinaryCollation(e.dataType))) {
247247
// if there are no collated types we don't need to do anything
248248
return None
249-
} else if (ResolveDefaultStringTypes.needsResolution(expressions)) {
250-
// if any of the strings types are still not resolved
251-
// we need to wait for them to be resolved first
252-
return None
253249
}
254250

255251
val collationContextWinner = expressions.foldLeft(findCollationContext(expressions.head)) {
Lines changed: 39 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -18,87 +18,57 @@
1818
package org.apache.spark.sql.catalyst.analysis
1919

2020
import org.apache.spark.sql.catalyst.expressions.{Cast, Expression, Literal}
21-
import org.apache.spark.sql.catalyst.plans.logical.{AddColumns, AlterColumns, AlterColumnSpec, AlterViewAs, ColumnDefinition, CreateView, LogicalPlan, QualifiedColType, ReplaceColumns, V1CreateTablePlan, V2CreateTablePlan}
22-
import org.apache.spark.sql.catalyst.rules.{Rule, RuleExecutor}
21+
import org.apache.spark.sql.catalyst.plans.logical.{AddColumns, AlterColumns, AlterColumnSpec, AlterTableCommand, AlterViewAs, ColumnDefinition, CreateTable, CreateView, LogicalPlan, QualifiedColType, ReplaceColumns, V2CreateTablePlan}
22+
import org.apache.spark.sql.catalyst.rules.Rule
23+
import org.apache.spark.sql.connector.catalog.TableCatalog
2324
import org.apache.spark.sql.types.{DataType, StringType}
2425

2526
/**
26-
* Resolves default string types in queries and commands. For queries, the default string type is
27-
* determined by the session's default string type. For DDL, the default string type is the
28-
* default type of the object (table -> schema -> catalog). However, this is not implemented yet.
29-
* So, we will just use UTF8_BINARY for now.
27+
* Resolves string types in DDL commands, where the string type inherits the
28+
* collation from the corresponding object (table/view -> schema -> catalog).
3029
*/
31-
object ResolveDefaultStringTypes extends Rule[LogicalPlan] {
30+
object ResolveDDLCommandStringTypes extends Rule[LogicalPlan] {
3231
def apply(plan: LogicalPlan): LogicalPlan = {
33-
val newPlan = apply0(plan)
34-
if (plan.ne(newPlan)) {
35-
// Due to how tree transformations work and StringType object being equal to
36-
// StringType("UTF8_BINARY"), we need to transform the plan twice
37-
// to ensure the correct results for occurrences of default string type.
38-
val finalPlan = apply0(newPlan)
39-
RuleExecutor.forceAdditionalIteration(finalPlan)
40-
finalPlan
41-
} else {
42-
newPlan
43-
}
44-
}
45-
46-
private def apply0(plan: LogicalPlan): LogicalPlan = {
4732
if (isDDLCommand(plan)) {
4833
transformDDL(plan)
4934
} else {
50-
transformPlan(plan, sessionDefaultStringType)
35+
// For non-DDL commands no need to do any further resolution of string types
36+
plan
5137
}
5238
}
5339

54-
/**
55-
* Returns whether any of the given `plan` needs to have its
56-
* default string type resolved.
57-
*/
58-
def needsResolution(plan: LogicalPlan): Boolean = {
59-
if (!isDDLCommand(plan) && isDefaultSessionCollationUsed) {
60-
return false
40+
/** Default collation used, if object level collation is not provided */
41+
private def defaultCollation: String = "UTF8_BINARY"
42+
43+
/** Returns the string type that should be used in a given DDL command */
44+
private def stringTypeForDDLCommand(table: LogicalPlan): StringType = {
45+
table match {
46+
case createTable: CreateTable if createTable.tableSpec.collation.isDefined =>
47+
StringType(createTable.tableSpec.collation.get)
48+
case createView: CreateView if createView.collation.isDefined =>
49+
StringType(createView.collation.get)
50+
case alterTable: AlterTableCommand if alterTable.table.resolved =>
51+
val collation = Option(alterTable
52+
.table.asInstanceOf[ResolvedTable]
53+
.table.properties.get(TableCatalog.PROP_COLLATION))
54+
if (collation.isDefined) {
55+
StringType(collation.get)
56+
} else {
57+
StringType(defaultCollation)
58+
}
59+
case _ => StringType(defaultCollation)
6160
}
62-
63-
plan.exists(node => needsResolution(node.expressions))
64-
}
65-
66-
/**
67-
* Returns whether any of the given `expressions` needs to have its
68-
* default string type resolved.
69-
*/
70-
def needsResolution(expressions: Seq[Expression]): Boolean = {
71-
expressions.exists(needsResolution)
7261
}
7362

74-
/**
75-
* Returns whether the given `expression` needs to have its
76-
* default string type resolved.
77-
*/
78-
def needsResolution(expression: Expression): Boolean = {
79-
expression.exists(e => transformExpression.isDefinedAt(e))
80-
}
81-
82-
private def isDefaultSessionCollationUsed: Boolean = conf.defaultStringType == StringType
83-
84-
/**
85-
* Returns the default string type that should be used in a given DDL command (for now always
86-
* UTF8_BINARY).
87-
*/
88-
private def stringTypeForDDLCommand(table: LogicalPlan): StringType =
89-
StringType("UTF8_BINARY")
90-
91-
/** Returns the session default string type */
92-
private def sessionDefaultStringType: StringType =
93-
StringType(conf.defaultStringType.collationId)
94-
9563
private def isDDLCommand(plan: LogicalPlan): Boolean = plan exists {
9664
case _: AddColumns | _: ReplaceColumns | _: AlterColumns => true
9765
case _ => isCreateOrAlterPlan(plan)
9866
}
9967

10068
private def isCreateOrAlterPlan(plan: LogicalPlan): Boolean = plan match {
101-
case _: V1CreateTablePlan | _: V2CreateTablePlan | _: CreateView | _: AlterViewAs => true
69+
// For CREATE TABLE, only v2 CREATE TABLE command is supported.
70+
// Also, table DEFAULT COLLATION cannot be specified through CREATE TABLE AS SELECT command.
71+
case _: V2CreateTablePlan | _: CreateView | _: AlterViewAs => true
10272
case _ => false
10373
}
10474

@@ -155,22 +125,22 @@ object ResolveDefaultStringTypes extends Rule[LogicalPlan] {
155125
dataType.existsRecursively(isDefaultStringType)
156126

157127
private def isDefaultStringType(dataType: DataType): Boolean = {
128+
// STRING (without explicit collation) is considered default string type.
129+
// STRING COLLATE <collation_name> (with explicit collation) is not considered
130+
// default string type even when explicit collation is UTF8_BINARY (default collation).
158131
dataType match {
159-
case st: StringType =>
160-
// should only return true for StringType object and not StringType("UTF8_BINARY")
161-
st.eq(StringType) || st.isInstanceOf[TemporaryStringType]
132+
// should only return true for StringType object and not for StringType("UTF8_BINARY")
133+
case st: StringType => st.eq(StringType)
162134
case _ => false
163135
}
164136
}
165137

166138
private def replaceDefaultStringType(dataType: DataType, newType: StringType): DataType = {
139+
// Should replace STRING with the new type.
140+
// Should not replace STRING COLLATE UTF8_BINARY, as that is explicit collation.
167141
dataType.transformRecursively {
168142
case currentType: StringType if isDefaultStringType(currentType) =>
169-
if (currentType == newType) {
170-
TemporaryStringType()
171-
} else {
172-
newType
173-
}
143+
newType
174144
}
175145
}
176146

@@ -186,7 +156,3 @@ object ResolveDefaultStringTypes extends Rule[LogicalPlan] {
186156
}
187157
}
188158
}
189-
190-
case class TemporaryStringType() extends StringType(1) {
191-
override def toString: String = s"TemporaryStringType($collationId)"
192-
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,8 @@ import org.apache.spark.sql.catalyst.trees.AlwaysProcess
2929
object ResolveInlineTables extends Rule[LogicalPlan] with EvalHelper {
3030
override def apply(plan: LogicalPlan): LogicalPlan = {
3131
plan.resolveOperatorsWithPruning(AlwaysProcess.fn, ruleId) {
32-
case table: UnresolvedInlineTable if canResolveTable(table) =>
32+
case table: UnresolvedInlineTable if table.expressionsResolved =>
3333
EvaluateUnresolvedInlineTable.evaluateUnresolvedInlineTable(table)
3434
}
3535
}
36-
37-
private def canResolveTable(table: UnresolvedInlineTable): Boolean = {
38-
table.expressionsResolved && !ResolveDefaultStringTypes.needsResolution(table)
39-
}
4036
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1467,5 +1467,5 @@ case class MultiCommutativeOp(
14671467
* Trait for expressions whose data type should be a default string type.
14681468
*/
14691469
trait DefaultStringProducingExpression extends Expression {
1470-
override def dataType: DataType = SQLConf.get.defaultStringType
1470+
override def dataType: DataType = StringType
14711471
}

0 commit comments

Comments
 (0)