Skip to content

[FLINK-30979] Support shuffling data by partition#522

Merged
JingsongLi merged 4 commits into
apache:masterfrom
FangYongs:FLINK_30979_bucket_partition_partition
Feb 14, 2023
Merged

[FLINK-30979] Support shuffling data by partition#522
JingsongLi merged 4 commits into
apache:masterfrom
FangYongs:FLINK_30979_bucket_partition_partition

Conversation

@FangYongs
Copy link
Copy Markdown
Contributor

Currently sink operator in flink will shuffle data by bucket id, which cause data skew when there is only 1 bucket with multiple partitions in the table. This PR aims to support shuffling data by bucket id and partition when sink.shuffle-by-partition.enable is set.

The main changes are

  1. Added config sink.shuffle-by-partition.enable to support shuffling data by partition
  2. Added PartitionComputer to get partition from row data
  3. Added shuffling data by partition in BucketStreamPartitioner

The main tests are

  1. Added FileStoreShuffleBucketTest to shuffle data by bucket and partition

public static final ConfigOption<Integer> SINK_PARALLELISM = FactoryUtil.SINK_PARALLELISM;

public static final ConfigOption<Boolean> SINK_SHUFFLE_BY_PARTITION =
ConfigOptions.key("sink.shuffle-by-partition.enable")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the name is the same as that of Flink, will it lead to another useless partition shuffle? See Flink StreamPhysicalSinkRule.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @JingsongLi I have rebased from master and rename the config name

@FangYongs FangYongs force-pushed the FLINK_30979_bucket_partition_partition branch from cbdaa03 to 7aa53c2 Compare February 13, 2023 01:41
<td>Define a custom parallelism for the scan source. By default, if this option is not defined, the planner will derive the parallelism for each statement individually by also considering the global configuration.</td>
</tr>
<tr>
<td><h5>sink-store.shuffle-by-partition.enable</h5></td>
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can document this in Writing Tables page?

<td>Define a custom parallelism for the scan source. By default, if this option is not defined, the planner will derive the parallelism for each statement individually by also considering the global configuration.</td>
</tr>
<tr>
<td><h5>sink-store.shuffle-by-partition.enable</h5></td>
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just sink.partition-shuffle?

@FangYongs
Copy link
Copy Markdown
Contributor Author

@JingsongLi DONE

@FangYongs
Copy link
Copy Markdown
Contributor Author

The failed test case is a known problem and I have created an issue https://issues.apache.org/jira/browse/FLINK-31039

Copy link
Copy Markdown
Contributor

@JingsongLi JingsongLi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@JingsongLi JingsongLi merged commit 219c4de into apache:master Feb 14, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants