Skip to content

[HUDI-6464] Spark SQL Merge Into for pkless tables#9083

Merged
nsivabalan merged 20 commits intoapache:masterfrom
jonvex:mit_add_filters
Jul 7, 2023
Merged

[HUDI-6464] Spark SQL Merge Into for pkless tables#9083
nsivabalan merged 20 commits intoapache:masterfrom
jonvex:mit_add_filters

Conversation

@jonvex
Copy link
Contributor

@jonvex jonvex commented Jun 29, 2023

Change Logs

Tables with a primary key now must join on all primary key columns. Additionally, they can join on the partition path columns as well, which is recommended if the table is not using a global index.

Tables without a primary key can join on any columns in the table. If multiple source table columns match a single target table column, precombine field will be used if set; otherwise, behavior is nondeterminate. To improve performance, the hudi meta cols are retained after the join, so that index lookup and keygeneration can be skipped.

NOTE: non-case sensitive column name recognition no longer working

Impact

Allows usage of sql feature for pkless tables.

Risk level (write none, low medium or high below)

medium
Additional tests have been written

Documentation Update

Release notes

Contributor's checklist

  • Read through contributor's guide
  • Change Logs and Impact were stated clearly
  • Adequate tests were added if applicable
  • CI passed

@jonvex jonvex changed the title Mit add filters PKLess Merge Into Jun 29, 2023
@jonvex jonvex changed the title PKLess Merge Into [HUDI-6464] Spark SQL Merge Into for pkless tables Jul 1, 2023
@jonvex jonvex marked this pull request as ready for review July 1, 2023 02:33
@nsivabalan nsivabalan added release-0.14.0 priority:blocker Production down; release blocker labels Jul 2, 2023
@nsivabalan
Copy link
Contributor

yet to review tests. but you can start addressing the source code comments in the mean time.

@jonvex jonvex requested a review from nsivabalan July 2, 2023 16:38

