Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-34528][SQL] Named explicitly field in struct of a catalog view #31639

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -35,15 +35,15 @@ import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst._
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
import org.apache.spark.sql.catalyst.expressions.{Alias, Expression, ExpressionInfo, ImplicitCastInputTypes, UpCast}
import org.apache.spark.sql.catalyst.expressions.{Alias, ArrayTransform, CreateStruct, Expression, ExpressionInfo, ImplicitCastInputTypes, LambdaFunction, NamedExpression, TransformKeys, TransformValues, Unevaluable, UnresolvedNamedLambdaVariable, UpCast}
import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParserInterface}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, SubqueryAlias, View}
import org.apache.spark.sql.catalyst.util.{CharVarcharUtils, StringUtils}
import org.apache.spark.sql.connector.catalog.CatalogManager
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.StaticSQLConf.GLOBAL_TEMP_DATABASE
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.{ArrayType, DataType, MapType, Metadata, StructType}
import org.apache.spark.sql.util.{CaseInsensitiveStringMap, PartitioningUtils}
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -875,12 +875,98 @@ class SessionCatalog(
// safe) and Alias (to respect user-specified view column names) according to the view schema
// in the catalog.
val projectList = viewColumnNames.zip(metadata.schema).map { case (name, field) =>
Alias(UpCast(UnresolvedAttribute.quoted(name), field.dataType), field.name)(
explicitMetadata = Some(field.metadata))
createNamedExpr(Seq(), name, field.name, field.dataType,
Some(field.metadata), inLambda = false)
}
View(desc = metadata, isTempView = isTempView, child = Project(projectList, parsedPlan))
}

private def createNamedExpr(
parent : Seq[String],
name : String,
fieldName : String,
dataType : DataType,
metadata : Option[Metadata],
inLambda : Boolean) : NamedExpression = {
Alias(createExpr(parent, name, dataType, metadata, inLambda),
fieldName)(explicitMetadata = metadata)
}

private def createExpr(
parent : Seq[String],
name : String,
dataType : DataType,
metadata : Option[Metadata],
inLambda : Boolean) : Expression = {
val key = UnresolvedNamedLambdaVariable(Seq("key"))
val value = UnresolvedNamedLambdaVariable(Seq("value"))
dataType match {
case structType: StructType => CreateStruct.create(structType.map {
subField => createNamedExpr(parent :+ name, subField.name, subField.name,
subField.dataType, Some(subField.metadata), inLambda)
})
case arrayType : ArrayType => if (needToBeExplode(arrayType)) {
ArrayTransform(
UnresolvedAttribute(parent :+ name),
LambdaFunction(
createExpr(Seq(), "value", arrayType.elementType, metadata, inLambda = true),
Seq(value)
)
)
} else {
upCast(parent :+ name, inLambda, dataType)
}
case mapType : MapType => if (needToBeExplode(mapType.keyType)
&& needToBeExplode(mapType.valueType)) {
TransformValues(
TransformKeys(
UnresolvedAttribute(parent :+ name),
LambdaFunction(createExpr(Seq(), "key", mapType.keyType, metadata, inLambda = true),
Seq(key, value))
),
LambdaFunction(createExpr(Seq(), "value", mapType.valueType, metadata, inLambda = true),
Seq(key, value))
)
} else if (needToBeExplode(mapType.keyType) ) {
TransformKeys(
UnresolvedAttribute(parent :+ name),
LambdaFunction(createExpr(Seq(), "key", mapType.keyType, metadata, inLambda = true),
Seq(key, value))
)
} else if (needToBeExplode(mapType.valueType) ) {
TransformValues(
UnresolvedAttribute(parent :+ name),
LambdaFunction(createExpr(Seq(), "value", mapType.valueType, metadata, inLambda = true),
Seq(key, value))
)
} else {
upCast(parent :+ name, inLambda, dataType)
}
case _ => upCast(parent :+ name, inLambda, dataType)
}
}

private def upCast(
nameParts: Seq[String],
inLambda : Boolean,
dataType: DataType) : Unevaluable = {
UpCast(if (inLambda) {
UnresolvedNamedLambdaVariable(nameParts)
} else {
UnresolvedAttribute(nameParts)
}, dataType)
}

