From d673027e58116d38a5fea09c98de6736a16bbaf8 Mon Sep 17 00:00:00 2001 From: Indhumathi27 Date: Mon, 3 Sep 2018 17:35:01 +0530 Subject: [PATCH] Fix decimal type for Avro --- ...sactionalCarbonTableWithAvroDataType.scala | 470 +++++++++++++++++- .../carbondata/sdk/file/AvroCarbonWriter.java | 77 ++- 2 files changed, 505 insertions(+), 42 deletions(-) diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithAvroDataType.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithAvroDataType.scala index 29aa2de9709..dc13b16dfaf 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithAvroDataType.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithAvroDataType.scala @@ -18,8 +18,14 @@ package org.apache.carbondata.spark.testsuite.createTable import java.io.File +import java.nio.ByteBuffer +import javax.xml.bind.DatatypeConverter + import scala.collection.mutable +import org.apache.avro.Conversions.DecimalConversion +import org.apache.avro.{LogicalTypes, Schema} +import org.apache.avro.generic.GenericData import org.apache.spark.sql.Row import org.apache.spark.sql.test.util.QueryTest import org.scalatest.BeforeAndAfterAll @@ -46,6 +52,8 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef writerPath = writerPath.replace("\\", "/") + val decimalConversion = new DecimalConversion + override def beforeAll(): Unit = { sql("DROP TABLE IF EXISTS sdkOutputTable") CarbonProperties.getInstance() @@ -678,7 +686,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef | "name": "StudentActivity", | "fields": [ | { - | "name": "enum_field", "type": [{ + | "name": "union_field", "type": [{ | "namespace": "org.example.avro", | "name": "dec", | "type": "bytes", @@ -689,15 +697,27 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef | }] |}""".stripMargin - val json1 = - """{"enum_field":{"bytes":"1010"}}""".stripMargin - val nn = new org.apache.avro.Schema.Parser().parse(schema1) + val decimalConversion = new DecimalConversion + val logicalType = LogicalTypes.decimal(10, 2) + val decimal = new java.math.BigDecimal("1010").setScale(2) + //get unscaled 2's complement bytearray + val bytes = + decimalConversion.toBytes(decimal, nn.getField("union_field").schema, logicalType) + val data = DatatypeConverter.printBase64Binary(bytes.array()) + val json1 = + s"""{"union_field":{"bytes":"$data"}}""".stripMargin val record = testUtil.jsonToAvro(json1, schema1) + val data1 = new String(record.get(0).asInstanceOf[ByteBuffer].array(), + CarbonCommonConstants.DEFAULT_CHARSET_CLASS) + val bytes1 = ByteBuffer.wrap(DatatypeConverter.parseBase64Binary(data1)) + val avroRec = new GenericData. Record(nn) + avroRec.put("union_field", bytes1) + val writer = CarbonWriter.builder .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn) - writer.write(record) + writer.write(avroRec) writer.close() sql( s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY @@ -711,16 +731,16 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(writerPath)) val schema1 = """ - |{"name": "address", + |{"name": "struct_field", | "type": "record", | "fields": [ - | { "name": "name", "type": "string"}, - | { "name": "age", "type": "float"}, - | { "name": "address", "type": { - | "type" : "record", "name" : "my_address", + | { "name": "record1", "type": "string"}, + | { "name": "record2", "type": "float"}, + | { "name": "struct_field_decimal", "type": { + | "type" : "record", "name" : "my_record", | "fields" : [ - | {"name": "street", "type": "string"}, - | {"name": "city", "type": {"type" : "bytes", + | {"name": "record3", "type": "string"}, + | {"name": "record4", "type": {"type" : "bytes", | "logicalType": "decimal", | "precision": 4, | "scale": 2 @@ -728,14 +748,46 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef |]} """.stripMargin - val json1 = """ {"name":"bob", "age":10.24, "address" : {"street":"abc", "city":"32"}} """ - val nn = new org.apache.avro.Schema.Parser().parse(schema1) + + val logicalType = LogicalTypes.decimal(4, 2) + val decimal1 = new java.math.BigDecimal("32").setScale(2) + //get unscaled 2's complement bytearray + val bytes = + decimalConversion.toBytes(decimal1, nn.getField("struct_field_decimal").schema, logicalType) + val data = DatatypeConverter.printBase64Binary(bytes.array()) + val json1 = s""" {"record1":"bob", "record2":10.24, "struct_field_decimal" : {"record3":"abc", "record4":"$data"}} """ val record = testUtil.jsonToAvro(json1, schema1) + val jsonData = new String(record.get(2).asInstanceOf[GenericData.Record].get(1) + .asInstanceOf[ByteBuffer].array(), + CarbonCommonConstants.DEFAULT_CHARSET_CLASS) + val bytesValue = ByteBuffer.wrap(DatatypeConverter.parseBase64Binary(jsonData)) + val mySchema = + """ + |{"name": "struct_field_decimal", + | "type": "record", + | "fields": [ + | { "name": "record3", "type": "string"}, + | { "name": "record4", "type": {"type" : "bytes", + | "logicalType": "decimal", + | "precision": 4, + | "scale": 2 + | }} + |]} + """.stripMargin + val schema = new org.apache.avro.Schema.Parser().parse(mySchema) + val genericByteArray = new GenericData.Record(schema) + genericByteArray.put("record3", "abc") + genericByteArray.put("record4", bytesValue) + val avroRec = new GenericData.Record(nn) + avroRec.put("record1", "bob") + avroRec.put("record2", 10.24) + avroRec.put("struct_field_decimal", genericByteArray) + val writer = CarbonWriter.builder .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn) - writer.write(record) + writer.write(avroRec) writer.close() sql( s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY @@ -761,11 +813,11 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef | "type": "int" | }, | { - | "name": "address", + | "name": "dec_fields", | "type": { | "type": "array", | "items": { - | "name": "street", + | "name": "dec_field", | "type": "bytes", | "logicalType": "decimal", | "precision": 4, @@ -774,14 +826,40 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef | } """.stripMargin - val json1: String = """ {"name": "bob","age": 10,"address": ["32", "42"]} """ - val nn = new org.apache.avro.Schema.Parser().parse(schema1) + val logicalType = LogicalTypes.decimal(4, 1) + val decimal1 = new java.math.BigDecimal("32").setScale(1) + val decimal2 = new java.math.BigDecimal("42").setScale(1) + //get unscaled 2's complement bytearray + val bytes1 = + decimalConversion.toBytes(decimal1, nn.getField("dec_fields").schema, logicalType) + val bytes2 = + decimalConversion.toBytes(decimal2, nn.getField("dec_fields").schema, logicalType) + val data1 = DatatypeConverter.printBase64Binary(bytes1.array()) + val data2 = DatatypeConverter.printBase64Binary(bytes2.array()) + val json1: String = s""" {"name": "bob","age": 10,"dec_fields":["$data1","$data2"]} """ val record = testUtil.jsonToAvro(json1, schema1) + val jsonData1 = new String(record.get(2).asInstanceOf[GenericData.Array[ByteBuffer]].get(0) + .array(), + CarbonCommonConstants.DEFAULT_CHARSET_CLASS) + val jsonData2 = new String(record.get(2).asInstanceOf[GenericData.Array[ByteBuffer]].get(1) + .array(), + CarbonCommonConstants.DEFAULT_CHARSET_CLASS) + val bytesValue1 = ByteBuffer.wrap(DatatypeConverter.parseBase64Binary(jsonData1)) + val bytesValue2 = ByteBuffer.wrap(DatatypeConverter.parseBase64Binary(jsonData2)) + val genericByteArray = new GenericData.Array[ByteBuffer](2, + Schema.createArray(Schema.create(Schema.Type.BYTES))) + genericByteArray.add(bytesValue1) + genericByteArray.add(bytesValue2) + val avroRec = new GenericData.Record(nn) + avroRec.put("name", "bob") + avroRec.put("age", 10) + avroRec.put("dec_fields", genericByteArray) + val writer = CarbonWriter.builder .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn) - writer.write(record) + writer.write(avroRec) writer.close() sql( s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY @@ -884,4 +962,356 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef checkAnswer(sql("select * from sdkOutputTable"), Seq(Row(1728000, Row(1728000)))) } + test("test logical type decimal through Json") { + sql("drop table if exists sdkOutputTable") + FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(writerPath)) + val schema1 = + """{ + | "namespace": "com.apache.schema", + | "type": "record", + | "name": "StudentActivity", + | "fields": [ + | { + | "name": "id", + | "type": {"type" : "bytes", + | "logicalType": "decimal", + | "precision": 5, + | "scale": 2 + | } + |} + | ] + |}""".stripMargin + val nn = new org.apache.avro.Schema.Parser().parse(schema1) + val logicalType = LogicalTypes.decimal(5, 2) + val decimal = new java.math.BigDecimal("12.8").setScale(2) + //get unscaled 2's complement bytearray + val bytes = + decimalConversion.toBytes(decimal, nn.getField("id").schema, logicalType) + val data = DatatypeConverter.printBase64Binary(bytes.array()) + val json1 = + s"""{"id":"$data"}""".stripMargin + val record = testUtil.jsonToAvro(json1, schema1) + val data1 = new String(record.get(0).asInstanceOf[ByteBuffer].array(), + CarbonCommonConstants.DEFAULT_CHARSET_CLASS) + val bytes1 = ByteBuffer.wrap(DatatypeConverter.parseBase64Binary(data1)) + val avroRec = new GenericData. Record(nn) + avroRec.put("id", bytes1) + val writer = CarbonWriter.builder + .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn) + writer.write(avroRec) + writer.close() + sql( + s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY + |'carbondata' LOCATION + |'$writerPath' """.stripMargin) + checkAnswer(sql("select * from sdkOutputTable"), Seq(Row(decimal))) + } + + test("test logical type decimal through Json with big decimal value") { + sql("drop table if exists sdkOutputTable") + FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(writerPath)) + val schema1 = + """{ + | "namespace": "com.apache.schema", + | "type": "record", + | "name": "StudentActivity", + | "fields": [ + | { + | "name": "dec_field", + | "type": {"type" : "bytes", + | "logicalType": "decimal", + | "precision": 30, + | "scale": 10 + | } + |} + | ] + |}""".stripMargin + val nn = new org.apache.avro.Schema.Parser().parse(schema1) + val logicalType = LogicalTypes.decimal(30, 10) + val decimal = new java.math.BigDecimal("12672346879023.845789").setScale(10) + //get unscaled 2's complement bytearray + val bytes = + decimalConversion.toBytes(decimal, nn.getField("dec_field").schema, logicalType) + val data = DatatypeConverter.printBase64Binary(bytes.array()) + val json1 = + s"""{"dec_field":"$data"}""".stripMargin + val record = testUtil.jsonToAvro(json1, schema1) + val data1 = new String(record.get(0).asInstanceOf[ByteBuffer].array(), + CarbonCommonConstants.DEFAULT_CHARSET_CLASS) + val bytes1 = ByteBuffer.wrap(DatatypeConverter.parseBase64Binary(data1)) + val avroRec = new GenericData. Record(nn) + avroRec.put("dec_field", bytes1) + val writer = CarbonWriter.builder + .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn) + writer.write(avroRec) + writer.close() + sql( + s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY + |'carbondata' LOCATION + |'$writerPath' """.stripMargin) + checkAnswer(sql("select * from sdkOutputTable"), Seq(Row(decimal))) + } + + test("test logical type decimal through Json with negative decimal value") { + sql("drop table if exists sdkOutputTable") + FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(writerPath)) + val schema1 = + """{ + | "namespace": "com.apache.schema", + | "type": "record", + | "name": "StudentActivity", + | "fields": [ + | { + | "name": "dec_field", + | "type": {"type" : "bytes", + | "logicalType": "decimal", + | "precision": 30, + | "scale": 6 + | } + |} + | ] + |}""".stripMargin + val nn = new org.apache.avro.Schema.Parser().parse(schema1) + val logicalType = LogicalTypes.decimal(30, 6) + val decimal = new java.math.BigDecimal("-12672346879023.845").setScale(6) + //get unscaled 2's complement bytearray + val bytes = + decimalConversion.toBytes(decimal, nn.getField("dec_field").schema, logicalType) + val data = DatatypeConverter.printBase64Binary(bytes.array()) + val json1 = + s"""{"dec_field":"$data"}""".stripMargin + val record = testUtil.jsonToAvro(json1, schema1) + val data1 = new String(record.get(0).asInstanceOf[ByteBuffer].array(), + CarbonCommonConstants.DEFAULT_CHARSET_CLASS) + val bytes1 = ByteBuffer.wrap(DatatypeConverter.parseBase64Binary(data1)) + val avroRec = new GenericData. Record(nn) + avroRec.put("dec_field", bytes1) + val writer = CarbonWriter.builder + .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn) + writer.write(avroRec) + writer.close() + sql( + s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY + |'carbondata' LOCATION + |'$writerPath' """.stripMargin) + checkAnswer(sql("select * from sdkOutputTable"), Seq(Row(decimal))) + } + + test("test logical type decimal through Avro") { + sql("drop table if exists sdkOutputTable") + FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(writerPath)) + val schema1 = + """{ + | "namespace": "com.apache.schema", + | "type": "record", + | "name": "StudentActivity", + | "fields": [ + | { + | "name": "dec_field", + | "type": {"type" : "bytes", + | "logicalType": "decimal", + | "precision": 5, + | "scale": 2 + | } + |} + | ] + |}""".stripMargin + val nn = new org.apache.avro.Schema.Parser().parse(schema1) + val logicalType = LogicalTypes.decimal(5, 2) + val decimal = new java.math.BigDecimal("12.8").setScale(2) + //get unscaled 2's complement bytearray + val bytes = + decimalConversion.toBytes(decimal, nn.getField("dec_field").schema, logicalType) + val data = DatatypeConverter.printBase64Binary(bytes.array()) + val json1 = + s"""{"dec_field":"$data"}""".stripMargin + val avroRec = new GenericData. Record(nn) + avroRec.put("dec_field", bytes) + val writer = CarbonWriter.builder + .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn) + writer.write(avroRec) + writer.close() + sql( + s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY + |'carbondata' LOCATION + |'$writerPath' """.stripMargin) + checkAnswer(sql("select * from sdkOutputTable"), Seq(Row(decimal))) + } + + test("test logical type decimal with data having greater precision than specified precision") { + sql("drop table if exists sdkOutputTable") + FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(writerPath)) + val schema1 = + """{ + | "namespace": "com.apache.schema", + | "type": "record", + | "name": "StudentActivity", + | "fields": [ + | { + | "name": "dec_field", + | "type": {"type" : "bytes", + | "logicalType": "decimal", + | "precision": 5, + | "scale": 2 + | } + |} + | ] + |}""".stripMargin + val nn = new org.apache.avro.Schema.Parser().parse(schema1) + val logicalType = LogicalTypes.decimal(5, 2) + val decimal = new java.math.BigDecimal("1218").setScale(2) + //get unscaled 2's complement bytearray + val bytes = + decimalConversion.toBytes(decimal, nn.getField("dec_field").schema, logicalType) + val data = DatatypeConverter.printBase64Binary(bytes.array()) + val json1 = + s"""{"dec_field":"$data"}""".stripMargin + val avroRec = new GenericData. Record(nn) + avroRec.put("dec_field", bytes) + val exception1 = intercept[Exception] { + val writer = CarbonWriter.builder + .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn) + writer.write(avroRec) + writer.close() + } + assert(exception1.getMessage + .contains("Data Loading failed as value Precision 6 is greater than specified Precision 5 in Avro Schema")) + } + + test("test union with multiple record type") { + sql("drop table if exists sdkOutputTable") + FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(writerPath)) + val schema1 = + """{ + | "namespace": "test.avro", + | "type": "record", + | "name": "NewCar2", + | "fields": [ + | { + | "name": "optionalExtra", + | "type": ["null",{ + | "type":"record", + | "name":"Stereo", + | "fields" :[{ + | "name":"make", + | "type":"string" + | }, + | { + | "name":"speakers", + | "type":"int" + | }] + | },{ + | "type":"record", + | "name":"LeatherTrim", + | "fields":[{ + | "name":"colour", + | "type":"string" + | }] + | }], + | "default":null + | }] + | + |}""".stripMargin + + val json1 = + """{"optionalExtra":{"test.avro.LeatherTrim":{"colour":"ab"}}}""".stripMargin + + val nn = new org.apache.avro.Schema.Parser().parse(schema1) + val record = testUtil.jsonToAvro(json1, schema1) + + val writer = CarbonWriter.builder + .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn) + writer.write(record) + writer.close() + sql( + s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY + |'carbondata' LOCATION + |'$writerPath' """.stripMargin) + checkAnswer(sql("select * from sdkOutputTable"), Seq(Row(Row(Row(null,null),Row("ab"))))) + } + + test("test union with multiple Enum type") { + sql("drop table if exists sdkOutputTable") + FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(writerPath)) + val schema1 = + """{ + | "namespace": "test.avro", + | "type": "record", + | "name": "Union_data3", + | "fields": [ + | { + | "name": "enum_record", + | "type": + | ["long","null","string", + | {"type":"enum","name":"t1","symbols":["red","blue","yellow"]}, + | {"type":"enum","name":"t2","symbols":["sun","mon","tue","wed","thu","fri","sat"]}, + | "int" + | ]}] + |}""".stripMargin + + val json1 = + """{"enum_record":{"test.avro.t2":"sun"}}""".stripMargin + + val nn = new org.apache.avro.Schema.Parser().parse(schema1) + val record = testUtil.jsonToAvro(json1, schema1) + + val writer = CarbonWriter.builder + .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn) + writer.write(record) + writer.close() + sql( + s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY + |'carbondata' LOCATION + |'$writerPath' """.stripMargin) + checkAnswer(sql("select * from sdkOutputTable"), Seq(Row(Row(null,null,null,"sun",null)))) + } + + test("test spark file format") { + sql("drop table if exists sdkOutputTable") + FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(writerPath)) + val schema1 = + """{ + | "namespace": "com.apache.schema", + | "type": "record", + | "name": "StudentActivity", + | "fields": [ + | { + | "name": "union_field", "type": [{ + | "namespace": "org.example.avro", + | "name": "dec", + | "type": "bytes", + | "logicalType": "decimal", + | "precision": 10, + | "scale": 2 + | },"int"] + | }] + |}""".stripMargin + + val nn = new org.apache.avro.Schema.Parser().parse(schema1) + val decimalConversion = new DecimalConversion + val logicalType = LogicalTypes.decimal(10, 2) + val decimal = new java.math.BigDecimal("1010").setScale(2) + //get unscaled 2's complement bytearray + val bytes = + decimalConversion.toBytes(decimal, nn.getField("union_field").schema, logicalType) + val data = DatatypeConverter.printBase64Binary(bytes.array()) + val json1 = + s"""{"union_field":{"bytes":"$data"}}""".stripMargin + val record = testUtil.jsonToAvro(json1, schema1) + val data1 = new String(record.get(0).asInstanceOf[ByteBuffer].array(), + CarbonCommonConstants.DEFAULT_CHARSET_CLASS) + val bytes1 = ByteBuffer.wrap(DatatypeConverter.parseBase64Binary(data1)) + val avroRec = new GenericData. Record(nn) + avroRec.put("union_field", bytes1) + + + val writer = CarbonWriter.builder + .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn) + writer.write(avroRec) + writer.close() + sql(s"create table sdkOutputTable(union_field struct) " + + s"using carbon options(path='$writerPath')") + checkAnswer(sql("select * from sdkOutputTable"), Seq(Row(Row(decimal,null)))) + } + } diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java index 14dbe1633cd..dd70cc93c35 100644 --- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java +++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java @@ -19,6 +19,7 @@ import java.io.IOException; import java.math.BigDecimal; +import java.math.BigInteger; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.HashMap; @@ -31,7 +32,6 @@ import org.apache.carbondata.common.annotations.InterfaceAudience; import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; -import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.keygenerator.directdictionary.timestamp.DateDirectDictionaryGenerator; import org.apache.carbondata.core.metadata.datatype.DataType; import org.apache.carbondata.core.metadata.datatype.DataTypes; @@ -43,6 +43,7 @@ import org.apache.carbondata.hadoop.internal.ObjectArrayWritable; import org.apache.carbondata.processing.loading.complexobjects.ArrayObject; import org.apache.carbondata.processing.loading.complexobjects.StructObject; +import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException; import org.apache.carbondata.processing.loading.model.CarbonLoadModel; import org.apache.avro.LogicalType; @@ -103,7 +104,7 @@ private Object[] avroToCsv(GenericData.Record avroRecord) { } private Object avroFieldToObject(Schema.Field avroField, Object fieldValue) { - Object out; + Object out = null; Schema.Type type = avroField.schema().getType(); LogicalType logicalType = avroField.schema().getLogicalType(); switch (type) { @@ -118,7 +119,7 @@ private Object avroFieldToObject(Schema.Field avroField, Object fieldValue) { while (iterator.hasNext()) { // size is 2 because map will have key and value Object[] mapChildObjects = new Object[2]; - Map.Entry mapEntry = (HashMap.Entry) iterator.next(); + Map.Entry mapEntry = (Map.Entry) iterator.next(); // evaluate key Object keyObject = avroFieldToObject( new Schema.Field(avroField.name(), Schema.create(Schema.Type.STRING), @@ -198,7 +199,10 @@ private Object avroFieldToObject(Schema.Field avroField, Object fieldValue) { if (unionField.getType().equals(Schema.Type.NULL)) { continue; } - if (checkFieldValueType(unionField.getType(), fieldValue)) { + // Union may not contain more than one schema with the same type, + // except for the named types record,fixed and enum + // hence check for schema also in case of union of multiple record or enum or fixed type + if (validateUnionFieldValue(unionField.getType(), fieldValue, unionField)) { values[j] = avroFieldToObjectForUnionType(unionField, fieldValue, avroField); break; } @@ -206,6 +210,15 @@ private Object avroFieldToObject(Schema.Field avroField, Object fieldValue) { } out = new StructObject(values); break; + case BYTES: + // DECIMAL type is defined in Avro as a BYTE type with the logicalType property + // set to "decimal" and a specified precision and scale + if (logicalType instanceof LogicalTypes.Decimal) { + out = extractDecimalValue(fieldValue, + ((LogicalTypes.Decimal) avroField.schema().getLogicalType()).getScale(), + ((LogicalTypes.Decimal) avroField.schema().getLogicalType()).getPrecision()); + } + break; default: out = avroPrimitiveFieldToObject(type, logicalType, fieldValue); } @@ -218,9 +231,10 @@ private Object avroFieldToObject(Schema.Field avroField, Object fieldValue) { * * @param type * @param fieldValue + * @param unionField * @return */ - private boolean checkFieldValueType(Schema.Type type, Object fieldValue) { + private boolean validateUnionFieldValue(Schema.Type type, Object fieldValue, Schema unionField) { switch (type) { case INT: return (fieldValue instanceof Integer); @@ -235,7 +249,8 @@ private boolean checkFieldValueType(Schema.Type type, Object fieldValue) { case FLOAT: return (fieldValue instanceof Float); case RECORD: - return (fieldValue instanceof GenericData.Record); + return (fieldValue instanceof GenericData.Record && unionField + .equals(((GenericData.Record) fieldValue).getSchema())); case ARRAY: return (fieldValue instanceof GenericData.Array || fieldValue instanceof ArrayList); case BYTES: @@ -243,7 +258,8 @@ private boolean checkFieldValueType(Schema.Type type, Object fieldValue) { case MAP: return (fieldValue instanceof HashMap); case ENUM: - return (fieldValue instanceof GenericData.EnumSymbol); + return (fieldValue instanceof GenericData.EnumSymbol && unionField + .equals(((GenericData.EnumSymbol) fieldValue).getSchema())); default: return false; } @@ -251,7 +267,7 @@ private boolean checkFieldValueType(Schema.Type type, Object fieldValue) { private Object avroPrimitiveFieldToObject(Schema.Type type, LogicalType logicalType, Object fieldValue) { - Object out = null; + Object out; switch (type) { case INT: if (logicalType != null) { @@ -290,15 +306,6 @@ private Object avroPrimitiveFieldToObject(Schema.Type type, LogicalType logicalT // also carbon internally needs float as double out = Double.parseDouble(fieldValue.toString()); break; - case BYTES: - // DECIMAL type is defined in Avro as a BYTE type with the logicalType property - // set to "decimal" and a specified precision and scale - // As binary type is not supported yet,value will be null - if (logicalType instanceof LogicalTypes.Decimal) { - out = new BigDecimal(new String(((ByteBuffer) fieldValue).array(), - CarbonCommonConstants.DEFAULT_CHARSET_CLASS)); - } - break; case NULL: out = null; break; @@ -319,7 +326,7 @@ private Object avroPrimitiveFieldToObject(Schema.Type type, LogicalType logicalT */ private Object avroFieldToObjectForUnionType(Schema avroField, Object fieldValue, Schema.Field avroFields) { - Object out; + Object out = null; Schema.Type type = avroField.getType(); LogicalType logicalType = avroField.getLogicalType(); switch (type) { @@ -383,7 +390,7 @@ private Object avroFieldToObjectForUnionType(Schema avroField, Object fieldValue while (iterator.hasNext()) { // size is 2 because map will have key and value Object[] mapChildObjects = new Object[2]; - Map.Entry mapEntry = (HashMap.Entry) iterator.next(); + Map.Entry mapEntry = (Map.Entry) iterator.next(); // evaluate key Object keyObject = avroFieldToObject( new Schema.Field(avroFields.name(), Schema.create(Schema.Type.STRING), @@ -407,12 +414,32 @@ private Object avroFieldToObjectForUnionType(Schema avroField, Object fieldValue out = null; } break; + case BYTES: + // DECIMAL type is defined in Avro as a BYTE type with the logicalType property + // set to "decimal" and a specified precision and scale + if (logicalType instanceof LogicalTypes.Decimal) { + out = extractDecimalValue(fieldValue, + ((LogicalTypes.Decimal) avroField.getLogicalType()).getScale(), + ((LogicalTypes.Decimal) avroField.getLogicalType()).getPrecision()); + } + break; default: out = avroPrimitiveFieldToObject(type, logicalType, fieldValue); } return out; } + private Object extractDecimalValue(Object fieldValue, int scale, int precision) { + BigDecimal dataValue = new BigDecimal(new BigInteger(((ByteBuffer) fieldValue).array()), scale); + if (!(dataValue.precision() > precision)) { + return dataValue; + } else { + throw new CarbonDataLoadingException( + "Data Loading failed as value Precision " + dataValue.precision() + + " is greater than specified Precision " + precision + " in Avro Schema"); + } + } + /** * converts avro schema to carbon schema required by carbonWriter * @@ -525,8 +552,10 @@ private static Field prepareFields(Schema.Field avroField) { int precision = ((LogicalTypes.Decimal) childSchema.getLogicalType()).getPrecision(); int scale = ((LogicalTypes.Decimal) childSchema.getLogicalType()).getScale(); return new Field(fieldName, DataTypes.createDecimalType(precision, scale)); + } else { + throw new UnsupportedOperationException( + "carbon not support " + type.toString() + " avro type yet"); } - return null; case NULL: return null; default: @@ -621,8 +650,10 @@ private static StructField prepareSubFields(String fieldName, Schema childSchema int precision = ((LogicalTypes.Decimal) childSchema.getLogicalType()).getPrecision(); int scale = ((LogicalTypes.Decimal) childSchema.getLogicalType()).getScale(); return new StructField(fieldName, DataTypes.createDecimalType(precision, scale)); + } else { + throw new UnsupportedOperationException( + "carbon not support " + type.toString() + " avro type yet"); } - return null; case NULL: return null; default: @@ -714,8 +745,10 @@ private static DataType getMappingDataTypeForCollectionRecord(Schema childSchema int precision = ((LogicalTypes.Decimal) childSchema.getLogicalType()).getPrecision(); int scale = ((LogicalTypes.Decimal) childSchema.getLogicalType()).getScale(); return DataTypes.createDecimalType(precision, scale); + } else { + throw new UnsupportedOperationException( + "carbon not support " + childSchema.getType().toString() + " avro type yet"); } - return null; case NULL: return null; default: