Skip to content

Conversation

@XiaoHongbo-Hope
Copy link
Contributor

@XiaoHongbo-Hope XiaoHongbo-Hope commented Nov 6, 2025

This PR

Supports combined mode processing when orphan files clean:

  • Processes multiple tables within a single DataStream during job graph construction, instead of creating one DataStream per table, which significantly reduces JobGraph construction time and complexity, avoiding timeout, stack overflow, and resource allocation failures
  • Only applies when --mode combined is specified

Adds configuration:

  • [--mode <divided|combined>]: Processing mode (default: combined)
    • divided: Create one DataStream per table (original behavior)
    • combined: Process all tables in a single DataStream
  • [--tables <table1>] [--tables <table2>]: multiple parameters for table names
  • --table and --tables cannot be used together

Tests:

  • testCombinedMode: Combined mode with multiple tables
  • testCombinedModeWithBranch: Combined mode with multiple branches

@XiaoHongbo-Hope XiaoHongbo-Hope marked this pull request as ready for review November 9, 2025 17:56
@XiaoHongbo-Hope XiaoHongbo-Hope marked this pull request as draft November 9, 2025 17:57
@XiaoHongbo-Hope XiaoHongbo-Hope changed the title [flink] support batch table processing when build data stream of orphan files clean [flink] support combined mode for orphan files clean to process multiple tables in a single DataStream Nov 13, 2025
@XiaoHongbo-Hope XiaoHongbo-Hope changed the title [flink] support combined mode for orphan files clean to process multiple tables in a single DataStream [flink] support combined mode for orphan files clean to process multiple tables Nov 13, 2025
@XiaoHongbo-Hope XiaoHongbo-Hope changed the title [flink] support combined mode for orphan files clean to process multiple tables [flink] support combined mode for orphan files clean Nov 13, 2025
+ "--database <database_name> \\\n"
+ "--table <table_name> \\\n"
+ "[--table <table_name>] \\\n"
+ "[--tables <table1,table2,...>] \\\n"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The usage of --tables is confused. Is this a single parameter or a multi-parameter?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The usage of --tables is confused. Is this a single parameter or a multi-parameter?

updated

