Skip to content

Commit

Permalink
[HUDI-4638] Rename payload clazz and preCombine field options for fli…
Browse files Browse the repository at this point in the history
…nk sql (#6434)
  • Loading branch information
danny0405 committed Aug 19, 2022
1 parent a6740d0 commit 39b16c5
Showing 1 changed file with 43 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ private FlinkOptions() {
// ------------------------------------------------------------------------
// Base Options
// ------------------------------------------------------------------------

public static final ConfigOption<String> PATH = ConfigOptions
.key("path")
.stringType()
Expand All @@ -79,6 +80,38 @@ private FlinkOptions() {
// Common Options
// ------------------------------------------------------------------------

public static final ConfigOption<String> TABLE_NAME = ConfigOptions
.key(HoodieWriteConfig.TBL_NAME.key())
.stringType()
.noDefaultValue()
.withDescription("Table name to register to Hive metastore");

public static final String TABLE_TYPE_COPY_ON_WRITE = HoodieTableType.COPY_ON_WRITE.name();
public static final String TABLE_TYPE_MERGE_ON_READ = HoodieTableType.MERGE_ON_READ.name();
public static final ConfigOption<String> TABLE_TYPE = ConfigOptions
.key("table.type")
.stringType()
.defaultValue(TABLE_TYPE_COPY_ON_WRITE)
.withDescription("Type of table to write. COPY_ON_WRITE (or) MERGE_ON_READ");

public static final String NO_PRE_COMBINE = "no_precombine";
public static final ConfigOption<String> PRECOMBINE_FIELD = ConfigOptions
.key("payload.ordering.field")
.stringType()
.defaultValue("ts")
.withFallbackKeys("write.precombine.field")
.withDescription("Field used in preCombining before actual write. When two records have the same\n"
+ "key value, we will pick the one with the largest value for the precombine field,\n"
+ "determined by Object.compareTo(..)");

public static final ConfigOption<String> PAYLOAD_CLASS_NAME = ConfigOptions
.key("payload.class")
.stringType()
.defaultValue(EventTimeAvroPayload.class.getName())
.withFallbackKeys("write.payload.class")
.withDescription("Payload class used. Override this, if you like to roll your own merge logic, when upserting/inserting.\n"
+ "This will render any value set for the option in-effective");

public static final ConfigOption<String> PARTITION_DEFAULT_NAME = ConfigOptions
.key("partition.default_name")
.stringType()
Expand Down Expand Up @@ -116,6 +149,7 @@ private FlinkOptions() {
// ------------------------------------------------------------------------
// Index Options
// ------------------------------------------------------------------------

public static final ConfigOption<String> INDEX_TYPE = ConfigOptions
.key("index.type")
.stringType()
Expand Down Expand Up @@ -150,6 +184,7 @@ private FlinkOptions() {
// ------------------------------------------------------------------------
// Read Options
// ------------------------------------------------------------------------

public static final ConfigOption<Integer> READ_TASKS = ConfigOptions
.key("read.tasks")
.intType()
Expand Down Expand Up @@ -247,19 +282,6 @@ private FlinkOptions() {
// ------------------------------------------------------------------------
// Write Options
// ------------------------------------------------------------------------
public static final ConfigOption<String> TABLE_NAME = ConfigOptions
.key(HoodieWriteConfig.TBL_NAME.key())
.stringType()
.noDefaultValue()
.withDescription("Table name to register to Hive metastore");

public static final String TABLE_TYPE_COPY_ON_WRITE = HoodieTableType.COPY_ON_WRITE.name();
public static final String TABLE_TYPE_MERGE_ON_READ = HoodieTableType.MERGE_ON_READ.name();
public static final ConfigOption<String> TABLE_TYPE = ConfigOptions
.key("table.type")
.stringType()
.defaultValue(TABLE_TYPE_COPY_ON_WRITE)
.withDescription("Type of table to write. COPY_ON_WRITE (or) MERGE_ON_READ");

public static final ConfigOption<Boolean> INSERT_CLUSTER = ConfigOptions
.key("write.insert.cluster")
Expand All @@ -275,22 +297,6 @@ private FlinkOptions() {
.defaultValue("upsert")
.withDescription("The write operation, that this write should do");

public static final String NO_PRE_COMBINE = "no_precombine";
public static final ConfigOption<String> PRECOMBINE_FIELD = ConfigOptions
.key("write.precombine.field")
.stringType()
.defaultValue("ts")
.withDescription("Field used in preCombining before actual write. When two records have the same\n"
+ "key value, we will pick the one with the largest value for the precombine field,\n"
+ "determined by Object.compareTo(..)");

public static final ConfigOption<String> PAYLOAD_CLASS_NAME = ConfigOptions
.key("write.payload.class")
.stringType()
.defaultValue(EventTimeAvroPayload.class.getName())
.withDescription("Payload class used. Override this, if you like to roll your own merge logic, when upserting/inserting.\n"
+ "This will render any value set for the option in-effective");

/**
* Flag to indicate whether to drop duplicates before insert/upsert.
* By default false to gain extra performance.
Expand Down Expand Up @@ -395,7 +401,7 @@ private FlinkOptions() {
.key("write.index_bootstrap.tasks")
.intType()
.noDefaultValue()
.withDescription("Parallelism of tasks that do index bootstrap, default same as the sink parallelism");
.withDescription("Parallelism of tasks that do index bootstrap, default same as the write task parallelism");

public static final ConfigOption<Integer> BUCKET_ASSIGN_TASKS = ConfigOptions
.key("write.bucket_assign.tasks")
Expand Down Expand Up @@ -580,12 +586,12 @@ private FlinkOptions() {
+ "This also directly translates into how much you can incrementally pull on this table, default 30");

public static final ConfigOption<Integer> CLEAN_RETAIN_HOURS = ConfigOptions
.key("clean.retain_hours")
.intType()
.defaultValue(24)// default 24 hours
.withDescription("Number of hours for which commits need to be retained. This config provides a more flexible option as"
+ "compared to number of commits retained for cleaning service. Setting this property ensures all the files, but the latest in a file group,"
+ " corresponding to commits with commit times older than the configured number of hours to be retained are cleaned.");
.key("clean.retain_hours")
.intType()
.defaultValue(24)// default 24 hours
.withDescription("Number of hours for which commits need to be retained. This config provides a more flexible option as"
+ "compared to number of commits retained for cleaning service. Setting this property ensures all the files, but the latest in a file group,"
+ " corresponding to commits with commit times older than the configured number of hours to be retained are cleaned.");

public static final ConfigOption<Integer> CLEAN_RETAIN_FILE_VERSIONS = ConfigOptions
.key("clean.retain_file_versions")
Expand Down Expand Up @@ -691,6 +697,7 @@ private FlinkOptions() {
// ------------------------------------------------------------------------
// Hive Sync Options
// ------------------------------------------------------------------------

public static final ConfigOption<Boolean> HIVE_SYNC_ENABLED = ConfigOptions
.key("hive_sync.enable")
.booleanType()
Expand Down

0 comments on commit 39b16c5

Please sign in to comment.