[HUDI-5329] Spark reads table error when Flink creates table without record key and primary key#8933
[HUDI-5329] Spark reads table error when Flink creates table without record key and primary key#8933SteNicholas wants to merge 2 commits intoapache:masterfrom
Conversation
| */ | ||
| public static void checkPreCombineField(Configuration conf, List<String> columnNames) { | ||
| String preCombineField = conf.get(FlinkOptions.PRECOMBINE_FIELD); | ||
| if (!columnNames.contains(preCombineField)) { |
There was a problem hiding this comment.
Does it still throw after spark pkless support: #8107
There was a problem hiding this comment.
@danny0405, it should still throw after spark pkless support #8107, because #8107 is used for auto generation of record keys not precombine field.
There was a problem hiding this comment.
After #8107, spark will not throw exception if there is no primary key, maybe we should just fix the primary key set up.
There was a problem hiding this comment.
@SteNicholas This comment does not look like resolved. Can you please revisit?
|
@danny0405, I have addressed above comments. PTAL. |
| if (!flinkConf.contains(FlinkOptions.RECORD_KEY_FIELD)) { | ||
| if (catalogTable.getUnresolvedSchema().getPrimaryKey().isPresent()) { | ||
| final String pkColumns = String.join(",", catalogTable.getUnresolvedSchema().getPrimaryKey().get().getColumnNames()); | ||
| flinkConf.setString(FlinkOptions.RECORD_KEY_FIELD, pkColumns); |
There was a problem hiding this comment.
Why we must have a primary key definition then?
| if (!resolvedSchema.getColumnNames().contains(preCombineField)) { | ||
| if (OptionsResolver.isDefaultHoodieRecordPayloadClazz(conf)) { | ||
| throw new HoodieValidationException("Option '" + FlinkOptions.PRECOMBINE_FIELD.key() | ||
| + "' is required for payload class: " + DefaultHoodieRecordPayload.class.getName()); |
There was a problem hiding this comment.
The preCombine field can be omitted only when it is append mode.
…record key and primary key
…record key and primary key
|
@danny0405, could you please take a look? |
There was a problem hiding this comment.
@SteNicholas Some comments did not look like resolved. Can you please respond to Danny's comments? Also, please rebase after addressing the comments. We have some test fixes that went in last one month.
| */ | ||
| public static void checkPreCombineField(Configuration conf, List<String> columnNames) { | ||
| String preCombineField = conf.get(FlinkOptions.PRECOMBINE_FIELD); | ||
| if (!columnNames.contains(preCombineField)) { |
There was a problem hiding this comment.
@SteNicholas This comment does not look like resolved. Can you please revisit?
Change Logs
Spark reads table error when Flink creates table without
precombine.field. Spark should read table successfully when Flink create table withoutprecombine.field.Impact
Flink
HoodieCatalogandHoodieHiveCatalogchecksprecombine.fieldforcreateTable.Risk level (write none, low medium or high below)
none.
Documentation Update
none.
Contributor's checklist