-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-1483] Support SetState in Flink runner #3309
Conversation
Run Flink ValidatesRunner |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Excellent set of changes! I had some inline comments and I think it would also be good if the changes related to SetState
were in a separate commit from the general changes to the state internals test.
I really like that you also ported the newest changes form the state internals test. 👍 I think it might make sense to have a common base class for the state internals test that has an abstract method getStateInternals()
and all the test methods. Then an actual implementation would just derive from that and only implement the method for creating the state internals. This way, we would keep the tests in sync for the different state internals/Runners. What do you think?
namespace.stringKey(), | ||
StringSerializer.INSTANCE, | ||
flinkStateDescriptor).keys(); | ||
return result == null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe, as a precaution, add if (result != null) return IsEmpty(result)
value.add("A"); | ||
value.add("B"); | ||
value.add("A"); | ||
assertFalse(value.addIfAbsent("B").read()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a bit worried about the semantics of addIfAbsent()
(and putIfAbsent()
in MapState
). If these lines were changed to
value.addIfAbsent("B");
assertTrue(value.contains("B").read());
I think this test would fail for the Flink StateInternals
. See my other comment on the actual addIfAbsent()
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, good point, another similar situation is:
//At first, there is no "B"
ReadableState<Boolean> readable = value.contains("B");
value.add("B");
Boolean result = readable.read();
The in-memory state will return false while the flink state will return true.
@kennknowles what do you think about these two cases?
} | ||
|
||
@Override | ||
public ReadableState<Boolean> addIfAbsent(final T t) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this satisfies the test but I think the test might not be thorough enough. (See my comment on the test, which is a copy of the original InMemoryStateInterhalsTest
.
If the user doesn't call read()
after calling this then the entry is actually never added to the set. I think that's not the correct behaviour (from my intuitive understanding of how this should work).
StringSerializer.INSTANCE, | ||
flinkStateDescriptor); | ||
boolean alreadyContained = state.contains(t); | ||
state.put(t, true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe only put if not already there.
Rebase and fix. |
Run Flink ValidatesRunner |
Perfect set of changes! I have nothing to add. 👍 I'll merge. |
Be sure to do all of the following to help us incorporate your contribution
quickly and easily:
[BEAM-<Jira issue #>] Description of pull request
mvn clean verify
.<Jira issue #>
in the title with the actual Jira issuenumber, if there is one.
Individual Contributor License Agreement.