Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-42151][SQL] Align UPDATE assignments with table attributes #40308

Closed
wants to merge 4 commits into from

Conversation

aokolnychyi
Copy link
Contributor

@aokolnychyi aokolnychyi commented Mar 6, 2023

What changes were proposed in this pull request?

This PR adds a rule to align UPDATE assignments with table attributes.

Why are the changes needed?

These changes are needed so that we can rewrite UPDATE statements into executable plans for tables that support row-level operations. In particular, our row-level mutation framework assumes Spark is responsible for building an updated version of each affected row and that row is passed back to the data source.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

This PR comes with tests.

/**
* A rule that aligns assignments in row-level operations.
*
* Note that this rule must be run after resolving default values but before rewriting row-level
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to think about a reliable way to check if default values have been resolved. Right now, it simply relies on the order of rules, which is fragile. Ideas are welcome.

case u: UpdateTable if u.resolved && !u.aligned && shouldAlign(u.table) =>
val newTable = u.table.transform {
case r: DataSourceV2Relation =>
validateStoreAssignmentPolicy()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I follow what we do for V2 inserts.

* @param assignments assignments to align
* @return aligned assignments that match table columns
*/
def alignAssignments(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logic in this method tries to follow by name resolution we have in V2 tables.

@aokolnychyi
Copy link
Contributor Author

cc @huaxingao @cloud-fan @dongjoon-hyun @sunchao @viirya @gengliangwang

errors: Seq[String]): Throwable = {

new AnalysisException(
errorClass = "DATATYPE_MISMATCH.INVALID_ROW_LEVEL_OPERATION_ASSIGNMENTS",
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am using DATATYPE_MISMATCH as it seems appropriate.


private case class ColumnUpdate(ref: Seq[String], expr: Expression)

/**
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan, this doc gives a bit more details about why this PR is a prerequisite for rewriting UPDATEs. Let me know if this makes sense!

object AlignRowLevelCommandAssignments extends Rule[LogicalPlan] {

override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case u: UpdateTable if !u.skipSchemaResolution && u.resolved && !u.aligned =>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can apply this rule only if table implements SupportsRowLevelOperations if that feels safer?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if the table doesn't implement SupportsRowLevelOperations?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I plan to ignore such statements when rewriting UPDATEs into executable plans, like we do today for DELETE. This would allow data sources to inject their own handling.

Comment on lines 39 to 45
* This method processes and reorders given assignments so that each target column gets
* an expression it should be set to. If a column does not have a matching assignment,
* it will be set to its current value. For example, if one passes table attributes c1, c2
* and an assignment c2 = 1, this method will return c1 = c1, c2 = 1.
* <p>
* This method also handles updates to nested columns. If there is an assignment to a particular
* nested field, this method will construct a new struct with one field updated preserving other
* fields that have not been modified. For example, if one passes table attributes c1, c2
* where c2 is a struct with fields n1 and n2 and an assignment c2.n2 = 1, this method will
* return c1 = c1, c2 = struct(c2.n1, 1).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, do we expect a data source that can directly update an inner field? For such data sources, this is a regression.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking about this more, I think this is required by the row level operation framework so we have no choice. Data sources can skip it (skipSchemaResolution return true) and use a more advanced implementation if they can.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, the existing row-level APIs assume Spark is responsible for building an updated version of the row. That should work for Delta, Iceberg, Hudi, Hive ACID.

Once there is another use case, we should be able to extend the framework to cover it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me also know if you think we should only apply this to implementations of SupportsRowLevelOperations.
Here is the original question.

toRef(child)
case GetStructField(child, _, Some(name)) =>
toRef(child) :+ name
case other: ExtractValue =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about GetArrayStructField?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For ALTER COLUMN we support a special syntax to reference any inner field, for example, array_col.element.field1, map_col.key.field2, etc. Shall we support this syntax in UPDATE as well? The related code is in StructType.findNestedField

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should eventually. This PR doesn't support updating arrays or maps, though. I wanted to work on it later and unblock further row-level operation development for now. For now, I throw an exception and support only nested fields in structs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I can add support for those expressions here but fail temporary in the rewrite logic.

@@ -319,6 +319,7 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
ResolveRandomSeed ::
ResolveBinaryArithmetic ::
ResolveUnion ::
AlignRowLevelCommandAssignments ::
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add a comment that this rule cannot be changed in order for now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a comment above.

}
}

test("align assignments (structs)") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There should be a test case for UPDATE nested_struct_table SET s.n_i = 1" that ensures the struct s.n_s is preserved as a whole instead of recursing and generating assignments for each of its children.

This is important if s.n_s contain null values: the assignments must be (s.n_i = 1, s.n_s = s.n_s), not (s.n_i = 1, s.n_s.dn_i = s.n_s.dn_i, s.n_s.dn_l = s.n_s.dn_l) so that s.n_s is still null after the update.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will make sure there is test case for this.

cloud-fan pushed a commit that referenced this pull request Apr 5, 2023
### What changes were proposed in this pull request?

This PR migrates `TableOutputResolver` to use runtime NOT NULL checks instead of checking type compatibility during the analysis phase.

### Why are the changes needed?

These changes are needed per discussion that happened [here](#40308 (comment)).

### Does this PR introduce _any_ user-facing change?

Nullability exceptions will be thrown at runtime (instead of analysis) but there is no API change.

### How was this patch tested?

This PR comes with tests.

Closes #40655 from aokolnychyi/spark-42855-v2.

Authored-by: aokolnychyi <aokolnychyi@apple.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
@@ -3334,9 +3337,6 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
v2Write
}

case u: UpdateTable if !u.skipSchemaResolution && u.resolved =>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have removed this resolution but the logic is same: runtime null checks, varchar/char length checks, etc.

fail(s"Unexpected assignments: $assignments")
}

val sql4 = "UPDATE nested_struct_table SET s.n_i = 1"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@johanl-db, here is the test we talked about. If you have time to contribute any other tests or to check the alignment logic works for Delta, it would be great!

@aokolnychyi
Copy link
Contributor Author

Failures don't seem to be related.

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for updating PR, @aokolnychyi .

attr.withDataType(CharVarcharUtils.getRawType(attr.metadata).getOrElse(attr.dataType))
}

private def applyUpdates(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few ideas to make the code more robust:

  1. I think it's better to operate on the resolved column expressions, instead of turning back the expression to Seq[String]
  2. Given the parser rule for the UPDATE command, the column expression can only be AttributeReference or accessing (array of) struct's fields. We can group by expr.references.head to get a map from AttributeReference to Seq[Expression] and the corresponding update expressions.
  3. We validate the map we got in step 2: for each top-level column, its expressions must be of the same tree height (to avoid updating both 'a.b' and 'a.b.c'), and must be different from each other.
  4. Now it's easy to build the new update expressions: for each top-level column, if it doesn't have a match in the map, use the actual column value as the update expression, else ... (same algorithm below)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's better to operate on the resolved column expressions

I agree, let's see if we can avoid the conversion to references.

We validate the map we got in step 2: for each top-level column, its expressions must be of the same tree height (to avoid updating both 'a.b' and 'a.b.c')

Could you elaborate a bit on how you see the tree height check? Like add a separate method for computing expression height? What about cases when it is OK to have different expression heights like 'a.b.n1' and 'a.c' where a, b, c are all structs?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah you are right, we can't simply check the tree height. I think a better way is to use a ExpressionSet to make sure these column expressions have no duplication.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using ExpressionSet to detect duplicate assignments to a.b.c and a.b.c would be easy. What about cases like a.b and a.b.c where we assign a value to a struct and its field at the same time? Are you thinking of recursively adding all subparts of each column key to ExpressionSet? For instance, we would need to add a.b, a.b.c, a.b.c.d to ExpressionSet for a.b.c.d?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could probably build ExpressionSet for each update key per top-level attribute and check the intersection across all ExpressionSet is empty. Let me know if that's similar to what you thought.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea this SGTM.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan, any ideas on how to avoid deconstructing Seq[String] when applying a set of assignments to a top-level attribute? The problem is that we recurse top to bottom in applyUpdates whereas assignment.key is a set of nested GetStructField calls with the outer expression referring the leaf column.

I can see ways to perform the validation without converting keys to Seq[String] but I don't see an easy way to avoid that in applyUpdates.

Copy link
Contributor

@cloud-fan cloud-fan Apr 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm thinking about this

def alignAssignments(
    assignments: Seq[Assignment],
    attrs: Seq[Attribute]): Seq[Assignment] = {
  // use ExpressionSet to check assignments have no duplication
  ...
  attrs.map { attr =>
    Assignment(attr, applyUpdates(assignments, attr))
  }
}

def applyUpdates(
    assignments: Seq[Assignment],
    col: Expression): Expression = {
  val (exactAssigments, others) = assignments.partition { assignment =>
    assigment.key.semanticEquals(col)
  }
  val relatedAssignments = others.filter { assignment =>
    assigment.key.exists(_.semanticEquals(col))
  }
  assert(exactAssigments.length <= 1)
  if (exactAssigments.nonEmpty) {
    if (relatedAssignments.nonEmpty) fail...
    exactAssigments.head.value
  } else {
    if (relatedAssignments.isEmpty) {
      col
    } else {
      assert(col.dataType.isInstanceOf[StructType])
      CreatedStruct(col.dataType.asInstanceOf[StructType].fields.flatMap { field =>
        Literal(field.name) :: applyUpdates(relatedAssignments, GetStruct(col, field.name)) :: Nil
      })
    }
  }
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perfect, I forgot about exists. Thanks!

assignment.key.exists(_.semanticEquals(colExpr))
}

if (exactAssignments.size > 1) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan, I've changed the approach to avoid deconstructing references. However, I decided to keep the validation while recursing vs doing this in a separate step as we discussed. When I tried to implement that idea, it turned out to be pretty involved with lots of edge cases. For instance, we can't have multiple assignments per top-level key but keys can reference top-level fields many times, a.b.c and a.b.d are allowed but a.b and a.b.c are not.

It felt easier to validate while recursing, similar to TableOutputResolver.

Let me know what you think.

@@ -778,6 +781,7 @@ case class Assignment(key: Expression, value: Expression) extends Expression
override def dataType: DataType = throw new UnresolvedException("nullable")
override def left: Expression = key
override def right: Expression = value
override def sql: String = s"${key.sql} = ${value.sql}"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added this for better error messages.

private def requiresAlignment(table: LogicalPlan): Boolean = {
EliminateSubqueryAliases(table) match {
case r: NamedRelation if r.skipSchemaResolution => false
case DataSourceV2Relation(_: SupportsRowLevelOperations, _, _, _, _) => true
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viirya, I decided not to align assignments if tables don't extend SupportsRowLevelOperations. That way, data sources using their own implementations won't be affected. They can still use AssignmentUtils.

}
}

private def resolveAssignments(p: LogicalPlan): LogicalPlan = {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copied from ResolveOutputRelation to preserve the existing behavior for data sources that rely on custom implementations.

@aokolnychyi
Copy link
Contributor Author

aokolnychyi commented Apr 17, 2023

Failures in streaming tests don't seem related.

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@cloud-fan cloud-fan closed this in 1c057f5 Apr 18, 2023
@aokolnychyi
Copy link
Contributor Author

Thanks for reviewing, @cloud-fan @huaxingao @dongjoon-hyun @viirya @johanl-db!

viirya pushed a commit to viirya/spark-1 that referenced this pull request Oct 19, 2023
This PR migrates `TableOutputResolver` to use runtime NOT NULL checks instead of checking type compatibility during the analysis phase.

These changes are needed per discussion that happened [here](apache#40308 (comment)).

Nullability exceptions will be thrown at runtime (instead of analysis) but there is no API change.

This PR comes with tests.

Closes apache#40655 from aokolnychyi/spark-42855-v2.

Authored-by: aokolnychyi <aokolnychyi@apple.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 4ad55b6)
viirya pushed a commit to viirya/spark-1 that referenced this pull request Oct 19, 2023
### What changes were proposed in this pull request?

This PR adds a rule to align UPDATE assignments with table attributes.

### Why are the changes needed?

These changes are needed so that we can rewrite UPDATE statements into executable plans for tables that support row-level operations. In particular, our row-level mutation framework assumes Spark is responsible for building an updated version of each affected row and that row is passed back to the data source.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

This PR comes with tests.

Closes apache#40308 from aokolnychyi/spark-42151-v2.

Authored-by: aokolnychyi <aokolnychyi@apple.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 1c057f5)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
6 participants