Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-6521: Use timestamped stores for KTables #6667

Merged
merged 2 commits into from
May 12, 2019

Conversation

mjsax
Copy link
Member

@mjsax mjsax commented May 2, 2019

The tests are failing because #6661 is not merged yet..

The goal of this PR is to swap out, KeyValueStore and WindowStore with the corresponding new TimestampedXxxStore in the DSL where appropriate. We don't leverage the timestamps yet in the Processors, to keep the PR contained. Thus, no semantic change yet.

@mjsax mjsax added the streams label May 2, 2019
@mjsax
Copy link
Member Author

mjsax commented May 2, 2019

Call for early review @guozhangwang @bbejeck @vvcephei @ableegoldman @cadonna @abbccdda

@@ -51,6 +51,6 @@ void initStoreSerde(final ProcessorContext context) {
serdes = new StateSerdes<>(
ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()),
keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
valueSerde == null ? new ValueAndTimestampSerde<>((Serde<V>) context.keySerde()) : valueSerde);
valueSerde == null ? new ValueAndTimestampSerde<>((Serde<V>) context.valueSerde()) : valueSerde);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix bug that was exposed with this PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oof. Good catch!

Copy link
Contributor

@vvcephei vvcephei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR, @mjsax . I guess the idea here is to just mix in the new (timestamped) stores without changing any semantics? And then we'll have other PR(s) later to update the semantics and tests? This seems like a good way to break it down.

Just a few comments.

}

@Override
public V get(final K key) {
return store.get(key);
final ValueAndTimestamp<V> valueAndTimestamp = store.get(key);
return valueAndTimestamp == null ? null : valueAndTimestamp.value();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic has come up once or twice ;)

Just a random idea... What do you think about a static method on ValueAndTimestamp:

static <V> V getValueOrNull(final ValueAndTimestamp<V> valueAndTimestamp) {
  return valueAndTimestamp == null ? null : valueAndTimestamp.value();
}

Then all these other methods could be:

Suggested change
return valueAndTimestamp == null ? null : valueAndTimestamp.value();
return getValueOrNull(valueAndTimestamp);

?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack. I'll just update the KIP accordingly. Turn out, we already have ValueAndTimestamp#make and no public constructor (also not documented on the KIP).

@@ -51,6 +51,6 @@ void initStoreSerde(final ProcessorContext context) {
serdes = new StateSerdes<>(
ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()),
keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
valueSerde == null ? new ValueAndTimestampSerde<>((Serde<V>) context.keySerde()) : valueSerde);
valueSerde == null ? new ValueAndTimestampSerde<>((Serde<V>) context.valueSerde()) : valueSerde);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oof. Good catch!

