Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-7378] Fix Spark SQL DML with custom key generator #10615

Merged

Conversation

yihua
Copy link
Contributor

@yihua yihua commented Feb 4, 2024

Change Logs

Before this PR, Spark SQL DML does not work for a partitioned table with CustomKeyGenerator. The CustomKeyGenerator requires the write config of the partition fields (hoodie.datasource.write.partitionpath.field) to be in the format of field:type (e.g., ts:timestamp,segment:simple). However, the table config hoodie.table.partition.fields in hoodie.properties only saves the field names (e.g., ts,segment). The Spark SQL writer picks up the partition field names automatically from the hoodie.table.partition.fields in hoodie.properties to set hoodie.datasource.write.partitionpath.field, which does not work with the CustomKeyGenerator (errors attached). And hoodie.datasource.write.partitionpath.field cannot be overwritten from either SQL statement or the catalog table properties (stored in catalogProperties of HoodieCatalogTable).

This PR fixes Spark SQL DML with CustomKeyGenerator by allowing the ALTER TABLE or CREATE TABLE to set hoodie.datasource.write.partitionpath.field to overwrite the write config of the partition fields for CustomKeyGenerator to function properly. The table config hoodie.table.partition.fields in hoodie.properties is still in the same format. The syntax to overwrite the write config of the partition fields looks like:

CREATE TABLE table_name USING HUDI
location '/table/path'
TBLPROPERTIES (
  hoodie.datasource.write.partitionpath.field = 'ts:timestamp,segment:simple'
)
---
ALTER TABLE table_name
SET TBLPROPERTIES (
  hoodie.datasource.write.partitionpath.field = 'ts:timestamp,segment:simple'
)

After adding the write config of partition path fields to the catalog table, it looks like this in Spark catalog:

spark-sql (default)> DESCRIBE TABLE formatted h0;
24/04/12 13:59:53 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
_hoodie_commit_time 	string              	                    
_hoodie_commit_seqno	string              	                    
_hoodie_record_key  	string              	                    
_hoodie_partition_path	string              	                    
_hoodie_file_name   	string              	                    
id                  	int                 	                    
name                	string              	                    
price               	decimal(5,1)        	                    
ts                  	int                 	                    
segment             	string              	                    
# Partition Information	                    	                    
# col_name          	data_type           	comment             
ts                  	int                 	                    
segment             	string              	                    
                    	                    	                    
# Detailed Table Information	                    	                    
Catalog             	spark_catalog       	                    
Database            	default             	                    
Table               	h0                  	                    
Owner               	ethan               	                    
Created Time        	Fri Apr 12 13:58:05 PDT 2024	                    
Last Access         	UNKNOWN             	                    
Created By          	Spark 3.5.1         	                    
Type                	EXTERNAL            	                    
Provider            	hudi                	                    
Table Properties    	[hoodie.datasource.write.partitionpath.field=ts:timestamp,segment:simple, preCombineField=name, primaryKey=id, provider=hudi, type=cow]	                    
Location            	file:/private/var/folders/60/wk8qzx310fd32b2dp7mhzvdc0000gn/T/spark-4ac6fb47-e20b-4679-a668-e28238ec3e05/h0	                    
Serde Library       	org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe	                    
InputFormat         	org.apache.hudi.hadoop.HoodieParquetInputFormat	                    
OutputFormat        	org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat	                    
Time taken: 1.694 seconds, Fetched 30 row(s)

hoodie.properities of the table:

#Properties saved on 2024-04-12T20:58:04.861Z
#Fri Apr 12 13:58:04 PDT 2024
hoodie.table.precombine.field=name
hoodie.table.version=6
hoodie.database.name=default
hoodie.datasource.write.hive_style_partitioning=false
hoodie.table.metadata.partitions.inflight=
hoodie.table.keygenerator.type=CUSTOM
hoodie.table.create.schema={"type"\:"record","name"\:"h0_record","namespace"\:"hoodie.h0","fields"\:[{"name"\:"id","type"\:["int","null"]},{"name"\:"name","type"\:["string","null"]},{"name"\:"price","type"\:[{"type"\:"fixed","name"\:"fixed","namespace"\:"hoodie.h0.h0_record.price","size"\:3,"logicalType"\:"decimal","precision"\:5,"scale"\:1},"null"]},{"name"\:"ts","type"\:["int","null"]},{"name"\:"segment","type"\:["string","null"]}]}
hoodie.partition.metafile.use.base.format=false
hoodie.archivelog.folder=archived
hoodie.table.cdc.enabled=false
hoodie.table.name=h0
hoodie.populate.meta.fields=true
hoodie.table.type=COPY_ON_WRITE
hoodie.datasource.write.partitionpath.urlencode=false
hoodie.table.base.file.format=PARQUET
hoodie.datasource.write.drop.partition.columns=false
hoodie.table.metadata.partitions=files
hoodie.timeline.layout.version=1
hoodie.table.partition.fields=ts,segment
hoodie.table.recordkey.fields=id
hoodie.table.checksum=1281977830

