-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-3516] Spanner BatchFn does not respect mutation limits #4860
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you, Nathan! This looks good to me, I've left some minor comments. @chamikaramj could you please also take a look?
* Estimate the number of cells modified in a {@link MutationGroup}. | ||
*/ | ||
final class MutationCellEstimator implements ToLongFunction<MutationGroup> { | ||
private final LoadingCache<String, ImmutableMap<String, Long>> tables; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's move this to SpannerSchema
.
// ranges should already be broken up into individual batches | ||
// but just in case, make a worst-case estimate about the size | ||
// of the key range so they will get their own transaction | ||
final long ranges = Iterables.size(keySet.getRanges()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We only batch single key deletes. I think we can return zero or -1 here and avoid passing maxNumMutations
+ " FROM information_schema.columns as c" | ||
+ " WHERE c.table_catalog = '' AND c.table_schema = '') AS c" | ||
+ " LEFT OUTER JOIN (" | ||
+ " SELECT t.table_name, t.column_name, COUNT(1) AS indices" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we use COUNT(*) here? I think it is more common.
Estimate the number of mutations in in a group by counting the columns and associated indexes
Given current job responsiibilities, will not be able to review in a reasonable amount of time -- sorry! Reviewed 6 of 6 files at r1. Comments from Reviewable |
@mairbek I've made the requested changes, can you take another look? (an unrelated test is failing atm) |
lgtm, @chamikaramj can you take a look? |
retest this please |
I'm looking to make it part of Beam 2.5.0 |
Excellent. We're no longer using Spanner (AWS won the contract) so I can't do much testing... but I can probably get it rebased against master later this week. |
This PR needs to be rebased. Please also take a look at precommit failures and fix them. |
@NathanHowell sad to hear that AWS won 🥇 I can fork and rebase the PR, 2.5.0 cut is planned for tomorrow. |
@mairbek I know... we're still using Beam though 👍 😃 thanks for picking it up! |
Fixed in #5297, thanks for the help! 👍 |
Estimate the number of mutations in in a group by counting the affected columns and associated indexes.
The code currently assumes each row contains a single mutation and has flushes batches after a (hardcoded) 10k row threshold has been exeeded. Spanner rejects commits that exceed 20k mutations, including indexes. This disconnect between the estimated mutations and the actual count causes commit failures.
This change estimates the actual mutations by counting the number of indexes covering each column, and summing up the counts of columns and indexes contained within a MutationGroup. The group is flushed prior to the limit being exceeded.