@@ -36,7 +36,7 @@ public TimestampedWindowStoreBuilder(final WindowBytesStoreSupplier storeSupplie
final Serde<K> keySerde,
final Serde<V> valueSerde,
final Time time) {
super(storeSupplier.name(), keySerde, new ValueAndTimestampSerde<>(valueSerde), time);
super(storeSupplier.name(), keySerde, valueSerde == null ? null : new ValueAndTimestampSerde<>(valueSerde), time);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found it useful with the FullChangeSerde to make the constructor private and force usage via a null-safe factory method. Then, you never have to worry about this check.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems to be a single occurrence. Maybe not worth a factory. Also, ValueAndTimestampSerde checks for non-null and thus we would surface any issue during testing anyway.

@mjsax
Copy link
Member Author

mjsax commented May 6, 2019

Updated this.

Copy link
Contributor

@vvcephei vvcephei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update!

Just another minor comment on ValueAndTimestamp

@@ -29,8 +29,8 @@
private final V value;
private final long timestamp;

private ValueAndTimestamp(final V value,
final long timestamp) {
protected ValueAndTimestamp(final V value,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the only usage of the constructor is in this class, it should be ok to be private.

Also, since it's just a data struct, we could also make the class final.

@mjsax
Copy link
Member Author

mjsax commented May 7, 2019

Updated this.

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already import the class on L26, so let's remove this import.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is for statically use this single util function (otherwise we have to call ValueAndTimestamp.getValueOrNull when calling).

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;

import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here.

}

@Override
public void close() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is having curly braces in one line a convention?

@@ -117,6 +120,7 @@ public void process(final K key, final Change<V> change) {
}
}


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems unnecessary L123

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like to have some more line to separate nested classes from method

* Forwarding by this class only occurs when caching is not enabled. If caching is enabled,
* forwarding occurs in the flush listener when the cached store flushes.
*
* @param <K>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could omit L28-29 since we don't want to comment on it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good eye. It would be even better to document the parameters!

public void maybeForward(final K key,
final V newValue,
final V oldValue) {
if (cachingEnabled) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about

if (!cachingEnabled) {
    context.forward(key, new Change<>(newValue, oldValue));
}

*
* @param valueAndTimestamp a {@link ValueAndTimestamp} instance; can be {@code null}
* @param <V> the type of the value
* @return the wrapped {@code value} of {@code valueAndTimestamp} is not {@code null}; otherwise {@code null}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/is not/if not

@bbejeck
Copy link
Contributor

bbejeck commented May 7, 2019

#6661 has been merged now.

retest this please

Copy link
Contributor

@bbejeck bbejeck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @mjsax LGTM

@guozhangwang
Copy link
Contributor

retest this please

Copy link
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some minor comments.

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is for statically use this single util function (otherwise we have to call ValueAndTimestamp.getValueOrNull when calling).

@@ -88,14 +95,13 @@ public void process(final K key, final V value) {
}

// update the store with the new value
store.put(key, newAgg);
store.put(key, ValueAndTimestamp.make(newAgg, context().timestamp()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Today we store the timestamp in cache entry and upon flushing we call entry.entry().context().timestamp() to set the timestamp. Would that be replaced with the underlying store's timestamp then?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe. After the store refactoring, I realized that we might actually be able to revert #6147 -- however, I would prefer to do this as cleanup only after everything else is merged (to make sure it's really safe).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While working on a follow PR, I figure out the details. The TimestampedForwardingCacheFlushListener needs to extract the timestamp from the value, and set it as record timestamp. Similar to SessionCacheFlushListener in another PR: https://github.com/apache/kafka/pull/6645/files#diff-4039efe54e3b87d65c20bd7d14c1b149R43

@@ -105,8 +109,13 @@ public void process(final K key, final Change<V> change) {
final V1 oldValue = sendOldValues ? valueTransformer.transform(key, change.oldValue) : null;
context().forward(key, new Change<>(newValue, oldValue));
} else {
final V1 oldValue = sendOldValues ? store.get(key) : null;
store.put(key, newValue);
final V1 oldValue;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we can still use the ternary operator here.


import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;

class TimestampedForwardingCacheFlushListener<K, V> implements CacheFlushListener<K, ValueAndTimestamp<V>> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we just remove the old CacheFlushListener / TupleForwarder since they are internal classes only used in the DSL?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We still need the old one, because we don't use TimestampedSessionWindowStore in the DSL but keep SessionWindowStore.

import org.apache.kafka.test.GenericInMemoryKeyValueStore;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.test.GenericInMemoryTimestampedKeyValueStore;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we replace other uses of GenericInMemoryKeyValueStore so we can remove the old class as well?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure. We don't remove "plain" stores, and thus it seems to make sense to keep existing tests and only rewrite DSL related tests?

@mjsax mjsax force-pushed the kafka-6521-add-timestamps-to-ktables branch from 7048ea9 to 161d6fc Compare May 8, 2019 14:48
@mjsax
Copy link
Member Author

mjsax commented May 8, 2019

Updated and rebased this.

store = new KeyValueToTimestampedKeyValueByteStoreAdapter(store);
} else {
store = new InMemoryTimestampedKeyValueStoreMarker(store);
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This fixes a bug that was exposed after rebasing. We need to treat in-memory stores always as TimestampedByteStores to guarantee correct restore. (Similar below for window stores.)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a new test to #6179 that also exposed this bug (cf. StoreUpgradeIntegrationTest)

Copy link
Contributor

@bbejeck bbejeck May 9, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with the fix, but it raises another question for me.

I admit I could be either missing something or forgetting some details already spelled out. Why don't we convert the values for in-memory stores to also have timestamps in the value? I'm basing on the fact that the InMemoryTimestampedKeyValueStoreMarker does simple puts/gets.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The in-memory store will have no content on startup, and the upper layer will put/get byte[] -- the store does not care what is in it. The upper layer will already serialize value+timestamp and put those, and expects value+timestamp byte[] back -- hence, the in-memory store does not need to do any translation.

For persistent store, the store does have content on local disk, namely plain values. Because this store cannot be upgraded, we put the adapter in between, to remove the timestamp part on put() and add dummy timestamp on get(), to meet the contract to the upper layer that expects byte[] to be value+timestamp.

If we restore a persistent store, the just copy plain key/value from the changelog topic and put it into the store. Everything still works as expected.

If we restore an in-memory store, we need to add the timestamp to the value, and this is triggered by the TimestampeByteStore interface. Otherwise, the in-memory store would contain plain values, and violate the contract to the upper layer, because the assumption is, that everything store in it, has value+timestamp format.

Ie, for persistent store, we play the backward compatibility game using surrogate timestamps. However, for in-memory store, we don't need to do this, because there is no local on-disk content and we can switch to timestamped-store behavior without any drawbacks and get full feature set.

Does this make sense?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep thanks for clarifying

@@ -319,7 +321,7 @@ public void shouldRestoreToWindowedStores() throws IOException {
changelogName,
1,
offset,
start,
end,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"exposed" after rebase -- for time-windows, we want to use the end-timestamp (even if this change is not implemented yet, I added it here, to avoid touching this class later on again).

@@ -46,14 +44,16 @@
@Mock(type = MockType.NICE)
private KeyValueBytesStoreSupplier supplier;
@Mock(type = MockType.NICE)
private KeyValueStore<Bytes, byte[]> inner;
private RocksDBTimestampedStore inner;
Copy link
Member Author

@mjsax mjsax May 8, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new wrapping would break the test. Using RocksDBTimestampedStore avoid the wrapping as TimestampedByteStore interface is implemented. (Same below.)

@mjsax mjsax force-pushed the kafka-6521-add-timestamps-to-ktables branch from 161d6fc to 73f05da Compare May 8, 2019 22:42
@mjsax
Copy link
Member Author

mjsax commented May 8, 2019

Rebased to resolve merge conflicts.

@mjsax mjsax force-pushed the kafka-6521-add-timestamps-to-ktables branch from 73f05da to 92c0f6f Compare May 11, 2019 12:03
@@ -193,7 +194,7 @@
}
supplier = Stores.persistentSessionStore(
materialized.storeName(),
retentionPeriod
Duration.ofMillis(retentionPeriod)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor fix on-the-side to not use deprecated method any longer.

context.forward(
key,
new Change<>(getValueOrNull(newValue), getValueOrNull(oldValue)),
To.all().withTimestamp(newValue != null ? newValue.timestamp() : timestamp));
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New fix: On flush, we take the timestamp from the newValue if it exists.

this.context = context;
this.sendOldValues = sendOldValues;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New fix: TupleForwarder should obey sendOldValues, too.


class CachingKeyValueStore
public class CachingKeyValueStore
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For testing...

KeyValueBytesStoreSupplier supplier = (KeyValueBytesStoreSupplier) materialized.storeSupplier();
if (supplier == null) {
final String name = materialized.storeName();
supplier = Stores.persistentKeyValueStore(name);
supplier = Stores.persistentTimestampedKeyValueStore(name);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New: slipped in the first commit of this PR... discovered while working on a follow up PR.

@mjsax
Copy link
Member Author

mjsax commented May 11, 2019

Rebased to resolve merge conflict. Also added some more fixes and tests.

@mjsax mjsax force-pushed the kafka-6521-add-timestamps-to-ktables branch from b79b478 to 21e5dee Compare May 11, 2019 12:26
@guozhangwang
Copy link
Contributor

LGTM. Please feel free to merge after green builds.

@guozhangwang
Copy link
Contributor

retest this please

@mjsax mjsax merged commit 8649717 into apache:trunk May 12, 2019
@mjsax mjsax deleted the kafka-6521-add-timestamps-to-ktables branch May 12, 2019 09:51
omkreddy added a commit to confluentinc/kafka that referenced this pull request May 13, 2019
…es-14-May

* AK_REPO/trunk: (24 commits)
  KAFKA-7321: Add a Maximum Log Compaction Lag (KIP-354) (apache#6009)
  KAFKA-8335; Clean empty batches when sequence numbers are reused (apache#6715)
  KAFKA-6455: Session Aggregation should use window-end-time as record timestamp (apache#6645)
  KAFKA-6521: Use timestamped stores for KTables (apache#6667)
  [MINOR] Consolidate in-memory/rocksdb unit tests for window & session store (apache#6677)
  MINOR: Include StickyAssignor in system tests (apache#5223)
  KAFKA-7633: Allow Kafka Connect to access internal topics without cluster ACLs (apache#5918)
  MINOR: Align KTableAgg and KTableReduce (apache#6712)
  MINOR: Fix code section formatting in TROGDOR.md (apache#6720)
  MINOR: Remove unnecessary OptionParser#accepts method call from PreferredReplicaLeaderElectionCommand (apache#6710)
  KAFKA-8352 : Fix Connect System test failure 404 Not Found (apache#6713)
  KAFKA-8348: Fix KafkaStreams JavaDocs (apache#6707)
  MINOR: Add missing option for running vagrant-up.sh with AWS to vagrant/README.md
  KAFKA-8344; Fix vagrant-up.sh to work with AWS properly
  MINOR: docs typo in '--zookeeper myhost:2181--execute'
  MINOR: Remove header and key/value converter config value logging (apache#6660)
  KAFKA-8231: Expansion of ConnectClusterState interface (apache#6584)
  KAFKA-8324: Add close() method to RocksDBConfigSetter (apache#6697)
  KAFKA-6789; Handle retriable group errors in AdminClient API (apache#5578)
  KAFKA-8332: Refactor ImplicitLinkedHashSet to avoid losing ordering when converting to Scala
  ...
pengxiaolong pushed a commit to pengxiaolong/kafka that referenced this pull request Jun 14, 2019
Reviewers: John Roesler <john@confluent.io>, Boyang Chen <boyang@confluent.io>, Bill Bejeck <bill@confluent.io>, Guozhang Wang <guozhang@confluent.io>
@mjsax mjsax added the kip Requires or implements a KIP label Jun 12, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
kip Requires or implements a KIP streams
Projects
None yet
5 participants