Skip to content

Commit

Permalink
[HUDI-2811] [MINOR] add descriptions and unify the spark version's me…
Browse files Browse the repository at this point in the history
…thods
  • Loading branch information
YannByron committed Dec 15, 2021
1 parent 9607f5c commit 38fc71d
Show file tree
Hide file tree
Showing 13 changed files with 37 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,14 @@ import scala.collection.JavaConverters.asScalaBufferConverter

object HoodieSparkUtils extends SparkAdapterSupport {

def isSpark2: Boolean = SPARK_VERSION.startsWith("2.")

def isSpark3: Boolean = SPARK_VERSION.startsWith("3.")

def isSpark3_0: Boolean = SPARK_VERSION.startsWith("3.0")

def isSpark3_2: Boolean = SPARK_VERSION.startsWith("3.2")

def getMetaSchema: StructType = {
StructType(HoodieRecord.HOODIE_META_COLUMNS.asScala.map(col => {
StructField(col, StringType, nullable = true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{DataFrame, Dataset, Row, SQLContext, SaveMode, SparkSession}
import org.apache.spark.{SPARK_VERSION, SparkContext}
import org.apache.spark.SparkContext

import java.util.Properties

Expand Down Expand Up @@ -466,13 +466,13 @@ object HoodieSparkSqlWriter {
} else {
HoodieDatasetBulkInsertHelper.prepareHoodieDatasetForBulkInsertWithoutMetaFields(df)
}
if (SPARK_VERSION.startsWith("2.")) {
if (HoodieSparkUtils.isSpark2) {
hoodieDF.write.format("org.apache.hudi.internal")
.option(DataSourceInternalWriterHelper.INSTANT_TIME_OPT_KEY, instantTime)
.options(params)
.mode(SaveMode.Append)
.save()
} else if (SPARK_VERSION.startsWith("3.")) {
} else if(HoodieSparkUtils.isSpark3) {
hoodieDF.write.format("org.apache.hudi.spark3.internal")
.option(DataSourceInternalWriterHelper.INSTANT_TIME_OPT_KEY, instantTime)
.option(HoodieInternalConfig.BULKINSERT_INPUT_DATA_SCHEMA_DDL.key, hoodieDF.schema.toDDL)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package org.apache.spark.sql.avro

import org.apache.avro.Schema

import org.apache.hudi.HoodieSparkUtils

import org.apache.spark.sql.types.DataType

/**
Expand All @@ -27,7 +29,7 @@ import org.apache.spark.sql.types.DataType
*/
case class HoodieAvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType) {

private val avroDeserializer = if (org.apache.spark.SPARK_VERSION.startsWith("3.2")) {
private val avroDeserializer = if (HoodieSparkUtils.isSpark3_2) {
// SPARK-34404: As of Spark3.2, there is no AvroDeserializer's constructor with Schema and DataType arguments.
// So use the reflection to get AvroDeserializer instance.
val constructor = classOf[AvroDeserializer].getConstructor(classOf[Schema], classOf[DataType], classOf[String])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

package org.apache.spark.sql.catalyst.trees

/**
* Similar to `LeafLike` in Spark3.2.
*/
trait HoodieLeafLike[T <: TreeNode[T]] { self: TreeNode[T] =>

override final def children: Seq[T] = Nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.HoodieRecord
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieInstantTimeGenerator}
import org.apache.spark.SPARK_VERSION

import org.apache.spark.sql.{Column, DataFrame, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
Expand Down Expand Up @@ -282,8 +282,6 @@ object HoodieSqlUtils extends SparkAdapterSupport {
.filterKeys(_.startsWith("hoodie."))
}

def isSpark3: Boolean = SPARK_VERSION.startsWith("3.")

def isEnableHive(sparkSession: SparkSession): Boolean =
"hive" == sparkSession.sessionState.conf.getConf(StaticSQLConf.CATALOG_IMPLEMENTATION)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.sql.hudi.analysis

import org.apache.hudi.DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL
import org.apache.hudi.SparkAdapterSupport
import org.apache.hudi.{HoodieSparkUtils, SparkAdapterSupport}

import scala.collection.JavaConverters._

Expand Down Expand Up @@ -140,7 +140,7 @@ case class HoodieResolveReferences(sparkSession: SparkSession) extends Rule[Logi
// We can do this because under the normal case, we should not allow to update or set
// the hoodie's meta field in sql statement, it is a system field, cannot set the value
// by user.
if (HoodieSqlUtils.isSpark3) {
if (HoodieSparkUtils.isSpark3) {
val assignmentFieldNames = assignments.map(_.key).map {
case attr: AttributeReference =>
attr.name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,9 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.trees.HoodieLeafLike
import org.apache.spark.sql.execution.command.RunnableCommand

/**
* Similar to `LeafRunnableCommand` in Spark3.2, `HoodieLeafRunnableCommand` mixed in
* `HoodieLeafLike` can avoid subclasses of `RunnableCommand` to override
* the `withNewChildrenInternal` method repeatedly.
*/
trait HoodieLeafRunnableCommand extends RunnableCommand with HoodieLeafLike[LogicalPlan]
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.hudi

import org.apache.hudi.HoodieSparkUtils.convertToCatalystExpressions
import org.apache.hudi.HoodieSparkUtils.convertToCatalystExpression
import org.apache.spark.SPARK_VERSION

import org.apache.spark.sql.sources.{And, EqualNullSafe, EqualTo, Filter, GreaterThan, GreaterThanOrEqual, In, IsNotNull, IsNull, LessThan, LessThanOrEqual, Not, Or, StringContains, StringEndsWith, StringStartsWith}
import org.apache.spark.sql.types.{DoubleType, IntegerType, LongType, StringType, StructField, StructType}
import org.junit.jupiter.api.Assertions.assertEquals
Expand Down Expand Up @@ -71,7 +71,7 @@ class TestConvertFilterToCatalystExpression {
private def checkConvertFilter(filter: Filter, expectExpression: String): Unit = {
// [SPARK-25769][SPARK-34636][SPARK-34626][SQL] sql method in UnresolvedAttribute,
// AttributeReference and Alias don't quote qualified names properly
val removeQuotesIfNeed = if (expectExpression != null && SPARK_VERSION.startsWith("3.2")) {
val removeQuotesIfNeed = if (expectExpression != null && HoodieSparkUtils.isSpark3_2) {
expectExpression.replace("`", "")
} else {
expectExpression
Expand All @@ -88,7 +88,7 @@ class TestConvertFilterToCatalystExpression {
private def checkConvertFilters(filters: Array[Filter], expectExpression: String): Unit = {
// [SPARK-25769][SPARK-34636][SPARK-34626][SQL] sql method in UnresolvedAttribute,
// AttributeReference and Alias don't quote qualified names properly
val removeQuotesIfNeed = if (expectExpression != null && SPARK_VERSION.startsWith("3.2")) {
val removeQuotesIfNeed = if (expectExpression != null && HoodieSparkUtils.isSpark3_2) {
expectExpression.replace("`", "")
} else {
expectExpression
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql.hudi

import org.apache.hudi.HoodieSparkUtils
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.spark.sql.Row

Expand Down Expand Up @@ -352,7 +353,7 @@ class TestMergeIntoTable2 extends TestHoodieSqlBase {
| when not matched and flag = '1' then insert *
|""".stripMargin

if (HoodieSqlUtils.isSpark3) {
if (HoodieSparkUtils.isSpark3) {
checkExceptionContain(mergeSql)("Columns aliases are not allowed in MERGE")
} else {
spark.sql(mergeSql)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.hudi.spark3.internal;

import org.apache.hudi.HoodieSparkUtils;
import org.apache.spark.sql.catalyst.plans.logical.InsertIntoStatement;
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
import org.apache.spark.sql.catalyst.util.DateFormatter;
Expand All @@ -31,10 +32,10 @@

public class ReflectUtil {

public static InsertIntoStatement createInsertInto(boolean isSpark30, LogicalPlan table, Map<String, Option<String>> partition, Seq<String> userSpecifiedCols,
public static InsertIntoStatement createInsertInto(LogicalPlan table, Map<String, Option<String>> partition, Seq<String> userSpecifiedCols,
LogicalPlan query, boolean overwrite, boolean ifPartitionNotExists) {
try {
if (isSpark30) {
if (HoodieSparkUtils.isSpark3_0()) {
Constructor<InsertIntoStatement> constructor = InsertIntoStatement.class.getConstructor(
LogicalPlan.class, Map.class, LogicalPlan.class, boolean.class, boolean.class);
return constructor.newInstance(table, partition, query, overwrite, ifPartitionNotExists);
Expand All @@ -48,10 +49,10 @@ public static InsertIntoStatement createInsertInto(boolean isSpark30, LogicalPla
}
}

public static DateFormatter getDateFormatter(String sparkVersion, ZoneId zoneId) {
public static DateFormatter getDateFormatter(ZoneId zoneId) {
try {
ClassLoader loader = Thread.currentThread().getContextClassLoader();
if (sparkVersion.startsWith("3.2")) {
if (HoodieSparkUtils.isSpark3_2()) {
Class clazz = loader.loadClass(DateFormatter.class.getName());
Method applyMethod = clazz.getDeclaredMethod("apply");
applyMethod.setAccessible(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ package org.apache.spark.sql.adapter
import org.apache.hudi.Spark3RowSerDe
import org.apache.hudi.client.utils.SparkRowSerDe
import org.apache.hudi.spark3.internal.ReflectUtil
import org.apache.spark.SPARK_VERSION
import org.apache.spark.sql.{Row, SparkSession}

import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.catalyst.expressions.{Expression, Like}
Expand Down Expand Up @@ -79,7 +79,7 @@ class Spark3Adapter extends SparkAdapter {

override def createInsertInto(table: LogicalPlan, partition: Map[String, Option[String]],
query: LogicalPlan, overwrite: Boolean, ifPartitionNotExists: Boolean): LogicalPlan = {
ReflectUtil.createInsertInto(SPARK_VERSION.startsWith("3.0"), table, partition, Seq.empty[String], query, overwrite, ifPartitionNotExists)
ReflectUtil.createInsertInto(table, partition, Seq.empty[String], query, overwrite, ifPartitionNotExists)
}

override def createSparkParsePartitionUtil(conf: SQLConf): SparkParsePartitionUtil = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import org.apache.hadoop.fs.Path
import org.apache.hudi.common.util.PartitionPathEncodeUtils.DEFAULT_PARTITION_PATH
import org.apache.hudi.spark3.internal.ReflectUtil

import org.apache.spark.SPARK_VERSION
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.unescapePathName
import org.apache.spark.sql.catalyst.expressions.{Cast, Literal}
Expand All @@ -54,7 +53,7 @@ class Spark3ParsePartitionUtil(conf: SQLConf) extends SparkParsePartitionUtil {
basePaths: Set[Path],
userSpecifiedDataTypes: Map[String, DataType],
timeZone: TimeZone): InternalRow = {
val dateFormatter = ReflectUtil.getDateFormatter(SPARK_VERSION, timeZone.toZoneId)
val dateFormatter = ReflectUtil.getDateFormatter(timeZone.toZoneId)
val timestampFormatter = TimestampFormatter(timestampPartitionPattern,
timeZone.toZoneId, isParsing = true)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ public void testDataSourceWriterExtraCommitMetadata() throws Exception {
InsertIntoStatement statement = (InsertIntoStatement) spark.sessionState().sqlParser().parsePlan(insertIntoSql);

InsertIntoStatement newStatment = ReflectUtil.createInsertInto(
spark.version().startsWith("3.0"),
statement.table(),
statement.partitionSpec(),
scala.collection.immutable.List.empty(),
Expand Down

0 comments on commit 38fc71d

Please sign in to comment.