Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ public void evictWhile(final Supplier<Boolean> predicate, final Consumer<Evictio

final BufferValue bufferValue = BufferValue.deserialize(ByteBuffer.wrap(keyValue.value));
final K key = keySerde.deserializer().deserialize(topic,
iternalContext.headers(),
bufferValue.context().headers(),
PrefixedWindowKeySchemas.TimeFirstWindowKeySchema.extractStoreKeyBytes(keyValue.key.get()));

if (bufferValue.context().timestamp() < minTimestamp && minValid) {
Expand All @@ -243,7 +243,7 @@ public void evictWhile(final Supplier<Boolean> predicate, final Consumer<Evictio
minTimestamp = bufferValue.context().timestamp();
minValid = true;

final V value = valueSerde.deserializer().deserialize(topic, iternalContext.headers(), bufferValue.newValue());
final V value = valueSerde.deserializer().deserialize(topic, bufferValue.context().headers(), bufferValue.newValue());

callback.accept(new Eviction<>(key, value, bufferValue.context()));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.kafka.streams.state.internals;

import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
Expand All @@ -40,11 +42,15 @@
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;

import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.mockito.Mockito.when;

@ExtendWith(MockitoExtension.class)
Expand Down Expand Up @@ -205,6 +211,49 @@ public void shouldHandleCollidingKeys() {
assertNumSizeAndTimestamp(buffer, 1, 7, 42);
}

@Test
public void shouldPropagateHeadersThroughEviction() {
createBuffer(Duration.ZERO, Serdes.String());
final RecordHeaders headers = new RecordHeaders(new Header[]{
new RecordHeader("h1", "v1".getBytes(StandardCharsets.UTF_8))
});
context.setRecordContext(new ProcessorRecordContext(0L, offset++, 0, "testing", headers));
buffer.put(0L, new Record<>("k", "v", 0L, headers), context.recordContext());

final List<TimeOrderedKeyValueBuffer.Eviction<String, String>> evicted = new ArrayList<>();
buffer.evictWhile(() -> buffer.numRecords() > 0, evicted::add);

assertThat(evicted.size(), is(1));
assertThat(evicted.get(0).recordContext().headers(), is(headers));
}

@Test
public void shouldNotBeAffectedByProcessorContextHeaderMutationBetweenPutAndEvict() {
createBuffer(Duration.ofMillis(1), Serdes.String());
final RecordHeaders putHeaders = new RecordHeaders(new Header[]{
new RecordHeader("at-put", "first".getBytes(StandardCharsets.UTF_8))
});
context.setRecordContext(new ProcessorRecordContext(0L, offset++, 0, "testing", putHeaders));
buffer.put(0L, new Record<>("k", "v", 0L, putHeaders), context.recordContext());

// Simulate the processor moving on to handle a different record with different headers
// before the grace period expires and eviction runs.
final RecordHeaders laterHeaders = new RecordHeaders(new Header[]{
new RecordHeader("at-evict", "second".getBytes(StandardCharsets.UTF_8))
});
context.setRecordContext(new ProcessorRecordContext(10L, offset++, 0, "testing", laterHeaders));
// Advance stream time past the grace period for the original record.
buffer.put(10L, new Record<>("trigger", "v", 10L, laterHeaders), context.recordContext());

final List<TimeOrderedKeyValueBuffer.Eviction<String, String>> evicted = new ArrayList<>();
buffer.evictWhile(() -> true, evicted::add);

// Only the original "k" record at t=0 falls outside the grace window of t=10.
assertThat(evicted.size(), is(1));
assertThat(evicted.get(0).key(), is("k"));
assertThat(evicted.get(0).recordContext().headers(), is(putHeaders));
}

private void assertNumSizeAndTimestamp(final TimeOrderedKeyValueBuffer<String, String, String> buffer,
final int num,
final long time,
Expand Down