Skip to content

[FLINK-39777] Support configurable HashFunction strategies in PrePartitionOperator#4423

Open
haruki-830 wants to merge 2 commits into
apache:masterfrom
haruki-830:FLINK-39777
Open

[FLINK-39777] Support configurable HashFunction strategies in PrePartitionOperator#4423
haruki-830 wants to merge 2 commits into
apache:masterfrom
haruki-830:FLINK-39777

Conversation

@haruki-830
Copy link
Copy Markdown
Contributor

@haruki-830 haruki-830 commented Jun 3, 2026

Summary

This commit adds configurable partitioning strategy support to Flink CDC pipelines, enabling users to switch to table-id hashing for small tables via YAML configuration, reducing unnecessary primary-key hashing overhead.

Key Changes

  1. New HashFunctionStrategy Enum
  • Introduced HashFunctionStrategy enum with two options: PRIMARY_KEY (hash by TableId + primary keys) and TABLE_ID (hash by TableId only).
  • Designed with @PublicEvolving annotation, allowing future strategies like ROUND_ROBIN or COLUMNS.
  1. New TableIdHashFunctionProvider
  • Added TableIdHashFunctionProvider that computes hash based solely on TableId.
  • Uses singleton pattern for HashFunction since it is stateless.
  • Suitable for small tables, or tables with no/changing primary keys.
  1. Pipeline Configuration Option
  • Added pipeline option "partitioning.strategy" to allow switching strategies via YAML.
  • When unset, behavior is identical to current, ensuring full backward compatibility.
  1. Comprehensive Testing
  • Added TableIdHashFunctionProviderTest with 7 test.
  • Added PrePartitionOperatorTest case verifying TABLE_ID strategy.
  • Added FlinkPipelineComposerTest configurations for PRIMARY_KEY and TABLE_ID strategy validation.

Configuration Example

Route same-table events to a single subtask
pipeline:
  name: my-cdc-job
  partitioning.strategy: TABLE_ID
Force load-balanced distribution by primary keys
pipeline:
  name: my-cdc-job
  partitioning.strategy: PRIMARY_KEY

JIRA Reference

https://issues.apache.org/jira/browse/FLINK-39777

@haruki-830 haruki-830 marked this pull request as ready for review June 3, 2026 02:17
@haruki-830
Copy link
Copy Markdown
Contributor Author

@lvyanquan could you please help review this PR?

Copy link
Copy Markdown
Contributor

@lvyanquan lvyanquan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @haruki-830 for the feature, left some comments.

"The timeout time for SchemaOperator to wait downstream SchemaChangeEvent applying finished, the default value is 3 minutes.");

public static final ConfigOption<HashFunctionStrategy> PIPELINE_PARTITIONING_STRATEGY =
ConfigOptions.key("partitioning.strategy")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please update the document to guide users on how to use it.

@lvyanquan
Copy link
Copy Markdown
Contributor

lvyanquan commented Jun 3, 2026

@github-actions github-actions Bot added the cli label Jun 3, 2026
Optional.ofNullable(tableId.getNamespace()).ifPresent(objectsToHash::add);
Optional.ofNullable(tableId.getSchemaName()).ifPresent(objectsToHash::add);
objectsToHash.add(tableId.getTableName());
this.cachedHash = (Objects.hash(objectsToHash.toArray()) * 31) & 0x7FFFFFFF;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using TableId#hashCode directly instead of Objects.hash(objectsToHash.toArray()) would be simpler.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants