Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-4081][HUDI-4472] Addressing Spark SQL vs Spark DS performance gap #6213

Merged
merged 26 commits into from
Jul 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
3d55f22
Tidying up
Jul 25, 2022
47e86f3
Avoid unnecessary RDD dereferencing of the `Dataset`
Jul 25, 2022
ed0b9c9
Tidying up
Jul 25, 2022
c5829c9
Cleaned up query ouput alignment seq to do proper validations and avo…
Jul 25, 2022
d6c34b0
Relaxed ordering requirement
Jul 25, 2022
4be35f1
Fixing compilation
Jul 25, 2022
0188b77
Fixed validating sequence to properly assert whether query output con…
Jul 25, 2022
023be63
Fixed partition-spec assertion;
Jul 25, 2022
a8b0a25
Fixing tests
Jul 25, 2022
27bb52f
Fixed invalid ref
Jul 25, 2022
225d037
Duct-tape the issue of incorrect schema handling in `HoodieSparkSqlWr…
Jul 26, 2022
c7f2688
Revert back to relative-order based mathing (no name lookup)
Jul 26, 2022
ec3cbfb
Simplify scehma reconciliation, schema evolution handling
Jul 26, 2022
b2f5007
Properly reconcile nullability attributes
Jul 26, 2022
193fa73
Rebased `InsertIntoHoodieTableCommand` to rely on Spark's `TableSchem…
Jul 26, 2022
813ea6f
Fixed tests
Jul 26, 2022
f7dae90
Extracted query output resolving/reshaping into `HoodieCatalystPlanUt…
Jul 26, 2022
83bec46
Fixed tests
Jul 27, 2022
f37753a
Added new method `TableSchemaResolver#getTableLatestAvroSchema` to re…
Jul 27, 2022
52a46e8
Fixing tests
Jul 27, 2022
3fa6184
Tidying up
Jul 27, 2022
e089df9
Refactored schema handling in `HoodieSparkSqlWriter` to make sure all…
Jul 27, 2022
0088598
Reverting changes in `TableSchemaResolver`
Jul 27, 2022
40b432e
Fixed tests
Jul 27, 2022
d098342
Fixed more tests (for Spark 2)
Jul 27, 2022
4a639a4
Tidying up
Jul 28, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,33 @@ package org.apache.spark.sql

import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.catalyst.plans.JoinType
import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan}
import org.apache.spark.sql.internal.SQLConf