@Override
public boolean isGlobal() {
return false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the global version is to be implemented, does the user need to simply set a config and we return the true here for the global index? Since the record location is already known from the meta column, how does the global/non-global part come into play here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe it comes into play when changing the partition https://issues.apache.org/jira/browse/HUDI-6471

Comment on lines +207 to +213
/**
* Calls fail analysis on
*
*/
def failAnalysisForMIT(a: Attribute, cols: String): Unit = {}

def createMITJoin(left: LogicalPlan, right: LogicalPlan, joinType: JoinType, condition: Option[Expression], hint: String): LogicalPlan
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Put these into HoodieCatalystPlansUtils?


val hoodieKey = new HoodieKey(recordKey, partitionPath)
val instantTime: Option[String] = if (isPrepped) {
val instantTime: Option[String] = if (isPrepped || mergeIntoWrites) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the commit time pre-populated based on the current commit (not the commit time from the meta columns in the existing files) for MIT records?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it might be changed at write time?



/**
* NOTE TO USERS: YOU SHOULD NOT SET THIS AS YOUR KEYGENERATOR
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add validation in a follow-up PR on the key generator so that SqlKeyGenerator and MergeIntoKeyGenerator should be be set by the user?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could

// NOTE: This rule adjusts [[LogicalRelation]]s resolving into Hudi tables such that
// meta-fields are not affecting the resolution of the target columns to be updated by Spark.
// meta-fields are not affecting the resolution of the target columns to be updated by Spark (Except in the
// case of MergeInto. We leave the meta columns on the target table, and use other means to ensure resolution)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: my understanding is that for pkless table, we need record key, partition path, and filename from the meta columns. Other meta columns can be pruned. I assume if we only keep the three, Spark does not read the others, which is a small improvement we can do.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It makes aligning the schema more difficult so it could be done in a followup


import org.apache.hudi.{HoodieSparkUtils, ScalaAssertionSupport}

class TestMergeIntoTable3 extends HoodieSparkSqlTestBase with ScalaAssertionSupport {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's OK to add a new test class. I think Siva's point is that, instead of naming it with numbers, we should name it with readability in mind, sth like TestMergeIntoWithNonRecordKeyField.

Comment on lines +142 to +143
//
////
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

docs to add? Could you check all places where the comment is empty?

Comment on lines +151 to +202
case _ =>
val newMatchedActions = m.matchedActions.map {
case DeleteAction(deleteCondition) =>
val resolvedDeleteCondition = deleteCondition.map(
resolveExpressionByPlanChildren(_, m))
DeleteAction(resolvedDeleteCondition)
case UpdateAction(updateCondition, assignments) =>
val resolvedUpdateCondition = updateCondition.map(
resolveExpressionByPlanChildren(_, m))
UpdateAction(
resolvedUpdateCondition,
// The update value can access columns from both target and source tables.
resolveAssignments(assignments, m, resolveValuesWithSourceOnly = false))
case UpdateStarAction(updateCondition) =>
////Hudi change: filter out meta fields
//
val assignments = targetTable.output.filter(a => !isMetaField(a.name)).map { attr =>
Assignment(attr, UnresolvedAttribute(Seq(attr.name)))
}
//
////
UpdateAction(
updateCondition.map(resolveExpressionByPlanChildren(_, m)),
// For UPDATE *, the value must from source table.
resolveAssignments(assignments, m, resolveValuesWithSourceOnly = true))
case o => o
}
val newNotMatchedActions = m.notMatchedActions.map {
case InsertAction(insertCondition, assignments) =>
// The insert action is used when not matched, so its condition and value can only
// access columns from the source table.
val resolvedInsertCondition = insertCondition.map(
resolveExpressionByPlanChildren(_, Project(Nil, m.sourceTable)))
InsertAction(
resolvedInsertCondition,
resolveAssignments(assignments, m, resolveValuesWithSourceOnly = true))
case InsertStarAction(insertCondition) =>
// The insert action is used when not matched, so its condition and value can only
// access columns from the source table.
val resolvedInsertCondition = insertCondition.map(
resolveExpressionByPlanChildren(_, Project(Nil, m.sourceTable)))
////Hudi change: filter out meta fields
//
val assignments = targetTable.output.filter(a => !isMetaField(a.name)).map { attr =>
Assignment(attr, UnresolvedAttribute(Seq(attr.name)))
}
//
////
InsertAction(
resolvedInsertCondition,
resolveAssignments(assignments, m, resolveValuesWithSourceOnly = true))
case o => o
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most of the code here are similar to Spark's ResolveReferences. Could you make a note of this and add docs before case mO@MatchMergeIntoTable(targetTableO, sourceTableO, _) to summarize the custom changes?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, is the code copied from Spark 3.2? Any difference among Spark 3.2, 3.3, and 3.4?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I marked the custom changes by surrounding them in
////
//
changes
//
////

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spark 3.2 and 3.3 are the same. In a followup we may want to use the 3.4 code

*
* PLEASE REFRAIN MAKING ANY CHANGES TO THIS CODE UNLESS ABSOLUTELY NECESSARY
*/
object HoodieSpark30Analysis {
Copy link
Contributor

@yihua yihua Jul 3, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For Spark 3.0 and 3.1, have you checked if the code here is different from Spark's ResolveReferences. Given we introduce the custom rule here, we should still match the implementation of ResolveReferences in the corresponding Spark version except for the custom logic you added.


public class HoodieInternalProxyIndex extends HoodieIndex<Object, Object> {

/**
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we move the docs to L29.

@nsivabalan
Copy link
Contributor

Lets wait for all GH actions to succeed before we can land.

Copy link
Contributor

@yihua yihua left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM on the core functionality. Let's address comments in a follow-up PR.

@hudi-bot
Copy link
Collaborator

hudi-bot commented Jul 7, 2023

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

priority:blocker Production down; release blocker release-0.14.0

Projects

Status: ✅ Done

Development

Successfully merging this pull request may close these issues.

4 participants