Skip to content

Commit

Permalink
[MINOR] Minor changes around Spark 3.3 support (#6231)
Browse files Browse the repository at this point in the history
Co-authored-by: Shawn Chang <yxchang@amazon.com>
  • Loading branch information
CTTY and Shawn Chang committed Jul 28, 2022
1 parent ea1fbc7 commit 70b5cf6
Show file tree
Hide file tree
Showing 7 changed files with 7 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ trait SparkAdapter extends Serializable {
* Extract condition in [[DeleteFromTable]]
* SPARK-38626 condition is no longer Option in Spark 3.3
*/
def extractCondition(deleteFromTable: Command): Expression
def extractDeleteCondition(deleteFromTable: Command): Expression

/**
* Get parseQuery from ExtendedSqlParser, only for Spark 3.3+
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,7 @@ case class HoodieResolveReferences(sparkSession: SparkSession) extends Rule[Logi
// Resolve Delete Table
case dft @ DeleteFromTable(table, condition)
if sparkAdapter.isHoodieTable(table, sparkSession) && table.resolved =>
val resolveExpression = resolveExpressionFrom(table, None)_
val resolveExpression = resolveExpressionFrom(table, None)(_)
sparkAdapter.resolveDeleteFromTable(dft, resolveExpression)

// Append the meta field to the insert query to walk through the validate for the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,7 @@ case class DeleteHoodieTableCommand(deleteTable: DeleteFromTable) extends Hoodie

// Remove meta fields from the data frame
var df = removeMetaFields(Dataset.ofRows(sparkSession, table))
// SPARK-38626 DeleteFromTable.condition is changed from Option[Expression] to Expression in Spark 3.3
val condition = sparkAdapter.extractCondition(deleteTable)
val condition = sparkAdapter.extractDeleteCondition(deleteTable)
if (condition != null) df = df.filter(Column(condition))

val hoodieCatalogTable = HoodieCatalogTable(sparkSession, tableId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ class Spark2Adapter extends SparkAdapter {
DeleteFromTable(deleteFromTableCommand.table, resolvedCondition)
}

override def extractCondition(deleteFromTable: Command): Expression = {
override def extractDeleteCondition(deleteFromTable: Command): Expression = {
deleteFromTable.asInstanceOf[DeleteFromTable].condition.getOrElse(null)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ class Spark3_1Adapter extends BaseSpark3Adapter {
DeleteFromTable(deleteFromTableCommand.table, resolvedCondition)
}

override def extractCondition(deleteFromTable: Command): Expression = {
override def extractDeleteCondition(deleteFromTable: Command): Expression = {
deleteFromTable.asInstanceOf[DeleteFromTable].condition.getOrElse(null)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class Spark3_2Adapter extends BaseSpark3Adapter {
DeleteFromTable(deleteFromTableCommand.table, resolvedCondition)
}

override def extractCondition(deleteFromTable: Command): Expression = {
override def extractDeleteCondition(deleteFromTable: Command): Expression = {
deleteFromTable.asInstanceOf[DeleteFromTable].condition.getOrElse(null)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ class Spark3_3Adapter extends BaseSpark3Adapter {
DeleteFromTable(deleteFromTableCommand.table, resolveExpression(deleteFromTableCommand.condition))
}

override def extractCondition(deleteFromTable: Command): Expression = {
override def extractDeleteCondition(deleteFromTable: Command): Expression = {
deleteFromTable.asInstanceOf[DeleteFromTable].condition
}

Expand Down

0 comments on commit 70b5cf6

Please sign in to comment.