trait HoodieCatalystPlansUtils {

/**
* Resolves output of the provided [[query]] against the [[expected]] list of [[Attribute]],
* and returns new (reshaped) instance of the [[LogicalPlan]]
*
* @param tableName used purely for more human-readable error output (if any)
* @param expected list of attributes output of the query has to adhere to
* @param query query whose output has to be reshaped
* @param byName whether the matching should occur by-name or positionally
* @param conf instance of [[SQLConf]]
* @return [[LogicalPlan]] which output is aligned to match to that of [[expected]]
*/
def resolveOutputColumns(tableName: String,
expected: Seq[Attribute],
query: LogicalPlan,
byName: Boolean,
conf: SQLConf): LogicalPlan

/**
* Instantiates an [[Explain]] command
*/
def createExplainCommand(plan: LogicalPlan, extended: Boolean): LogicalPlan

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,9 @@ private MessageType convertAvroSchemaToParquet(Schema schema) {
* @param oldSchema Older schema to check.
* @param newSchema Newer schema to check.
* @return True if the schema validation is successful
*
* TODO revisit this method: it's implemented incorrectly as it might be applying different criteria
* to top-level record and nested record (for ex, if that nested record is contained w/in an array)
*/
public static boolean isSchemaCompatible(Schema oldSchema, Schema newSchema) {
if (oldSchema.getType() == newSchema.getType() && newSchema.getType() == Schema.Type.RECORD) {
Expand Down Expand Up @@ -366,13 +369,31 @@ public static boolean isSchemaCompatible(String oldSchema, String newSchema) {
return isSchemaCompatible(new Schema.Parser().parse(oldSchema), new Schema.Parser().parse(newSchema));
}

/**
* Returns table's latest Avro {@link Schema} iff table is non-empty (ie there's at least
* a single commit)
*
* This method differs from {@link #getTableAvroSchema(boolean)} in that it won't fallback
* to use table's schema used at creation
*/
public Option<Schema> getTableAvroSchemaFromLatestCommit(boolean includeMetadataFields) throws Exception {
if (metaClient.isTimelineNonEmpty()) {
return Option.of(getTableAvroSchemaInternal(includeMetadataFields, Option.empty()));
}

return Option.empty();
}

/**
* Get latest schema either from incoming schema or table schema.
* @param writeSchema incoming batch's write schema.
* @param convertTableSchemaToAddNamespace {@code true} if table schema needs to be converted. {@code false} otherwise.
* @param converterFn converter function to be called over table schema (to add namespace may be). Each caller can decide if any conversion is required.
* @return the latest schema.
*
* @deprecated will be removed (HUDI-4472)
*/
@Deprecated
public Schema getLatestSchema(Schema writeSchema, boolean convertTableSchemaToAddNamespace,
Function1<Schema, Schema> converterFn) {
Schema latestSchema = writeSchema;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.apache.avro.generic.GenericRecord
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.HoodieConversionUtils.toProperties
import org.apache.hudi.HoodieConversionUtils.{toProperties, toScalaOption}
import org.apache.hudi.HoodieWriterUtils._
import org.apache.hudi.avro.HoodieAvroUtils
import org.apache.hudi.client.{HoodieWriteResult, SparkRDDWriteClient}
Expand All @@ -31,7 +31,7 @@ import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model._
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.common.util.{CommitUtils, StringUtils}
import org.apache.hudi.common.util.{CommitUtils, Functions, StringUtils}
import org.apache.hudi.config.HoodieBootstrapConfig.{BASE_PATH, INDEX_CLASS_NAME}
import org.apache.hudi.config.{HoodieInternalConfig, HoodieWriteConfig}
import org.apache.hudi.exception.HoodieException
Expand Down Expand Up @@ -72,8 +72,7 @@ object HoodieSparkSqlWriter {
hoodieTableConfigOpt: Option[HoodieTableConfig] = Option.empty,
hoodieWriteClient: Option[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]] = Option.empty,
asyncCompactionTriggerFn: Option[SparkRDDWriteClient[HoodieRecordPayload[Nothing]] => Unit] = Option.empty,
asyncClusteringTriggerFn: Option[SparkRDDWriteClient[HoodieRecordPayload[Nothing]] => Unit] = Option.empty
)
asyncClusteringTriggerFn: Option[SparkRDDWriteClient[HoodieRecordPayload[Nothing]] => Unit] = Option.empty)
: (Boolean, common.util.Option[String], common.util.Option[String], common.util.Option[String],
SparkRDDWriteClient[HoodieRecordPayload[Nothing]], HoodieTableConfig) = {

Expand Down Expand Up @@ -241,39 +240,49 @@ object HoodieSparkSqlWriter {
sparkContext.getConf.registerKryoClasses(
Array(classOf[org.apache.avro.generic.GenericData],
classOf[org.apache.avro.Schema]))
var schema = AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, nameSpace)
val lastestSchema = getLatestTableSchema(fs, basePath, sparkContext, schema)

// TODO(HUDI-4472) revisit and simplify schema handling
val sourceSchema = AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, nameSpace)
val latestTableSchema = getLatestTableSchema(fs, basePath, sparkContext).getOrElse(sourceSchema)

val schemaEvolutionEnabled = parameters.getOrDefault(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key(), "false").toBoolean
var internalSchemaOpt = getLatestTableInternalSchema(fs, basePath, sparkContext)
if (reconcileSchema && parameters.getOrDefault(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key(), "false").toBoolean
&& internalSchemaOpt.isEmpty) {
// force apply full schema evolution.
internalSchemaOpt = Some(AvroInternalSchemaConverter.convert(schema))
}
if (reconcileSchema) {
schema = lastestSchema
}
if (internalSchemaOpt.isDefined) {
// Apply schema evolution.
val mergedSparkSchema = if (!reconcileSchema) {
AvroConversionUtils.convertAvroSchemaToStructType(AvroSchemaEvolutionUtils.canonicalizeColumnNullability(schema, lastestSchema))

val writerSchema: Schema =
if (reconcileSchema) {
// In case we need to reconcile the schema and schema evolution is enabled,
// we will force-apply schema evolution to the writer's schema
if (schemaEvolutionEnabled && internalSchemaOpt.isEmpty) {
internalSchemaOpt = Some(AvroInternalSchemaConverter.convert(sourceSchema))
}

if (internalSchemaOpt.isDefined) {
// Apply schema evolution, by auto-merging write schema and read schema
val mergedInternalSchema = AvroSchemaEvolutionUtils.reconcileSchema(sourceSchema, internalSchemaOpt.get)
AvroInternalSchemaConverter.convert(mergedInternalSchema, latestTableSchema.getName)
} else if (TableSchemaResolver.isSchemaCompatible(sourceSchema, latestTableSchema)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so essentially, this schema reconciliation is not handled well in schema evolution code path ? may be can you file a tracking ticket.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's one linked on top of this code section

// In case schema reconciliation is enabled and source and latest table schemas
// are compatible (as defined by [[TableSchemaResolver#isSchemaCompatible]], then we will
// pick latest table's schema as the writer's schema
latestTableSchema
} else {
// Otherwise fallback to original source's schema
sourceSchema
}
} else {
// Auto merge write schema and read schema.
val mergedInternalSchema = AvroSchemaEvolutionUtils.reconcileSchema(schema, internalSchemaOpt.get)
AvroConversionUtils.convertAvroSchemaToStructType(AvroInternalSchemaConverter.convert(mergedInternalSchema, lastestSchema.getName))
// In case reconciliation is disabled, we still have to do nullability attributes
// (minor) reconciliation, making sure schema of the incoming batch is in-line with
// the data already committed in the table
AvroSchemaEvolutionUtils.canonicalizeColumnNullability(sourceSchema, latestTableSchema)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Notice:
The avro Schema created by AvroSchemaEvolutionUtils.canonicalizecolumnnullability has a different namespce with the schema created by AvroConversionUtils.convertStructTypeToAvroSchema.
Avro 1.8.2 is Namespace sensitive.
In the original logic, we will eventually call AvroConversionUtils.convertStructTypeToAvroSchema to maintain namespace consistency

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alexeykudinkin : prior to this patch, when reconcileSchema is disabled and when no schema evolution is enabled, where do we do canonicalizecolumnnullability? would you mind pointing me to the code path.

Copy link
Contributor Author

@alexeykudinkin alexeykudinkin Jul 28, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good call out @xiarixiaoyao. Do you think it still might be an issue even if all our tests are green (for Spark 2.4)?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the original logic, we will eventually call AvroConversionUtils.convertStructTypeToAvroSchema to maintain namespace consistency

@alexeykudinkin do we have a UT to ensure namespace consistent?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What i meant is that all of our Spark writing is proxied t/h this code and therefore would there be issues due to namespacing in Avro 1.8 (Spark 2.4) we would learn it pretty fast

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok that sounds fair. i'm more curious on what different namespaces we might get? by right Hudi should dominate the namespace with some constant like hoodie_namespace for e.g., then we should UT cover this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might get auto-gen'd namespace when we convert incoming StructType into Schema. Frankly, it's of little variance given how many times we do back-n-forth conversions. We just need to make sure it's not breaking anything

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alexeykudinkin
I have encounter this problem before,So in the original code logic, I finally added avroconversionutils Convertstructtypetoavroschema to maintain the consistency of the namespace. let me check it again with new branch code.
another things:
line 281, we register a writerSchema to kryo, let named this writerSchema as schema1;
line 175 in HoodieSparkUtils we use another schema to convert internal row to avro record, let named this schema as schema2
schema1 and schema2 has different namespace, I'm not sure if this will affect performance

}
schema = AvroConversionUtils.convertStructTypeToAvroSchema(mergedSparkSchema, structName, nameSpace)
}

if (reconcileSchema && internalSchemaOpt.isEmpty) {
schema = lastestSchema
}
validateSchemaForHoodieIsDeleted(schema)
sparkContext.getConf.registerAvroSchemas(schema)
log.info(s"Registered avro schema : ${schema.toString(true)}")
validateSchemaForHoodieIsDeleted(writerSchema)
sparkContext.getConf.registerAvroSchemas(writerSchema)
log.info(s"Registered avro schema : ${writerSchema.toString(true)}")

// Convert to RDD[HoodieRecord]
val genericRecords: RDD[GenericRecord] = HoodieSparkUtils.createRdd(df, structName, nameSpace, reconcileSchema,
org.apache.hudi.common.util.Option.of(schema))
org.apache.hudi.common.util.Option.of(writerSchema))
val shouldCombine = parameters(INSERT_DROP_DUPS.key()).toBoolean ||
operation.equals(WriteOperationType.UPSERT) ||
parameters.getOrElse(HoodieWriteConfig.COMBINE_BEFORE_INSERT.key(),
Expand All @@ -295,10 +304,10 @@ object HoodieSparkSqlWriter {
hoodieRecord
}).toJavaRDD()

val writeSchema = if (dropPartitionColumns) generateSchemaWithoutPartitionColumns(partitionColumns, schema) else schema
val writerDataSchema = if (dropPartitionColumns) generateSchemaWithoutPartitionColumns(partitionColumns, writerSchema) else writerSchema
// Create a HoodieWriteClient & issue the write.

val client = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc, writeSchema.toString, path,
val client = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc, writerDataSchema.toString, path,
tblName, mapAsJavaMap(addSchemaEvolutionParameters(parameters, internalSchemaOpt) - HoodieWriteConfig.AUTO_COMMIT_ENABLE.key)
)).asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]]