Conflict of partition path field write config (ts:timestamp,segment:simple) with the field names (ts,segment) for custom key generator:

Config conflict(key	current value	existing value):
PartitionPath:	ts:timestamp,segment:simple	ts,segment
org.apache.hudi.exception.HoodieException: Config conflict(key	current value	existing value):
PartitionPath:	ts:timestamp,segment:simple	ts,segment
	at org.apache.hudi.HoodieWriterUtils$.validateTableConfig(HoodieWriterUtils.scala:216)
	at org.apache.hudi.HoodieWriterUtils$.validateTableConfig(HoodieWriterUtils.scala:159)
	at org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable.parseSchemaAndConfigs(HoodieCatalogTable.scala:254)
	at org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable.initHoodieTable(HoodieCatalogTable.scala:192)
	at org.apache.spark.sql.hudi.command.CreateHoodieTableCommand.run(CreateHoodieTableCommand.scala:71)

Exception of the CustomKeyGenerator failure:

Caused by: org.apache.hudi.exception.HoodieKeyException: Unable to find field names for partition path in proper format
	at org.apache.hudi.keygen.CustomAvroKeyGenerator.lambda$getPartitionKeyGenerators$1(CustomAvroKeyGenerator.java:81)
	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
	at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)
	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
	at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
	at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566)
	at org.apache.hudi.keygen.CustomAvroKeyGenerator.getPartitionKeyGenerators(CustomAvroKeyGenerator.java:97)
	at org.apache.hudi.keygen.CustomAvroKeyGenerator.<init>(CustomAvroKeyGenerator.java:71)
	at org.apache.hudi.keygen.CustomKeyGenerator.<init>(CustomKeyGenerator.java:74)
	... 39 more

Impact

Fixes Spark SQL DML with CustomKeyGenerator. A new test class, TestSparkSqlWithCustomKeyGenerator, is added to test all DML with CustomKeyGenerator, and the case of setting hoodie.datasource.write.partitionpath.field with ALTER TABLE .. SET TBLPROPERTIES (..) or CREATE TABLE .. TBLPROPERTIES (..) at the Spark catalog table level.

Risk level

low

Documentation Update

N/A

Contributor's checklist

  • Read through contributor's guide
  • Change Logs and Impact were stated clearly
  • Adequate tests were added if applicable
  • CI passed

@yihua yihua force-pushed the HUDI-7378-fix-spark-sql-dml-with-custom-keygen branch from 7abdbd8 to afc107a Compare February 4, 2024 00:15
@github-actions github-actions bot added the size:L PR with lines of changes in (300, 1000] label Feb 26, 2024
@yihua yihua force-pushed the HUDI-7378-fix-spark-sql-dml-with-custom-keygen branch 2 times, most recently from 23c4af9 to 185d0fc Compare March 28, 2024 06:15
Copy link
Member

@codope codope left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. is there any change to partitions in hoodie.proerties? Do we now write it as field1:type,field2:type2 when using CustomKeyGenerator?
  2. Thanks for adding extensive tests. Can you please look into the failures? They seem related to the patch.

Copy link
Member

@codope codope left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mistakenly approved earlier.

@yihua yihua force-pushed the HUDI-7378-fix-spark-sql-dml-with-custom-keygen branch from 185d0fc to c376900 Compare April 11, 2024 01:42
@yihua
Copy link
Contributor Author

yihua commented Apr 11, 2024

  1. is there any change to partitions in hoodie.proerties? Do we now write it as field1:type,field2:type2 when using CustomKeyGenerator?

There is no change to the table configs in hoodie.properties, i.e., the hoodie.table.partition.fields contains the comma-separated list of partition field names like "segment,ts" (no type for custom key generator). This PR opens the opportunity to override the hoodie.datasource.write.partitionpath.field with SET TBLPROPERTIES at the table level in the Spark catalog, so that SQL DML can derive the correct write config of the partition fields (e.g., "segment:simple,ts:timestamp" instead of "segment,ts").

  1. Thanks for adding extensive tests. Can you please look into the failures? They seem related to the patch.

Failures for Spark 3.2 and above are fixed. I'm looking into failures for older Spark versions.

Copy link
Contributor

