Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP - Decimal support #294

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,11 @@ public class AvroConverter {
public static Object toAvro(JsonNode value, Schema schema) {
try {
ByteArrayOutputStream out = new ByteArrayOutputStream();
value = JsonNodeConverters.transformJsonNode(value, schema, JsonNodeConverters.withDecimalNodesAsText);
jsonMapper.writeValue(out, value);
DatumReader<Object> reader = new GenericDatumReader<Object>(schema);
Object object = reader.read(
null, decoderFactory.jsonDecoder(schema, new ByteArrayInputStream(out.toByteArray())));
null, decoderFactory.jsonDecoder(schema, new ByteArrayInputStream(out.toByteArray())));
out.close();
return object;
} catch (IOException e) {
Expand Down Expand Up @@ -116,7 +117,9 @@ public static JsonNodeAndSize toJson(Object value) {
encoder.flush();
byte[] bytes = out.toByteArray();
out.close();
return new JsonNodeAndSize(jsonMapper.readTree(bytes), bytes.length);
JsonNode json = jsonMapper.readTree(bytes);
json = JsonNodeConverters.transformJsonNode(json, schema, JsonNodeConverters.withTextNodesAsDecimal);
return new JsonNodeAndSize(json, bytes.length);
} catch (IOException e) {
// These can be generated by Avro's JSON encoder, the output stream operations, and the
// Jackson ObjectMapper.readTree() call.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package io.confluent.kafkarest.converters;

import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;

public class BigDecimalDecoder {

// see: https://github.com/apache/avro/blob/33d495840c896b693b7f37b5ec786ac1acacd3b4/lang/java/avro/src/main/java/org/apache/avro/Conversions.java#L79
public static BigDecimal fromBytes(ByteBuffer value, int scale) {
byte[] bytes = new byte[value.remaining()];
value.get(bytes);
return new BigDecimal(new BigInteger(bytes), scale);
}

public static BigDecimal fromEncodedString(String encoded, int scale){
return fromBytes(ByteBuffer.wrap(encoded.getBytes(Charset.forName("ISO-8859-1"))), scale);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package io.confluent.kafkarest.converters;

import java.math.BigDecimal;
import java.nio.charset.Charset;

public class BigDecimalEncoder {

static public byte[] toByteArray(BigDecimal bigDecimal, int scale) {
// will throw an error if rounding is necessary (meaning scale is wrong)
return bigDecimal.setScale(scale, BigDecimal.ROUND_UNNECESSARY).unscaledValue().toByteArray();
}


static public String toEncodedString(BigDecimal bigDecimal, int scale, int precision) {
return new String(toByteArray(bigDecimal,scale), Charset.forName("ISO-8859-1"));
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do you use ISO-8859-1?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's the only charset that I found that could encode the bytes the way I wanted for a String, so that my tests passed. Unsure why this one works and not others, but hoping for Confluent to let me know


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package io.confluent.kafkarest.converters;

import com.fasterxml.jackson.databind.JsonNode;
import org.apache.avro.Schema;

interface JsonNodeConverter {
JsonNode convert(JsonNode jsonNode, Schema schema);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package io.confluent.kafkarest.converters;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.*;
import org.apache.avro.Schema;

import java.math.BigDecimal;
import java.util.Iterator;
import java.util.Objects;

public class JsonNodeConverters {


static JsonNodeConverter withDecimalNodesAsText = new JsonNodeConverter() {
@Override
public JsonNode convert(JsonNode jsonNode, Schema schema) {
if (jsonNode instanceof DoubleNode) {
BigDecimal bd = jsonNode.decimalValue();
int scale = schema.getJsonProps().get("scale").asInt();
int precision = schema.getJsonProps().get("precision").asInt();
String bdBytesAsUtf8 = BigDecimalEncoder.toEncodedString(bd, scale, precision);
return new TextNode(bdBytesAsUtf8);
} else {
return jsonNode;
}
}
};

static JsonNodeConverter withTextNodesAsDecimal = new JsonNodeConverter() {
@Override
public JsonNode convert(JsonNode jsonNode, Schema schema) {
if (jsonNode instanceof TextNode) {
String encodedBigDecimal = jsonNode.asText();
int scale = schema.getJsonProps().get("scale").asInt();
BigDecimal bd = BigDecimalDecoder.fromEncodedString(encodedBigDecimal, scale);
return new DecimalNode(bd);
} else {
return jsonNode;
}
}
};

// private static JsonNode transformJsonNode(JsonNode jsonNode, Schema schema) {
static JsonNode transformJsonNode(JsonNode jsonNode, Schema schema, JsonNodeConverter jsonNodeConverter) {
if (schema.getType() == Schema.Type.BYTES && schema.getJsonProps().containsKey("logicalType") &&
Objects.equals(schema.getJsonProps().get("logicalType").asText(), "decimal")) {
return jsonNodeConverter.convert(jsonNode, schema);
} else if (schema.getType() == Schema.Type.RECORD) {
for (Schema.Field s : schema.getFields()) {
JsonNode transformed = transformJsonNode(jsonNode.get(s.name()), s.schema(), jsonNodeConverter);
((ObjectNode) jsonNode).set(s.name(), transformed);
}
} else if (schema.getType() == Schema.Type.UNION) {
if (jsonNode.has("bytes") && jsonNode.get("bytes").isNumber()) {
for (Schema subSchema : schema.getTypes()) {
if (subSchema.getType() == Schema.Type.BYTES && subSchema.getJsonProps().containsKey("logicalType") &&
Objects.equals(subSchema.getJsonProps().get("logicalType").asText(), "decimal")) {
JsonNode transformed = transformJsonNode(jsonNode.get("bytes"), subSchema, jsonNodeConverter);
((ObjectNode) jsonNode).set("bytes", transformed);
}
}
}
} else if (schema.getType() == Schema.Type.ARRAY) {
Schema subSchema = schema.getElementType();
int i = 0;
for (Iterator<JsonNode> it = jsonNode.elements(); it.hasNext(); ) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use instead

for (JsonNode elem: jsonNode.elements()) {
  //
}

JsonNode elem = it.next();
JsonNode transformed = transformJsonNode(elem, subSchema, jsonNodeConverter);
((ArrayNode) jsonNode).set(i, transformed);
i += 1;
}
}
// TODO: More cases - missing MAP and ENUM
return jsonNode;
}
}
116 changes: 116 additions & 0 deletions src/test/java/io/confluent/kafkarest/unit/AvroConverterTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.TextNode;

import io.confluent.kafkarest.converters.BigDecimalDecoder;
import io.confluent.kafkarest.converters.BigDecimalEncoder;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericArray;
import org.apache.avro.generic.GenericData;
Expand All @@ -29,6 +31,7 @@
import org.apache.avro.util.Utf8;
import org.junit.Test;

import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.HashMap;
Expand Down Expand Up @@ -61,6 +64,7 @@ public class AvroConverterTest {
+ " {\"name\": \"float\", \"type\": \"float\"},\n"
+ " {\"name\": \"double\", \"type\": \"double\"},\n"
+ " {\"name\": \"bytes\", \"type\": \"bytes\"},\n"
+ " {\"name\": \"decimal\", \"type\": { \"type\": \"bytes\", \"logicalType\": \"decimal\", \"precision\": 5, \"scale\": 2} },\n"
+ " {\"name\": \"string\", \"type\": \"string\", \"aliases\": [\"string_alias\"]},\n"
+ " {\"name\": \"null_default\", \"type\": \"null\", \"default\": null},\n"
+ " {\"name\": \"boolean_default\", \"type\": \"boolean\", \"default\": false},\n"
Expand Down Expand Up @@ -106,6 +110,43 @@ public class AvroConverterTest {
+ "}"
);

private static final Schema decimalSchema = new Schema.Parser().parse(
"{\"type\": \"record\",\n"
+ " \"name\": \"testDecimal\",\n"
+ " \"fields\": [\n"
+ " {\"name\": \"decimal\", \"type\": {\"type\":\"bytes\", \"logicalType\": \"decimal\", \"precision\" : 5,\"scale\" : 2 } }\n"
+ "]}"
);


private static final Schema decimalUnionNullSchema = new Schema.Parser().parse(
"{\"type\": \"record\",\n"
+ " \"name\": \"testDecimal\",\n"
+ " \"fields\": [\n"
+ " {\"name\": \"decimal\", \"type\": [\"null\",{\"type\":\"bytes\", \"logicalType\": \"decimal\", \"precision\" : 5,\"scale\" : 2 }] }\n"
+ "]}"
);


private static final Schema nestedDecimalSchema = new Schema.Parser().parse(
"{\"type\": \"record\",\n"
+ " \"name\": \"testDecimal\",\n"
+ " \"fields\": [\n"
+ " {\"name\": \"nested\", \"type\":\n"
+ " {\"type\": \"record\", \"name\":\"nestedRecord\", \"fields\":[\n"
+ " {\"name\": \"decimal\", \"type\": {\"type\":\"bytes\", \"logicalType\": \"decimal\", \"precision\" : 5,\"scale\" : 2 } }]}}\n"
+ "]}"
);


private static final Schema decimalArraySchema = new Schema.Parser().parse(
"{\"namespace\": \"namespace\",\n"
+ " \"type\": \"array\",\n"
+ " \"name\": \"test\",\n"
+ " \"items\": {\"type\":\"bytes\", \"logicalType\": \"decimal\", \"precision\" : 5,\"scale\" : 2 } \n"
+ "}"
);

@Test
public void testPrimitiveTypesToAvro() {
Object result = AvroConverter.toAvro(null, createPrimitiveSchema("null"));
Expand Down Expand Up @@ -188,6 +229,7 @@ public void testRecordToAvro() {
+ " \"float\": 23.4,\n"
+ " \"double\": 800.25,\n"
+ " \"bytes\": \"hello\",\n"
+ " \"decimal\": 123.45,\n"
+ " \"string\": \"string\",\n"
+ " \"null_default\": null,\n"
+ " \"boolean_default\": false,\n"
Expand All @@ -210,6 +252,7 @@ public void testRecordToAvro() {
assertEquals(800.25, resultRecord.get("double"));
assertEquals(EntityUtils.encodeBase64Binary("hello".getBytes()),
EntityUtils.encodeBase64Binary(((ByteBuffer) resultRecord.get("bytes")).array()));
assertEquals(new BigDecimal("123.45"), BigDecimalDecoder.fromBytes((ByteBuffer) resultRecord.get("decimal"), 2));
assertEquals("string", resultRecord.get("string").toString());
// Nothing to check with default values, just want to make sure an exception wasn't thrown
// when they values weren't specified for their fields.
Expand Down Expand Up @@ -263,6 +306,76 @@ public void testEnumToAvro() {
// serialization.
}

@Test
public void testDecimalToAvro(){
// this has been tested for numbers ranging from -10000.99 to 10000.99
for (int i = -100; i <= 100; i++){
for (int j = 0; j <= 99; j++){
BigDecimal numberBigDecimal = new BigDecimal(i + (float) j / 100);
numberBigDecimal = numberBigDecimal .setScale(2, BigDecimal.ROUND_HALF_UP);
String decimal = numberBigDecimal.toString();

Object result = AvroConverter.toAvro(
TestUtils.jsonTree(String.format("{\"decimal\": %s}", decimal)),
decimalSchema);

ByteBuffer byteBuffer = ((ByteBuffer) ((GenericData.Record) result).get("decimal"));
int scale = decimalSchema.getField("decimal").schema().getJsonProp("scale").asInt();

BigDecimal expected = BigDecimalDecoder.fromBytes(byteBuffer, scale);
assertEquals(new BigDecimal(decimal), expected );
}
}

}


@Test
public void testDecimalUnionNullToAvro(){
String decimal = "123.45";
Object result = AvroConverter.toAvro(
TestUtils.jsonTree(String.format("{\"decimal\": {\"bytes\": %s }}", decimal)),
decimalUnionNullSchema);

ByteBuffer byteBuffer = ((ByteBuffer) ((GenericData.Record) result).get("decimal"));
int scale = decimalUnionNullSchema.getField("decimal").schema()
.getTypes().get(1).getJsonProp("scale").asInt();

BigDecimal expected = BigDecimalDecoder.fromBytes(byteBuffer, scale);
assertEquals(new BigDecimal(decimal), expected);

}

@Test
public void testNestedDecimalToAvro(){
String decimal = "123.45";

Object result = AvroConverter.toAvro(
TestUtils.jsonTree(String.format("{\"nested\": {\"decimal\": %s }}", decimal)),
nestedDecimalSchema);

ByteBuffer byteBuffer = (ByteBuffer) ((GenericData.Record) ((GenericData.Record) result).get("nested")).get("decimal");
int scale = 2;

BigDecimal expected = BigDecimalDecoder.fromBytes(byteBuffer, scale);
assertEquals(new BigDecimal(decimal), expected);

}


@Test
public void testDecimalArrayToAvro() {
String json = "[123.45,555.55]";
Object result = AvroConverter.toAvro(TestUtils.jsonTree(json), decimalArraySchema);
int scale = 2;

ByteBuffer byteBuffer0 = (ByteBuffer) ((GenericData.Array) result).get(0);
assertEquals(new BigDecimal("123.45"), BigDecimalDecoder.fromBytes(byteBuffer0,2));

ByteBuffer byteBuffer1 = (ByteBuffer) ((GenericData.Array) result).get(1);
assertEquals(new BigDecimal("555.55"), BigDecimalDecoder.fromBytes(byteBuffer1,2));
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add more test for different scales? You really only testing scale of 2


@Test
public void testPrimitiveTypesToJson() {
Expand Down Expand Up @@ -313,6 +426,7 @@ public void testRecordToJson() {
.set("float", 23.4f)
.set("double", 800.25)
.set("bytes", ByteBuffer.wrap("bytes".getBytes()))
.set("decimal", ByteBuffer.wrap(BigDecimalEncoder.toByteArray(new BigDecimal("123.45"),2)))
.set("string", "string")
.build();

Expand All @@ -334,6 +448,8 @@ public void testRecordToJson() {
// The bytes value was created from an ASCII string, so Avro's encoding should just give that
// string back to us in the JSON-serialized version
assertEquals("bytes", result.json.get("bytes").textValue());
assertTrue(result.json.get("decimal").isBigDecimal());
assertEquals(new BigDecimal("123.45"), result.json.get("decimal").decimalValue());
assertTrue(result.json.get("string").isTextual());
assertEquals("string", result.json.get("string").textValue());
}
Expand Down