New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-5987] Cache and share materialized side inputs between Spark tasks #7091
Conversation
b997857
to
976678f
Compare
Run Spark ValidatesRunner |
Collections.synchronizedMap(new WeakHashMap<>()); | ||
|
||
/** | ||
* Id that is consistent among executors. We can not use stepName because of possible collisions. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do not get meaning of 'consistent' here. Do you mean random (most likely distinct) even within one JVM ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After deserialization on the executor side
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, I see.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
This still needs some effort as it does not handle the case when sideinput is used in different DoFns |
b804a31
to
6a22322
Compare
Run Spark ValidatesRunner |
ab6adc1
to
a84bbc1
Compare
Run Spark ValidatesRunner |
a84bbc1
to
80e147c
Compare
* Side inputs are stored in {@link Cache} with weakValues so if there is no reference to a value, | ||
* sideInput is garbage collected. | ||
*/ | ||
public class SideInputStorage { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
package private and same for constructor, make access as tight as needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
* Keep references for the whole lifecycle of CachedSideInputReader otherwise sideInput needs to | ||
* be de-serialized again. | ||
*/ | ||
private Set<?> sideInputReferences = new HashSet<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Aren't values ever removed from here? Or I am misreading this one, seems like it can overflow and even prevent SideInputStorage
from being GCed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
references removed because different solution was used. more
It would be nice to test that this behaves as expected and does not leak (not being GCed) and does not rematerialize. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let some comments, given the potential issue of 'growing memory use' as a side effect it would be really nice to add some test(s).
0b542a9
to
cc404ac
Compare
After trying several approaches we decided to keep it simple and go with expireAfterAccess to drop values from cache. Solution with weak values didn't bring desired behavior because when With expireAfterAccess side input is de-serialized only once. I chose 5 min eviction duration as best compromise but I am open to discussion if it should be configurable. Disadvantage for expireAfterAccess solution could be potential higher memory consumption if |
Run Spark ValidatesRunner |
1 similar comment
Run Spark ValidatesRunner |
8c5c5b0
to
72aee8e
Compare
Run Java PreCommit |
72aee8e
to
0e32d96
Compare
Run Spark ValidatesRunner |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I let two comments on possible issues that I am not 100% sure if they are correct with this PR.
Pinging also @amitsela to see if he has something to say in particular in the static state. Thanks!
class SideInputStorage { | ||
|
||
/** JVM deserialized side input cache. */ | ||
private static final Cache<Key<?>, Optional<?>> materializedSideInputs = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am a bit worried on the possible consequence of a collision of the Key<view, window>
tuple in particular if a bad implementation of equals is around. This is not relative to this PR but since the state is now static this makes the likelihood of this happening bigger.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how do other runners cache side inputs? by which key? this sounds like something the SDK could provide guidance on (@kennknowles)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The window should use windowCoder.structuralValue(window)
which is required to behave identically to a full serialization for shuffle purposes. The view itself is just a tag so it should have good enough equals as-is. There is already caching in the now-donated Dataflow Java worker, if you look through uses and subclasses of SideInputReader
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also think you have bigger troubles if you have collision in view
or window
(on more places sparkRunner
relies on that) so I will leave it as it is. Is that ok?
@@ -86,9 +55,27 @@ private CachedSideInputReader(SideInputReader delegate) { | |||
@Override | |||
public <T> T get(PCollectionView<T> view, BoundedWindow window) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am worried on the possible semantics consequences of CachedSideInputReader.get()
returning a null
value when it is not in the Cache. Wouldn't it imply that a window could get an empty side input assigned?
The documentation on this is not really clear (pinging @kennknowles to see if I am misreading it).
Wonder if there is a test to validate that this cannot happen or if we can create one somehow?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, the meaning of that comment is that null
is a value. You can have a PCollection<@Nullable Foo>
that contains just one copy of null
and use View.asSingleton()
and the side input returns the null
.
In other words, get
must return a value of type T
. But the type T
may itself be @Nullable Something
. The annotation on SideInputReader
should be removed. It is is incorrect if we use a static analysis that understands this. Findbugs does not understand this but we should aspire for our annotations to be correct so the documentation is clear.
@@ -85,6 +88,13 @@ private SideInputBroadcast createBroadcastHelper( | |||
PCollectionView<?> view, JavaSparkContext context) { | |||
Tuple2<byte[], Coder<Iterable<WindowedValue<?>>>> tuple2 = pviews.get(view); | |||
SideInputBroadcast helper = SideInputBroadcast.create(tuple2._1, tuple2._2); | |||
String pCollectionName = | |||
view.getPCollection() != null ? view.getPCollection().getName() : "UNKNOWN"; | |||
LOG.info( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe LOG.debug
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed to debug
import java.util.HashMap; | ||
import java.util.Map; | ||
import java.util.Objects; | ||
import com.google.common.cache.Cache; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Vendored Guava?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed to vendored
@@ -86,9 +55,27 @@ private CachedSideInputReader(SideInputReader delegate) { | |||
@Override | |||
public <T> T get(PCollectionView<T> view, BoundedWindow window) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, the meaning of that comment is that null
is a value. You can have a PCollection<@Nullable Foo>
that contains just one copy of null
and use View.asSingleton()
and the side input returns the null
.
In other words, get
must return a value of type T
. But the type T
may itself be @Nullable Something
. The annotation on SideInputReader
should be removed. It is is incorrect if we use a static analysis that understands this. Findbugs does not understand this but we should aspire for our annotations to be correct so the documentation is clear.
SizeEstimator.estimate(result)); | ||
return Optional.ofNullable(result); | ||
}); | ||
return optionalResult.orElse(null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ismaël is right. The delegate.get
is not @Nullable
at this place in the abstraction. You don't need to check for it (unless there's some other bug somewhere) and you shouldn't convert Optional.absent()
to null
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I chose this solution because guava Cache
doesn't allow null values and I didn't realize I will break semantic meaning. I will try to find out different solution.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, thank you for clarifying. That is a good attempt. The problem is that these will incorrectly be turned into the same thing:
Optional.ofNullable(null).orElse(null) == null
Optional.ofNullable(Optional.absent()).orElse(null) == null
The fact that Optional.of(null)
throws NPE is a mistake in the design (both Java and Guava). Maybe the point of the design is to convince people to not use null
, which is a billion dollar good idea. But it makes Optional<T>
not correctly parametric in T
.
I think that if you actually convert null
into Optional.of(Optional.absent())
and other values v
into Optional.of(Optional.of(v))
you can simulate the behavior it should have had in the first place. Or you could make your own little replacement of Optional
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for suggestions, Optional combo would be not very readable.
I made my own wrapper where I simply wrap the value so I can put null
into the cache.
https://github.com/apache/beam/pull/7091/files#diff-b123f0f1ca9646966a641a458b74cfbcR92
0e32d96
to
ddfe7dd
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, will do some minor touches and rebase manually to merge. Thanks a lot @mareksimunek and @dmvk !
Merged! |
We should try to reuse deserialized side inputs among spark tasks.
Follow this checklist to help us incorporate your contribution quickly and easily:
[BEAM-XXX] Fixes bug in ApproximateQuantiles
, where you replaceBEAM-XXX
with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.It will help us expedite review of your Pull Request if you tag someone (e.g.
@username
) to look at it.Post-Commit Tests Status (on master branch)