Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package org.apache.kafka.streams.state.internals;

import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;
Expand Down Expand Up @@ -64,7 +63,6 @@
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency;
import static org.apache.kafka.streams.state.internals.Utils.keyBytes;

/**
* A Metered {@link KeyValueStore} wrapper that is used for recording operation metrics, and hence its
Expand All @@ -75,8 +73,6 @@
* @param <K>
* @param <V>
*/
// TODO: replace with new method in follow-up PR of KIP-1271
@SuppressWarnings("deprecation")
public class MeteredKeyValueStore<K, V>
extends WrappedStateStore<KeyValueStore<Bytes, byte[]>, K, V>
implements KeyValueStore<K, V>, MeteredStateStore {
Expand Down Expand Up @@ -120,11 +116,13 @@ public class MeteredKeyValueStore<K, V>
)
);

MeteredKeyValueStore(final KeyValueStore<Bytes, byte[]> inner,
final String metricsScope,
final Time time,
final Serde<K> keySerde,
final Serde<V> valueSerde) {
MeteredKeyValueStore(
final KeyValueStore<Bytes, byte[]> inner,
final String metricsScope,
final Time time,
final Serde<K> keySerde,
final Serde<V> valueSerde
) {
Comment on lines +119 to +125
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Offtopic

Question about formatting fix: do we have consistent code style across code base? I mean literally cli formatted or ide setting
I was able to find this: https://kafka.apache.org/community/developer/#streams-api , but I don't think that's enough to keep code style consistent

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No we don't have anything that would do strict enforcement...

super(inner);
this.metricsScope = metricsScope;
this.time = time != null ? time : Time.SYSTEM;
Expand All @@ -133,9 +131,8 @@ public class MeteredKeyValueStore<K, V>
}

@Override
public void init(final StateStoreContext stateStoreContext,
final StateStore root) {
internalContext = stateStoreContext instanceof InternalProcessorContext ? (InternalProcessorContext<?, ?>) stateStoreContext : null;
public void init(final StateStoreContext stateStoreContext, final StateStore root) {
internalContext = (InternalProcessorContext<?, ?>) stateStoreContext;
taskId = stateStoreContext.taskId();
initStoreSerde(stateStoreContext);
streamsMetrics = (StreamsMetricsImpl) stateStoreContext.metrics();
Expand All @@ -159,17 +156,27 @@ private void registerMetrics() {
deleteSensor = StateStoreMetrics.deleteSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
e2eLatencySensor = StateStoreMetrics.e2ELatencySensor(taskId.toString(), metricsScope, name(), streamsMetrics);
iteratorDurationSensor = StateStoreMetrics.iteratorDurationSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
StateStoreMetrics.addNumOpenIteratorsGauge(taskId.toString(), metricsScope, name(), streamsMetrics,
(config, now) -> numOpenIterators.sum());
StateStoreMetrics.addOldestOpenIteratorGauge(taskId.toString(), metricsScope, name(), streamsMetrics,
StateStoreMetrics.addNumOpenIteratorsGauge(
taskId.toString(),
metricsScope,
name(),
streamsMetrics,
(config, now) -> numOpenIterators.sum()
);
StateStoreMetrics.addOldestOpenIteratorGauge(
taskId.toString(),
metricsScope,
name(),
streamsMetrics,
(config, now) -> {
try {
final Iterator<MeteredIterator> iter = openIterators.iterator();
return iter.hasNext() ? iter.next().startTimestamp() : 0L;
} catch (final NoSuchElementException e) {
return 0L;
}
});
}
);
}

@Override
Expand All @@ -185,24 +192,33 @@ protected void initStoreSerde(final StateStoreContext context) {
final String storeName = name();
final String changelogTopic = ProcessorContextUtils.changelogFor(context, storeName, Boolean.FALSE);
serdes = StoreSerdeInitializer.prepareStoreSerde(
context, storeName, changelogTopic, keySerde, valueSerde, this::prepareValueSerdeForStore);
context,
storeName,
changelogTopic,
keySerde,
valueSerde,
this::prepareValueSerdeForStore
);
}

@SuppressWarnings("unchecked")
@Override
public boolean setFlushListener(final CacheFlushListener<K, V> listener,
final boolean sendOldValues) {
public boolean setFlushListener(final CacheFlushListener<K, V> listener, final boolean sendOldValues) {
final KeyValueStore<Bytes, byte[]> wrapped = wrapped();
if (wrapped instanceof CachedStateStore) {
return ((CachedStateStore<byte[], byte[]>) wrapped).setFlushListener(
record -> listener.apply(
record.withKey(serdes.keyFrom(record.key()))
.withValue(new Change<>(
record.value().newValue != null ? serdes.valueFrom(record.value().newValue) : null,
record.value().oldValue != null ? serdes.valueFrom(record.value().oldValue) : null,
record.value().isLatest
))
),
record -> {
final Change<byte[]> change = record.value();
listener.apply(
record
.withKey(serdes.keyFrom(record.key(), record.headers()))
.withValue(new Change<>(
change.newValue != null ? serdes.valueFrom(change.newValue, record.headers()) : null,
change.oldValue != null ? serdes.valueFrom(change.oldValue, record.headers()) : null,
change.isLatest
))
);
},
sendOldValues);
}
return false;
Expand Down Expand Up @@ -255,8 +271,8 @@ private <R> QueryResult<R> runRangeQuery(final Query<R> query,
RangeQuery<Bytes, byte[]> rawRangeQuery;
final ResultOrder order = typedQuery.resultOrder();
rawRangeQuery = RangeQuery.withRange(
keyBytes(typedQuery.getLowerBound().orElse(null), serdes),
keyBytes(typedQuery.getUpperBound().orElse(null), serdes)
serializeKey(typedQuery.getLowerBound().orElse(null)),
serializeKey(typedQuery.getUpperBound().orElse(null))
);
if (order.equals(ResultOrder.DESCENDING)) {
rawRangeQuery = rawRangeQuery.withDescendingKeys();
Expand Down Expand Up @@ -293,7 +309,7 @@ private <R> QueryResult<R> runKeyQuery(final Query<R> query,
final QueryResult<R> result;
final KeyQuery<K, V> typedKeyQuery = (KeyQuery<K, V>) query;
final KeyQuery<Bytes, byte[]> rawKeyQuery =
KeyQuery.withKey(keyBytes(typedKeyQuery.getKey(), serdes));
KeyQuery.withKey(serializeKey(typedKeyQuery.getKey()));
final QueryResult<byte[]> rawResult =
wrapped().query(rawKeyQuery, positionBound, config);
if (rawResult.isSuccess()) {
Expand All @@ -313,7 +329,7 @@ private <R> QueryResult<R> runKeyQuery(final Query<R> query,
public V get(final K key) {
Objects.requireNonNull(key, "key cannot be null");
try {
return maybeMeasureLatency(() -> outerValue(wrapped().get(keyBytes(key, serdes))), time, getSensor);
return maybeMeasureLatency(() -> deserializeValue(wrapped().get(serializeKey(key))), time, getSensor);
} catch (final ProcessorStateException e) {
final String message = String.format(e.getMessage(), key);
throw new ProcessorStateException(message, e);
Expand All @@ -325,7 +341,7 @@ public void put(final K key,
final V value) {
Objects.requireNonNull(key, "key cannot be null");
try {
maybeMeasureLatency(() -> wrapped().put(keyBytes(key, serdes), serdes.rawValue(value, new RecordHeaders())), time, putSensor);
maybeMeasureLatency(() -> wrapped().put(serializeKey(key), serializeValue(value)), time, putSensor);
maybeRecordE2ELatency();
} catch (final ProcessorStateException e) {
final String message = String.format(e.getMessage(), key, value);
Expand All @@ -338,7 +354,7 @@ public V putIfAbsent(final K key,
final V value) {
Objects.requireNonNull(key, "key cannot be null");
final V currentValue = maybeMeasureLatency(
() -> outerValue(wrapped().putIfAbsent(keyBytes(key, serdes), serdes.rawValue(value))),
() -> deserializeValue(wrapped().putIfAbsent(serializeKey(key), serializeValue(value))),
time,
putIfAbsentSensor
);
Expand All @@ -356,7 +372,7 @@ public void putAll(final List<KeyValue<K, V>> entries) {
public V delete(final K key) {
Objects.requireNonNull(key, "key cannot be null");
try {
return maybeMeasureLatency(() -> outerValue(wrapped().delete(keyBytes(key, serdes))), time, deleteSensor);
return maybeMeasureLatency(() -> deserializeValue(wrapped().delete(serializeKey(key))), time, deleteSensor);
} catch (final ProcessorStateException e) {
final String message = String.format(e.getMessage(), key);
throw new ProcessorStateException(message, e);
Expand All @@ -373,21 +389,17 @@ public <PS extends Serializer<P>, P> KeyValueIterator<K, V> prefixScan(final P p
@Override
public KeyValueIterator<K, V> range(final K from,
final K to) {
final byte[] serFrom = from == null ? null : serdes.rawKey(from);
final byte[] serTo = to == null ? null : serdes.rawKey(to);
return new MeteredKeyValueIterator(
wrapped().range(Bytes.wrap(serFrom), Bytes.wrap(serTo)),
wrapped().range(serializeKey(from), serializeKey(to)),
rangeSensor
);
}

@Override
public KeyValueIterator<K, V> reverseRange(final K from,
final K to) {
final byte[] serFrom = from == null ? null : serdes.rawKey(from);
final byte[] serTo = to == null ? null : serdes.rawKey(to);
return new MeteredKeyValueIterator(
wrapped().reverseRange(Bytes.wrap(serFrom), Bytes.wrap(serTo)),
wrapped().reverseRange(serializeKey(from), serializeKey(to)),
rangeSensor
);
}
Expand Down Expand Up @@ -421,21 +433,31 @@ public void close() {
}
}

protected V outerValue(final byte[] value) {
return value != null ? serdes.valueFrom(value, new RecordHeaders()) : null;
protected byte[] serializeValue(final V value) {
return value != null ? serdes.rawValue(value, internalContext.headers()) : null;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the key question -- should we pass context.headers() or new RecordHeaders for non-header stores. Both solutions have advantages and disadvantages.

Using new RecordHeaders is strictly more backward compatible; the idea to pass in context.headers() feels like a "bug fix" though -- we should have always done this IMHO. That's why I opted for this solution.

Of course, we can also "fix" this "bug" by arguing: we stay 100% backward compatible and pass in new RecordHeaders and user get the fix by enabling the new headers stores...

Let me know what you think.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO. I think new RecordHeaders looks more correct (from backward compatibility POV), but essentially that's okay to do context.headers() because:

  • If user doesn't use headers aware serializer, headers will be ignored (ether empty or not)
  • The idea of this ticket is to actually fix code base to propagate headers, and if we can propagate original headers instead of "mocked" one we go for it
  • It doesn't make any difference if users aren't using headers state stores / serializers, but helps us to consistently apply simple paradigm: if there is a access to original headers -> go for it, if not fallback to new RecordHeaders

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mjsax
Regarding backward compability, I think this risk is very low because, most serdes ignore headers - the default interface methods delegate to the non-headers versions. Using internalContext.headers() is semantically correct behavior - serdes should have access to record context.
1Q: should we check if internalContext != null?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1Q: should we check if internalContext != null?

I would say no? If internalContext would be null, it would be a bug (I believe), so we should rather expose such an issue directly, and fail fast by crashing, and not "mask" the bug by not failing?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @aliehsaeedii on using internalContext.headers(), it's semantically correct.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to internalContext.headers()

}

protected V deserializeValue(final byte[] rawValue) {
return rawValue != null ? serdes.valueFrom(rawValue, internalContext.headers()) : null;
}

protected Bytes serializeKey(final K key) {
return Bytes.wrap(serdes.rawKey(key, internalContext.headers()));
}

protected K deserializeKey(final byte[] rawKey) {
return rawKey != null ? serdes.keyFrom(rawKey, internalContext.headers()) : null;
}

private List<KeyValue<Bytes, byte[]>> innerEntries(final List<KeyValue<K, V>> from) {
final List<KeyValue<Bytes, byte[]>> byteEntries = new ArrayList<>();
for (final KeyValue<K, V> entry : from) {
byteEntries.add(KeyValue.pair(Bytes.wrap(serdes.rawKey(entry.key)), serdes.rawValue(entry.value)));
byteEntries.add(KeyValue.pair(serializeKey(entry.key), serializeValue(entry.value)));
}
return byteEntries;
}

protected void maybeRecordE2ELatency() {
// Context is null if the provided context isn't an implementation of InternalProcessorContext.
// In that case, we _can't_ get the current timestamp, so we don't record anything.
if (e2eLatencySensor.shouldRecord() && internalContext != null) {
final long currentTime = time.milliseconds();
final long e2eLatency = currentTime - internalContext.recordContext().timestamp();
Expand Down Expand Up @@ -474,8 +496,8 @@ public boolean hasNext() {
public KeyValue<K, V> next() {
final KeyValue<Bytes, byte[]> keyValue = iter.next();
return KeyValue.pair(
serdes.keyFrom(keyValue.key.get()),
outerValue(keyValue.value));
deserializeKey(keyValue.key.get()),
deserializeValue(keyValue.value));
}

@Override
Expand All @@ -493,7 +515,7 @@ public void close() {

@Override
public K peekNextKey() {
return serdes.keyFrom(iter.peekNextKey().get());
return deserializeKey(iter.peekNextKey().get());
}
}

Expand Down Expand Up @@ -533,8 +555,9 @@ public boolean hasNext() {
public KeyValue<K, V> next() {
final KeyValue<Bytes, byte[]> keyValue = iter.next();
return KeyValue.pair(
serdes.keyFrom(keyValue.key.get()),
valueDeserializer.apply(keyValue.value));
deserializeKey(keyValue.key.get()),
valueDeserializer.apply(keyValue.value)
);
}

@Override
Expand All @@ -552,7 +575,7 @@ public void close() {

@Override
public K peekNextKey() {
return serdes.keyFrom(iter.peekNextKey().get());
return deserializeKey(iter.peekNextKey().get());
}
}
}
Loading
Loading