diff --git a/src/main/java/io/confluent/kafkarest/converters/AvroConverter.java b/src/main/java/io/confluent/kafkarest/converters/AvroConverter.java index b67148bd89..c396adaacc 100644 --- a/src/main/java/io/confluent/kafkarest/converters/AvroConverter.java +++ b/src/main/java/io/confluent/kafkarest/converters/AvroConverter.java @@ -67,10 +67,11 @@ public class AvroConverter { public static Object toAvro(JsonNode value, Schema schema) { try { ByteArrayOutputStream out = new ByteArrayOutputStream(); + value = JsonNodeConverters.transformJsonNode(value, schema, JsonNodeConverters.withDecimalNodesAsText); jsonMapper.writeValue(out, value); DatumReader reader = new GenericDatumReader(schema); Object object = reader.read( - null, decoderFactory.jsonDecoder(schema, new ByteArrayInputStream(out.toByteArray()))); + null, decoderFactory.jsonDecoder(schema, new ByteArrayInputStream(out.toByteArray()))); out.close(); return object; } catch (IOException e) { @@ -116,7 +117,9 @@ public static JsonNodeAndSize toJson(Object value) { encoder.flush(); byte[] bytes = out.toByteArray(); out.close(); - return new JsonNodeAndSize(jsonMapper.readTree(bytes), bytes.length); + JsonNode json = jsonMapper.readTree(bytes); + json = JsonNodeConverters.transformJsonNode(json, schema, JsonNodeConverters.withTextNodesAsDecimal); + return new JsonNodeAndSize(json, bytes.length); } catch (IOException e) { // These can be generated by Avro's JSON encoder, the output stream operations, and the // Jackson ObjectMapper.readTree() call. diff --git a/src/main/java/io/confluent/kafkarest/converters/BigDecimalDecoder.java b/src/main/java/io/confluent/kafkarest/converters/BigDecimalDecoder.java new file mode 100644 index 0000000000..c049d16c58 --- /dev/null +++ b/src/main/java/io/confluent/kafkarest/converters/BigDecimalDecoder.java @@ -0,0 +1,21 @@ +package io.confluent.kafkarest.converters; + +import java.math.BigDecimal; +import java.math.BigInteger; +import java.nio.ByteBuffer; +import java.nio.charset.Charset; + +public class BigDecimalDecoder { + + // see: https://github.com/apache/avro/blob/33d495840c896b693b7f37b5ec786ac1acacd3b4/lang/java/avro/src/main/java/org/apache/avro/Conversions.java#L79 + public static BigDecimal fromBytes(ByteBuffer value, int scale) { + byte[] bytes = new byte[value.remaining()]; + value.get(bytes); + return new BigDecimal(new BigInteger(bytes), scale); + } + + public static BigDecimal fromEncodedString(String encoded, int scale){ + return fromBytes(ByteBuffer.wrap(encoded.getBytes(Charset.forName("ISO-8859-1"))), scale); + } + +} diff --git a/src/main/java/io/confluent/kafkarest/converters/BigDecimalEncoder.java b/src/main/java/io/confluent/kafkarest/converters/BigDecimalEncoder.java new file mode 100644 index 0000000000..a08f912e15 --- /dev/null +++ b/src/main/java/io/confluent/kafkarest/converters/BigDecimalEncoder.java @@ -0,0 +1,18 @@ +package io.confluent.kafkarest.converters; + +import java.math.BigDecimal; +import java.nio.charset.Charset; + +public class BigDecimalEncoder { + + static public byte[] toByteArray(BigDecimal bigDecimal, int scale) { + // will throw an error if rounding is necessary (meaning scale is wrong) + return bigDecimal.setScale(scale, BigDecimal.ROUND_UNNECESSARY).unscaledValue().toByteArray(); + } + + + static public String toEncodedString(BigDecimal bigDecimal, int scale, int precision) { + return new String(toByteArray(bigDecimal,scale), Charset.forName("ISO-8859-1")); + } + +} \ No newline at end of file diff --git a/src/main/java/io/confluent/kafkarest/converters/JsonNodeConverter.java b/src/main/java/io/confluent/kafkarest/converters/JsonNodeConverter.java new file mode 100644 index 0000000000..a2d3d28d30 --- /dev/null +++ b/src/main/java/io/confluent/kafkarest/converters/JsonNodeConverter.java @@ -0,0 +1,8 @@ +package io.confluent.kafkarest.converters; + +import com.fasterxml.jackson.databind.JsonNode; +import org.apache.avro.Schema; + +interface JsonNodeConverter { + JsonNode convert(JsonNode jsonNode, Schema schema); +} diff --git a/src/main/java/io/confluent/kafkarest/converters/JsonNodeConverters.java b/src/main/java/io/confluent/kafkarest/converters/JsonNodeConverters.java new file mode 100644 index 0000000000..5855d588d3 --- /dev/null +++ b/src/main/java/io/confluent/kafkarest/converters/JsonNodeConverters.java @@ -0,0 +1,76 @@ +package io.confluent.kafkarest.converters; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.*; +import org.apache.avro.Schema; + +import java.math.BigDecimal; +import java.util.Iterator; +import java.util.Objects; + +public class JsonNodeConverters { + + + static JsonNodeConverter withDecimalNodesAsText = new JsonNodeConverter() { + @Override + public JsonNode convert(JsonNode jsonNode, Schema schema) { + if (jsonNode instanceof DoubleNode) { + BigDecimal bd = jsonNode.decimalValue(); + int scale = schema.getJsonProps().get("scale").asInt(); + int precision = schema.getJsonProps().get("precision").asInt(); + String bdBytesAsUtf8 = BigDecimalEncoder.toEncodedString(bd, scale, precision); + return new TextNode(bdBytesAsUtf8); + } else { + return jsonNode; + } + } + }; + + static JsonNodeConverter withTextNodesAsDecimal = new JsonNodeConverter() { + @Override + public JsonNode convert(JsonNode jsonNode, Schema schema) { + if (jsonNode instanceof TextNode) { + String encodedBigDecimal = jsonNode.asText(); + int scale = schema.getJsonProps().get("scale").asInt(); + BigDecimal bd = BigDecimalDecoder.fromEncodedString(encodedBigDecimal, scale); + return new DecimalNode(bd); + } else { + return jsonNode; + } + } + }; + + // private static JsonNode transformJsonNode(JsonNode jsonNode, Schema schema) { + static JsonNode transformJsonNode(JsonNode jsonNode, Schema schema, JsonNodeConverter jsonNodeConverter) { + if (schema.getType() == Schema.Type.BYTES && schema.getJsonProps().containsKey("logicalType") && + Objects.equals(schema.getJsonProps().get("logicalType").asText(), "decimal")) { + return jsonNodeConverter.convert(jsonNode, schema); + } else if (schema.getType() == Schema.Type.RECORD) { + for (Schema.Field s : schema.getFields()) { + JsonNode transformed = transformJsonNode(jsonNode.get(s.name()), s.schema(), jsonNodeConverter); + ((ObjectNode) jsonNode).set(s.name(), transformed); + } + } else if (schema.getType() == Schema.Type.UNION) { + if (jsonNode.has("bytes") && jsonNode.get("bytes").isNumber()) { + for (Schema subSchema : schema.getTypes()) { + if (subSchema.getType() == Schema.Type.BYTES && subSchema.getJsonProps().containsKey("logicalType") && + Objects.equals(subSchema.getJsonProps().get("logicalType").asText(), "decimal")) { + JsonNode transformed = transformJsonNode(jsonNode.get("bytes"), subSchema, jsonNodeConverter); + ((ObjectNode) jsonNode).set("bytes", transformed); + } + } + } + } else if (schema.getType() == Schema.Type.ARRAY) { + Schema subSchema = schema.getElementType(); + int i = 0; + for (Iterator it = jsonNode.elements(); it.hasNext(); ) { + JsonNode elem = it.next(); + JsonNode transformed = transformJsonNode(elem, subSchema, jsonNodeConverter); + ((ArrayNode) jsonNode).set(i, transformed); + i += 1; + } + } + // TODO: More cases - missing MAP and ENUM + return jsonNode; + } +} diff --git a/src/test/java/io/confluent/kafkarest/unit/AvroConverterTest.java b/src/test/java/io/confluent/kafkarest/unit/AvroConverterTest.java index 1bc0c53595..2bea046897 100644 --- a/src/test/java/io/confluent/kafkarest/unit/AvroConverterTest.java +++ b/src/test/java/io/confluent/kafkarest/unit/AvroConverterTest.java @@ -20,6 +20,8 @@ import com.fasterxml.jackson.databind.node.JsonNodeFactory; import com.fasterxml.jackson.databind.node.TextNode; +import io.confluent.kafkarest.converters.BigDecimalDecoder; +import io.confluent.kafkarest.converters.BigDecimalEncoder; import org.apache.avro.Schema; import org.apache.avro.generic.GenericArray; import org.apache.avro.generic.GenericData; @@ -29,6 +31,7 @@ import org.apache.avro.util.Utf8; import org.junit.Test; +import java.math.BigDecimal; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.HashMap; @@ -61,6 +64,7 @@ public class AvroConverterTest { + " {\"name\": \"float\", \"type\": \"float\"},\n" + " {\"name\": \"double\", \"type\": \"double\"},\n" + " {\"name\": \"bytes\", \"type\": \"bytes\"},\n" + + " {\"name\": \"decimal\", \"type\": { \"type\": \"bytes\", \"logicalType\": \"decimal\", \"precision\": 5, \"scale\": 2} },\n" + " {\"name\": \"string\", \"type\": \"string\", \"aliases\": [\"string_alias\"]},\n" + " {\"name\": \"null_default\", \"type\": \"null\", \"default\": null},\n" + " {\"name\": \"boolean_default\", \"type\": \"boolean\", \"default\": false},\n" @@ -106,6 +110,43 @@ public class AvroConverterTest { + "}" ); + private static final Schema decimalSchema = new Schema.Parser().parse( + "{\"type\": \"record\",\n" + + " \"name\": \"testDecimal\",\n" + + " \"fields\": [\n" + + " {\"name\": \"decimal\", \"type\": {\"type\":\"bytes\", \"logicalType\": \"decimal\", \"precision\" : 5,\"scale\" : 2 } }\n" + + "]}" + ); + + + private static final Schema decimalUnionNullSchema = new Schema.Parser().parse( + "{\"type\": \"record\",\n" + + " \"name\": \"testDecimal\",\n" + + " \"fields\": [\n" + + " {\"name\": \"decimal\", \"type\": [\"null\",{\"type\":\"bytes\", \"logicalType\": \"decimal\", \"precision\" : 5,\"scale\" : 2 }] }\n" + + "]}" + ); + + + private static final Schema nestedDecimalSchema = new Schema.Parser().parse( + "{\"type\": \"record\",\n" + + " \"name\": \"testDecimal\",\n" + + " \"fields\": [\n" + + " {\"name\": \"nested\", \"type\":\n" + + " {\"type\": \"record\", \"name\":\"nestedRecord\", \"fields\":[\n" + + " {\"name\": \"decimal\", \"type\": {\"type\":\"bytes\", \"logicalType\": \"decimal\", \"precision\" : 5,\"scale\" : 2 } }]}}\n" + + "]}" + ); + + + private static final Schema decimalArraySchema = new Schema.Parser().parse( + "{\"namespace\": \"namespace\",\n" + + " \"type\": \"array\",\n" + + " \"name\": \"test\",\n" + + " \"items\": {\"type\":\"bytes\", \"logicalType\": \"decimal\", \"precision\" : 5,\"scale\" : 2 } \n" + + "}" + ); + @Test public void testPrimitiveTypesToAvro() { Object result = AvroConverter.toAvro(null, createPrimitiveSchema("null")); @@ -188,6 +229,7 @@ public void testRecordToAvro() { + " \"float\": 23.4,\n" + " \"double\": 800.25,\n" + " \"bytes\": \"hello\",\n" + + " \"decimal\": 123.45,\n" + " \"string\": \"string\",\n" + " \"null_default\": null,\n" + " \"boolean_default\": false,\n" @@ -210,6 +252,7 @@ public void testRecordToAvro() { assertEquals(800.25, resultRecord.get("double")); assertEquals(EntityUtils.encodeBase64Binary("hello".getBytes()), EntityUtils.encodeBase64Binary(((ByteBuffer) resultRecord.get("bytes")).array())); + assertEquals(new BigDecimal("123.45"), BigDecimalDecoder.fromBytes((ByteBuffer) resultRecord.get("decimal"), 2)); assertEquals("string", resultRecord.get("string").toString()); // Nothing to check with default values, just want to make sure an exception wasn't thrown // when they values weren't specified for their fields. @@ -263,6 +306,76 @@ public void testEnumToAvro() { // serialization. } + @Test + public void testDecimalToAvro(){ + // this has been tested for numbers ranging from -10000.99 to 10000.99 + for (int i = -100; i <= 100; i++){ + for (int j = 0; j <= 99; j++){ + BigDecimal numberBigDecimal = new BigDecimal(i + (float) j / 100); + numberBigDecimal = numberBigDecimal .setScale(2, BigDecimal.ROUND_HALF_UP); + String decimal = numberBigDecimal.toString(); + + Object result = AvroConverter.toAvro( + TestUtils.jsonTree(String.format("{\"decimal\": %s}", decimal)), + decimalSchema); + + ByteBuffer byteBuffer = ((ByteBuffer) ((GenericData.Record) result).get("decimal")); + int scale = decimalSchema.getField("decimal").schema().getJsonProp("scale").asInt(); + + BigDecimal expected = BigDecimalDecoder.fromBytes(byteBuffer, scale); + assertEquals(new BigDecimal(decimal), expected ); + } + } + + } + + + @Test + public void testDecimalUnionNullToAvro(){ + String decimal = "123.45"; + Object result = AvroConverter.toAvro( + TestUtils.jsonTree(String.format("{\"decimal\": {\"bytes\": %s }}", decimal)), + decimalUnionNullSchema); + + ByteBuffer byteBuffer = ((ByteBuffer) ((GenericData.Record) result).get("decimal")); + int scale = decimalUnionNullSchema.getField("decimal").schema() + .getTypes().get(1).getJsonProp("scale").asInt(); + + BigDecimal expected = BigDecimalDecoder.fromBytes(byteBuffer, scale); + assertEquals(new BigDecimal(decimal), expected); + + } + + @Test + public void testNestedDecimalToAvro(){ + String decimal = "123.45"; + + Object result = AvroConverter.toAvro( + TestUtils.jsonTree(String.format("{\"nested\": {\"decimal\": %s }}", decimal)), + nestedDecimalSchema); + + ByteBuffer byteBuffer = (ByteBuffer) ((GenericData.Record) ((GenericData.Record) result).get("nested")).get("decimal"); + int scale = 2; + + BigDecimal expected = BigDecimalDecoder.fromBytes(byteBuffer, scale); + assertEquals(new BigDecimal(decimal), expected); + + } + + + @Test + public void testDecimalArrayToAvro() { + String json = "[123.45,555.55]"; + Object result = AvroConverter.toAvro(TestUtils.jsonTree(json), decimalArraySchema); + int scale = 2; + + ByteBuffer byteBuffer0 = (ByteBuffer) ((GenericData.Array) result).get(0); + assertEquals(new BigDecimal("123.45"), BigDecimalDecoder.fromBytes(byteBuffer0,2)); + + ByteBuffer byteBuffer1 = (ByteBuffer) ((GenericData.Array) result).get(1); + assertEquals(new BigDecimal("555.55"), BigDecimalDecoder.fromBytes(byteBuffer1,2)); + } + @Test public void testPrimitiveTypesToJson() { @@ -313,6 +426,7 @@ public void testRecordToJson() { .set("float", 23.4f) .set("double", 800.25) .set("bytes", ByteBuffer.wrap("bytes".getBytes())) + .set("decimal", ByteBuffer.wrap(BigDecimalEncoder.toByteArray(new BigDecimal("123.45"),2))) .set("string", "string") .build(); @@ -334,6 +448,8 @@ public void testRecordToJson() { // The bytes value was created from an ASCII string, so Avro's encoding should just give that // string back to us in the JSON-serialized version assertEquals("bytes", result.json.get("bytes").textValue()); + assertTrue(result.json.get("decimal").isBigDecimal()); + assertEquals(new BigDecimal("123.45"), result.json.get("decimal").decimalValue()); assertTrue(result.json.get("string").isTextual()); assertEquals("string", result.json.get("string").textValue()); }