@jonvex jonvex left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like that this has the benefit of not breaking tables with their existing hoodie.table.recordkey.fields, but I am curious about any other approaches you thought about. From you test code, it looks like we can't use partitioned by (dt:int,idk:string)
when creating the table. I don't think that should block this pr from landing, but in the documentation for SQL: https://hudi.apache.org/docs/sql_ddl#create-partitioned-table I think we should add an example

Also, I think think this change will help us to fix partition pruning which currently does not work with timestamp keygen: https://issues.apache.org/jira/browse/HUDI-6614

// in hoodie.properties stores "col,ts".
// The "params" here may only contain the write config of partition path field,
// so we need to pass in the validated key generator class name.
val validatedKeyGenClassName = if (tableConfigKeyGen != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So when hoodie.datasource.write.partitionpath.field is set, we don't set hoodie.table.partition.fields ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only the hoodie.datasource.write.partitionpath.field takes effect in the writer path. Before the fix, the write config is automatically set by the SQL writer based on the value of table config hoodie.table.partition.fields.

} else {
val writeConfigPartitionField = catalogTable.catalogProperties.get(PARTITIONPATH_FIELD.key())
val keyGenClass = ReflectionUtils.getClass(tableConfigKeyGeneratorClassName)
if (classOf[CustomKeyGenerator].equals(keyGenClass)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to make this cover any classes that extend customkeygen as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The assumption is that these key generators should not be extended. We should keep it this way for now.

}
}

private def testInsertInto1(tableName: String,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be named better

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or can you at least add some more comments on what the helper functions are doing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I renamed the methods. Let me know if it looks good.

if (StringUtils.isNullOrEmpty(tableConfigKeyGeneratorClassName)) {
partitionFieldNamesWithoutKeyGenType
} else {
val writeConfigPartitionField = catalogTable.catalogProperties.get(PARTITIONPATH_FIELD.key())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is HoodieCatalogTable persisted between sessions? When you add an existing hudi table in spark sql you only need column names usually right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the table properties associated with HoodieCatalogTable are persisted across Spark sessions. The persisted partition field write config hoodie.datasource.write.partitionpath.field is a custom config outside Spark, which is used by Hudi logic only.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As an example, the table looks like this in Spark catalog:

spark-sql (default)> DESCRIBE TABLE formatted h0;
24/04/12 13:59:53 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
_hoodie_commit_time 	string              	                    
_hoodie_commit_seqno	string              	                    
_hoodie_record_key  	string              	                    
_hoodie_partition_path	string              	                    
_hoodie_file_name   	string              	                    
id                  	int                 	                    
name                	string              	                    
price               	decimal(5,1)        	                    
ts                  	int                 	                    
segment             	string              	                    
# Partition Information	                    	                    
# col_name          	data_type           	comment             
ts                  	int                 	                    
segment             	string              	                    
                    	                    	                    
# Detailed Table Information	                    	                    
Catalog             	spark_catalog       	                    
Database            	default             	                    
Table               	h0                  	                    
Owner               	ethan               	                    
Created Time        	Fri Apr 12 13:58:05 PDT 2024	                    
Last Access         	UNKNOWN             	                    
Created By          	Spark 3.5.1         	                    
Type                	EXTERNAL            	                    
Provider            	hudi                	                    
Table Properties    	[hoodie.datasource.write.partitionpath.field=ts:timestamp,segment:simple, preCombineField=name, primaryKey=id, provider=hudi, type=cow]	                    
Location            	file:/private/var/folders/60/wk8qzx310fd32b2dp7mhzvdc0000gn/T/spark-4ac6fb47-e20b-4679-a668-e28238ec3e05/h0	                    
Serde Library       	org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe	                    
InputFormat         	org.apache.hudi.hadoop.HoodieParquetInputFormat	                    
OutputFormat        	org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat	                    
Time taken: 1.694 seconds, Fetched 30 row(s)

@yihua
Copy link
Contributor Author

yihua commented Apr 12, 2024

I like that this has the benefit of not breaking tables with their existing hoodie.table.recordkey.fields, but I am curious about any other approaches you thought about. From you test code, it looks like we can't use partitioned by (dt:int,idk:string) when creating the table. I don't think that should block this pr from landing, but in the documentation for SQL: https://hudi.apache.org/docs/sql_ddl#create-partitioned-table I think we should add an example

Good point. I tried partitioned by statement but it did not work either, due to the same the write config of the partition fields. But you're right that adding a new table config indicating the partition field types should solve the problem fundamentally. We should update the SQL docs on any gaps here.

Also, I think think this change will help us to fix partition pruning which currently does not work with timestamp keygen: https://issues.apache.org/jira/browse/HUDI-6614

Right.

@hudi-bot
Copy link

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
release-0.15.0 size:L PR with lines of changes in (300, 1000]
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants