-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[HUDI-2207] Support independent flink hudi clustering function #3599
Conversation
c795bbe
to
9443126
Compare
2828a67
to
1bb7f93
Compare
public static final ConfigOption<Integer> CLUSTERING_PLAN_STRATEGY_TARGET_FILE_MAX_BYTES = ConfigOptions | ||
.key("clustering.plan.strategy.target.file.max.bytes") | ||
.intType() | ||
.defaultValue(1024) // default 1 GB |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess the unit is MB ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Already fix it.
*/ | ||
public class FlinkRecentDaysClusteringPlanStrategy<T extends HoodieRecordPayload<T>> | ||
extends PartitionAwareClusteringPlanStrategy<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> { | ||
private static final Logger LOG = LogManager.getLogger(FlinkRecentDaysClusteringPlanStrategy.class); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we extend from FlinkSizeBasedClusteringPlanStrategy
instead ?
if (writeStats.stream().mapToLong(s -> s.getTotalWriteErrors()).sum() > 0) { | ||
throw new HoodieClusteringException("Clustering failed to write to files:" | ||
+ writeStats.stream().filter(s -> s.getTotalWriteErrors() > 0L).map(s -> s.getFileId()).collect(Collectors.joining(","))); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
writeTableMetadata
is missing.
|
||
private static final Logger LOG = LogManager.getLogger(FlinkClusteringPlanActionExecutor.class); | ||
|
||
public FlinkClusteringPlanActionExecutor(HoodieEngineContext context, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can use the HoodieData
to merge the code with SparkClusteringPlanActionExecutor
, this can be done with separate following PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
.key("clustering.schedule.enabled") | ||
.booleanType() | ||
.defaultValue(false) // default false for pipeline | ||
.withDescription("Async clustering, default false for pipeline"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Schedule the compaction plan, default false
.key("clustering.tasks") | ||
.intType() | ||
.defaultValue(10) | ||
.withDescription("Parallelism of tasks that do actual clustering, default is 10"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Change the default value same with compaction.tasks
, which is 4
.
* The clustering task identifier. | ||
*/ | ||
private int taskID; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The event should include a fileId
to deduplicate for tasks failover/retry. Take CompactionCommitEvent
as a reference. Because there are multiple input file ids for a HoodieClusteringGroup
thus the CompactionCommitEvent
, we can use the first file group id to distinguish.
this.schema = new Schema.Parser().parse(writeConfig.getSchema()); | ||
this.readerSchema = HoodieAvroUtils.addMetadataFields(this.schema); | ||
this.requiredPos = getRequiredPositions(); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where is the requiredPos
used for ?
for (ClusteringOperation clusteringOp : clusteringOps) { | ||
try { | ||
Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(writeConfig.getSchema())); | ||
HoodieFileReader<? extends IndexedRecord> baseFileReader = HoodieFileReaderFactory.getFileReader(table.getHadoopConf(), new Path(clusteringOp.getDataFilePath())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use the readerSchema
?
private void doClustering(String instantTime, HoodieClusteringGroup clusteringGroup, Collector<ClusteringCommitEvent> collector) throws IOException { | ||
List<ClusteringOperation> clusteringOps = clusteringGroup.getSlices().stream().map(ClusteringOperation::create).collect(Collectors.toList()); | ||
boolean hasLogFiles = clusteringOps.stream().anyMatch(op -> op.getDeltaFilePaths().size() > 0); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The HoodieClusteringGroup
has num of output file groups, the current code has only one file group (or more if the parquet size hits the threshold), can we find a way to set up the parallelism of bulk_insert writer as that ?
conf.setInteger(FlinkOptions.CLUSTERING_PLAN_STRATEGY_SKIP_PARTITIONS_FROM_LATEST, config.skipFromLatestPartitions); | ||
if (config.sortColumns != null) { | ||
conf.setString(FlinkOptions.CLUSTERING_SORT_COLUMNS, config.sortColumns); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where is the CLUSTERING_SORT_COLUMNS
used for ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @yihua wondering if we can reuse a lot more code here?
Yes, the core clustering action should be extracted out, independent of engines, using |
f86df2e
to
d3eb4e3
Compare
@hudi-bot run azure |
Hello, there seems come conflicts and the compile failure, can you fix that @yuzhaojing ? |
Sure, I will fix this. |
e1689c4
to
cf4355d
Compare
This comment was marked as resolved.
This comment was marked as resolved.
eb878d4
to
9f98d28
Compare
@hudi-bot run azure |
@hudi-bot run azure |
packaging/hudi-flink-bundle/pom.xml
Outdated
<artifactId>flink-avro</artifactId> | ||
<version>${flink.version}</version> | ||
<scope>compile</scope> | ||
</dependency> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why this change ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
HoodieClusteringGroup is a avro model in ClusteringPlanEvent.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But you should not depend on flink-avro i guess, there are already many model clazzes in hudi-flink code paths.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed. use ClusteringGroupInfo instead of HoodieClusteringGroup.
<groupId>org.apache.flink</groupId> | ||
<artifactId>flink-avro</artifactId> | ||
<version>${flink.version}</version> | ||
<scope>provided</scope> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why this change ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto.
We may need to resolve the conflict. |
fixed. |
@@ -399,6 +404,59 @@ public HoodieWriteMetadata<List<WriteStatus>> cluster(final String clusteringIns | |||
throw new HoodieNotSupportedException("Clustering is not supported yet"); | |||
} | |||
|
|||
private void updateTableMetadata(HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table, | |||
HoodieCommitMetadata commitMetadata, | |||
HoodieInstant hoodieInstant) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updateTableMetadat
seems useless now ~
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, thanks for contribution @yuzhaojing , we may need to fix the clustering plan scheduling issue in the following PR.
Sure, I will fix the issue in following PR to support clustering plan scheduling in coordinator. |
Thanks for suggest, Danny! |
Tips
What is the purpose of the pull request
(For example: This pull request adds quick-start document.)
Brief change log
(for example:)
Verify this pull request
(Please pick either of the following options)
This pull request is a trivial rework / code cleanup without any test coverage.
(or)
This pull request is already covered by existing tests, such as (please describe tests).
(or)
This change added tests and can be verified as follows:
(example:)
Committer checklist
Has a corresponding JIRA in PR title & commit
Commit message is descriptive of the change
CI is green
Necessary doc changes done or have another open PR
For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.