private def needToBeExplode(dataType : DataType) : Boolean = {
dataType match {
case _ : StructType => true
case arrayType: ArrayType => needToBeExplode(arrayType.elementType)
case mapType: MapType => needToBeExplode(mapType.keyType) ||
needToBeExplode(mapType.valueType)
case _ => false
}
}

def lookupTempView(table: String): Option[SubqueryAlias] = {
val formattedTable = formatTableName(table)
getTempView(formattedTable).map { view =>
Expand Down
Expand Up @@ -31,6 +31,7 @@ import org.apache.spark.sql.connector.catalog.CatalogManager
import org.apache.spark.sql.connector.catalog.SupportsNamespaces.PROP_OWNER
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils

class InMemorySessionCatalogSuite extends SessionCatalogSuite {
protected val utils = new CatalogTestUtils {
Expand Down Expand Up @@ -642,13 +643,100 @@ abstract class SessionCatalogSuite extends AnalysisTest with Eventually {
}

private def getViewPlan(metadata: CatalogTable): LogicalPlan = {
import org.apache.spark.sql.catalyst.dsl.expressions._
val projectList = metadata.schema.map { field =>
UpCast(field.name.attr, field.dataType).as(field.name)
createNamedExpr(Seq(), field.name, field.dataType, inLambda = false)
}
Project(projectList, CatalystSqlParser.parsePlan(metadata.viewText.get))
}

private def createNamedExpr(
parent: Seq[String],
fieldName : String,
fieldDateType : DataType,
inLambda : Boolean) : NamedExpression = {
import org.apache.spark.sql.catalyst.dsl.expressions._
createExpr(parent, fieldName, fieldDateType, inLambda).as(fieldName)
}

private def createExpr(
parent: Seq[String],
fieldName : String,
fieldDateType : DataType,
inLambda : Boolean) : Expression = {
import org.apache.spark.sql.catalyst.dsl.expressions._
val key = UnresolvedNamedLambdaVariable(Seq("key"))
val value = UnresolvedNamedLambdaVariable(Seq("value"))
fieldDateType match {
case structType : StructType => CreateStruct.create(structType.map {
subField => createNamedExpr(parent :+ fieldName, subField.name, subField.dataType, inLambda)
})
case arrayType : ArrayType => if (needToBeExplode(arrayType)) {
ArrayTransform(
(parent :+ fieldName).mkString(".").attr,
LambdaFunction(createExpr(Seq(), "value", arrayType.elementType, inLambda = true),
Seq(value)
)
)
} else {
upCast(parent :+ fieldName, fieldDateType, inLambda)
}
case mapType : MapType => if (needToBeExplode(mapType.keyType)
&& needToBeExplode(mapType.valueType)) {
TransformValues(
TransformKeys(
(parent :+ fieldName).mkString(".").attr,
LambdaFunction(createExpr(Seq(), "key", mapType.keyType, inLambda = true),
Seq(key, value)
)
),
LambdaFunction(createExpr(Seq(), "value", mapType.valueType, inLambda = true),
Seq(key, value)
)
)
} else if (needToBeExplode(mapType.keyType)) {
TransformKeys(
(parent :+ fieldName).mkString(".").attr,
LambdaFunction(createExpr(Seq(), "key", mapType.keyType, inLambda = true),
Seq(key, value)
)
)
} else if (needToBeExplode(mapType.valueType)) {
TransformValues(
(parent :+ fieldName).mkString(".").attr,
LambdaFunction(createExpr(Seq(), "value", mapType.valueType, inLambda = true),
Seq(key, value)
)
)
} else {
upCast(parent :+ fieldName, fieldDateType, inLambda)
}
case _ => upCast(parent :+ fieldName, fieldDateType, inLambda)
}
}

private def upCast(
nameParts: Seq[String],
fieldDateType: DataType,
inLambda : Boolean) = {
import org.apache.spark.sql.catalyst.dsl.expressions._
UpCast(
if (inLambda) {
UnresolvedNamedLambdaVariable(nameParts)
} else {
nameParts.mkString(".").attr
}, fieldDateType)
}

private def needToBeExplode(dataType : DataType) : Boolean = {
dataType match {
case _ : StructType => true
case arrayType: ArrayType => needToBeExplode(arrayType.elementType)
case mapType: MapType => needToBeExplode(mapType.keyType) ||
needToBeExplode(mapType.valueType)
case _ => false
}
}

test("look up view relation") {
withBasicCatalog { catalog =>
val props = CatalogTable.catalogAndNamespaceToProps("cat1", Seq("ns1"))
Expand Down Expand Up @@ -686,6 +774,63 @@ abstract class SessionCatalogSuite extends AnalysisTest with Eventually {
}
}


test("SPARK-34528: named explicitly field in struct of a view") {
withEmptyCatalog { catalog =>
val databaseName = "default"
val tableName = "complex_table"
val viewName = "view_table"
val subSubSchema = new StructType()
.add("c", "long")
.add("d", "date")
val subSchema = new StructType()
.add("col1", "int")
.add("col2", "string")
.add("a", "int")
.add("b", "string")
.add("subComplex", subSubSchema)
.add("subMatrix", new ArrayType(
new ArrayType(new MapType(StringType, IntegerType, valueContainsNull = true),
containsNull = true), containsNull = true)
)
.add("subComplexMatrix", new ArrayType(
new ArrayType(subSubSchema, containsNull = true), containsNull = true)
)
.add("subMap", new MapType(StringType, new ArrayType(IntegerType, containsNull = true)
, valueContainsNull = true))
.add("subComplexMap", new MapType(subSubSchema, StringType, valueContainsNull = true))
val schema = new StructType()
.add("id", "int")
.add("complex", subSchema)
.add("array", new ArrayType(IntegerType, containsNull = true))
.add("complexArray", new ArrayType(subSchema, containsNull = true))
.add("map", new MapType(StringType, IntegerType, valueContainsNull = true))
.add("complexMap", new MapType(StringType, subSchema, valueContainsNull = true))

val complexTable = CatalogTable(
identifier = TableIdentifier(tableName, Some(databaseName)),
tableType = CatalogTableType.EXTERNAL,
storage = storageFormat.copy(locationUri = Some(Utils.createTempDir().toURI)),
schema = schema,
provider = Some(defaultProvider))
catalog.createTable(complexTable, ignoreIfExists = false)
val view = CatalogTable(
identifier = TableIdentifier(viewName, Some(databaseName)),
tableType = CatalogTableType.VIEW,
storage = CatalogStorageFormat.empty,
schema = schema,
viewText = Some(s"SELECT * FROM $tableName")
)
catalog.createTable(view, ignoreIfExists = false)
val viewMetadata = catalog.externalCatalog.getTable(databaseName, viewName)
val viewPlan = View(desc = viewMetadata, isTempView = false,
child = getViewPlan(viewMetadata))

comparePlans(catalog.lookupRelation(TableIdentifier(viewName, Some(databaseName))),
SubqueryAlias(Seq(CatalogManager.SESSION_CATALOG_NAME, databaseName, viewName), viewPlan))
}
}

test("table exists") {
withBasicCatalog { catalog =>
assert(catalog.tableExists(TableIdentifier("tbl1", Some("db2"))))
Expand Down
Expand Up @@ -205,7 +205,7 @@ class HiveCatalogedDDLSuite extends DDLSuite with TestHiveSingleton with BeforeA
spark.sql("CREATE VIEW v AS SELECT STRUCT('a' AS `a`, 1 AS b) q")
checkAnswer(spark.table("v"), Row(Row("a", 1)) :: Nil)

spark.sql("ALTER VIEW v AS SELECT STRUCT('a' AS `b`, 1 AS b) q1")
spark.sql("ALTER VIEW v AS SELECT STRUCT('a' AS `c`, 1 AS b) q1")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see. Since we don't allow duplicate names in a top-level, it seems we need to follow it in this case, too. cc: @cloud-fan

scala> spark.sql("CREATE VIEW v1 AS SELECT 'a' AS `a`, 1 AS b")
scala> spark.sql("ALTER VIEW v1 AS SELECT 'a' AS `b`, 1 AS b")
org.apache.spark.sql.AnalysisException: Found duplicate column(s) in the view definition: `b`

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So you want me to add this test ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is out-of-scope in this PR. Do you wanna work on it? If so, please feel free to file jira for it.

val df = spark.table("v")
assert("q1".equals(df.schema.fields(0).name))
checkAnswer(df, Row(Row("a", 1)) :: Nil)
Expand Down