Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
7b7fdb1
Addressing merge conflicts
dejankrak-db Feb 3, 2025
fcca8b1
Fixing dependencies
dejankrak-db Feb 3, 2025
61659b5
Reintroducing default string type resolution
dejankrak-db Feb 3, 2025
60dee86
Including analyzer rule
dejankrak-db Feb 3, 2025
0b822a5
Reintroducing parts of the previous resolution logic
dejankrak-db Feb 3, 2025
f096e52
Reintroducing remaining changes from the original PR that should remain
dejankrak-db Feb 3, 2025
0295f03
Fix indentation
dejankrak-db Feb 3, 2025
6e050df
Minor indentation fix
dejankrak-db Feb 3, 2025
6026281
Align brackets
dejankrak-db Feb 3, 2025
1b49e57
Remove string resolution clause in CollationTypeCoercion
dejankrak-db Feb 4, 2025
4649b2b
Revert
dejankrak-db Feb 7, 2025
e95325d
Revert
dejankrak-db Feb 7, 2025
8b60e26
Revert
dejankrak-db Feb 7, 2025
86dd157
Revert
dejankrak-db Feb 7, 2025
5a8f898
Revert
dejankrak-db Feb 7, 2025
724496d
Revert
dejankrak-db Feb 7, 2025
c2d11c8
Revert
dejankrak-db Feb 7, 2025
1dab4f6
Revert
dejankrak-db Feb 7, 2025
ec9a59d
Revert
dejankrak-db Feb 7, 2025
8ee3481
Revert
dejankrak-db Feb 7, 2025
d387e9e
initial
stefankandic Feb 4, 2025
0783ec5
initial
stefankandic Feb 4, 2025
aa1223a
initial
stefankandic Feb 4, 2025
0637483
initial
stefankandic Feb 4, 2025
c430d8c
fix default suite
stefankandic Feb 4, 2025
c1e86be
fix hll test
stefankandic Feb 4, 2025
8e94a68
Merge with latest master
dejankrak-db Feb 7, 2025
a99c407
Merge branch 'apache:master' into revert-session-collations
dejankrak-db Feb 7, 2025
1d90304
Implementing string type resolution for DDL commands using object lev…
dejankrak-db Feb 9, 2025
9e4300e
Fix describe table suite test case expected result
dejankrak-db Feb 9, 2025
86d446c
Fixing and extending tests
dejankrak-db Feb 9, 2025
6c254f1
Removing DML collation reesolution and only leaving DDL collation res…
dejankrak-db Feb 10, 2025
aca4ac2
Update sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/anal…
dejankrak-db Feb 10, 2025
0109ded
Merge branch 'apache:master' into revert-session-collations
dejankrak-db Feb 10, 2025
eba555f
Adding clarifying comments and further tests
dejankrak-db Feb 11, 2025
33a0b74
Remove force additional iteration logic as it is not used or needed a…
dejankrak-db Feb 12, 2025
50a0520
Removing V1CreateTablePlan as it currently does not support table def…
dejankrak-db Feb 13, 2025
7173488
Merge branch 'apache:master' into revert-session-collations
dejankrak-db Feb 13, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 0 additions & 10 deletions common/utils/src/main/resources/error/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -2315,16 +2315,6 @@
"The value '<confValue>' in the config \"<confName>\" is invalid."
],
"subClass" : {
"DEFAULT_COLLATION" : {
"message" : [
"Cannot resolve the given default collation. Suggested valid collation names: ['<proposals>']?"
]
},
"DEFAULT_COLLATION_NOT_SUPPORTED" : {
"message" : [
"Setting default session collation other than UTF8_BINARY is currently not supported."
]
},
"TIME_ZONE" : {
"message" : [
"Cannot resolve the given timezone."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,14 @@ import org.apache.spark.sql.catalyst.expressions.{Expression, LeafExpression, Li
import org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke
import org.apache.spark.sql.catalyst.util.{FailFastMode, ParseMode, PermissiveMode}
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{DataType, ObjectType}
import org.apache.spark.sql.types.{DataType, ObjectType, StringType}

private[sql] case class SchemaOfAvro(
jsonFormatSchema: String,
options: Map[String, String])
extends LeafExpression with RuntimeReplaceable {

override def dataType: DataType = SQLConf.get.defaultStringType
override def dataType: DataType = StringType

override def nullable: Boolean = false

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -342,8 +342,7 @@ statement
;

setResetStatement
: SET COLLATION collationName=identifier #setCollation
| SET ROLE .*? #failSetRole
: SET ROLE .*? #failSetRole
| SET TIME ZONE interval #setTimeZone
| SET TIME ZONE timezone #setTimeZone
| SET TIME ZONE .*? #setTimeZone
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import java.util.TimeZone

import scala.util.Try

import org.apache.spark.sql.types.{AtomicType, StringType, TimestampType}
import org.apache.spark.sql.types.{AtomicType, TimestampType}
import org.apache.spark.util.SparkClassUtils

/**
Expand All @@ -44,7 +44,6 @@ private[sql] trait SqlApiConf {
def datetimeJava8ApiEnabled: Boolean
def sessionLocalTimeZone: String
def legacyTimeParserPolicy: LegacyBehaviorPolicy.Value
def defaultStringType: StringType
def stackTracesInDataFrameContext: Int
def dataFrameQueryContextEnabled: Boolean
def legacyAllowUntypedScalaUDFs: Boolean
Expand All @@ -61,7 +60,6 @@ private[sql] object SqlApiConf {
val LOCAL_RELATION_CACHE_THRESHOLD_KEY: String = {
SqlApiConfHelper.LOCAL_RELATION_CACHE_THRESHOLD_KEY
}
val DEFAULT_COLLATION: String = SqlApiConfHelper.DEFAULT_COLLATION

def get: SqlApiConf = SqlApiConfHelper.getConfGetter.get()()

Expand All @@ -87,7 +85,6 @@ private[sql] object DefaultSqlApiConf extends SqlApiConf {
override def datetimeJava8ApiEnabled: Boolean = false
override def sessionLocalTimeZone: String = TimeZone.getDefault.getID
override def legacyTimeParserPolicy: LegacyBehaviorPolicy.Value = LegacyBehaviorPolicy.CORRECTED
override def defaultStringType: StringType = StringType
override def stackTracesInDataFrameContext: Int = 1
override def dataFrameQueryContextEnabled: Boolean = true
override def legacyAllowUntypedScalaUDFs: Boolean = false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ private[sql] object SqlApiConfHelper {
val CASE_SENSITIVE_KEY: String = "spark.sql.caseSensitive"
val SESSION_LOCAL_TIMEZONE_KEY: String = "spark.sql.session.timeZone"
val LOCAL_RELATION_CACHE_THRESHOLD_KEY: String = "spark.sql.session.localRelationCacheThreshold"
val DEFAULT_COLLATION: String = "spark.sql.session.collation.default"
val ARROW_EXECUTION_USE_LARGE_VAR_TYPES = "spark.sql.execution.arrow.useLargeVarTypes"

val confGetter: AtomicReference[() => SqlApiConf] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
ResolveAliases ::
ResolveSubquery ::
ResolveSubqueryColumnAliases ::
ResolveDefaultStringTypes ::
ResolveDDLCommandStringTypes ::
ResolveWindowOrder ::
ResolveWindowFrame ::
ResolveNaturalAndUsingJoin ::
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,10 +246,6 @@ object CollationTypeCoercion {
if (!expressions.exists(e => SchemaUtils.hasNonUTF8BinaryCollation(e.dataType))) {
// if there are no collated types we don't need to do anything
return None
} else if (ResolveDefaultStringTypes.needsResolution(expressions)) {
// if any of the strings types are still not resolved
// we need to wait for them to be resolved first
return None
}

val collationContextWinner = expressions.foldLeft(findCollationContext(expressions.head)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,87 +18,57 @@
package org.apache.spark.sql.catalyst.analysis

import org.apache.spark.sql.catalyst.expressions.{Cast, Expression, Literal}
import org.apache.spark.sql.catalyst.plans.logical.{AddColumns, AlterColumns, AlterColumnSpec, AlterViewAs, ColumnDefinition, CreateView, LogicalPlan, QualifiedColType, ReplaceColumns, V1CreateTablePlan, V2CreateTablePlan}
import org.apache.spark.sql.catalyst.rules.{Rule, RuleExecutor}
import org.apache.spark.sql.catalyst.plans.logical.{AddColumns, AlterColumns, AlterColumnSpec, AlterTableCommand, AlterViewAs, ColumnDefinition, CreateTable, CreateView, LogicalPlan, QualifiedColType, ReplaceColumns, V2CreateTablePlan}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.connector.catalog.TableCatalog
import org.apache.spark.sql.types.{DataType, StringType}

/**
* Resolves default string types in queries and commands. For queries, the default string type is
* determined by the session's default string type. For DDL, the default string type is the
* default type of the object (table -> schema -> catalog). However, this is not implemented yet.
* So, we will just use UTF8_BINARY for now.
* Resolves string types in DDL commands, where the string type inherits the
* collation from the corresponding object (table/view -> schema -> catalog).
*/
object ResolveDefaultStringTypes extends Rule[LogicalPlan] {
object ResolveDDLCommandStringTypes extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = {
val newPlan = apply0(plan)
if (plan.ne(newPlan)) {
// Due to how tree transformations work and StringType object being equal to
// StringType("UTF8_BINARY"), we need to transform the plan twice
// to ensure the correct results for occurrences of default string type.
val finalPlan = apply0(newPlan)
RuleExecutor.forceAdditionalIteration(finalPlan)
finalPlan
} else {
newPlan
}
}

private def apply0(plan: LogicalPlan): LogicalPlan = {
if (isDDLCommand(plan)) {
transformDDL(plan)
} else {
transformPlan(plan, sessionDefaultStringType)
// For non-DDL commands no need to do any further resolution of string types
plan
}
}

/**
* Returns whether any of the given `plan` needs to have its
* default string type resolved.
*/
def needsResolution(plan: LogicalPlan): Boolean = {
if (!isDDLCommand(plan) && isDefaultSessionCollationUsed) {
return false
/** Default collation used, if object level collation is not provided */
private def defaultCollation: String = "UTF8_BINARY"

/** Returns the string type that should be used in a given DDL command */
private def stringTypeForDDLCommand(table: LogicalPlan): StringType = {
table match {
case createTable: CreateTable if createTable.tableSpec.collation.isDefined =>
StringType(createTable.tableSpec.collation.get)
case createView: CreateView if createView.collation.isDefined =>
StringType(createView.collation.get)
case alterTable: AlterTableCommand if alterTable.table.resolved =>
val collation = Option(alterTable
.table.asInstanceOf[ResolvedTable]
.table.properties.get(TableCatalog.PROP_COLLATION))
if (collation.isDefined) {
StringType(collation.get)
} else {
StringType(defaultCollation)
}
case _ => StringType(defaultCollation)
}

plan.exists(node => needsResolution(node.expressions))
}

/**
* Returns whether any of the given `expressions` needs to have its
* default string type resolved.
*/
def needsResolution(expressions: Seq[Expression]): Boolean = {
expressions.exists(needsResolution)
}

/**
* Returns whether the given `expression` needs to have its
* default string type resolved.
*/
def needsResolution(expression: Expression): Boolean = {
expression.exists(e => transformExpression.isDefinedAt(e))
}

private def isDefaultSessionCollationUsed: Boolean = conf.defaultStringType == StringType

/**
* Returns the default string type that should be used in a given DDL command (for now always
* UTF8_BINARY).
*/
private def stringTypeForDDLCommand(table: LogicalPlan): StringType =
StringType("UTF8_BINARY")

/** Returns the session default string type */
private def sessionDefaultStringType: StringType =
StringType(conf.defaultStringType.collationId)

private def isDDLCommand(plan: LogicalPlan): Boolean = plan exists {
case _: AddColumns | _: ReplaceColumns | _: AlterColumns => true
case _ => isCreateOrAlterPlan(plan)
}

private def isCreateOrAlterPlan(plan: LogicalPlan): Boolean = plan match {
case _: V1CreateTablePlan | _: V2CreateTablePlan | _: CreateView | _: AlterViewAs => true
// For CREATE TABLE, only v2 CREATE TABLE command is supported.
// Also, table DEFAULT COLLATION cannot be specified through CREATE TABLE AS SELECT command.
case _: V2CreateTablePlan | _: CreateView | _: AlterViewAs => true
case _ => false
}

Expand Down Expand Up @@ -155,22 +125,22 @@ object ResolveDefaultStringTypes extends Rule[LogicalPlan] {
dataType.existsRecursively(isDefaultStringType)

private def isDefaultStringType(dataType: DataType): Boolean = {
// STRING (without explicit collation) is considered default string type.
// STRING COLLATE <collation_name> (with explicit collation) is not considered
// default string type even when explicit collation is UTF8_BINARY (default collation).
dataType match {
case st: StringType =>
// should only return true for StringType object and not StringType("UTF8_BINARY")
st.eq(StringType) || st.isInstanceOf[TemporaryStringType]
// should only return true for StringType object and not for StringType("UTF8_BINARY")
case st: StringType => st.eq(StringType)
case _ => false
}
}

private def replaceDefaultStringType(dataType: DataType, newType: StringType): DataType = {
// Should replace STRING with the new type.
// Should not replace STRING COLLATE UTF8_BINARY, as that is explicit collation.
dataType.transformRecursively {
case currentType: StringType if isDefaultStringType(currentType) =>
if (currentType == newType) {
TemporaryStringType()
} else {
newType
}
newType
}
}

Expand All @@ -186,7 +156,3 @@ object ResolveDefaultStringTypes extends Rule[LogicalPlan] {
}
}
}

case class TemporaryStringType() extends StringType(1) {
override def toString: String = s"TemporaryStringType($collationId)"
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,8 @@ import org.apache.spark.sql.catalyst.trees.AlwaysProcess
object ResolveInlineTables extends Rule[LogicalPlan] with EvalHelper {
override def apply(plan: LogicalPlan): LogicalPlan = {
plan.resolveOperatorsWithPruning(AlwaysProcess.fn, ruleId) {
case table: UnresolvedInlineTable if canResolveTable(table) =>
case table: UnresolvedInlineTable if table.expressionsResolved =>
EvaluateUnresolvedInlineTable.evaluateUnresolvedInlineTable(table)
}
}

private def canResolveTable(table: UnresolvedInlineTable): Boolean = {
table.expressionsResolved && !ResolveDefaultStringTypes.needsResolution(table)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1467,5 +1467,5 @@ case class MultiCommutativeOp(
* Trait for expressions whose data type should be a default string type.
*/
trait DefaultStringProducingExpression extends Expression {
override def dataType: DataType = SQLConf.get.defaultStringType
override def dataType: DataType = StringType
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ case class CreateArray(children: Seq[Expression], useStringTypeWhenEmpty: Boolea

private val defaultElementType: DataType = {
if (useStringTypeWhenEmpty) {
SQLConf.get.defaultStringType
StringType
} else {
NullType
}
Expand Down Expand Up @@ -196,7 +196,7 @@ case class CreateMap(children: Seq[Expression], useStringTypeWhenEmpty: Boolean)

private val defaultElementType: DataType = {
if (useStringTypeWhenEmpty) {
SQLConf.get.defaultStringType
StringType
} else {
NullType
}
Expand Down Expand Up @@ -354,7 +354,7 @@ case class MapFromArrays(left: Expression, right: Expression)
case object NamePlaceholder extends LeafExpression with Unevaluable {
override lazy val resolved: Boolean = false
override def nullable: Boolean = false
override def dataType: DataType = SQLConf.get.defaultStringType
override def dataType: DataType = StringType
override def prettyName: String = "NamePlaceholder"
override def toString: String = prettyName
}
Expand Down Expand Up @@ -565,14 +565,14 @@ case class StringToMap(text: Expression, pairDelim: Expression, keyValueDelim: E
extends TernaryExpression with ExpectsInputTypes {
override def nullIntolerant: Boolean = true
def this(child: Expression, pairDelim: Expression) = {
this(child, pairDelim, Literal.create(":", SQLConf.get.defaultStringType))
this(child, pairDelim, Literal.create(":", StringType))
}

def this(child: Expression) = {
this(
child,
Literal.create(",", SQLConf.get.defaultStringType),
Literal.create(":", SQLConf.get.defaultStringType))
Literal.create(",", StringType),
Literal.create(":", StringType))
}

override def stateful: Boolean = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ import org.apache.spark.sql.catalyst.expressions.variant.VariantExpressionEvalUt
import org.apache.spark.sql.catalyst.json.{CreateJacksonParser, JacksonGenerator, JacksonParser, JsonInferSchema, JSONOptions}
import org.apache.spark.sql.catalyst.util.{ArrayData, FailFastMode, FailureSafeParser, MapData, PermissiveMode}
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructField, StructType, VariantType}
import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StringType, StructField, StructType, VariantType}
import org.apache.spark.unsafe.types.{UTF8String, VariantVal}
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -234,7 +233,7 @@ case class SchemaOfJsonEvaluator(options: Map[String, String]) {
.getOrElse(ArrayType(StructType(Nil), containsNull = at.containsNull))
case other: DataType =>
jsonInferSchema.canonicalizeType(other, jsonOptions).getOrElse(
SQLConf.get.defaultStringType)
StringType)
}
}

Expand Down
Loading