Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-14491: [8/N] Add serdes for ValueAndTimestamp with null value #13249

Merged
merged 2 commits into from
Feb 16, 2023

Conversation

vcrfxia
Copy link
Contributor

@vcrfxia vcrfxia commented Feb 15, 2023

As part of introducing versioned key-value stores in KIP-889, we'd like a way to represent a versioned key-value store (VersionedKeyValueStore<Bytes, byte[]>) as a regular key-value store (KeyValueStore<Bytes, byte[]>) in order to be compatible with existing DSL methods for passing key-value stores, e.g., StreamsBuilder#table() and KTable methods, which are explicitly typed to accept Materialized<K, V, KeyValueStore<Bytes, byte[]>. This way, we do not need to introduce new versions of all relevant StreamsBuilder and KTable methods to relax the Materialized type to accept versioned stores.

As part of representing a versioned key-value store as a regular key-value bytes store, we need a way to serialize a value and timestamp as a single byte array, where the value may be null (in order to represent putting a tombstone with timestamp into the versioned store). This PR introduces the serdes for doing so. See #13250 for its usage.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@Override
public byte[] serialize(final String topic, final Boolean data) {
if (data == null) {
// actually want to return null here but spotbugs won't allow deserialization so
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if I can follow? We should return null and just make sure spotbug does not mess with us. What does it complain about?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resolved below.

@Override
public Boolean deserialize(final String topic, final byte[] data) {
if (data == null) {
// actually want to return null here but spotbugs won't allow it (NP_BOOLEAN_RETURN_NULL)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah. Here is the error. We should just add an exception for this error (cf gradle/spotbugs-exclude.xml)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, added an exception for this particular class.


final byte[] nonNullRawValue = rawValue == null ? new byte[0] : rawValue;
return ByteBuffer
.allocate(rawTimestamp.length + rawIsTombstone.length + nonNullRawValue.length)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both rawTimestamp.length and rawIsTombstone.length are constants, right? Should we do allocate(8L + 1L + nonNullRawValueLength)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes that's correct. I extracted the expected lengths into static variables and added the relevant checks in order to be defensive against serialization surprises.

return ValueAndTimestamp.makeAllowNullable(null, timestamp);
} else {
final V value = valueDeserializer.deserialize(topic, rawValue(rawValueAndTimestamp));
return ValueAndTimestamp.makeAllowNullable(value, timestamp);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we call make() here? value should never be null for this case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're correct that value should never be null here, assuming valid deserializer implementations which never deserialize non-null into null.

Regarding updating this call to make(), make() does not throw if value == null, instead it returns null. In the event of a buggy deserializer implementation which returns null when it shouldn't, calling make() here and returning null will cause cascading failures elsewhere -- for example, the changelogging layer uses this deserializer to obtain the value to write to the changelog (see #13251) and will throw an exception if it gets a null ValueAndTimestamp from the deserializer.

I suppose I can update this to make() and throw an explicit exception if value == null. I'll do that.

}

@Test
public void shouldSerializeNonNullWithEmptyBytes() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test does not really verify if we get "empty bytes"? I am also not sure if "" is actually empty, as it would encode "string length as zero" ?

I believe you want to test the boolean isTombstone flag here? Mabye we need a custom Serde for the value to force an empty byte array as serialization output?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

StringSerializer serializes an empty string as byte[0], which is exactly what this test case is for (isTombstone will be set to false, which is how byte[0] is distinguished from a null value).

Did I misunderstand your question?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that was the question. I was just not sure if it would do this. SG.

Copy link
Contributor Author

@vcrfxia vcrfxia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks as always for your review, @mjsax ! Pushed a new commit just now with the requested changes.

return ValueAndTimestamp.makeAllowNullable(null, timestamp);
} else {
final V value = valueDeserializer.deserialize(topic, rawValue(rawValueAndTimestamp));
return ValueAndTimestamp.makeAllowNullable(value, timestamp);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're correct that value should never be null here, assuming valid deserializer implementations which never deserialize non-null into null.

Regarding updating this call to make(), make() does not throw if value == null, instead it returns null. In the event of a buggy deserializer implementation which returns null when it shouldn't, calling make() here and returning null will cause cascading failures elsewhere -- for example, the changelogging layer uses this deserializer to obtain the value to write to the changelog (see #13251) and will throw an exception if it gets a null ValueAndTimestamp from the deserializer.

I suppose I can update this to make() and throw an explicit exception if value == null. I'll do that.

@Override
public Boolean deserialize(final String topic, final byte[] data) {
if (data == null) {
// actually want to return null here but spotbugs won't allow it (NP_BOOLEAN_RETURN_NULL)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, added an exception for this particular class.

@Override
public byte[] serialize(final String topic, final Boolean data) {
if (data == null) {
// actually want to return null here but spotbugs won't allow deserialization so
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resolved below.


final byte[] nonNullRawValue = rawValue == null ? new byte[0] : rawValue;
return ByteBuffer
.allocate(rawTimestamp.length + rawIsTombstone.length + nonNullRawValue.length)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes that's correct. I extracted the expected lengths into static variables and added the relevant checks in order to be defensive against serialization surprises.

}

@Test
public void shouldSerializeNonNullWithEmptyBytes() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

StringSerializer serializes an empty string as byte[0], which is exactly what this test case is for (isTombstone will be set to false, which is how byte[0] is distinguished from a null value).

Did I misunderstand your question?

@mjsax mjsax merged commit dcaf95a into apache:trunk Feb 16, 2023
@vcrfxia vcrfxia deleted the kip-889-nullable-serde branch February 16, 2023 16:12
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
kip Requires or implements a KIP streams
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants