-
Notifications
You must be signed in to change notification settings - Fork 2.4k
/
FlinkOptions.java
751 lines (648 loc) · 34.1 KB
/
FlinkOptions.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.configuration;
import org.apache.hudi.common.config.ConfigClassProperty;
import org.apache.hudi.common.config.ConfigGroups;
import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.hudi.keygen.constant.KeyGeneratorType;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* Hoodie Flink config options.
*
* <p>It has the options for Hoodie table read and write. It also defines some utilities.
*/
@ConfigClassProperty(name = "Flink Options",
groupName = ConfigGroups.Names.FLINK_SQL,
description = "Flink jobs using the SQL can be configured through the options in WITH clause."
+ " The actual datasource level configs are listed below.")
public class FlinkOptions extends HoodieConfig {
private FlinkOptions() {
}
// ------------------------------------------------------------------------
// Base Options
// ------------------------------------------------------------------------
public static final ConfigOption<String> PATH = ConfigOptions
.key("path")
.stringType()
.noDefaultValue()
.withDescription("Base path for the target hoodie table.\n"
+ "The path would be created if it does not exist,\n"
+ "otherwise a Hoodie table expects to be initialized successfully");
// ------------------------------------------------------------------------
// Common Options
// ------------------------------------------------------------------------
public static final ConfigOption<String> PARTITION_DEFAULT_NAME = ConfigOptions
.key("partition.default_name")
.stringType()
.defaultValue("default") // keep sync with hoodie style
.withDescription("The default partition name in case the dynamic partition"
+ " column value is null/empty string");
public static final ConfigOption<Boolean> CHANGELOG_ENABLED = ConfigOptions
.key("changelog.enabled")
.booleanType()
.defaultValue(false)
.withDescription("Whether to keep all the intermediate changes, "
+ "we try to keep all the changes of a record when enabled:\n"
+ "1). The sink accept the UPDATE_BEFORE message;\n"
+ "2). The source try to emit every changes of a record.\n"
+ "The semantics is best effort because the compaction job would finally merge all changes of a record into one.\n"
+ " default false to have UPSERT semantics");
// ------------------------------------------------------------------------
// Metadata table Options
// ------------------------------------------------------------------------
public static final ConfigOption<Boolean> METADATA_ENABLED = ConfigOptions
.key("metadata.enabled")
.booleanType()
.defaultValue(false)
.withDescription("Enable the internal metadata table which serves table metadata like level file listings, default false");
public static final ConfigOption<Integer> METADATA_COMPACTION_DELTA_COMMITS = ConfigOptions
.key("metadata.compaction.delta_commits")
.intType()
.defaultValue(10)
.withDescription("Max delta commits for metadata table to trigger compaction, default 24");
// ------------------------------------------------------------------------
// Index Options
// ------------------------------------------------------------------------
public static final ConfigOption<Boolean> INDEX_BOOTSTRAP_ENABLED = ConfigOptions
.key("index.bootstrap.enabled")
.booleanType()
.defaultValue(false)
.withDescription("Whether to bootstrap the index state from existing hoodie table, default false");
public static final ConfigOption<Double> INDEX_STATE_TTL = ConfigOptions
.key("index.state.ttl")
.doubleType()
.defaultValue(0D)
.withDescription("Index state ttl in days, default stores the index permanently");
public static final ConfigOption<Boolean> INDEX_GLOBAL_ENABLED = ConfigOptions
.key("index.global.enabled")
.booleanType()
.defaultValue(true)
.withDescription("Whether to update index for the old partition path\n"
+ "if same key record with different partition path came in, default true");
public static final ConfigOption<String> INDEX_PARTITION_REGEX = ConfigOptions
.key("index.partition.regex")
.stringType()
.defaultValue(".*")
.withDescription("Whether to load partitions in state if partition path matching, default *");
// ------------------------------------------------------------------------
// Read Options
// ------------------------------------------------------------------------
public static final ConfigOption<Integer> READ_TASKS = ConfigOptions
.key("read.tasks")
.intType()
.defaultValue(4)
.withDescription("Parallelism of tasks that do actual read, default is 4");
public static final ConfigOption<String> SOURCE_AVRO_SCHEMA_PATH = ConfigOptions
.key("source.avro-schema.path")
.stringType()
.noDefaultValue()
.withDescription("Source avro schema file path, the parsed schema is used for deserialization");
public static final ConfigOption<String> SOURCE_AVRO_SCHEMA = ConfigOptions
.key("source.avro-schema")
.stringType()
.noDefaultValue()
.withDescription("Source avro schema string, the parsed schema is used for deserialization");
public static final String QUERY_TYPE_SNAPSHOT = "snapshot";
public static final String QUERY_TYPE_READ_OPTIMIZED = "read_optimized";
public static final String QUERY_TYPE_INCREMENTAL = "incremental";
public static final ConfigOption<String> QUERY_TYPE = ConfigOptions
.key("hoodie.datasource.query.type")
.stringType()
.defaultValue(QUERY_TYPE_SNAPSHOT)
.withDescription("Decides how data files need to be read, in\n"
+ "1) Snapshot mode (obtain latest view, based on row & columnar data);\n"
+ "2) incremental mode (new data since an instantTime);\n"
+ "3) Read Optimized mode (obtain latest view, based on columnar data)\n."
+ "Default: snapshot");
public static final String REALTIME_SKIP_MERGE = "skip_merge";
public static final String REALTIME_PAYLOAD_COMBINE = "payload_combine";
public static final ConfigOption<String> MERGE_TYPE = ConfigOptions
.key("hoodie.datasource.merge.type")
.stringType()
.defaultValue(REALTIME_PAYLOAD_COMBINE)
.withDescription("For Snapshot query on merge on read table. Use this key to define how the payloads are merged, in\n"
+ "1) skip_merge: read the base file records plus the log file records;\n"
+ "2) payload_combine: read the base file records first, for each record in base file, checks whether the key is in the\n"
+ " log file records(combines the two records with same key for base and log file records), then read the left log file records");
public static final ConfigOption<Boolean> UTC_TIMEZONE = ConfigOptions
.key("read.utc-timezone")
.booleanType()
.defaultValue(true)
.withDescription("Use UTC timezone or local timezone to the conversion between epoch"
+ " time and LocalDateTime. Hive 0.x/1.x/2.x use local timezone. But Hive 3.x"
+ " use UTC timezone, by default true");
public static final ConfigOption<Boolean> READ_AS_STREAMING = ConfigOptions
.key("read.streaming.enabled")
.booleanType()
.defaultValue(false)// default read as batch
.withDescription("Whether to read as streaming source, default false");
public static final ConfigOption<Integer> READ_STREAMING_CHECK_INTERVAL = ConfigOptions
.key("read.streaming.check-interval")
.intType()
.defaultValue(60)// default 1 minute
.withDescription("Check interval for streaming read of SECOND, default 1 minute");
// this option is experimental
public static final ConfigOption<Boolean> READ_STREAMING_SKIP_COMPACT = ConfigOptions
.key("read.streaming.skip_compaction")
.booleanType()
.defaultValue(false)// default read as batch
.withDescription("Whether to skip compaction instants for streaming read,\n"
+ "there are two cases that this option can be used to avoid reading duplicates:\n"
+ "1) you are definitely sure that the consumer reads faster than any compaction instants, "
+ "usually with delta time compaction strategy that is long enough, for e.g, one week;\n"
+ "2) changelog mode is enabled, this option is a solution to keep data integrity");
public static final String START_COMMIT_EARLIEST = "earliest";
public static final ConfigOption<String> READ_START_COMMIT = ConfigOptions
.key("read.start-commit")
.stringType()
.noDefaultValue()
.withDescription("Start commit instant for reading, the commit time format should be 'yyyyMMddHHmmss', "
+ "by default reading from the latest instant for streaming read");
public static final ConfigOption<String> READ_END_COMMIT = ConfigOptions
.key("read.end-commit")
.stringType()
.noDefaultValue()
.withDescription("End commit instant for reading, the commit time format should be 'yyyyMMddHHmmss'");
// ------------------------------------------------------------------------
// Write Options
// ------------------------------------------------------------------------
public static final ConfigOption<String> TABLE_NAME = ConfigOptions
.key(HoodieWriteConfig.TBL_NAME.key())
.stringType()
.noDefaultValue()
.withDescription("Table name to register to Hive metastore");
public static final String TABLE_TYPE_COPY_ON_WRITE = HoodieTableType.COPY_ON_WRITE.name();
public static final String TABLE_TYPE_MERGE_ON_READ = HoodieTableType.MERGE_ON_READ.name();
public static final ConfigOption<String> TABLE_TYPE = ConfigOptions
.key("table.type")
.stringType()
.defaultValue(TABLE_TYPE_COPY_ON_WRITE)
.withDescription("Type of table to write. COPY_ON_WRITE (or) MERGE_ON_READ");
public static final ConfigOption<Boolean> INSERT_CLUSTER = ConfigOptions
.key("write.insert.cluster")
.booleanType()
.defaultValue(false)
.withDescription("Whether to merge small files for insert mode, "
+ "if true, the write throughput will decrease because the read/write of existing small file, "
+ "only valid for COW table, default false");
public static final ConfigOption<String> OPERATION = ConfigOptions
.key("write.operation")
.stringType()
.defaultValue("upsert")
.withDescription("The write operation, that this write should do");
public static final String NO_PRE_COMBINE = "no_precombine";
public static final ConfigOption<String> PRECOMBINE_FIELD = ConfigOptions
.key("write.precombine.field")
.stringType()
.defaultValue("ts")
.withDescription("Field used in preCombining before actual write. When two records have the same\n"
+ "key value, we will pick the one with the largest value for the precombine field,\n"
+ "determined by Object.compareTo(..)");
public static final ConfigOption<String> PAYLOAD_CLASS_NAME = ConfigOptions
.key("write.payload.class")
.stringType()
.defaultValue(OverwriteWithLatestAvroPayload.class.getName())
.withDescription("Payload class used. Override this, if you like to roll your own merge logic, when upserting/inserting.\n"
+ "This will render any value set for the option in-effective");
/**
* Flag to indicate whether to drop duplicates before insert/upsert.
* By default false to gain extra performance.
*/
public static final ConfigOption<Boolean> PRE_COMBINE = ConfigOptions
.key("write.precombine")
.booleanType()
.defaultValue(false)
.withDescription("Flag to indicate whether to drop duplicates before insert/upsert.\n"
+ "By default these cases will accept duplicates, to gain extra performance:\n"
+ "1) insert operation;\n"
+ "2) upsert for MOR table, the MOR table deduplicate on reading");
public static final ConfigOption<Integer> RETRY_TIMES = ConfigOptions
.key("write.retry.times")
.intType()
.defaultValue(3)
.withDescription("Flag to indicate how many times streaming job should retry for a failed checkpoint batch.\n"
+ "By default 3");
public static final ConfigOption<Long> RETRY_INTERVAL_MS = ConfigOptions
.key("write.retry.interval.ms")
.longType()
.defaultValue(2000L)
.withDescription("Flag to indicate how long (by millisecond) before a retry should issued for failed checkpoint batch.\n"
+ "By default 2000 and it will be doubled by every retry");
public static final ConfigOption<Boolean> IGNORE_FAILED = ConfigOptions
.key("write.ignore.failed")
.booleanType()
.defaultValue(true)
.withDescription("Flag to indicate whether to ignore any non exception error (e.g. writestatus error). within a checkpoint batch.\n"
+ "By default true (in favor of streaming progressing over data integrity)");
public static final ConfigOption<String> RECORD_KEY_FIELD = ConfigOptions
.key(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key())
.stringType()
.defaultValue("uuid")
.withDescription("Record key field. Value to be used as the `recordKey` component of `HoodieKey`.\n"
+ "Actual value will be obtained by invoking .toString() on the field value. Nested fields can be specified using "
+ "the dot notation eg: `a.b.c`");
public static final ConfigOption<String> PARTITION_PATH_FIELD = ConfigOptions
.key(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key())
.stringType()
.defaultValue("")
.withDescription("Partition path field. Value to be used at the `partitionPath` component of `HoodieKey`.\n"
+ "Actual value obtained by invoking .toString(), default ''");
public static final ConfigOption<Boolean> URL_ENCODE_PARTITIONING = ConfigOptions
.key(KeyGeneratorOptions.URL_ENCODE_PARTITIONING.key())
.booleanType()
.defaultValue(false)
.withDescription("Whether to encode the partition path url, default false");
public static final ConfigOption<Boolean> HIVE_STYLE_PARTITIONING = ConfigOptions
.key(KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE.key())
.booleanType()
.defaultValue(false)
.withDescription("Whether to use Hive style partitioning.\n"
+ "If set true, the names of partition folders follow <partition_column_name>=<partition_value> format.\n"
+ "By default false (the names of partition folders are only partition values)");
public static final ConfigOption<String> KEYGEN_CLASS_NAME = ConfigOptions
.key(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key())
.stringType()
.defaultValue("")
.withDescription("Key generator class, that implements will extract the key out of incoming record");
public static final ConfigOption<String> KEYGEN_TYPE = ConfigOptions
.key(HoodieWriteConfig.KEYGENERATOR_TYPE.key())
.stringType()
.defaultValue(KeyGeneratorType.SIMPLE.name())
.withDescription("Key generator type, that implements will extract the key out of incoming record");
public static final String PARTITION_FORMAT_HOUR = "yyyyMMddHH";
public static final String PARTITION_FORMAT_DAY = "yyyyMMdd";
public static final ConfigOption<String> PARTITION_FORMAT = ConfigOptions
.key("write.partition.format")
.stringType()
.noDefaultValue()
.withDescription("Partition path format, only valid when 'write.datetime.partitioning' is true, default is:\n"
+ "1) 'yyyyMMddHH' for timestamp(3) WITHOUT TIME ZONE, LONG, FLOAT, DOUBLE, DECIMAL;\n"
+ "2) 'yyyyMMdd' for DAY and INT.");
public static final ConfigOption<Integer> INDEX_BOOTSTRAP_TASKS = ConfigOptions
.key("write.index_bootstrap.tasks")
.intType()
.noDefaultValue()
.withDescription("Parallelism of tasks that do index bootstrap, default is the parallelism of the execution environment");
public static final ConfigOption<Integer> BUCKET_ASSIGN_TASKS = ConfigOptions
.key("write.bucket_assign.tasks")
.intType()
.noDefaultValue()
.withDescription("Parallelism of tasks that do bucket assign, default is the parallelism of the execution environment");
public static final ConfigOption<Integer> WRITE_TASKS = ConfigOptions
.key("write.tasks")
.intType()
.defaultValue(4)
.withDescription("Parallelism of tasks that do actual write, default is 4");
public static final ConfigOption<Double> WRITE_TASK_MAX_SIZE = ConfigOptions
.key("write.task.max.size")
.doubleType()
.defaultValue(1024D) // 1GB
.withDescription("Maximum memory in MB for a write task, when the threshold hits,\n"
+ "it flushes the max size data bucket to avoid OOM, default 1GB");
public static final ConfigOption<Long> WRITE_RATE_LIMIT = ConfigOptions
.key("write.rate.limit")
.longType()
.defaultValue(0L) // default no limit
.withDescription("Write record rate limit per second to prevent traffic jitter and improve stability, default 0 (no limit)");
public static final ConfigOption<Double> WRITE_BATCH_SIZE = ConfigOptions
.key("write.batch.size")
.doubleType()
.defaultValue(256D) // 256MB
.withDescription("Batch buffer size in MB to flush data into the underneath filesystem, default 256MB");
public static final ConfigOption<Integer> WRITE_LOG_BLOCK_SIZE = ConfigOptions
.key("write.log_block.size")
.intType()
.defaultValue(128)
.withDescription("Max log block size in MB for log file, default 128MB");
public static final ConfigOption<Long> WRITE_LOG_MAX_SIZE = ConfigOptions
.key("write.log.max.size")
.longType()
.defaultValue(1024L)
.withDescription("Maximum size allowed in MB for a log file before it is rolled over to the next version, default 1GB");
public static final ConfigOption<Integer> WRITE_PARQUET_BLOCK_SIZE = ConfigOptions
.key("write.parquet.block.size")
.intType()
.defaultValue(120)
.withDescription("Parquet RowGroup size. It's recommended to make this large enough that scan costs can be"
+ " amortized by packing enough column values into a single row group.");
public static final ConfigOption<Integer> WRITE_PARQUET_MAX_FILE_SIZE = ConfigOptions
.key("write.parquet.max.file.size")
.intType()
.defaultValue(120)
.withDescription("Target size for parquet files produced by Hudi write phases. "
+ "For DFS, this needs to be aligned with the underlying filesystem block size for optimal performance.");
public static final ConfigOption<Integer> WRITE_PARQUET_PAGE_SIZE = ConfigOptions
.key("hoodie.parquet.page.size")
.intType()
.defaultValue(1)
.withDescription("Parquet page size. Page is the unit of read within a parquet file. "
+ "Within a block, pages are compressed separately.");
public static final ConfigOption<Integer> WRITE_MERGE_MAX_MEMORY = ConfigOptions
.key("write.merge.max_memory")
.intType()
.defaultValue(100) // default 100 MB
.withDescription("Max memory in MB for merge, default 100MB");
// this is only for internal use
public static final ConfigOption<Long> WRITE_COMMIT_ACK_TIMEOUT = ConfigOptions
.key("write.commit.ack.timeout")
.longType()
.defaultValue(-1L) // default at least once
.withDescription("Timeout limit for a writer task after it finishes a checkpoint and\n"
+ "waits for the instant commit success, only for internal use");
public static final ConfigOption<Boolean> WRITE_BULK_INSERT_SHUFFLE_BY_PARTITION = ConfigOptions
.key("write.bulk_insert.shuffle_by_partition")
.booleanType()
.defaultValue(true)
.withDescription("Whether to shuffle the inputs by partition path for bulk insert tasks, default true");
public static final ConfigOption<Boolean> WRITE_BULK_INSERT_SORT_BY_PARTITION = ConfigOptions
.key("write.bulk_insert.sort_by_partition")
.booleanType()
.defaultValue(true)
.withDescription("Whether to sort the inputs by partition path for bulk insert tasks, default true");
public static final ConfigOption<Integer> WRITE_SORT_MEMORY = ConfigOptions
.key("write.sort.memory")
.intType()
.defaultValue(128)
.withDescription("Sort memory in MB, default 128MB");
// ------------------------------------------------------------------------
// Compaction Options
// ------------------------------------------------------------------------
public static final ConfigOption<Boolean> COMPACTION_SCHEDULE_ENABLED = ConfigOptions
.key("compaction.schedule.enabled")
.booleanType()
.defaultValue(true) // default true for MOR write
.withDescription("Schedule the compaction plan, enabled by default for MOR");
public static final ConfigOption<Boolean> COMPACTION_ASYNC_ENABLED = ConfigOptions
.key("compaction.async.enabled")
.booleanType()
.defaultValue(true) // default true for MOR write
.withDescription("Async Compaction, enabled by default for MOR");
public static final ConfigOption<Integer> COMPACTION_TASKS = ConfigOptions
.key("compaction.tasks")
.intType()
.defaultValue(4) // default WRITE_TASKS * COMPACTION_DELTA_COMMITS * 0.2 (assumes 5 commits generate one bucket)
.withDescription("Parallelism of tasks that do actual compaction, default is 4");
public static final String NUM_COMMITS = "num_commits";
public static final String TIME_ELAPSED = "time_elapsed";
public static final String NUM_AND_TIME = "num_and_time";
public static final String NUM_OR_TIME = "num_or_time";
public static final ConfigOption<String> COMPACTION_TRIGGER_STRATEGY = ConfigOptions
.key("compaction.trigger.strategy")
.stringType()
.defaultValue(NUM_COMMITS) // default true for MOR write
.withDescription("Strategy to trigger compaction, options are 'num_commits': trigger compaction when reach N delta commits;\n"
+ "'time_elapsed': trigger compaction when time elapsed > N seconds since last compaction;\n"
+ "'num_and_time': trigger compaction when both NUM_COMMITS and TIME_ELAPSED are satisfied;\n"
+ "'num_or_time': trigger compaction when NUM_COMMITS or TIME_ELAPSED is satisfied.\n"
+ "Default is 'num_commits'");
public static final ConfigOption<Integer> COMPACTION_DELTA_COMMITS = ConfigOptions
.key("compaction.delta_commits")
.intType()
.defaultValue(5)
.withDescription("Max delta commits needed to trigger compaction, default 5 commits");
public static final ConfigOption<Integer> COMPACTION_DELTA_SECONDS = ConfigOptions
.key("compaction.delta_seconds")
.intType()
.defaultValue(3600) // default 1 hour
.withDescription("Max delta seconds time needed to trigger compaction, default 1 hour");
public static final ConfigOption<Integer> COMPACTION_TIMEOUT_SECONDS = ConfigOptions
.key("compaction.timeout.seconds")
.intType()
.defaultValue(1200) // default 20 minutes
.withDescription("Max timeout time in seconds for online compaction to rollback, default 20 minutes");
public static final ConfigOption<Integer> COMPACTION_MAX_MEMORY = ConfigOptions
.key("compaction.max_memory")
.intType()
.defaultValue(100) // default 100 MB
.withDescription("Max memory in MB for compaction spillable map, default 100MB");
public static final ConfigOption<Long> COMPACTION_TARGET_IO = ConfigOptions
.key("compaction.target_io")
.longType()
.defaultValue(500 * 1024L) // default 500 GB
.withDescription("Target IO per compaction (both read and write), default 500 GB");
public static final ConfigOption<Boolean> CLEAN_ASYNC_ENABLED = ConfigOptions
.key("clean.async.enabled")
.booleanType()
.defaultValue(true)
.withDescription("Whether to cleanup the old commits immediately on new commits, enabled by default");
public static final ConfigOption<Integer> CLEAN_RETAIN_COMMITS = ConfigOptions
.key("clean.retain_commits")
.intType()
.defaultValue(10)// default 10 commits
.withDescription("Number of commits to retain. So data will be retained for num_of_commits * time_between_commits (scheduled).\n"
+ "This also directly translates into how much you can incrementally pull on this table, default 10");
public static final ConfigOption<Integer> ARCHIVE_MAX_COMMITS = ConfigOptions
.key("archive.max_commits")
.intType()
.defaultValue(30)// default max 30 commits
.withDescription("Max number of commits to keep before archiving older commits into a sequential log, default 30");
public static final ConfigOption<Integer> ARCHIVE_MIN_COMMITS = ConfigOptions
.key("archive.min_commits")
.intType()
.defaultValue(20)// default min 20 commits
.withDescription("Min number of commits to keep before archiving older commits into a sequential log, default 20");
// ------------------------------------------------------------------------
// Hive Sync Options
// ------------------------------------------------------------------------
public static final ConfigOption<Boolean> HIVE_SYNC_ENABLED = ConfigOptions
.key("hive_sync.enable")
.booleanType()
.defaultValue(false)
.withDescription("Asynchronously sync Hive meta to HMS, default false");
public static final ConfigOption<String> HIVE_SYNC_DB = ConfigOptions
.key("hive_sync.db")
.stringType()
.defaultValue("default")
.withDescription("Database name for hive sync, default 'default'");
public static final ConfigOption<String> HIVE_SYNC_TABLE = ConfigOptions
.key("hive_sync.table")
.stringType()
.defaultValue("unknown")
.withDescription("Table name for hive sync, default 'unknown'");
public static final ConfigOption<String> HIVE_SYNC_FILE_FORMAT = ConfigOptions
.key("hive_sync.file_format")
.stringType()
.defaultValue("PARQUET")
.withDescription("File format for hive sync, default 'PARQUET'");
public static final ConfigOption<String> HIVE_SYNC_MODE = ConfigOptions
.key("hive_sync.mode")
.stringType()
.defaultValue("jdbc")
.withDescription("Mode to choose for Hive ops. Valid values are hms, jdbc and hiveql, default 'jdbc'");
public static final ConfigOption<String> HIVE_SYNC_USERNAME = ConfigOptions
.key("hive_sync.username")
.stringType()
.defaultValue("hive")
.withDescription("Username for hive sync, default 'hive'");
public static final ConfigOption<String> HIVE_SYNC_PASSWORD = ConfigOptions
.key("hive_sync.password")
.stringType()
.defaultValue("hive")
.withDescription("Password for hive sync, default 'hive'");
public static final ConfigOption<String> HIVE_SYNC_JDBC_URL = ConfigOptions
.key("hive_sync.jdbc_url")
.stringType()
.defaultValue("jdbc:hive2://localhost:10000")
.withDescription("Jdbc URL for hive sync, default 'jdbc:hive2://localhost:10000'");
public static final ConfigOption<String> HIVE_SYNC_METASTORE_URIS = ConfigOptions
.key("hive_sync.metastore.uris")
.stringType()
.defaultValue("")
.withDescription("Metastore uris for hive sync, default ''");
public static final ConfigOption<String> HIVE_SYNC_PARTITION_FIELDS = ConfigOptions
.key("hive_sync.partition_fields")
.stringType()
.defaultValue("")
.withDescription("Partition fields for hive sync, default ''");
public static final ConfigOption<String> HIVE_SYNC_PARTITION_EXTRACTOR_CLASS_NAME = ConfigOptions
.key("hive_sync.partition_extractor_class")
.stringType()
.defaultValue(SlashEncodedDayPartitionValueExtractor.class.getCanonicalName())
.withDescription("Tool to extract the partition value from HDFS path, "
+ "default 'SlashEncodedDayPartitionValueExtractor'");
public static final ConfigOption<Boolean> HIVE_SYNC_ASSUME_DATE_PARTITION = ConfigOptions
.key("hive_sync.assume_date_partitioning")
.booleanType()
.defaultValue(false)
.withDescription("Assume partitioning is yyyy/mm/dd, default false");
public static final ConfigOption<Boolean> HIVE_SYNC_USE_JDBC = ConfigOptions
.key("hive_sync.use_jdbc")
.booleanType()
.defaultValue(true)
.withDescription("Use JDBC when hive synchronization is enabled, default true");
public static final ConfigOption<Boolean> HIVE_SYNC_AUTO_CREATE_DB = ConfigOptions
.key("hive_sync.auto_create_db")
.booleanType()
.defaultValue(true)
.withDescription("Auto create hive database if it does not exists, default true");
public static final ConfigOption<Boolean> HIVE_SYNC_IGNORE_EXCEPTIONS = ConfigOptions
.key("hive_sync.ignore_exceptions")
.booleanType()
.defaultValue(false)
.withDescription("Ignore exceptions during hive synchronization, default false");
public static final ConfigOption<Boolean> HIVE_SYNC_SKIP_RO_SUFFIX = ConfigOptions
.key("hive_sync.skip_ro_suffix")
.booleanType()
.defaultValue(false)
.withDescription("Skip the _ro suffix for Read optimized table when registering, default false");
public static final ConfigOption<Boolean> HIVE_SYNC_SUPPORT_TIMESTAMP = ConfigOptions
.key("hive_sync.support_timestamp")
.booleanType()
.defaultValue(false)
.withDescription("INT64 with original type TIMESTAMP_MICROS is converted to hive timestamp type.\n"
+ "Disabled by default for backward compatibility.");
// -------------------------------------------------------------------------
// Utilities
// -------------------------------------------------------------------------
// Prefix for Hoodie specific properties.
private static final String PROPERTIES_PREFIX = "properties.";
/**
* Collects the config options that start with 'properties.' into a 'key'='value' list.
*/
public static Map<String, String> getHoodieProperties(Map<String, String> options) {
return getHoodiePropertiesWithPrefix(options, PROPERTIES_PREFIX);
}
/**
* Collects the config options that start with specified prefix {@code prefix} into a 'key'='value' list.
*/
public static Map<String, String> getHoodiePropertiesWithPrefix(Map<String, String> options, String prefix) {
final Map<String, String> hoodieProperties = new HashMap<>();
if (hasPropertyOptions(options)) {
options.keySet().stream()
.filter(key -> key.startsWith(PROPERTIES_PREFIX))
.forEach(key -> {
final String value = options.get(key);
final String subKey = key.substring((prefix).length());
hoodieProperties.put(subKey, value);
});
}
return hoodieProperties;
}
/**
* Collects all the config options, the 'properties.' prefix would be removed if the option key starts with it.
*/
public static Configuration flatOptions(Configuration conf) {
final Map<String, String> propsMap = new HashMap<>();
conf.toMap().forEach((key, value) -> {
final String subKey = key.startsWith(PROPERTIES_PREFIX)
? key.substring((PROPERTIES_PREFIX).length())
: key;
propsMap.put(subKey, value);
});
return fromMap(propsMap);
}
private static boolean hasPropertyOptions(Map<String, String> options) {
return options.keySet().stream().anyMatch(k -> k.startsWith(PROPERTIES_PREFIX));
}
/**
* Creates a new configuration that is initialized with the options of the given map.
*/
public static Configuration fromMap(Map<String, String> map) {
final Configuration configuration = new Configuration();
map.forEach(configuration::setString);
return configuration;
}
/**
* Returns whether the given conf defines default value for the option {@code option}.
*/
public static <T> boolean isDefaultValueDefined(Configuration conf, ConfigOption<T> option) {
return !conf.getOptional(option).isPresent()
|| conf.get(option).equals(option.defaultValue());
}
/**
* Returns all the optional config options.
*/
public static Set<ConfigOption<?>> optionalOptions() {
Set<ConfigOption<?>> options = new HashSet<>(allOptions());
options.remove(PATH);
return options;
}
/**
* Returns all the config options.
*/
public static List<ConfigOption<?>> allOptions() {
Field[] declaredFields = FlinkOptions.class.getDeclaredFields();
List<ConfigOption<?>> options = new ArrayList<>();
for (Field field : declaredFields) {
if (java.lang.reflect.Modifier.isStatic(field.getModifiers())
&& field.getType().equals(ConfigOption.class)) {
try {
options.add((ConfigOption<?>) field.get(ConfigOption.class));
} catch (IllegalAccessException e) {
throw new HoodieException("Error while fetching static config option", e);
}
}
}
return options;
}
}