Skip to content

Commit

Permalink
Adjust ErrorHeaderProcessor
Browse files Browse the repository at this point in the history
  • Loading branch information
torbsto committed Jun 24, 2024
1 parent 8b0d7fc commit 617a080
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 12 deletions.
22 changes: 11 additions & 11 deletions error-handling-avro/src/main/avro/DeadLetter.avsc
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,6 @@
"name": "description",
"type": "string"
},
{
"name": "input_timestamp",
"type": [
"null",
{
"type": "long",
"logicalType": "timestamp-millis"
}
],
"default": null
},
{
"name": "cause",
"type": {
Expand Down Expand Up @@ -79,6 +68,17 @@
}
]
}
},
{
"name": "input_timestamp",
"type": [
"null",
{
"type": "long",
"logicalType": "timestamp-millis"
}
],
"default": null
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ public class ErrorHeaderProcessor<K, V> implements FixedKeyProcessor<K, Processi
* Header indicating the description of the context in which an exception has been thrown
*/
public static final String DESCRIPTION = HEADER_PREFIX + "description";
/**
* Header indicating the timestamp of the erroneous record.
*/
public static final String INPUT_TIMESTAMP = HEADER_PREFIX + "input_timestamp";
/**
* Prefix of all headers detailing the error message added by this FixedKeyProcessor
*/
Expand Down Expand Up @@ -136,7 +140,8 @@ public void process(final FixedKeyRecord<K, ProcessingError<V>> inputRecord) {
addHeader(EXCEPTION_MESSAGE, value.getThrowable().getMessage(), headers);
addHeader(EXCEPTION_STACK_TRACE, ExceptionUtils.getStackTrace(value.getThrowable()), headers);
addHeader(DESCRIPTION, this.description, headers);
this.context.forward(inputRecord.withValue(value.getValue()));
addHeader(INPUT_TIMESTAMP, Long.toString(inputRecord.timestamp()), headers);
this.context.forward(inputRecord.withValue(value.getValue()).withTimestamp(this.context.currentSystemTimeMs()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,4 +182,37 @@ void shouldHandleExceptionWithoutMessage(final SoftAssertions softly) {
.isNull());
}

@Test
void shouldSetTimestamp(final SoftAssertions softly) {
final long timestamp = System.currentTimeMillis();
when(this.mapper.apply(1, "foo")).thenThrow(new RuntimeException());
this.createTopology();
this.topology.input()
.withValueSerde(STRING_SERDE)
.add(1, "foo", 100);

final List<ProducerRecord<Double, Long>> records = Seq.seq(this.topology.streamOutput(OUTPUT_TOPIC)
.withKeySerde(DOUBLE_SERDE)
.withValueSerde(LONG_SERDE))
.toList();
softly.assertThat(records).isEmpty();
final List<ProducerRecord<Integer, String>> errors = Seq.seq(this.topology.streamOutput(ERROR_TOPIC)
.withValueSerde(Serdes.String()))
.toList();

softly.assertThat(errors)
.hasSize(1)
.first()
.isNotNull()
.satisfies(record -> {
softly.assertThat(record.key()).isEqualTo(1);
softly.assertThat(record.value()).isEqualTo("foo");
softly.assertThat(record.timestamp()).isGreaterThan(timestamp);
})
.extracting(ProducerRecord::headers)
.satisfies(headers -> softly.assertThat(getHeader(headers, ErrorHeaderProcessor.INPUT_TIMESTAMP))
.isEqualTo("100"));
}


}

0 comments on commit 617a080

Please sign in to comment.