Skip to content

[HUDI-7957] fix data skew when writing with bulk_insert + bucket_inde…#11578

Merged
danny0405 merged 1 commit intoapache:masterfrom
KnightChess:fix-data-skew
Jul 10, 2024
Merged

[HUDI-7957] fix data skew when writing with bulk_insert + bucket_inde…#11578
danny0405 merged 1 commit intoapache:masterfrom
KnightChess:fix-data-skew

Conversation

@KnightChess
Copy link
Contributor

…x enabled

Change Logs

  • imporve BucketIndexUtil partitionIndex algorithm make the data be evenly distributed.
  • BucketPartitionUtils in spark use BucketIndexUtil method, same logic as flink.

Impact

  • flink partitionIndex will use new algorithm

Risk level (write none, low medium or high below)

low

Documentation Update

None

Contributor's checklist

  • Read through contributor's guide
  • Change Logs and Impact were stated clearly
  • Adequate tests were added if applicable
  • CI passed

@github-actions github-actions bot added the size:M PR with lines of changes in (100, 300] label Jul 5, 2024
public static Functions.Function2<String, Integer, Integer> getPartitionIndexFunc(int bucketNum, int parallelism) {
return (partition, curBucket) -> {
int partitionIndex = (partition.hashCode() & Integer.MAX_VALUE) % parallelism * bucketNum;
int globalIndex = partitionIndex + curBucket;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logic in Flink has got a regression?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not sure, no problem found in my local area, I re -triggered
image

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

after trying it many times, can't come up with this problem

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you check the original commit PR description and comments, maybe we can sync up there first: #11346

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, I have check, added more complete ut.

Copy link
Contributor

@xicm xicm Jul 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These two algorithms divide the parallelism into bucket-sized blocks, so that the tasks are evenly distributed.

(partition.hashCode() & Integer.MAX_VALUE) % parallelism * bucketNum means the start index of the buckets in a partition.

The difference is whether “% parallelism” or “/ (parallelism/bucketNum)”。

Assume bucketNum = 2, parallelism = 10

“(partition.hashCode() & Integer.MAX_VALUE) % parallelism” means the start index will be any value in [0-parallelism ], the value of diffrent partitions can be next to each other。

image

“(partition.hashCode() & Integer.MAX_VALUE) / (parallelism / bucketNum )” ,There will be a interval between the start index of different partitions

image

Copy link
Contributor Author

@KnightChess KnightChess Jul 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you summarize the current drawback of the Flink partitioner algorithm for 3 scenarios and what is the main reason to rollback the changes to the original?

@danny0405 no, the original logical in flink is (partition.hashCode() & Integer.MAX_VALUE) % parallelism current is (partition.hashCode() & Integer.MAX_VALUE) % parallelism * bucketNumber

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xicm (partition.hashCode() & Integer.MAX_VALUE) % parallelism is right, but (partition.hashCode() & Integer.MAX_VALUE) / (parallelism / bucketNum + 1) * bucketNum have a little problem, because it is / not %, so I think it should be like this. Each adjacent partition will be in the same slot, the range depend on parallelism / bucketNum, like you case, is 5
image

Copy link
Contributor

@xicm xicm Jul 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Each adjacent partition will be in the same slot

this is not what we expect.

you can run the test case with parallelism = 10, bucketNumber = 5 and partition = ["2021-01-01", "2021-01-03"], the result is inlimit : [2, 2, 2, 2, 2], half of TM are idle.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xicm hi, #11578 (comment), as the comment say, it can not cover all case, just based on practical considerations

@KnightChess KnightChess closed this Jul 6, 2024
@KnightChess KnightChess reopened this Jul 6, 2024
Comment on lines +120 to +125
// parallelism2TaskCount size need Infinitely close parallelism or (bucketNumber * 8)
if (parallelism >= (bucketNumber * 8)) {
assertTrue(parallelism2TaskCount.size() >= (bucketNumber * 8 * 0.9));
} else {
assertTrue(parallelism2TaskCount.size() >= parallelism * 0.9);
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a new check, the work thread need infinitely close parallelism or (bucketNumber * partitionNumber) if we have enougth resource

Copy link
Contributor Author

@KnightChess KnightChess left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@danny0405 @xicm From the discussion results and unit test situations, we can conclude that in the case of consecutive partitions, the new algorithm is more stable than the old algorithm. However, in the case of non-consecutive partitions, both algorithms exhibit some fluctuations. For example, with a parallelism of 10 and 5 buckets, the new algorithm can only achieve half of the parallelism for partitions 01 and 03, whereas the old algorithm can fully utilize the parallelism. With a parallelism of 20 and 5 buckets, and the same partitions, the new algorithm can achieve half of the parallelism, but the old algorithm can only achieve a quarter. The specific outcome depends on the initial position of the partitions.

@KnightChess
Copy link
Contributor Author

both of these algorithms are better than the original spark bulk bucket partitioner algorithm. I think they can both address the skew issue to some extent. If we want to maintain the original flink implementation, I will modify the logic and unit tests, because with the current unit test scenarios, half of them cannot pass. I am inclined towards the current fix. What do you think?

@xicm
Copy link
Contributor

xicm commented Jul 9, 2024

@danny0405 @xicm From the discussion results and unit test situations, we can conclude that in the case of consecutive partitions, the new algorithm is more stable than the old algorithm. However, in the case of non-consecutive partitions, both algorithms exhibit some fluctuations. For example, with a parallelism of 10 and 5 buckets, the new algorithm can only achieve half of the parallelism for partitions 01 and 03, whereas the old algorithm can fully utilize the parallelism. With a parallelism of 20 and 5 buckets, and the same partitions, the new algorithm can achieve half of the parallelism, but the old algorithm can only achieve a quarter. The specific outcome depends on the initial position of the partitions.

Yes, the result of the two algorithms depends on the hash value of the partition, which is almost random. Is there a better way to get the partition number?

@xicm
Copy link
Contributor

xicm commented Jul 9, 2024

@danny0405 @xicm From the discussion results and unit test situations, we can conclude that in the case of consecutive partitions, the new algorithm is more stable than the old algorithm. However, in the case of non-consecutive partitions, both algorithms exhibit some fluctuations. For example, with a parallelism of 10 and 5 buckets, the new algorithm can only achieve half of the parallelism for partitions 01 and 03, whereas the old algorithm can fully utilize the parallelism. With a parallelism of 20 and 5 buckets, and the same partitions, the new algorithm can achieve half of the parallelism, but the old algorithm can only achieve a quarter. The specific outcome depends on the initial position of the partitions.

Yes, the result of the two algorithms depends on the hash value of the partition, which is almost random. Is there a better way to get the partition number?

@KnightChess

The old algorithm has overflow problems, if we fix the overflow problem, the old algorithm is better.

          int partitionIndex = (partition.hashCode() & Integer.MAX_VALUE) / (parallelism / bucketNum) * bucketNum;
          int globalIndex = Math.abs(partitionIndex) + curBucket;
          return BucketIdentifier.mod(globalIndex, parallelism);

@danny0405
Copy link
Contributor

if we fix the overflow problem, the old algorithm is better.

Let's fire a fix for it, and @KnightChess let's keep the Flink hashing algorithm the same as it is and we can improve it in a separate PR I think.

@danny0405 danny0405 self-assigned this Jul 10, 2024
@danny0405 danny0405 added the area:performance Performance optimizations label Jul 10, 2024
@KnightChess
Copy link
Contributor Author

@xicm no, although fixing the overflow problem, the old will not be better, you can try the ut. I have tried before.

@KnightChess
Copy link
Contributor Author

KnightChess commented Jul 10, 2024

@danny0405 I have tried before, the result is the new algorithm better. I will cherry pick it in a separate pr.

@xicm
Copy link
Contributor

xicm commented Jul 10, 2024

@xicm no, although fixing the overflow problem, the old will not be better, you can try the ut. I have tried before.

oh, there's something wrong with my test case , the old algorithm also has drawbacks.

@danny0405
Copy link
Contributor

@xicm @KnightChess So we reach concensus the algorithm raised by @KnightChess is better? If that's true, let's fire a fix in a separate PR.

@github-actions github-actions bot added size:S PR with lines of changes in (10, 100] and removed size:M PR with lines of changes in (100, 300] labels Jul 10, 2024
@xicm
Copy link
Contributor

xicm commented Jul 10, 2024

@xicm @KnightChess So we reach concensus the algorithm raised by @KnightChess is better? If that's true, let's fire a fix in a separate PR.

Both algorithms have drawbacks.
For example,
parallelism = 10, bucketNumber = 5 and partition = ["2021-01-01", "2021-01-03"]
old: [1, 1, 1, 1, 1, 1, 1, 1, 1, 1]
new: [2, 2, 2, 2, 2]

parallelism = 20, bucketNumber = 5 and partition = ["2021-01-01", "2021-01-03"]
old: [2, 2, 2, 2, 2]
new: [1, 1, 1, 1, 1, 1, 1, 1, 1, 1]

The element in the array means how many data slice each TM processes.

@hudi-bot
Copy link
Collaborator

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

@KnightChess
Copy link
Contributor Author

don't know why this check will contain docker moudle, other success look like not contain, retrigger again
image

@KnightChess KnightChess reopened this Jul 10, 2024
@danny0405
Copy link
Contributor

Both algorithms have drawbacks.

@xicm That's fine, the new algorithm looks simpler, there is no need to distinguish between different parallelisms.

@KnightChess KnightChess reopened this Jul 10, 2024
@danny0405 danny0405 merged commit bb0621e into apache:master Jul 10, 2024
@xicm
Copy link
Contributor

xicm commented Jul 10, 2024

@danny0405 @KnightChess Shall we use the new algorithm? new algorithm looks simpler. The previous implementation had an overflow problem,need a fix?

@KnightChess
Copy link
Contributor Author

@xicm @danny0405 yes, I will submit a new pr to fix it recently

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:performance Performance optimizations size:S PR with lines of changes in (10, 100]

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants