Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP - Decimal support #294

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;

/**
* Provides conversion of JSON to/from Avro.
Expand Down Expand Up @@ -67,10 +68,16 @@ public class AvroConverter {
public static Object toAvro(JsonNode value, Schema schema) {
try {
ByteArrayOutputStream out = new ByteArrayOutputStream();
value = JsonNodeConverters.transformJsonNode(value, schema, new JsonNodeConverter() {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i dont recall exactly how the JVM handles this (maybe it does optimally), but it might be more efficient to have your implementation of JsonNodeConverter defined somewhere rather than repeatedly created

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto for toJson.

@Override
public JsonNode convert(JsonNode jsonNode, Schema schema) {
return JsonNodeConverters.decimalToText(jsonNode, schema);
}
});
jsonMapper.writeValue(out, value);
DatumReader<Object> reader = new GenericDatumReader<Object>(schema);
Object object = reader.read(
null, decoderFactory.jsonDecoder(schema, new ByteArrayInputStream(out.toByteArray())));
null, decoderFactory.jsonDecoder(schema, new ByteArrayInputStream(out.toByteArray())));
out.close();
return object;
} catch (IOException e) {
Expand Down Expand Up @@ -116,7 +123,14 @@ public static JsonNodeAndSize toJson(Object value) {
encoder.flush();
byte[] bytes = out.toByteArray();
out.close();
return new JsonNodeAndSize(jsonMapper.readTree(bytes), bytes.length);
JsonNode json = jsonMapper.readTree(bytes);
json = JsonNodeConverters.transformJsonNode(json, schema, new JsonNodeConverter() {
@Override
public JsonNode convert(JsonNode jsonNode, Schema schema) {
return JsonNodeConverters.textToDecimal(jsonNode, schema);
}
});
return new JsonNodeAndSize(json, bytes.length);
} catch (IOException e) {
// These can be generated by Avro's JSON encoder, the output stream operations, and the
// Jackson ObjectMapper.readTree() call.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package io.confluent.kafkarest.converters;

import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;

public class BigDecimalDecoder {

// see: https://github.com/apache/avro/blob/33d495840c896b693b7f37b5ec786ac1acacd3b4/lang/java/avro/src/main/java/org/apache/avro/Conversions.java#L79
public static BigDecimal fromBytes(ByteBuffer value, int scale) {
byte[] bytes = new byte[value.remaining()];
value.get(bytes);
return new BigDecimal(new BigInteger(bytes), scale);
}

public static BigDecimal fromEncodedString(String encoded, int scale){
return fromBytes(ByteBuffer.wrap(encoded.getBytes(Charset.forName("ISO-8859-1"))), scale);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package io.confluent.kafkarest.converters;

import java.math.BigDecimal;
import java.nio.charset.Charset;

public class BigDecimalEncoder {

static public byte[] toByteArray(BigDecimal bigDecimal, int scale) {
// will throw an error if rounding is necessary (meaning scale is wrong)
return bigDecimal.setScale(scale, BigDecimal.ROUND_UNNECESSARY).unscaledValue().toByteArray();
}


static public String toEncodedString(BigDecimal bigDecimal, int scale, int precision) {
return new String(toByteArray(bigDecimal,scale), Charset.forName("ISO-8859-1"));
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do you use ISO-8859-1?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's the only charset that I found that could encode the bytes the way I wanted for a String, so that my tests passed. Unsure why this one works and not others, but hoping for Confluent to let me know


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package io.confluent.kafkarest.converters;

import com.fasterxml.jackson.databind.JsonNode;
import org.apache.avro.Schema;

interface JsonNodeConverter {
JsonNode convert(JsonNode jsonNode, Schema schema);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package io.confluent.kafkarest.converters;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.*;
import org.apache.avro.Schema;

import java.math.BigDecimal;
import java.util.Iterator;
import java.util.Objects;

public class JsonNodeConverters {

static JsonNode textToDecimal(JsonNode jsonNode, Schema schema) {
if (jsonNode instanceof TextNode){
String encodedBigDecimal = jsonNode.asText();
int scale = schema.getJsonProps().get("scale").asInt();
BigDecimal bd = BigDecimalDecoder.fromEncodedString(encodedBigDecimal, scale);
return new DecimalNode(bd);
} else {
return jsonNode;
}
};

static JsonNode decimalToText(JsonNode jsonNode, Schema schema) {
if (jsonNode instanceof DoubleNode) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as a suggestion, I would make these methods more conditional in their names. For example, if the method was isDecimal (which encapsulated your instanceOf logic), then as a result of that condition being true, you'd do your conversion. False would lead to the function being an identity on the input jsonNode. The benefit of this is in AvroConverter, the code isn't read as "I am definitely turning this into text from a decimal", but more so "I will convert this into text iff it's a decimal". The same can be said for textToDecimal.

BigDecimal bd = jsonNode.decimalValue();
int scale = schema.getJsonProps().get("scale").asInt();
int precision = schema.getJsonProps().get("precision").asInt();
String bdBytesAsUtf8 = BigDecimalEncoder.toEncodedString(bd, scale, precision);
return new TextNode(bdBytesAsUtf8);
} else {
return jsonNode;
}
}


private void dummy() {
transformJsonNode(null, null, new JsonNodeConverter() {
@Override
public JsonNode convert(JsonNode jsonNode, Schema schema) {
return textToDecimal(jsonNode, schema);
}
});
}

// private static JsonNode transformJsonNode(JsonNode jsonNode, Schema schema) {
static JsonNode transformJsonNode(JsonNode jsonNode, Schema schema, JsonNodeConverter jsonNodeConverter) {
if (schema.getType() == Schema.Type.BYTES && schema.getJsonProps().containsKey("logicalType") &&
Objects.equals(schema.getJsonProps().get("logicalType").asText(), "decimal")) {
return jsonNodeConverter.convert(jsonNode, schema);
} else if (schema.getType() == Schema.Type.RECORD) {
for (Schema.Field s : schema.getFields()){
JsonNode transformed = transformJsonNode(jsonNode.get(s.name()), s.schema(), jsonNodeConverter);
((ObjectNode) jsonNode).set(s.name(), transformed);
}
} else if (schema.getType() == Schema.Type.UNION) {
if (jsonNode.has("bytes") && jsonNode.get("bytes").isNumber()) {
for (Schema subSchema : schema.getTypes()) {
if (subSchema.getType() == Schema.Type.BYTES && subSchema.getJsonProps().containsKey("logicalType") &&
Objects.equals(subSchema.getJsonProps().get("logicalType").asText(), "decimal")){
JsonNode transformed = transformJsonNode(jsonNode.get("bytes"), subSchema, jsonNodeConverter);
((ObjectNode) jsonNode).set("bytes", transformed);
}
}
}
} else if (schema.getType() == Schema.Type.ARRAY) {
Schema subSchema = schema.getElementType();
int i = 0;
for (Iterator<JsonNode> it = jsonNode.elements(); it.hasNext(); ) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use instead

for (JsonNode elem: jsonNode.elements()) {
  //
}

JsonNode elem = it.next();
JsonNode transformed = transformJsonNode(elem, subSchema, jsonNodeConverter);
((ArrayNode) jsonNode).set(i, transformed);
i += 1;
}
}
// TODO: More cases - missing MAP and ENUM
return jsonNode;
}
}
116 changes: 116 additions & 0 deletions src/test/java/io/confluent/kafkarest/unit/AvroConverterTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.TextNode;

import io.confluent.kafkarest.converters.BigDecimalDecoder;
import io.confluent.kafkarest.converters.BigDecimalEncoder;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericArray;
import org.apache.avro.generic.GenericData;
Expand All @@ -29,6 +31,7 @@
import org.apache.avro.util.Utf8;
import org.junit.Test;

import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.HashMap;
Expand Down Expand Up @@ -61,6 +64,7 @@ public class AvroConverterTest {
+ " {\"name\": \"float\", \"type\": \"float\"},\n"
+ " {\"name\": \"double\", \"type\": \"double\"},\n"
+ " {\"name\": \"bytes\", \"type\": \"bytes\"},\n"
+ " {\"name\": \"decimal\", \"type\": { \"type\": \"bytes\", \"logicalType\": \"decimal\", \"precision\": 5, \"scale\": 2} },\n"
+ " {\"name\": \"string\", \"type\": \"string\", \"aliases\": [\"string_alias\"]},\n"
+ " {\"name\": \"null_default\", \"type\": \"null\", \"default\": null},\n"
+ " {\"name\": \"boolean_default\", \"type\": \"boolean\", \"default\": false},\n"
Expand Down Expand Up @@ -106,6 +110,43 @@ public class AvroConverterTest {
+ "}"
);

private static final Schema decimalSchema = new Schema.Parser().parse(
"{\"type\": \"record\",\n"
+ " \"name\": \"testDecimal\",\n"
+ " \"fields\": [\n"
+ " {\"name\": \"decimal\", \"type\": {\"type\":\"bytes\", \"logicalType\": \"decimal\", \"precision\" : 5,\"scale\" : 2 } }\n"
+ "]}"
);


private static final Schema decimalUnionNullSchema = new Schema.Parser().parse(
"{\"type\": \"record\",\n"
+ " \"name\": \"testDecimal\",\n"
+ " \"fields\": [\n"
+ " {\"name\": \"decimal\", \"type\": [\"null\",{\"type\":\"bytes\", \"logicalType\": \"decimal\", \"precision\" : 5,\"scale\" : 2 }] }\n"
+ "]}"
);


private static final Schema nestedDecimalSchema = new Schema.Parser().parse(
"{\"type\": \"record\",\n"
+ " \"name\": \"testDecimal\",\n"
+ " \"fields\": [\n"
+ " {\"name\": \"nested\", \"type\":\n"
+ " {\"type\": \"record\", \"name\":\"nestedRecord\", \"fields\":[\n"
+ " {\"name\": \"decimal\", \"type\": {\"type\":\"bytes\", \"logicalType\": \"decimal\", \"precision\" : 5,\"scale\" : 2 } }]}}\n"
+ "]}"
);


private static final Schema decimalArraySchema = new Schema.Parser().parse(
"{\"namespace\": \"namespace\",\n"
+ " \"type\": \"array\",\n"
+ " \"name\": \"test\",\n"
+ " \"items\": {\"type\":\"bytes\", \"logicalType\": \"decimal\", \"precision\" : 5,\"scale\" : 2 } \n"
+ "}"
);

@Test
public void testPrimitiveTypesToAvro() {
Object result = AvroConverter.toAvro(null, createPrimitiveSchema("null"));
Expand Down Expand Up @@ -188,6 +229,7 @@ public void testRecordToAvro() {
+ " \"float\": 23.4,\n"
+ " \"double\": 800.25,\n"
+ " \"bytes\": \"hello\",\n"
+ " \"decimal\": 123.45,\n"
+ " \"string\": \"string\",\n"
+ " \"null_default\": null,\n"
+ " \"boolean_default\": false,\n"
Expand All @@ -210,6 +252,7 @@ public void testRecordToAvro() {
assertEquals(800.25, resultRecord.get("double"));
assertEquals(EntityUtils.encodeBase64Binary("hello".getBytes()),
EntityUtils.encodeBase64Binary(((ByteBuffer) resultRecord.get("bytes")).array()));
assertEquals(new BigDecimal("123.45"), BigDecimalDecoder.fromBytes((ByteBuffer) resultRecord.get("decimal"), 2));
assertEquals("string", resultRecord.get("string").toString());
// Nothing to check with default values, just want to make sure an exception wasn't thrown
// when they values weren't specified for their fields.
Expand Down Expand Up @@ -263,6 +306,76 @@ public void testEnumToAvro() {
// serialization.
}

@Test
public void testDecimalToAvro(){
// this has been tested for numbers ranging from -10000.99 to 10000.99
for (int i = -100; i <= 100; i++){
for (int j = 0; j <= 99; j++){
BigDecimal numberBigDecimal = new BigDecimal(i + (float) j / 100);
numberBigDecimal = numberBigDecimal .setScale(2, BigDecimal.ROUND_HALF_UP);
String decimal = numberBigDecimal.toString();

Object result = AvroConverter.toAvro(
TestUtils.jsonTree(String.format("{\"decimal\": %s}", decimal)),
decimalSchema);

ByteBuffer byteBuffer = ((ByteBuffer) ((GenericData.Record) result).get("decimal"));
int scale = decimalSchema.getField("decimal").schema().getJsonProp("scale").asInt();

BigDecimal expected = BigDecimalDecoder.fromBytes(byteBuffer, scale);
assertEquals(new BigDecimal(decimal), expected );
}
}

}


@Test
public void testDecimalUnionNullToAvro(){
String decimal = "123.45";
Object result = AvroConverter.toAvro(
TestUtils.jsonTree(String.format("{\"decimal\": {\"bytes\": %s }}", decimal)),
decimalUnionNullSchema);

ByteBuffer byteBuffer = ((ByteBuffer) ((GenericData.Record) result).get("decimal"));
int scale = decimalUnionNullSchema.getField("decimal").schema()
.getTypes().get(1).getJsonProp("scale").asInt();

BigDecimal expected = BigDecimalDecoder.fromBytes(byteBuffer, scale);
assertEquals(new BigDecimal(decimal), expected);

}

@Test
public void testNestedDecimalToAvro(){
String decimal = "123.45";

Object result = AvroConverter.toAvro(
TestUtils.jsonTree(String.format("{\"nested\": {\"decimal\": %s }}", decimal)),
nestedDecimalSchema);

ByteBuffer byteBuffer = (ByteBuffer) ((GenericData.Record) ((GenericData.Record) result).get("nested")).get("decimal");
int scale = 2;

BigDecimal expected = BigDecimalDecoder.fromBytes(byteBuffer, scale);
assertEquals(new BigDecimal(decimal), expected);

}


@Test
public void testDecimalArrayToAvro() {
String json = "[123.45,555.55]";
Object result = AvroConverter.toAvro(TestUtils.jsonTree(json), decimalArraySchema);
int scale = 2;

ByteBuffer byteBuffer0 = (ByteBuffer) ((GenericData.Array) result).get(0);
assertEquals(new BigDecimal("123.45"), BigDecimalDecoder.fromBytes(byteBuffer0,2));

ByteBuffer byteBuffer1 = (ByteBuffer) ((GenericData.Array) result).get(1);
assertEquals(new BigDecimal("555.55"), BigDecimalDecoder.fromBytes(byteBuffer1,2));
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add more test for different scales? You really only testing scale of 2


@Test
public void testPrimitiveTypesToJson() {
Expand Down Expand Up @@ -313,6 +426,7 @@ public void testRecordToJson() {
.set("float", 23.4f)
.set("double", 800.25)
.set("bytes", ByteBuffer.wrap("bytes".getBytes()))
.set("decimal", ByteBuffer.wrap(BigDecimalEncoder.toByteArray(new BigDecimal("123.45"),2)))
.set("string", "string")
.build();

Expand All @@ -334,6 +448,8 @@ public void testRecordToJson() {
// The bytes value was created from an ASCII string, so Avro's encoding should just give that
// string back to us in the JSON-serialized version
assertEquals("bytes", result.json.get("bytes").textValue());
assertTrue(result.json.get("decimal").isBigDecimal());
assertEquals(new BigDecimal("123.45"), result.json.get("decimal").decimalValue());
assertTrue(result.json.get("string").isTextual());
assertEquals("string", result.json.get("string").textValue());
}
Expand Down