-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-13541] More intelligent caching of CoGBK values. #16354
Conversation
Run Java PreCommit |
1 similar comment
Run Java PreCommit |
FYI, reference to the previously failing precommit : https://ci-beam.apache.org/job/beam_PreCommit_Java_Phrase/4426/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Thank you.
} | ||
|
||
/** | ||
* Assigns a monotonically increasing index to each item in teh underling Reiterator. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: s/teh/the
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
this.unions = Iterators.peekingIterator(unions); | ||
this.containsTag = containsTag; | ||
// Used to keep track of what has been observed so far. | ||
private final int[] lastObserved; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are these 3 variables are arrays? Only the index 0 is used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They are zero-length arrays because we want to share these values among all copies of the reiterator. Basically they're like pointers. Added a comment to clarify.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just noticed a few typos
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
Outdated
Show resolved
Hide resolved
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
Outdated
Show resolved
Hide resolved
@Test | ||
@SuppressWarnings("BoxedPrimitiveEquality") | ||
public void testCachedResults() { | ||
// Ensure we don't fail below due to odd VM settings. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
out of curiousity, what odd VM settings?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
java.lang.Integer.IntegerCache.high. Clarified to be more explicit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It doesn't look like we will respect DEFAULT_IN_MEMORY_ELEMENT_COUNT. We will have at most DEFAULT_IN_MEMORY_ELEMENT_COUNT + (NUM_TAGS - 1) * DEFAULT_MIN_ELEMENTS_PER_TAG this is fine but wanted to confirm that this was your intent.
@@ -98,8 +104,7 @@ public CoGbkResult( | |||
throw new IllegalStateException( | |||
"union tag " + unionTag + " has no corresponding tuple tag in the result schema"); | |||
} | |||
List<Object> valueList = (List<Object>) valueMap.get(unionTag); | |||
valueList.add(value.getValue()); | |||
valuesByTag.get(unionTag).add(value.getValue()); | |||
} | |||
|
|||
if (taggedIter.hasNext()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: use guard style
if (!taggedIter.hasNext()) {
valueMap = (List) valuesByTag;
}
// If we get here, there were more elements than we can afford to
// keep in memory, so we copy the re-iterable of remaining items
// and append filtered views to each of the sorted lists computed earlier.
...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are pros and cons to this, but I agree it's a slight improvement. Done.
private static class TagIterable<T> implements Iterable<T> { | ||
int tag; | ||
int cacheSize; | ||
Supplier<Boolean> forceCache; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Leftover from a refactor, removing.
@@ -59,6 +60,8 @@ | |||
|
|||
private static final int DEFAULT_IN_MEMORY_ELEMENT_COUNT = 10_000; | |||
|
|||
private static final int DEFAULT_MIN_ELEMENTS_PER_TAG = 100; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Based upon the code it looks like this is used as a max per tag.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The idea is that we will cache always at least this many values per tag, regardless of whether DEFAULT_IN_MEMORY_ELEMENT_COUNT was "used up" for other tags. I'll clarify.
@Test | ||
@SuppressWarnings("BoxedPrimitiveEquality") | ||
public void testCachedResults() { | ||
// Ensure we don't fail below due to a non-default java.lang.Integer.IntegerCache.high setting. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The assertion of which values are cached vs. re-created relies on not-so-small integers not being cached.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understand that. I was trying to highlight the spacing issue in the comment. Also consider using a different type which isn't interned by the JVM.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah. Strings have their own (more difficult to reason about) interning issues, and using a non-primitive gets really verbose here, which is why I stayed with integers.
Run Java PreCommit |
Running into BEAM-13575. |
Run Java PreCommit |
Same failure with unrelated org.apache.beam.runners.flink.FlinkRequiresStableInputTest.testParDoRequiresStableInput . I'm going to go ahead and merge so we can get a cherry pick going. |
This is a follow-up to apache#16354
Spot bugs consistently fails with:
Fixed in #16407 |
This is a follow-up to #16354
This is a follow-up to apache#16354
This is a follow-up to apache#16354
This is a follow-up to apache#16354
A minimal number of elements are cached for each tag, possibly in addition to a global number of elements cached.
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
R: @username
).[BEAM-XXX] Fixes bug in ApproximateQuantiles
, where you replaceBEAM-XXX
with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
ValidatesRunner
compliance status (on master branch)Examples testing status on various runners
Post-Commit SDK/Transform Integration Tests Status (on master branch)
Pre-Commit Tests Status (on master branch)
See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI.