From 617a080a03accbed49ab2b835c5e2e2ff01529ce Mon Sep 17 00:00:00 2001 From: Torben Meyer Date: Mon, 24 Jun 2024 07:19:00 +0200 Subject: [PATCH] Adjust ErrorHeaderProcessor --- .../src/main/avro/DeadLetter.avsc | 22 ++++++------- .../bakdata/kafka/ErrorHeaderProcessor.java | 7 +++- .../ErrorHeaderProcessorTopologyTest.java | 33 +++++++++++++++++++ 3 files changed, 50 insertions(+), 12 deletions(-) diff --git a/error-handling-avro/src/main/avro/DeadLetter.avsc b/error-handling-avro/src/main/avro/DeadLetter.avsc index 7304756..d00f6c8 100644 --- a/error-handling-avro/src/main/avro/DeadLetter.avsc +++ b/error-handling-avro/src/main/avro/DeadLetter.avsc @@ -38,17 +38,6 @@ "name": "description", "type": "string" }, - { - "name": "input_timestamp", - "type": [ - "null", - { - "type": "long", - "logicalType": "timestamp-millis" - } - ], - "default": null - }, { "name": "cause", "type": { @@ -79,6 +68,17 @@ } ] } + }, + { + "name": "input_timestamp", + "type": [ + "null", + { + "type": "long", + "logicalType": "timestamp-millis" + } + ], + "default": null } ] } diff --git a/error-handling-core/src/main/java/com/bakdata/kafka/ErrorHeaderProcessor.java b/error-handling-core/src/main/java/com/bakdata/kafka/ErrorHeaderProcessor.java index 34403b1..8adc0bb 100644 --- a/error-handling-core/src/main/java/com/bakdata/kafka/ErrorHeaderProcessor.java +++ b/error-handling-core/src/main/java/com/bakdata/kafka/ErrorHeaderProcessor.java @@ -77,6 +77,10 @@ public class ErrorHeaderProcessor implements FixedKeyProcessor> inputRecord) { addHeader(EXCEPTION_MESSAGE, value.getThrowable().getMessage(), headers); addHeader(EXCEPTION_STACK_TRACE, ExceptionUtils.getStackTrace(value.getThrowable()), headers); addHeader(DESCRIPTION, this.description, headers); - this.context.forward(inputRecord.withValue(value.getValue())); + addHeader(INPUT_TIMESTAMP, Long.toString(inputRecord.timestamp()), headers); + this.context.forward(inputRecord.withValue(value.getValue()).withTimestamp(this.context.currentSystemTimeMs())); } @Override diff --git a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorHeaderProcessorTopologyTest.java b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorHeaderProcessorTopologyTest.java index e1da34c..68d38f2 100644 --- a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorHeaderProcessorTopologyTest.java +++ b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorHeaderProcessorTopologyTest.java @@ -182,4 +182,37 @@ void shouldHandleExceptionWithoutMessage(final SoftAssertions softly) { .isNull()); } + @Test + void shouldSetTimestamp(final SoftAssertions softly) { + final long timestamp = System.currentTimeMillis(); + when(this.mapper.apply(1, "foo")).thenThrow(new RuntimeException()); + this.createTopology(); + this.topology.input() + .withValueSerde(STRING_SERDE) + .add(1, "foo", 100); + + final List> records = Seq.seq(this.topology.streamOutput(OUTPUT_TOPIC) + .withKeySerde(DOUBLE_SERDE) + .withValueSerde(LONG_SERDE)) + .toList(); + softly.assertThat(records).isEmpty(); + final List> errors = Seq.seq(this.topology.streamOutput(ERROR_TOPIC) + .withValueSerde(Serdes.String())) + .toList(); + + softly.assertThat(errors) + .hasSize(1) + .first() + .isNotNull() + .satisfies(record -> { + softly.assertThat(record.key()).isEqualTo(1); + softly.assertThat(record.value()).isEqualTo("foo"); + softly.assertThat(record.timestamp()).isGreaterThan(timestamp); + }) + .extracting(ProducerRecord::headers) + .satisfies(headers -> softly.assertThat(getHeader(headers, ErrorHeaderProcessor.INPUT_TIMESTAMP)) + .isEqualTo("100")); + } + + }