Expand Down Expand Up @@ -388,14 +397,18 @@ object HoodieSparkSqlWriter {
* @param schema incoming record's schema.
* @return Pair of(boolean, table schema), where first entry will be true only if schema conversion is required.
*/
def getLatestTableSchema(fs: FileSystem, basePath: Path, sparkContext: SparkContext, schema: Schema): Schema = {
var latestSchema: Schema = schema
def getLatestTableSchema(fs: FileSystem, basePath: Path, sparkContext: SparkContext): Option[Schema] = {
if (FSUtils.isTableExists(basePath.toString, fs)) {
val tableMetaClient = HoodieTableMetaClient.builder.setConf(sparkContext.hadoopConfiguration).setBasePath(basePath.toString).build()
val tableMetaClient = HoodieTableMetaClient.builder
.setConf(sparkContext.hadoopConfiguration)
.setBasePath(basePath.toString)
.build()
val tableSchemaResolver = new TableSchemaResolver(tableMetaClient)
latestSchema = tableSchemaResolver.getLatestSchema(schema, false, null)

toScalaOption(tableSchemaResolver.getTableAvroSchemaFromLatestCommit(false))
} else {
None
}
latestSchema
}

def registerKryoClassesAndGetGenericRecords(tblName: String, sparkContext: SparkContext, df: Dataset[Row],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,8 +317,8 @@ object HoodieSqlCommonUtils extends SparkAdapterSupport {
def castIfNeeded(child: Expression, dataType: DataType, conf: SQLConf): Expression = {
child match {
case Literal(nul, NullType) => Literal(nul, dataType)
case _ => if (child.dataType != dataType)
Cast(child, dataType, Option(conf.sessionLocalTimeZone)) else child
case expr if child.dataType != dataType => Cast(expr, dataType, Option(conf.sessionLocalTimeZone))
case _ => child
}
}

Expand Down
Loading