Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: ksqlDB should not truncate decimals #5763

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.JsonDeserializer;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.JsonSerializer;
Expand All @@ -30,6 +31,7 @@
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.NullNode;
import com.fasterxml.jackson.databind.node.TextNode;
import com.google.common.annotations.VisibleForTesting;
import io.confluent.ksql.test.tools.Record;
import io.confluent.ksql.test.tools.exceptions.InvalidFieldException;
import io.confluent.ksql.test.tools.exceptions.MissingFieldException;
Expand All @@ -42,6 +44,8 @@
public final class RecordNode {

private static final ObjectMapper objectMapper = new ObjectMapper()
.enable(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS)
.enable(JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN)
.setNodeFactory(JsonNodeFactory.withExactBigDecimals(true));

private final String topicName;
Expand All @@ -50,7 +54,8 @@ public final class RecordNode {
private final Optional<Long> timestamp;
private final Optional<WindowData> window;

private RecordNode(
@VisibleForTesting
RecordNode(
final String topicName,
final Optional<Object> key,
final JsonNode value,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.confluent.kafka.schemaregistry.ParsedSchema;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
import io.confluent.ksql.test.TestFrameworkException;
import io.confluent.ksql.util.DecimalUtil;
import io.confluent.ksql.util.KsqlConstants;
import io.confluent.ksql.util.KsqlException;
Expand Down Expand Up @@ -165,10 +166,19 @@ private Object specToConnect(final Object spec, final Schema schema) {
return struct;
case BYTES:
if (DecimalUtil.isDecimal(schema)) {
return DecimalUtil.cast(
(String) spec,
DecimalUtil.precision(schema),
DecimalUtil.scale(schema));
if (spec instanceof BigDecimal) {
return DecimalUtil.ensureFit((BigDecimal) spec, schema);
}

if (spec instanceof String) {
// Supported for legacy reasons...
return DecimalUtil.cast(
(String) spec,
DecimalUtil.precision(schema),
DecimalUtil.scale(schema));
}

throw new TestFrameworkException("DECIMAL type requires JSON number in test data");
}
throw new RuntimeException("Unexpected BYTES type " + schema.name());
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public Optional<JsonNode> getJsonValue() {
return jsonValue;
}

public Record withKey(final Object key) {
public Record withKeyValue(final Object key, final Object value) {
return new Record(
topicName,
key,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ private void validateTopicData(

int i = 0;
while (actualIt.hasNext() && expectedIt.hasNext()) {
final Record expectedRecord = topicInfo.coerceRecordKey(expectedIt.next(), i);
final Record expectedRecord = topicInfo.coerceRecord(expectedIt.next(), i);
final ProducerRecord<?, ?> actualProducerRecord = actualIt.next();

validateCreatedMessage(
Expand Down Expand Up @@ -313,7 +313,7 @@ private void pipeRecordsFromProvidedInput(

final TopicInfo topicInfo = topicInfoCache.get(record.getTopicName());

final Record coerced = topicInfo.coerceRecordKey(record, inputRecordIndex);
final Record coerced = topicInfo.coerceRecord(record, inputRecordIndex);

processSingleRecord(
coerced.asProducerRecord(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,14 @@
import io.confluent.ksql.schema.ksql.types.SqlType;
import io.confluent.ksql.serde.KeyFormat;
import io.confluent.ksql.serde.ValueFormat;
import io.confluent.ksql.serde.kafka.KafkaFormat;
import io.confluent.ksql.test.TestFrameworkException;
import io.confluent.ksql.test.serde.SerdeSupplier;
import io.confluent.ksql.test.utils.SerdeUtil;
import io.confluent.ksql.util.PersistentQueryMetadata;
import java.util.List;
import java.util.OptionalLong;
import java.util.Set;
import java.util.function.Function;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.kafka.common.serialization.Deserializer;
Expand Down Expand Up @@ -239,7 +239,7 @@ public Deserializer<?> getValueDeserializer() {
}

/**
* Coerce the key value to the correct type.
* Coerce the key & value to the correct type.
*
* <p>The type of the key loaded from the JSON test case file may not be the exact match on
* type, e.g. JSON will load a small number as an integer, but the key type of the source might
Expand All @@ -249,13 +249,14 @@ public Deserializer<?> getValueDeserializer() {
* @param msgIndex the index of the message, displayed in the error message
* @return a new Record with the correct key type.
*/
public Record coerceRecordKey(
public Record coerceRecord(
final Record record,
final int msgIndex
) {
try {
final Object coerced = keyCoercer().apply(record.rawKey());
return record.withKey(coerced);
final Object coercedKey = coerceKey(record.rawKey());
final Object coercedValue = coerceValue(record.value());
return record.withKeyValue(coercedKey, coercedValue);
} catch (final Exception e) {
throw new AssertionError(
"Topic '" + record.getTopicName() + "', message " + msgIndex
Expand All @@ -265,21 +266,21 @@ public Record coerceRecordKey(
}
}

private Function<Object, Object> keyCoercer() {
private Object coerceKey(final Object key) {
if (schema.key().isEmpty()) {
// No key column - pass the key in as a string to allow tests to pass in data that should
// be ignored:
return key -> key == null ? null : String.valueOf(key);
// No key column
// - pass the key in as a string to allow tests to pass in data that should be ignored:
return key == null ? null : String.valueOf(key);
}

final SqlType keyType = schema
.key()
.get(0)
.type();

return key -> DefaultSqlValueCoercer.INSTANCE
return DefaultSqlValueCoercer.INSTANCE
.coerce(key, keyType)
.orElseThrow(() -> new AssertionError("Invalid key value for topic " + topicName + "."
.orElseThrow(() -> new AssertionError("Invalid key for topic " + topicName + "."
+ System.lineSeparator()
+ "Expected KeyType: " + keyType
+ System.lineSeparator()
Expand All @@ -291,5 +292,37 @@ private Function<Object, Object> keyCoercer() {
))
.orElse(null);
}

private Object coerceValue(final Object value) {
// Only KAFKA format needs any value coercion at the moment:
if (!(valueFormat.getFormat() instanceof KafkaFormat)) {
return value;
}

if (schema.value().size() != 1) {
// Wrong column count:
// - pass the value as-is for negative testing:
return value == null ? null : String.valueOf(value);
}

final SqlType valueType = schema
.value()
.get(0)
.type();

return DefaultSqlValueCoercer.INSTANCE
.coerce(value, valueType)
.orElseThrow(() -> new AssertionError("Invalid value for topic " + topicName + "."
+ System.lineSeparator()
+ "Expected ValueType: " + valueType
+ System.lineSeparator()
+ "Actual ValueType: " + SchemaConverters.javaToSqlConverter()
.toSqlType(value.getClass())
+ ", value: " + value + "."
+ System.lineSeparator()
+ "This is likely caused by the value type in the test-case not matching the schema."
))
.orElse(null);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package io.confluent.ksql.test.model;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;

import com.fasterxml.jackson.databind.node.DecimalNode;
import io.confluent.ksql.test.tools.Record;
import java.math.BigDecimal;
import java.util.Optional;
import org.junit.Test;

public class RecordNodeTest {

@Test
public void shouldUseExactDecimals() {
// Given:
final RecordNode node = new RecordNode(
"topic",
Optional.empty(),
new DecimalNode(new BigDecimal("10.000")),
Optional.empty(),
Optional.empty()
);

// When:
final Record result = node.build();

// Then:
assertThat(result.value(), is(new BigDecimal("10.000")));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ private void produceInputs(final Map<String, List<Record>> inputs) {
for (int idx = 0; idx < records.size(); idx++) {
final Record record = records.get(idx);

final Record coerced = topicInfo.coerceRecordKey(record, idx);
final Record coerced = topicInfo.coerceRecord(record, idx);

producer.send(new ProducerRecord<>(
topicName,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package io.confluent.ksql.test.tools;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasEntry;

import java.math.BigDecimal;
import java.util.Map;
import org.junit.Test;

public class TestJsonMapperTest {

@Test
public void shouldLoadExactDecimals() throws Exception {
// Given:
final String json = "{\"DEC\": 1.0000}";

// When:
final Map<?, ?> result = TestJsonMapper.INSTANCE.get().readValue(json, Map.class);

// Then:
assertThat(result, hasEntry("DEC", new BigDecimal("1.0000")));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
{
"plan" : [ {
"@type" : "ksqlPlanV1",
"statementText" : "CREATE STREAM INPUT (ID STRING KEY, DEC DECIMAL(6, 4)) WITH (KAFKA_TOPIC='test', VALUE_FORMAT='Avro');",
"ddlCommand" : {
"@type" : "createStreamV1",
"sourceName" : "INPUT",
"schema" : "`ID` STRING KEY, `DEC` DECIMAL(6, 4)",
"topicName" : "test",
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "AVRO"
}
}
}
}, {
"@type" : "ksqlPlanV1",
"statementText" : "CREATE STREAM OUTPUT AS SELECT *\nFROM INPUT INPUT\nEMIT CHANGES",
"ddlCommand" : {
"@type" : "createStreamV1",
"sourceName" : "OUTPUT",
"schema" : "`ID` STRING KEY, `DEC` DECIMAL(6, 4)",
"topicName" : "OUTPUT",
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "AVRO"
}
}
},
"queryPlan" : {
"sources" : [ "INPUT" ],
"sink" : "OUTPUT",
"physicalPlan" : {
"@type" : "streamSinkV1",
"properties" : {
"queryContext" : "OUTPUT"
},
"source" : {
"@type" : "streamSelectV1",
"properties" : {
"queryContext" : "Project"
},
"source" : {
"@type" : "streamSourceV1",
"properties" : {
"queryContext" : "KsqlTopic/Source"
},
"topicName" : "test",
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "AVRO"
}
},
"sourceSchema" : "`ID` STRING KEY, `DEC` DECIMAL(6, 4)"
},
"keyColumnNames" : [ "ID" ],
"selectExpressions" : [ "DEC AS DEC" ]
},
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "AVRO"
}
},
"topicName" : "OUTPUT"
},
"queryId" : "CSAS_OUTPUT_0"
}
} ],
"configs" : {
"ksql.extension.dir" : "ext",
"ksql.streams.cache.max.bytes.buffering" : "0",
"ksql.security.extension.class" : null,
"ksql.transient.prefix" : "transient_",
"ksql.persistence.wrap.single.values" : "true",
"ksql.authorization.cache.expiry.time.secs" : "30",
"ksql.schema.registry.url" : "",
"ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler",
"ksql.output.topic.name.prefix" : "",
"ksql.streams.auto.offset.reset" : "earliest",
"ksql.query.pull.enable.standby.reads" : "false",
"ksql.connect.url" : "http://localhost:8083",
"ksql.service.id" : "some.ksql.service.id",
"ksql.internal.topic.min.insync.replicas" : "1",
"ksql.streams.shutdown.timeout.ms" : "300000",
"ksql.internal.topic.replicas" : "1",
"ksql.insert.into.values.enabled" : "true",
"ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807",
"ksql.query.pull.max.qps" : "2147483647",
"ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler",
"ksql.access.validator.enable" : "auto",
"ksql.streams.bootstrap.servers" : "localhost:0",
"ksql.streams.commit.interval.ms" : "2000",
"ksql.metric.reporters" : "",
"ksql.query.pull.metrics.enabled" : "false",
"ksql.streams.auto.commit.interval.ms" : "0",
"ksql.metrics.extension" : null,
"ksql.streams.topology.optimization" : "all",
"ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses",
"ksql.streams.num.stream.threads" : "4",
"ksql.timestamp.throw.on.invalid" : "false",
"ksql.authorization.cache.max.entries" : "10000",
"ksql.metrics.tags.custom" : "",
"ksql.pull.queries.enable" : "true",
"ksql.udfs.enabled" : "true",
"ksql.udf.enable.security.manager" : "true",
"ksql.connect.worker.config" : "",
"ksql.sink.window.change.log.additional.retention" : "1000000",
"ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses",
"ksql.udf.collect.metrics" : "false",
"ksql.persistent.prefix" : "query_",
"ksql.query.persistent.active.limit" : "2147483647",
"ksql.error.classifier.regex" : ""
}
}
Loading