Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-29358][SQL] Make unionByName optionally fill missing columns with nulls #28996

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
40 changes: 35 additions & 5 deletions sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
Expand Up @@ -2030,7 +2030,22 @@ class Dataset[T] private[sql](
* @group typedrel
* @since 2.3.0
*/
def unionByName(other: Dataset[T]): Dataset[T] = withSetOperator {
def unionByName(other: Dataset[T]): Dataset[T] = unionByName(other, false)

/**
* Returns a new Dataset containing union of rows in this Dataset and another Dataset.
*
* The difference between this function and [[union]] is that this function
* resolves columns by name (not by position).
*
* When the parameter `allowMissingColumns` is true, this function allows different set
* of column names between two Datasets. Missing columns at each side, will be filled with
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's worth to document a little more about the order sensitive. Previously, it was simple because it follows the schema of original set(=left). With new options, the number of missing columns which will be added at the end are determined by other (=right).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good advice.

* null values.
*
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add an illustrate example like 2016 ~ 2029, @viirya ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay.

* @group typedrel
* @since 3.1.0
*/
def unionByName(other: Dataset[T], allowMissingColumns: Boolean): Dataset[T] = withSetOperator {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have a JIRA to add the corresponding API for Python?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good beginner task for new contributors.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I should create a followup PR for Python and R. But it is okay for a beginner task too.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I filed at SPARK-32798 and SPARK-32799

// Check column name duplication
val resolver = sparkSession.sessionState.analyzer.resolver
val leftOutputAttrs = logicalPlan.output
Expand All @@ -2048,19 +2063,34 @@ class Dataset[T] private[sql](
// Builds a project list for `other` based on `logicalPlan` output names
val rightProjectList = leftOutputAttrs.map { lattr =>
rightOutputAttrs.find { rattr => resolver(lattr.name, rattr.name) }.getOrElse {
throw new AnalysisException(
s"""Cannot resolve column name "${lattr.name}" among """ +
s"""(${rightOutputAttrs.map(_.name).mkString(", ")})""")
if (allowMissingColumns) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it work with nested columns?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, currently it doesn't.

Copy link
Contributor

@cloud-fan cloud-fan Jul 13, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the major problem here is we put the by-name logic in the API method, not in the Analyzer. Shall we add 2 boolean parameters(byName and allowMissingCol) to Union, and move the by-name logic to the type coercion rules?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. I will do it in another PR.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan . unionByName (and by-name logic) has been here since Apache Spark 2.3.0.
Shall we proceed that refactoring suggestion as a separate JIRA?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea it's better to have a new JIRA.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, @cloud-fan .

Alias(Literal(null, lattr.dataType), lattr.name)()
} else {
throw new AnalysisException(
s"""Cannot resolve column name "${lattr.name}" among """ +
s"""(${rightOutputAttrs.map(_.name).mkString(", ")})""")
}
}
}

// Delegates failure checks to `CheckAnalysis`
val notFoundAttrs = rightOutputAttrs.diff(rightProjectList)
val rightChild = Project(rightProjectList ++ notFoundAttrs, other.logicalPlan)

// Builds a project for `logicalPlan` based on `other` output names, if allowing
// missing columns.
val leftChild = if (allowMissingColumns) {
val missingAttrs = notFoundAttrs.map { attr =>
Alias(Literal(null, attr.dataType), attr.name)()
}
Project(leftOutputAttrs ++ missingAttrs, logicalPlan)
} else {
logicalPlan
}

// This breaks caching, but it's usually ok because it addresses a very specific use case:
// using union to union many files or partitions.
CombineUnions(Union(logicalPlan, rightChild))
CombineUnions(Union(leftChild, rightChild))
}

/**
Expand Down
Expand Up @@ -506,4 +506,23 @@ class DataFrameSetOperationsSuite extends QueryTest with SharedSparkSession {
check(lit(2).cast("int"), $"c" === 2, Seq(Row(1, 1, 2, 0), Row(1, 1, 2, 1)))
check(lit(2).cast("int"), $"c" =!= 2, Seq())
}

test("SPARK-29358: Make unionByName optionally fill missing columns with nulls") {
var df1 = Seq(1, 2, 3).toDF("a")
var df2 = Seq(3, 1, 2).toDF("b")
val df3 = Seq(2, 3, 1).toDF("c")
val unionDf = df1.unionByName(df2.unionByName(df3, true), true)
checkAnswer(unionDf,
Row(1, null, null) :: Row(2, null, null) :: Row(3, null, null) :: // df1
Row(null, 3, null) :: Row(null, 1, null) :: Row(null, 2, null) :: // df2
Row(null, null, 2) :: Row(null, null, 3) :: Row(null, null, 1) :: Nil // df3
)

df1 = Seq((1, 2)).toDF("a", "c")
df2 = Seq((3, 4, 5)).toDF("a", "b", "c")
checkAnswer(df1.unionByName(df2, true),
Row(1, 2, null) :: Row(3, 5, 4) :: Nil)
checkAnswer(df2.unionByName(df1, true),
Row(3, 4, 5) :: Row(1, null, 2) :: Nil)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viirya . Can we have both case-sensitive and case-insensitive test coverage?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure.

}
}