Skip to content
Permalink
Browse files
fix #1332 : CameHeader value is wrongly interpreted as BigDecimal and…
… causes ClassCastException.
  • Loading branch information
valdar committed Jan 31, 2022
1 parent 532f279 commit 9323f1ece25dd23d557c6b3b12b9c193a6e46f07
Showing 4 changed files with 19 additions and 7 deletions.
@@ -240,7 +240,9 @@ private static void mapHeader(Header header, String prefix, Map<String, Object>
final String key = StringHelper.after(header.key(), prefix, header.key());
final Schema schema = header.schema();

if (schema.type().equals(Schema.BYTES_SCHEMA.type()) && Objects.equals(schema.name(), Decimal.LOGICAL_NAME)) {
if (schema.type().equals(Schema.BYTES_SCHEMA.type())
&& Objects.equals(schema.name(), Decimal.LOGICAL_NAME)
&& header.value() instanceof byte[]) {
destination.put(key, Decimal.toLogical(schema, (byte[]) header.value()));
} else {
destination.put(key, header.value());
@@ -36,7 +36,6 @@
import org.apache.camel.kafkaconnector.utils.TaskHelper;
import org.apache.camel.support.UnitOfWorkHelper;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.connect.data.Decimal;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
@@ -357,8 +356,9 @@ private void setAdditionalHeaders(SourceRecord record, Map<String, Object> map,
} else if (value instanceof Date) {
record.headers().addTimestamp(keyCamelHeader, (Date)value);
} else if (value instanceof BigDecimal) {
Schema schema = Decimal.schema(((BigDecimal)value).scale());
record.headers().add(keyCamelHeader, Decimal.fromLogical(schema, (BigDecimal)value), schema);
//XXX: kafka connect configured header converter takes care of the encoding,
//default: org.apache.kafka.connect.storage.SimpleHeaderConverter
record.headers().addDecimal(keyCamelHeader, (BigDecimal)value);
} else if (value instanceof Double) {
record.headers().addDouble(keyCamelHeader, (double)value);
} else if (value instanceof Float) {
@@ -33,6 +33,7 @@
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.storage.SimpleHeaderConverter;
import org.junit.jupiter.api.Test;

import static org.assertj.core.api.Assertions.assertThat;
@@ -118,7 +119,13 @@ public void testBodyAndHeaders() {
int myInteger = 100;
Long myLong = new Long("100");
BigDecimal myBigDecimal = new BigDecimal(1234567890);
Schema schema = Decimal.schema(myBigDecimal.scale());
Schema myBigDecimalSchema = Decimal.schema(myBigDecimal.scale());
//reproducing bigDecimal encoding by kafka connect
BigDecimal kafkaBigDecimal = new BigDecimal("6.9203120E+787");
Schema kafkaBigDecimalSchema = Decimal.schema(kafkaBigDecimal.scale());
SimpleHeaderConverter shc = new SimpleHeaderConverter();
byte[] persistedBytes = shc.fromConnectHeader("", "MyBigDecimal", kafkaBigDecimalSchema, kafkaBigDecimal);
SchemaAndValue sav = shc.toConnectHeader("", "MyBigDecimal", persistedBytes);

List<SinkRecord> records = new ArrayList<SinkRecord>();
SinkRecord record = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel", 42);
@@ -129,7 +136,9 @@ public void testBodyAndHeaders() {
record.headers().addDouble(CamelSinkTask.HEADER_CAMEL_PREFIX + "MyDouble", myDouble);
record.headers().addInt(CamelSinkTask.HEADER_CAMEL_PREFIX + "MyInteger", myInteger);
record.headers().addLong(CamelSinkTask.HEADER_CAMEL_PREFIX + "MyLong", myLong);
record.headers().add(CamelSinkTask.HEADER_CAMEL_PREFIX + "MyBigDecimal", Decimal.fromLogical(schema, myBigDecimal), schema);
record.headers().add(CamelSinkTask.HEADER_CAMEL_PREFIX + "MyBigDecimal", Decimal.fromLogical(myBigDecimalSchema, myBigDecimal), myBigDecimalSchema);
record.headers().add(CamelSinkTask.HEADER_CAMEL_PREFIX + "KafkaBigDecimal", sav.value(), sav.schema());

records.add(record);
sinkTask.put(records);

@@ -145,6 +154,7 @@ public void testBodyAndHeaders() {
assertEquals(myInteger, exchange.getIn().getHeader("MyInteger"));
assertEquals(myLong, exchange.getIn().getHeader("MyLong", Long.class));
assertEquals(myBigDecimal, exchange.getIn().getHeader("MyBigDecimal", BigDecimal.class));
assertEquals(kafkaBigDecimal, exchange.getIn().getHeader("KafkaBigDecimal", BigDecimal.class));

sinkTask.stop();
}
@@ -312,7 +312,7 @@ public void testSourceBigDecimalHeader() {
List<SourceRecord> results = sourceTask.poll();
assertEquals(1, results.size());
Header bigDecimalHeader = results.get(0).headers().allWithName(CamelSourceTask.HEADER_CAMEL_PREFIX + "bigdecimal").next();
assertEquals("[B", bigDecimalHeader.value().getClass().getName());
assertTrue(bigDecimalHeader.value() instanceof BigDecimal);
assertEquals(Decimal.class.getName(), bigDecimalHeader.schema().name());
assertEquals(Schema.Type.BYTES, bigDecimalHeader.schema().type());

0 comments on commit 9323f1e

Please sign in to comment.