for (T cleaner : cleaners) {
FileStoreTable table = cleaner.getTable();
Identifier identifier = table.catalogEnvironment().identifier();
if (identifier == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not pass identifier from ActionFactory?

Copy link
Contributor

@yuzelin yuzelin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@JingsongLi
Copy link
Contributor

JingsongLi commented Nov 14, 2025

Is combine mode any bad case? Why not just enable it by default

@XiaoHongbo-Hope
Copy link
Contributor Author

Is combine mode any bad case? Why not just enable it by default

Sure,we can enable it by default.

@XiaoHongbo-Hope
Copy link
Contributor Author

Is combine mode any bad case? Why not just enable it by default

Updated and tested in our application.

Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR adds support for combined mode when cleaning orphan files in Flink, allowing multiple tables to be processed within a single DataStream during job graph construction instead of creating one DataStream per table. This significantly reduces JobGraph construction time and complexity when processing thousands of tables.

Key changes:

  • Introduces CombinedFlinkOrphanFilesClean class to process multiple tables in a single DataStream
  • Adds --mode parameter with divided (original behavior) and combined (new behavior) options, defaulting to combined
  • Adds --tables parameter to specify multiple tables explicitly
  • Refactors existing code to extract common utilities into OrphanFilesCleanUtil

Reviewed Changes

Copilot reviewed 7 out of 7 changed files in this pull request and generated 6 comments.

Show a summary per file
File Description
RemoveOrphanFilesActionITCaseBase.java Adds comprehensive tests for combined mode including multi-table, branch, and external path scenarios
OrphanFilesCleanUtil.java New utility class extracting common Flink environment configuration and input handling logic
FlinkOrphanFilesClean.java Refactored to extract common logic and expose methods for combined mode usage
CombinedFlinkOrphanFilesClean.java New implementation for combined mode orphan file cleaning across multiple tables
RemoveOrphanFilesActionFactory.java Updated to support --tables and --mode parameters with validation
RemoveOrphanFilesAction.java Modified to support both single and multi-table modes with configurable processing mode
OrphanFilesClean.java Changed visibility of methods from protected to public for combined mode access

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.


private String olderThan = null;
private boolean dryRun = false;
private MultiTablesSinkMode mode = COMBINED;
Copy link

Copilot AI Nov 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default mode is set to COMBINED in the code, but the PR description and the help text in RemoveOrphanFilesActionFactory state the default is combined. However, MultiTablesSinkMode.fromString(null) returns DIVIDED, not COMBINED. This creates an inconsistency: when the --mode parameter is not provided, fromString(null) will return DIVIDED, but the field default is COMBINED. This means the actual default depends on whether the parameter is provided. Consider either: (1) removing the field initializer and letting mode be null by default, then using mode == null || mode == COMBINED in the condition, or (2) changing the help text and PR description to accurately reflect that when --mode is not specified, the behavior defaults to DIVIDED per the fromString method.

Suggested change
private MultiTablesSinkMode mode = COMBINED;
@Nullable private MultiTablesSinkMode mode;

Copilot uses AI. Check for mistakes.
Comment on lines 141 to 144
+ "during job graph construction, instead of creating one dataStream per table. "
+ "This significantly reduces job graph construction time, when processing "
+ "thousands of tables (jobs may fail to start within timeout limits). "
+ "It also reduces JobGraph complexity and avoids stack over flow issue and resource allocation failures during job running. "
Copy link

Copilot AI Nov 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Corrected spelling of 'dataStream' to 'DataStream' for consistency with Flink terminology and corrected 'stack over flow' to 'stack overflow'.

Suggested change
+ "during job graph construction, instead of creating one dataStream per table. "
+ "This significantly reduces job graph construction time, when processing "
+ "thousands of tables (jobs may fail to start within timeout limits). "
+ "It also reduces JobGraph complexity and avoids stack over flow issue and resource allocation failures during job running. "
+ "during job graph construction, instead of creating one DataStream per table. "
+ "This significantly reduces job graph construction time, when processing "
+ "thousands of tables (jobs may fail to start within timeout limits). "
+ "It also reduces JobGraph complexity and avoids stack overflow issue and resource allocation failures during job running. "

Copilot uses AI. Check for mistakes.
+ "during job graph construction, instead of creating one dataStream per table. "
+ "This significantly reduces job graph construction time, when processing "
+ "thousands of tables (jobs may fail to start within timeout limits). "
+ "It also reduces JobGraph complexity and avoids stack over flow issue and resource allocation failures during job running. "
Copy link

Copilot AI Nov 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Corrected spelling of 'stack over flow' to 'stack overflow'.

Suggested change
+ "It also reduces JobGraph complexity and avoids stack over flow issue and resource allocation failures during job running. "
+ "It also reduces JobGraph complexity and avoids stack overflow issue and resource allocation failures during job running. "

Copilot uses AI. Check for mistakes.
Comment on lines 135 to 141
/**
* Stringify the given {@link InternalRow}. This is a simplified version that handles basic
* types. For complex types (Array, Map, Row), it falls back to toString().
*
* <p>This method is implemented locally to avoid dependency on paimon-common's test-jar, which
* may not be available in CI environments.
*/
Copy link

Copilot AI Nov 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The documentation states the method handles complex types by falling back to toString(), but the actual implementation doesn't handle complex types at all—it only handles basic types using FieldGetter. The comment is misleading and should be updated to accurately reflect what the method actually does: 'This is a simplified version that handles basic types only. Complex types (Array, Map, Row) are not explicitly handled and rely on the default object representation from FieldGetter.'

Copilot uses AI. Check for mistakes.
DataStream<String> usedFiles =
usedManifestFiles
.getSideOutput(manifestOutputTag)
.keyBy(tuple2 -> tuple2.f0) // Use Identifier object directly as key
Copy link

Copilot AI Nov 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] The comment 'Use Identifier object directly as key' is misleading. The code uses tuple2.f0 which is indeed an Identifier, but the comment should clarify that Identifier objects are used as keys for grouping by table. Consider: 'Group by table identifier to process manifests per table'.

Suggested change
.keyBy(tuple2 -> tuple2.f0) // Use Identifier object directly as key
.keyBy(tuple2 -> tuple2.f0) // Group by table identifier to process manifests per table

Copilot uses AI. Check for mistakes.
writeToBranch(branchTable2, GenericRow.of(3L, BinaryString.fromString("World"), 30));

// Create orphan files in both branch snapshot directories
// This is key: same table, multiple branches - will trigger bug in
Copy link

Copilot AI Nov 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment 'will trigger bug in' is incomplete and unclear. It appears to be referring to a bug that was fixed, but the comment doesn't explain what bug or what the expected behavior is. This should either be completed or removed, e.g., 'This tests that combined mode correctly handles multiple branches within the same table'.

Suggested change
// This is key: same table, multiple branches - will trigger bug in
// This tests that combined mode correctly handles multiple branches within the same table and removes orphan files from each branch.

Copilot uses AI. Check for mistakes.
writeToBranch(branchTable2, GenericRow.of(3L, BinaryString.fromString("World"), 30));

// Create orphan files in both branch snapshot directories
// This is key: same table, multiple branches - will trigger bug in
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this comment fixed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this comment fixed?

fixed and removed expired comment

/** Utility class for orphan files clean operations in Flink. */
public class OrphanFilesCleanUtil {

protected static final Logger LOG = LoggerFactory.getLogger(FlinkOrphanFilesClean.class);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OrphanFilesCleanUtil.class

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants