Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-10157] [State TTL] Allow `null` user values in map state with TTL #6707

Closed
wants to merge 8 commits into from

Conversation

@azagrebin
Copy link
Contributor

azagrebin commented Sep 17, 2018

What is the purpose of the change

This change allows to store null values with TTL and aligns the semantics of map state TTL get/contains methods with the map state w/o TTL.

Brief change log

  • allow null user value in TtlValue
  • add tests for null user values in Map state
  • add NullableSerializer to wrap serializers w/o null support but where null values are needed in map state
  • doc notes

Verifying this change

(Please pick either of the following options)

This change is a trivial rework / code cleanup without any test coverage.

(or)

This change is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end deployment with large payloads (100MB)
  • Extended integration test for recovery after master (JobManager) failure
  • Added test that validates that TaskInfo is transferred only once across recoveries
  • Manually verified the change by running a 4 node cluser with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes / no)
  • The serializers: (yes / no / don't know)
  • The runtime per-record code paths (performance sensitive): (yes / no / don't know)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know)
  • The S3 file system connector: (yes / no / don't know)

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)
@azagrebin azagrebin force-pushed the azagrebin:FLINK-10157 branch from e3309a6 to 7e61b39 Sep 17, 2018
@azagrebin

This comment has been minimized.

Copy link
Contributor Author

azagrebin commented Sep 17, 2018

Copy link
Contributor

Clarkkkkk left a comment

Looks good to me. Just one little suggestion.


private TtlValue<UV> getWrapped(UK key) throws Exception {
return getWrappedWithTtlCheckAndUpdate(
() -> original.get(key), v -> original.put(key, v), () -> original.remove(key));

This comment has been minimized.

Copy link
@Clarkkkkk

Clarkkkkk Sep 19, 2018

Contributor

Is it necessary to add a private method here?
TtlValue ttlValue = getWrappedWithTtlCheckAndUpdate(
() -> original.get(key), v -> original.put(key, v), () -> original.remove(key));
return ttlValue == null ? null : ttlValue.getUserValue();
looks better to me.

This comment has been minimized.

Copy link
@azagrebin

azagrebin Sep 19, 2018

Author Contributor

I added it for code reuse because get and contains needed the same code.

Copy link
Contributor

StefanRRichter left a comment

Besides the inline comments, the PR looks good to me as a workaround to the problem. In the long run, we should get rid of the compulsory additional byte in map state, as should have happened in the original PR. You could create a ticket to change this for a breaking release so that it will not be forgotten.


@Override
public TypeSerializer<T> duplicate() {
return new NullableSerializer<>(originalSerializer.duplicate());

This comment has been minimized.

Copy link
@StefanRRichter

StefanRRichter Sep 19, 2018

Contributor

I would only return a new serializer if originalSerializer.duplicate() != originalSerializer, otherwise this.

try {
originalSerializer.serialize(null, new DataOutputSerializer(1));
Preconditions.checkArgument(originalSerializer.copy(null) == null);
} catch (NullPointerException | IOException e) {

This comment has been minimized.

Copy link
@StefanRRichter

StefanRRichter Sep 19, 2018

Contributor

I wonder if we should be broader and catch IOException and RuntimeException, e.g. some people might wrongly throw IllegalArgumentException in their serializer.

This comment has been minimized.

Copy link
@StefanRRichter

StefanRRichter Sep 19, 2018

Contributor

This triggered another idea, maybe we should also test for this in the SerializerTestBase, for example that null either works or throws an "acceptable" exception. In case that it works, we could also check other properties, e.g. that if the serializer declares a fixed size also the serialized bytes for null have the same size.

private final TypeSerializer<T> originalSerializer;

private NullableSerializer(TypeSerializer<T> originalSerializer) {
Preconditions.checkNotNull(originalSerializer, "The original serializer cannot be null");

This comment has been minimized.

Copy link
@StefanRRichter

StefanRRichter Sep 19, 2018

Contributor

For my personal taste, I would prefer @Nonnull on the field.


@Override
public boolean canEqual(Object obj) {
return (obj != null && obj.getClass() == getClass());

This comment has been minimized.

Copy link
@StefanRRichter

StefanRRichter Sep 19, 2018

Contributor

...and also check if the originalSerializer canEqual?

@@ -350,6 +350,9 @@ will lead to compatibility failure and `StateMigrationException`.

- The TTL configuration is not part of check- or savepoints but rather a way of how Flink treats it in the currently running job.

- The map state with TTL currently supports null user values only if the user value serializer can handle null values.

This comment has been minimized.

Copy link
@StefanRRichter

StefanRRichter Sep 19, 2018

Contributor

Could highlight plaintext null as null (multiple instances).

@@ -32,6 +32,12 @@

/**
* Configuration of state TTL logic.
*
* <p>Note: The map state with TTL currently supports null user values

This comment has been minimized.

Copy link
@StefanRRichter

StefanRRichter Sep 19, 2018

Contributor

Do you think this is the best position for this comment or rather on MapStateDescriptor (where the serializer is supplied) or maybe even both?


private static <T> boolean checkIfNullSupported(TypeSerializer<T> originalSerializer) {
try {
originalSerializer.serialize(null, new DataOutputSerializer(1));

This comment has been minimized.

Copy link
@StefanRRichter

StefanRRichter Sep 19, 2018

Contributor

Would it make sense to check that a full cycle of serialize(null) -> deserialize(bytes) returns as null?

This comment has been minimized.

Copy link
@StefanRRichter

StefanRRichter Sep 19, 2018

Contributor

Could initialize new DataOutputSerializer(serializer.getLength()) for positive length.

import static org.junit.Assert.assertTrue;

/** Unit tests for {@link NullableSerializer}. */
public class NullableSerializerTest {

This comment has been minimized.

Copy link
@StefanRRichter

StefanRRichter Sep 19, 2018

Contributor

I think you might also extend TestLogger and probably TypeSerializerTestBase as well. The second could also be a separate test class.

@azagrebin azagrebin force-pushed the azagrebin:FLINK-10157 branch from 9df896d to ea5db0c Sep 20, 2018
@azagrebin

This comment has been minimized.

Copy link
Contributor Author

azagrebin commented Sep 20, 2018

Thanks for the review @StefanRRichter, I updated the PR

private final TypeSerializer<T> originalSerializer;

private NullableSerializer(TypeSerializer<T> originalSerializer) {
private NullableSerializer(@Nonnull TypeSerializer<T> originalSerializer) {
Preconditions.checkNotNull(originalSerializer, "The original serializer cannot be null");

This comment has been minimized.

Copy link
@StefanRRichter

StefanRRichter Sep 20, 2018

Contributor

You could also remove the Preconditions check now that the annotation is in place.

DataOutputSerializer dos = new DataOutputSerializer(length);
serializer.serialize(null, dos);
DataInputDeserializer dis = new DataInputDeserializer(dos.getSharedBuffer());
Preconditions.checkArgument(serializer.deserialize(dis) == null);

This comment has been minimized.

Copy link
@StefanRRichter

StefanRRichter Sep 20, 2018

Contributor

Both precondition checks will just be caught by the following catch block. Depending on what you intend to do, the check should go outside the block, or rather result in false value directly (I assume the second case is what you want).

This comment has been minimized.

Copy link
@azagrebin

azagrebin Sep 20, 2018

Author Contributor

Catching exceptions will also produce false which will also result in returning false. But I think they should be rather moved out because if serializer can serialize null then it must also properly deserialize and copy it. This can be a sanity check after try and before returning true. What do you think?

This comment has been minimized.

Copy link
@StefanRRichter

StefanRRichter Sep 20, 2018

Contributor

My main point is, you use precondition that could produce an exception and then immediately catch the exception and translate it to false. So my question is, why use precondition and not just an if that affects the result. Or in other words, why do we need to go through exceptions for this (nobody takes note that the false came from an exceptional case as well)?

This comment has been minimized.

Copy link
@azagrebin

azagrebin Sep 20, 2018

Author Contributor

ok, let's move preconditions out and leave them as a sanity check

@@ -116,7 +139,7 @@ public T copy(T from, T reuse) {
@Override
public int getLength() {
int len = originalSerializer.getLength();
return len < 0 ? len : len + 1;
return len == 0 ? 1 : -1;

This comment has been minimized.

Copy link
@StefanRRichter

StefanRRichter Sep 20, 2018

Contributor

This change seems to be a bug.

This comment has been minimized.

Copy link
@StefanRRichter

StefanRRichter Sep 20, 2018

Contributor

Followup question: how did it pass the tests?

This comment has been minimized.

Copy link
@azagrebin

azagrebin Sep 20, 2018

Author Contributor

The serializer does not have fixed length in common case now (1 or 1 + original length). It is fixed to 1 only of original length is zero. Let's add an option to pad the null value and preserve the fixed length if original serializer has the fixed length.

Copy link
Contributor

StefanRRichter left a comment

If have doubts about the changes in getLength(), please take a look at my comment. Otherwise looks good.

…original serializer has the fixed length
@azagrebin azagrebin force-pushed the azagrebin:FLINK-10157 branch from c040bfe to 8fcae00 Sep 20, 2018
private NullableSerializer(@Nonnull TypeSerializer<T> originalSerializer, boolean padNullValueIfFixedLen) {
this.originalSerializer = originalSerializer;
this.padNullValue = originalSerializer.getLength() > 0 && padNullValueIfFixedLen;
padding = padNullValue ? new byte[originalSerializer.getLength()] : null;

This comment has been minimized.

Copy link
@StefanRRichter

StefanRRichter Sep 20, 2018

Contributor

You could use byte[0] for no padding and get rid of null/flag checks as well as the padNullValue flag via a method isPadding { return padding.length > 0;}. Then also annotating the field as @Nonnull and just always call the method that writes padding to remove a couple branches.

azagrebin added 2 commits Sep 21, 2018
@azagrebin azagrebin force-pushed the azagrebin:FLINK-10157 branch from 2319373 to b073070 Sep 21, 2018
@StefanRRichter

This comment has been minimized.

Copy link
Contributor

StefanRRichter commented Sep 21, 2018

Thanks @azagrebin ! Changes look good to me, merging. 👍

@asfgit asfgit closed this in f343204 Sep 21, 2018
asfgit pushed a commit that referenced this pull request Sep 21, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
4 participants
You can’t perform that action at this time.