Skip to content

Commit

Permalink
Use wall-clock time for dead letter record (#28)
Browse files Browse the repository at this point in the history
The written dead-letter now has the wall-clock time as its record's
timestamp. Because the original timestamp would be lost otherwise, there
is a new field in the respective dead-letter models called timestamp. To
maintain backwards compatibility, it is nullable, but always set
starting from this version.
  • Loading branch information
torbsto committed Jun 24, 2024
1 parent 4fba751 commit 11991ff
Show file tree
Hide file tree
Showing 10 changed files with 57 additions and 7 deletions.
11 changes: 11 additions & 0 deletions error-handling-avro/src/main/avro/DeadLetter.avsc
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,17 @@
}
]
}
},
{
"name": "input_timestamp",
"type": [
"null",
{
"type": "long",
"logicalType": "timestamp-millis"
}
],
"default": null
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public DeadLetter convert(final DeadLetterDescription deadLetterDescription) {
.setTopic(deadLetterDescription.getTopic())
.setPartition(deadLetterDescription.getPartition())
.setOffset(deadLetterDescription.getOffset())
.setInputTimestamp(deadLetterDescription.getInputTimestamp())
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import static org.mockito.Mockito.when;

import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde;
import java.time.Instant;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
Expand Down Expand Up @@ -88,11 +89,12 @@ protected Properties getKafkaProperties() {

@Test
void shouldConvertAndSerializeAvroDeadLetter() {
final long startTimestamp = System.currentTimeMillis();
when(this.mapper.apply(any(), any())).thenThrow(new RuntimeException(ERROR_MESSAGE));
this.createTopology();
this.topology.input(INPUT_TOPIC).withValueSerde(STRING_SERDE)
.add(1, "foo")
.add(2, "bar");
.add(1, "foo", 100)
.add(2, "bar", 200);

final List<ProducerRecord<Integer, String>> records = Seq.seq(this.topology.streamOutput(OUTPUT_TOPIC)
.withValueSerde(STRING_SERDE))
Expand All @@ -106,6 +108,7 @@ void shouldConvertAndSerializeAvroDeadLetter() {

this.softly.assertThat(errors)
.hasSize(2)
.allSatisfy(record -> this.softly.assertThat(record.timestamp()).isGreaterThan(startTimestamp))
.extracting(ProducerRecord::value).allSatisfy(
deadLetter -> {
this.softly.assertThat(deadLetter.getDescription()).isEqualTo(DEAD_LETTER_DESCRIPTION);
Expand All @@ -123,12 +126,14 @@ void shouldConvertAndSerializeAvroDeadLetter() {
deadLetter -> {
this.softly.assertThat(deadLetter.getInputValue()).hasValue("foo");
this.softly.assertThat(deadLetter.getOffset()).hasValue(0L);
this.softly.assertThat(deadLetter.getInputTimestamp()).hasValue(Instant.ofEpochMilli(100));
}
);
this.softly.assertThat(errors).extracting(ProducerRecord::value).element(1).satisfies(
deadLetter -> {
this.softly.assertThat(deadLetter.getInputValue()).hasValue("bar");
this.softly.assertThat(deadLetter.getOffset()).hasValue(1L);
this.softly.assertThat(deadLetter.getInputTimestamp()).hasValue(Instant.ofEpochMilli(200));
}
);

Expand Down
1 change: 1 addition & 0 deletions error-handling-core/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ dependencies {
val jacksonVersion: String by project
testFixturesImplementation(group = "com.fasterxml.jackson.core", name = "jackson-core", version = jacksonVersion)
testFixturesImplementation(group = "com.fasterxml.jackson.core", name = "jackson-databind", version = jacksonVersion)
testFixturesImplementation(group = "com.fasterxml.jackson.datatype", name = "jackson-datatype-jsr310", version = jacksonVersion)
}

avro {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

package com.bakdata.kafka;

import java.time.Instant;
import lombok.Builder;
import lombok.NonNull;
import lombok.Value;
Expand Down Expand Up @@ -55,4 +56,5 @@ public static class Cause {
String topic;
Integer partition;
Long offset;
Instant inputTimestamp;
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

package com.bakdata.kafka;

import java.time.Instant;
import java.util.Optional;
import lombok.Getter;
import lombok.NonNull;
Expand Down Expand Up @@ -99,8 +100,14 @@ public void process(final FixedKeyRecord<K, ProcessingError<V>> inputRecord) {
.topic(metadata.map(RecordMetadata::topic).orElse(null))
.partition(metadata.map(RecordMetadata::partition).orElse(null))
.offset(metadata.map(RecordMetadata::offset).orElse(null))
.inputTimestamp(Instant.ofEpochMilli(inputRecord.timestamp()))
.build();
this.context.forward(inputRecord.withValue(this.deadLetterConverter.convert(deadLetterDescription)));

final FixedKeyRecord<K, T> record = inputRecord
.withValue(this.deadLetterConverter.convert(deadLetterDescription))
.withTimestamp(this.context.currentSystemTimeMs());

this.context.forward(record);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,16 @@
package com.bakdata.kafka;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import java.io.IOException;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;

public class TestDeadLetterSerde implements Serde<DeadLetterDescription> {
static final ObjectMapper objectMapper = new ObjectMapper();
static final ObjectMapper objectMapper = new ObjectMapper().registerModule(new JavaTimeModule());
private static final Serializer<DeadLetterDescription> serializer = (topic, data) -> {
try {
return objectMapper.writeValueAsBytes(data);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.google.protobuf.Int32Value;
import com.google.protobuf.Int64Value;
import com.google.protobuf.StringValue;
import com.google.protobuf.Timestamp;
import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
import org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier;

Expand Down Expand Up @@ -65,6 +66,15 @@ public ProtoDeadLetter convert(final DeadLetterDescription deadLetterDescription
if (deadLetterDescription.getOffset() != null) {
builder.setOffset(Int64Value.of(deadLetterDescription.getOffset()));
}

if (deadLetterDescription.getInputTimestamp() != null) {
final Timestamp timestamp = Timestamp.newBuilder()
.setSeconds(deadLetterDescription.getInputTimestamp().getEpochSecond())
.setNanos(deadLetterDescription.getInputTimestamp().getNano())
.build();
builder.setInputTimestamp(timestamp);
}

return builder.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ syntax = "proto3";
package bakdata.kafka.proto.v1;

import "google/protobuf/wrappers.proto";
import "google/protobuf/timestamp.proto";

option java_package = "com.bakdata.kafka.proto.v1";
option java_multiple_files = true;
Expand All @@ -19,4 +20,5 @@ message ProtoDeadLetter {
google.protobuf.StringValue topic = 4;
google.protobuf.Int32Value partition = 5;
google.protobuf.Int64Value offset = 6;
google.protobuf.Timestamp input_timestamp = 7;
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,12 @@
import com.google.protobuf.Int32Value;
import com.google.protobuf.Int64Value;
import com.google.protobuf.StringValue;
import com.google.protobuf.Timestamp;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaProvider;
import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
import io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializerConfig;
import io.confluent.kafka.streams.serdes.protobuf.KafkaProtobufSerde;
import java.time.Instant;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -118,11 +120,12 @@ protected void createTopology() {

@Test
void shouldConvertAndSerializeProtoDeadLetter() {
final long startTimestamp = System.currentTimeMillis();
when(this.mapper.apply(any(), any())).thenThrow(new RuntimeException(ERROR_MESSAGE));
this.createTopology();
this.topology.input(INPUT_TOPIC).withValueSerde(STRING_SERDE)
.add(1, "foo")
.add(2, "bar");
.add(1, "foo", 100)
.add(2, "bar", 200);

final List<ProducerRecord<Integer, String>> records = Seq.seq(this.topology.streamOutput(OUTPUT_TOPIC)
.withValueSerde(STRING_SERDE))
Expand All @@ -136,6 +139,7 @@ void shouldConvertAndSerializeProtoDeadLetter() {

this.softly.assertThat(errors)
.hasSize(2)
.allSatisfy(record -> this.softly.assertThat(record.timestamp()).isGreaterThan(startTimestamp))
.extracting(ProducerRecord::value).allSatisfy(
deadLetter -> {
this.softly.assertThat(deadLetter.getDescription()).isEqualTo(DEAD_LETTER_DESCRIPTION);
Expand All @@ -156,15 +160,22 @@ void shouldConvertAndSerializeProtoDeadLetter() {
this.softly.assertThat(deadLetter.getInputValue()).extracting(StringValue::getValue)
.isEqualTo("foo");
this.softly.assertThat(deadLetter.getOffset()).extracting(Int64Value::getValue).isEqualTo(0L);
this.softly.assertThat(timestampToInstant(deadLetter.getInputTimestamp()))
.isEqualTo(Instant.ofEpochMilli(100));
}
);
this.softly.assertThat(errors).map(ProducerRecord::value).element(1).satisfies(
deadLetter -> {
this.softly.assertThat(deadLetter.getInputValue()).extracting(StringValue::getValue)
.isEqualTo("bar");
this.softly.assertThat(deadLetter.getOffset()).extracting(Int64Value::getValue).isEqualTo(1L);
this.softly.assertThat(timestampToInstant(deadLetter.getInputTimestamp()))
.isEqualTo(Instant.ofEpochMilli(200));
}
);
}

private static Instant timestampToInstant(final Timestamp timestamp) {
return Instant.ofEpochSecond(timestamp.getSeconds(), timestamp.getNanos());
}
}

0 comments on commit 11991ff

Please sign in to comment.