-
Notifications
You must be signed in to change notification settings - Fork 2.5k
Description
Tips before filing an issue
-
Have you gone through our FAQs?
-
Join the mailing list to engage in conversations and get faster support at dev-subscribe@hudi.apache.org.
-
If you have triaged this as a bug, then file an issue directly.
Describe the problem you faced
almost every commits generate one compaction plan.
offline compaction task cannot finish so many compaction.
To Reproduce
Steps to reproduce the behavior:
there are my configs:
map.put(FlinkOptions.COMPACTION_TRIGGER_STRATEGY.key(),"num_commits"); map.put(FlinkOptions.COMPACTION_SCHEDULE_ENABLED.key(),"true"); map.put(FlinkOptions.COMPACTION_DELTA_COMMITS.key(),"24");
my flink checkpoint interval is 150 sec
it should generate one compact plan per hour,but every commits generate one compaction plan.
There is my some problem with my config?
Environment Description
- Hudi version : 0.11.0
below is my code part
`
//compact
configuration.set(FlinkOptions.COMPACTION_SCHEDULE_ENABLED, configuration.getBoolean(FlinkOptions.COMPACTION_SCHEDULE_ENABLED));
configuration.set(FlinkOptions.COMPACTION_ASYNC_ENABLED, configuration.getBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED));
configuration.set(FlinkOptions.COMPACTION_TASKS, configuration.getInteger(FlinkOptions.COMPACTION_TASKS));
configuration.set(FlinkOptions.COMPACTION_TRIGGER_STRATEGY, configuration.getString(FlinkOptions.COMPACTION_TRIGGER_STRATEGY));
configuration.set(FlinkOptions.COMPACTION_DELTA_COMMITS, configuration.getInteger(FlinkOptions.COMPACTION_DELTA_COMMITS));
configuration.set(FlinkOptions.COMPACTION_DELTA_SECONDS, configuration.getInteger(FlinkOptions.COMPACTION_DELTA_SECONDS));
configuration.set(FlinkOptions.COMPACTION_MAX_MEMORY, configuration.getInteger(FlinkOptions.COMPACTION_MAX_MEMORY));
//hive
configuration.set(FlinkOptions.HIVE_SYNC_ENABLED, configuration.getBoolean(FlinkOptions.HIVE_SYNC_ENABLED));
configuration.set(FlinkOptions.HIVE_SYNC_MODE, configuration.getString(FlinkOptions.HIVE_SYNC_MODE));
configuration.set(FlinkOptions.HIVE_SYNC_METASTORE_URIS, configuration.getString(FlinkOptions.HIVE_SYNC_METASTORE_URIS));
configuration.set(FlinkOptions.HIVE_SYNC_JDBC_URL, configuration.getString(FlinkOptions.HIVE_SYNC_JDBC_URL));
configuration.set(FlinkOptions.HIVE_SYNC_TABLE, tableName);
configuration.set(FlinkOptions.HIVE_SYNC_DB, configuration.getString(FlinkOptions.HIVE_SYNC_DB));
configuration.set(FlinkOptions.HIVE_SYNC_USERNAME, configuration.getString(FlinkOptions.HIVE_SYNC_USERNAME));
configuration.set(FlinkOptions.HIVE_SYNC_PASSWORD, configuration.getString(FlinkOptions.HIVE_SYNC_PASSWORD));
configuration.set(FlinkOptions.HIVE_SYNC_SUPPORT_TIMESTAMP, configuration.getBoolean(FlinkOptions.HIVE_SYNC_SUPPORT_TIMESTAMP));
//timeline-server
configuration.setString("hoodie.embed.timeline.server","false");
//index
configuration.set(FlinkOptions.INDEX_TYPE, HoodieIndex.IndexType.BLOOM.name());
//failed
configuration.set(FlinkOptions.IGNORE_FAILED, false);
long ckpTimeout = rowDataDataStream.getExecutionEnvironment()
.getCheckpointConfig().getCheckpointTimeout();
configuration.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout);
RowType rowType = RowType.of(false, columnTypes, columnNames);
configuration.setString(FlinkOptions.SOURCE_AVRO_SCHEMA,
AvroSchemaConverter.convertToSchema(rowType).toString());
//分区
configuration.setString(FlinkOptions.KEYGEN_CLASS_NAME, ComplexAvroKeyGenerator.class.getName());
configuration.setString(FlinkOptions.HIVE_SYNC_PARTITION_EXTRACTOR_CLASS_NAME, MultiPartKeysValueExtractor.class.getName());
configuration.setString(FlinkOptions.PARTITION_PATH_FIELD, "createYear,createMonth");
configuration.setString(FlinkOptions.HIVE_SYNC_PARTITION_FIELDS, "createYear,createMonth");
// bulk_insert mode
final String writeOperation = configuration.get(FlinkOptions.OPERATION);
if (WriteOperationType.fromValue(writeOperation) == WriteOperationType.BULK_INSERT) {
Pipelines.bulkInsert(configuration, rowType, rowDataDataStream);
} else
// Append mode
if (OptionsResolver.isAppendMode(configuration)) {
Pipelines.append(configuration, rowType, rowDataDataStream, false);
} else {
DataStream<HoodieRecord> hoodieRecordDataStream = Pipelines.bootstrap(configuration, rowType, parallelism,
rowDataDataStream);
DataStream<Object> pipeline = Pipelines.hoodieStreamWrite(configuration, parallelism,
hoodieRecordDataStream);
// compaction
if (StreamerUtil.needsAsyncCompaction(configuration)) {
//mor table and online compaction
Pipelines.compact(configuration, pipeline);
} else if (configuration.getString(FlinkOptions.TABLE_TYPE)
.toUpperCase(Locale.ROOT)
.equals(FlinkOptions.TABLE_TYPE_COPY_ON_WRITE)) {
//cow table
Pipelines.clean(configuration, pipeline);
}
}
}`
Metadata
Metadata
Assignees
Labels
Type
Projects
Status