New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-10212] Add caching state client wrapper #15170
Conversation
R: @amaliujia |
R: @lukecwik |
cc @tysonjh |
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/CachingBeamFnStateClient.java
Outdated
Show resolved
Hide resolved
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/CachingBeamFnStateClient.java
Outdated
Show resolved
Hide resolved
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/CachingBeamFnStateClient.java
Outdated
Show resolved
Hide resolved
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/CachingBeamFnStateClient.java
Show resolved
Hide resolved
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/CachingBeamFnStateClient.java
Outdated
Show resolved
Hide resolved
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/CachingBeamFnStateClient.java
Outdated
Show resolved
Hide resolved
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/CachingBeamFnStateClient.java
Outdated
Show resolved
Hide resolved
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/CachingBeamFnStateClient.java
Outdated
Show resolved
Hide resolved
|
||
switch (request.getRequestCase()) { | ||
case GET: | ||
if (ByteString.EMPTY.equals(request.getGet().getContinuationToken())) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks like we are only ever caching the first page and subsequent ones aren't handled.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So should we cache all pages or just the first page?
if (cachedFirstPage == null) { | ||
CompletableFuture<StateResponse> responseFuture = new CompletableFuture<>(); | ||
beamFnStateClient.handle( | ||
StateRequest.newBuilder() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Until we get the rest working, let us treat APPEND as a cache eviction instead. Handling APPEND is more complicated since we want to merge existing data with the new data without needing to have a "get" call and there are a couple of scenarios (e.g., there is no existing data, the last page is still cached, ...).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
left append as a regular call to the state client for now
Run Java PreCommit |
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/CachingBeamFnStateClient.java
Outdated
Show resolved
Hide resolved
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/CachingBeamFnStateClient.java
Show resolved
Hide resolved
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/CachingBeamFnStateClient.java
Outdated
Show resolved
Hide resolved
} | ||
|
||
@AutoValue | ||
public abstract static class StateCacheKey { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add comments please. Note: every class, public method, and tricky bits of code should have comments. There are some exceptions of course, like getters/setters
or trivial self explanatory methods.
If you're curious about learning more, you can refer to Google's public style guide for some good practices: https://google.github.io/styleguide/javaguide.html
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/CachingBeamFnStateClient.java
Outdated
Show resolved
Hide resolved
@lukecwik @amaliujia @tysonjh made some fix-up changes, let me know if you have any other suggestions! |
// If data is not cached, add callback to add response to cache on completion. | ||
// Otherwise, complete the response with the cached data. | ||
if (cachedPage == null) { | ||
beamFnStateClient.handle(requestBuilder, response); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Question: how is the GET served when cache miss? When cache hits, you have response.complete
, do you need the same for cache miss after you put the state into the cache?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The response.complete is executed by the beamFnStateClient.handle call here, but if it is a cache hit we can complete the response immediately so we do not need to forward the request to the delegate state client
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then can you confirm that after complete, your thenAccept will still be executed? (I am not a async API expert so I am not sure).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/CachingBeamFnStateClient.java
Outdated
Show resolved
Hide resolved
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/CachingBeamFnStateClient.java
Show resolved
Hide resolved
public class CachingBeamFnStateClient implements BeamFnStateClient { | ||
|
||
private final BeamFnStateClient beamFnStateClient; | ||
private final LoadingCache<StateKey, Map<StateCacheKey, StateGetResponse>> stateCache; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It doesn't seem like we are using the Loading part of LoadingCache, did you mean to have the CachingBeamFnStateClient perform the loading?
This would generally be a good thing since requests that go for the same state key could resolve down to a single request to the runner instead of multiple.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Leaving this for a future PR
retest this please |
@lukecwik addressed comments if you want to take another look |
Run Java PreCommit |
1 similar comment
Run Java PreCommit |
As Luke is OOO for weeks and comments have been addressed. I will merge this PR to unblock Anthony. If there are still other issues, we can address those in patches. Failed Java Tests seems have been fixed. I will give it another try. |
Run Java_Examples_Dataflow_Java11 PreCommit |
* [BEAM-10212] Add caching state client wrapper.
* [BEAM-10212] Add caching state client wrapper.
Introduces a caching state wrapper that stores delegate state client responses in the provided cache.
Unit tests test caching of user and side input state, and test that appends and clears are reflected in the cache.
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
R: @username
).[BEAM-XXX] Fixes bug in ApproximateQuantiles
, where you replaceBEAM-XXX
with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
ValidatesRunner
compliance status (on master branch)Examples testing status on various runners
Post-Commit SDK/Transform Integration Tests Status (on master branch)
Pre-Commit Tests Status (on master branch)
See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI.