Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-6963] Fix class conflict of CreateIndex from Spark3.3 #9895

Merged
merged 2 commits into from
Oct 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.catalyst.plans.JoinType
import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan}
import org.apache.spark.sql.execution.datasources.HadoopFsRelation
import org.apache.spark.sql.internal.SQLConf

trait HoodieCatalystPlansUtils {
Expand Down Expand Up @@ -79,6 +78,25 @@ trait HoodieCatalystPlansUtils {
*/
def unapplyMergeIntoTable(plan: LogicalPlan): Option[(LogicalPlan, LogicalPlan, Expression)]

/**
* Decomposes [[MatchCreateIndex]] into its arguments with accommodation.
*/
def unapplyCreateIndex(plan: LogicalPlan): Option[(LogicalPlan, String, String, Boolean, Seq[(Seq[String], Map[String, String])], Map[String, String])]

/**
* Decomposes [[MatchDropIndex]] into its arguments with accommodation.
*/
def unapplyDropIndex(plan: LogicalPlan): Option[(LogicalPlan, String, Boolean)]

/**
* Decomposes [[MatchShowIndexes]] into its arguments with accommodation.
*/
def unapplyShowIndexes(plan: LogicalPlan): Option[(LogicalPlan, Seq[Attribute])]

/**
* Decomposes [[MatchRefreshIndex]] into its arguments with accommodation.
*/
def unapplyRefreshIndex(plan: LogicalPlan): Option[(LogicalPlan, String)]

/**
* Spark requires file formats to append the partition path fields to the end of the schema.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,6 @@
statement
: compactionStatement #compactionCommand
| CALL multipartIdentifier callArgumentList? #call
| CREATE INDEX (IF NOT EXISTS)? identifier ON TABLE?
tableIdentifier (USING indexType=identifier)?
LEFT_PAREN columns=multipartIdentifierPropertyList RIGHT_PAREN
(OPTIONS indexOptions=propertyList)? #createIndex
| DROP INDEX (IF EXISTS)? identifier ON TABLE? tableIdentifier #dropIndex
| SHOW INDEXES (FROM | IN) TABLE? tableIdentifier #showIndexes
| REFRESH INDEX identifier ON TABLE? tableIdentifier #refreshIndex
| .*? #passThrough
;

Expand Down Expand Up @@ -110,14 +103,6 @@
| MINUS? BIGDECIMAL_LITERAL #bigDecimalLiteral
;

multipartIdentifierPropertyList
: multipartIdentifierProperty (COMMA multipartIdentifierProperty)*
;

multipartIdentifierProperty
: multipartIdentifier (OPTIONS options=propertyList)?
;

multipartIdentifier
: parts+=identifier ('.' parts+=identifier)*
;
Expand All @@ -135,51 +120,13 @@
nonReserved
: CALL
| COMPACTION
| CREATE
| DROP
| EXISTS
| FROM
| IN
| INDEX
| INDEXES
| IF
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still need some of these tokens for other SQL statements?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I checked and only remove unneeded.

| LIMIT
| NOT
| ON
| OPTIONS
| REFRESH
| RUN
| SCHEDULE
| SHOW
| TABLE
| USING
;

propertyList
: LEFT_PAREN property (COMMA property)* RIGHT_PAREN
;

property
: key=propertyKey (EQ? value=propertyValue)?
;

propertyKey
: identifier (DOT identifier)*
| STRING
;

propertyValue
: INTEGER_VALUE
| DECIMAL_VALUE
| booleanValue
| STRING
;

LEFT_PAREN: '(';
RIGHT_PAREN: ')';
COMMA: ',';
DOT: '.';

ALL: 'ALL';
AT: 'AT';
CALL: 'CALL';
Expand All @@ -195,21 +142,6 @@
FALSE: 'FALSE';
INTERVAL: 'INTERVAL';
TO: 'TO';
CREATE: 'CREATE';
INDEX: 'INDEX';
INDEXES: 'INDEXES';
IF: 'IF';
NOT: 'NOT';
EXISTS: 'EXISTS';
TABLE: 'TABLE';
USING: 'USING';
OPTIONS: 'OPTIONS';
DROP: 'DROP';
FROM: 'FROM';
IN: 'IN';
REFRESH: 'REFRESH';

EQ: '=' | '==';

PLUS: '+';
MINUS: '-';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.datasources.{CreateTable, LogicalRelation}
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.{isMetaField, removeMetaFields}
import org.apache.spark.sql.hudi.analysis.HoodieAnalysis.{MatchCreateTableLike, MatchInsertIntoStatement, MatchMergeIntoTable, ResolvesToHudiTable, sparkAdapter}
import org.apache.spark.sql.hudi.analysis.HoodieAnalysis.{MatchCreateIndex, MatchCreateTableLike, MatchDropIndex, MatchInsertIntoStatement, MatchMergeIntoTable, MatchRefreshIndex, MatchShowIndexes, ResolvesToHudiTable, sparkAdapter}
import org.apache.spark.sql.hudi.command._
import org.apache.spark.sql.hudi.command.procedures.{HoodieProcedures, Procedure, ProcedureArgs}
import org.apache.spark.sql.{AnalysisException, SparkSession}
Expand Down Expand Up @@ -354,6 +354,26 @@ object HoodieAnalysis extends SparkAdapterSupport {
sparkAdapter.getCatalystPlanUtils.unapplyCreateTableLikeCommand(plan)
}

private[sql] object MatchCreateIndex {
def unapply(plan: LogicalPlan): Option[(LogicalPlan, String, String, Boolean, Seq[(Seq[String], Map[String, String])], Map[String, String])] =
sparkAdapter.getCatalystPlanUtils.unapplyCreateIndex(plan)
}

private[sql] object MatchDropIndex {
def unapply(plan: LogicalPlan): Option[(LogicalPlan, String, Boolean)] =
sparkAdapter.getCatalystPlanUtils.unapplyDropIndex(plan)
}

private[sql] object MatchShowIndexes {
def unapply(plan: LogicalPlan): Option[(LogicalPlan, Seq[Attribute])] =
sparkAdapter.getCatalystPlanUtils.unapplyShowIndexes(plan)
}

private[sql] object MatchRefreshIndex {
def unapply(plan: LogicalPlan): Option[(LogicalPlan, String)] =
sparkAdapter.getCatalystPlanUtils.unapplyRefreshIndex(plan)
}

private[sql] def failAnalysis(msg: String): Nothing = {
throw new AnalysisException(msg)
}
Expand Down Expand Up @@ -442,21 +462,20 @@ case class ResolveImplementations() extends Rule[LogicalPlan] {
}

// Convert to CreateIndexCommand
case ci @ CreateIndex(plan @ ResolvesToHudiTable(table), indexName, indexType, ignoreIfExists, columns, options, output) =>
// TODO need to resolve columns
CreateIndexCommand(table, indexName, indexType, ignoreIfExists, columns, options, output)
case ci @ MatchCreateIndex(plan @ ResolvesToHudiTable(table), indexName, indexType, ignoreIfExists, columns, options) if ci.resolved =>
CreateIndexCommand(table, indexName, indexType, ignoreIfExists, columns, options)

// Convert to DropIndexCommand
case di @ DropIndex(plan @ ResolvesToHudiTable(table), indexName, ignoreIfNotExists, output) if di.resolved =>
DropIndexCommand(table, indexName, ignoreIfNotExists, output)
case di @ MatchDropIndex(plan @ ResolvesToHudiTable(table), indexName, ignoreIfNotExists) if di.resolved =>
DropIndexCommand(table, indexName, ignoreIfNotExists)

// Convert to ShowIndexesCommand
case si @ ShowIndexes(plan @ ResolvesToHudiTable(table), output) if si.resolved =>
case si @ MatchShowIndexes(plan @ ResolvesToHudiTable(table), output) if si.resolved =>
ShowIndexesCommand(table, output)

// Covert to RefreshCommand
case ri @ RefreshIndex(plan @ ResolvesToHudiTable(table), indexName, output) if ri.resolved =>
RefreshIndexCommand(table, indexName, output)
case ri @ MatchRefreshIndex(plan @ ResolvesToHudiTable(table), indexName) if ri.resolved =>
RefreshIndexCommand(table, indexName)

case _ => plan
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,9 @@

package org.apache.spark.sql.hudi.command

import com.fasterxml.jackson.annotation.{JsonAutoDetect, PropertyAccessor}
import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
import org.apache.hudi.HoodieConversionUtils.toScalaOption
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.util.JsonUtils
import org.apache.hudi.secondary.index.SecondaryIndexManager
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.catalog.CatalogTable
Expand All @@ -32,23 +31,21 @@ import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.getTableLocation
import org.apache.spark.sql.{Row, SparkSession}

import java.util

import scala.collection.JavaConverters.{collectionAsScalaIterableConverter, mapAsJavaMapConverter}

case class CreateIndexCommand(table: CatalogTable,
indexName: String,
indexType: String,
ignoreIfExists: Boolean,
columns: Seq[(Attribute, Map[String, String])],
options: Map[String, String],
override val output: Seq[Attribute]) extends IndexBaseCommand {
columns: Seq[(Seq[String], Map[String, String])],
options: Map[String, String]) extends IndexBaseCommand {

override def run(sparkSession: SparkSession): Seq[Row] = {
val tableId = table.identifier
val metaClient = createHoodieTableMetaClient(tableId, sparkSession)
val columnsMap: java.util.LinkedHashMap[String, java.util.Map[String, String]] =
new util.LinkedHashMap[String, java.util.Map[String, String]]()
columns.map(c => columnsMap.put(c._1.name, c._2.asJava))
columns.map(c => columnsMap.put(c._1.mkString("."), c._2.asJava))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why change this? for nested fields?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now columns' name is Seq[String] instead of Attribute(UnresolvedAttribute to be more specific), since this pr will try to resolve column names

case (u: UnresolvedFieldName, prop) => resolveFieldNames(cmd.table, u.name, u) -> prop

UnresolvedAttribute is no need anymore, so here I directly use Seq[String] to represent the column name here

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it.


SecondaryIndexManager.getInstance().create(
metaClient, indexName, indexType, ignoreIfExists, columnsMap, options.asJava)
Expand All @@ -65,8 +62,7 @@ case class CreateIndexCommand(table: CatalogTable,

case class DropIndexCommand(table: CatalogTable,
indexName: String,
ignoreIfNotExists: Boolean,
override val output: Seq[Attribute]) extends IndexBaseCommand {
ignoreIfNotExists: Boolean) extends IndexBaseCommand {

override def run(sparkSession: SparkSession): Seq[Row] = {
val tableId = table.identifier
Expand All @@ -90,7 +86,7 @@ case class ShowIndexesCommand(table: CatalogTable,
val metaClient = createHoodieTableMetaClient(table.identifier, sparkSession)
val secondaryIndexes = SecondaryIndexManager.getInstance().show(metaClient)

val mapper = getObjectMapper
val mapper = JsonUtils.getObjectMapper
toScalaOption(secondaryIndexes).map(x =>
x.asScala.map(i => {
val colOptions =
Expand All @@ -100,18 +96,10 @@ case class ShowIndexesCommand(table: CatalogTable,
i.getIndexType.name().toLowerCase, colOptions, options)
}).toSeq).getOrElse(Seq.empty[Row])
}

protected def getObjectMapper: ObjectMapper = {
val mapper = new ObjectMapper
mapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES)
mapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY)
mapper
}
}

case class RefreshIndexCommand(table: CatalogTable,
indexName: String,
override val output: Seq[Attribute]) extends IndexBaseCommand {
indexName: String) extends IndexBaseCommand {

override def run(sparkSession: SparkSession): Seq[Row] = {
val metaClient = createHoodieTableMetaClient(table.identifier, sparkSession)
Expand Down
Loading
Loading