From b588cb65564d26cdf55da7482ae7b1ee79173067 Mon Sep 17 00:00:00 2001 From: Indhumathi27 Date: Thu, 30 Aug 2018 14:50:06 +0530 Subject: [PATCH] [CARBONDATA-2876]Support Avro datatype conversion through SDK This PR supports following Avro DataTypes to carbon format through SDK. Avro datatypes include, 1. Avro Union 2. Avro Enum 3. Avro Logical type Decimal This closes #2671 --- ...sactionalCarbonTableWithAvroDataType.scala | 793 ++++++++++++++++++ .../carbondata/sdk/file/AvroCarbonWriter.java | 331 +++++++- 2 files changed, 1088 insertions(+), 36 deletions(-) create mode 100644 integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithAvroDataType.scala diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithAvroDataType.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithAvroDataType.scala new file mode 100644 index 00000000000..b50407cc877 --- /dev/null +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithAvroDataType.scala @@ -0,0 +1,793 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.testsuite.createTable + +import java.io.File +import scala.collection.mutable + +import org.apache.spark.sql.Row +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.BeforeAndAfterAll + +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.sdk.file.CarbonWriter + +/** + * Test class for Avro supported data types through SDK + */ +class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with BeforeAndAfterAll { + + + val badRecordAction = CarbonProperties.getInstance() + .getProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION) + + var writerPath = new File(this.getClass.getResource("/").getPath + + + "../." + + "./target/SparkCarbonFileFormat/WriterOutput/") + .getCanonicalPath + + writerPath = writerPath.replace("\\", "/") + + override def beforeAll(): Unit = { + sql("DROP TABLE IF EXISTS sdkOutputTable") + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, "force") + } + + override def afterAll(): Unit = { + sql("DROP TABLE IF EXISTS sdkOutputTable") + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, badRecordAction) + } + + test("test enum") { + sql("drop table if exists sdkOutputTable") + FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(writerPath)) + val schema1 = + """{ + | "namespace": "com.apache.schema", + | "type": "record", + | "name": "StudentActivity", + | "fields": + | [{ + | "name": "id", + | "type": { + | "type": "enum", + | "name": "Suit", + | "symbols": ["SPADES", "HEARTS", "DIAMONDS", "CLUBS"] + | } + | } + | ] + |}""".stripMargin + + val json1 = + """{"id":"HEARTS"}""".stripMargin + + val nn = new org.apache.avro.Schema.Parser().parse(schema1) + val record = testUtil.jsonToAvro(json1, schema1) + + val writer = CarbonWriter.builder + .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn) + writer.write(record) + writer.close() + sql( + s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY + |'carbondata' LOCATION + |'$writerPath' """.stripMargin) + checkAnswer(sql("select * from sdkOutputTable"), Seq(Row("HEARTS"))) + } + + test("test enum with struct type") { + sql("drop table if exists sdkOutputTable") + FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(writerPath)) + val mySchema = + "{ " + + " \"name\": \"address\", " + + " \"type\": \"record\", " + + " \"fields\": [ " + + " { " + + " \"name\": \"name\", " + + " \"type\": \"string\" " + + " }, " + + " { " + + " \"name\": \"age\", " + + " \"type\": \"int\" " + + " }, " + + " { " + + " \"name\": \"address\", \"type\": {" + + " \"type\" : \"record\", \"name\" : \"my_address\"," + + " \"fields\" : [" + + " {\"name\": \"enumRec\", " + + " \"type\": { " + + " \"type\": \"enum\", " + + " \"name\": \"card\", " + + " \"symbols\": [\"SPADES\", \"HEARTS\", \"DIAMONDS\", \"CLUBS\"] " + + " } " + + "}]}" + + " } " + + " ] " + + "} " + + val json1 = "{\"name\":\"bob\", \"age\":10, \"address\": {\"enumRec\":\"SPADES\"}}" + + val nn = new org.apache.avro.Schema.Parser().parse(mySchema) + val record = testUtil.jsonToAvro(json1, mySchema) + + val writer = CarbonWriter.builder + .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn) + writer.write(record) + writer.close() + sql( + s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY + |'carbondata' LOCATION + |'$writerPath' """.stripMargin) + checkAnswer(sql("select * from sdkOutputTable"), Seq(Row("bob", 10, Row("SPADES")))) + } + + test("test enum with Array type") { + sql("drop table if exists sdkOutputTable") + FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(writerPath)) + val mySchema = + """ { + | "name": "address", + | "type": "record", + | "fields": [ + | { + | "name": "name", + | "type": "string" + | }, + | { + | "name": "age", + | "type": "int" + | }, + | { + | "name": "address", + | "type": { + | "type": "array", + | "items": { + | "name": "Suit", + | "type": "enum", + | "symbols": ["SPADES", "HEARTS", "DIAMONDS", "CLUBS"] + | }}}] + | } + """.stripMargin + + val json: String = """ {"name": "bob","age": 10,"address": ["SPADES", "DIAMONDS"]} """ + + val nn = new org.apache.avro.Schema.Parser().parse(mySchema) + val record = testUtil.jsonToAvro(json, mySchema) + + val writer = CarbonWriter.builder + .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn) + writer.write(record) + writer.close() + sql( + s"""CREATE EXTERNAL TABLE sdkOutputTable(id decimal(4,3)) STORED BY + |'carbondata' LOCATION + |'$writerPath' """.stripMargin) + checkAnswer(sql("select * from sdkOutputTable"), + Seq(Row("bob", 10, mutable.WrappedArray.make(Array("SPADES", "DIAMONDS"))))) + } + + test("test union type long") { + sql("drop table if exists sdkOutputTable") + FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(writerPath)) + val schema1 = + """{ + | "namespace": "com.apache.schema", + | "type": "record", + | "name": "StudentActivity", + | "fields": [ + | { "name": "first", "type": ["string", "int", "long"] } + | ] + |}""".stripMargin + + val json1 = + """{"first":{"long":10345}}""".stripMargin + + val nn = new org.apache.avro.Schema.Parser().parse(schema1) + val record = testUtil.jsonToAvro(json1, schema1) + + val writer = CarbonWriter.builder + .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn) + writer.write(record) + writer.close() + sql( + s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY + |'carbondata' LOCATION + |'$writerPath' """.stripMargin) + checkAnswer(sql("select * from sdkOutputTable"), Seq(Row(Row(null, null, 10345)))) + } + + test("test union type boolean") { + sql("drop table if exists sdkOutputTable") + FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(writerPath)) + val schema1 = + """{ + | "namespace": "com.apache.schema", + | "type": "record", + | "name": "StudentActivity", + | "fields": [ + | { "name": "first", "type": ["boolean", "int", "long"] } + | ] + |}""".stripMargin + + val json1 = + """{"first":{"boolean":true}}""".stripMargin + + val nn = new org.apache.avro.Schema.Parser().parse(schema1) + val record = testUtil.jsonToAvro(json1, schema1) + + val writer = CarbonWriter.builder + .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn) + writer.write(record) + writer.close() + sql( + s"""CREATE EXTERNAL TABLE sdkOutputTable(id decimal(4,3)) STORED BY + |'carbondata' LOCATION + |'$writerPath' """.stripMargin) + checkAnswer(sql("select * from sdkOutputTable"), Seq(Row(Row(true, null, null)))) + } + + test("test union type string") { + sql("drop table if exists sdkOutputTable") + FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(writerPath)) + val schema1 = + """{ + | "namespace": "com.apache.schema", + | "type": "record", + | "name": "StudentActivity", + | "fields": [ + | { "name": "first", "type": ["string", "int", "long"] } + | ] + |}""".stripMargin + + val json1 = + """{"first":{"string":"abc"}}""".stripMargin + + val nn = new org.apache.avro.Schema.Parser().parse(schema1) + val record = testUtil.jsonToAvro(json1, schema1) + + val writer = CarbonWriter.builder + .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn) + writer.write(record) + writer.close() + sql( + s"""CREATE EXTERNAL TABLE sdkOutputTable(id decimal(4,3)) STORED BY + |'carbondata' LOCATION + |'$writerPath' """.stripMargin) + checkAnswer(sql("select * from sdkOutputTable"), Seq(Row(Row("abc", null, null)))) + } + + test("test union type int") { + sql("drop table if exists sdkOutputTable") + FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(writerPath)) + val schema1 = + """{ + | "namespace": "com.apache.schema", + | "type": "record", + | "name": "StudentActivity", + | "fields": [ + | { "name": "first", "type": ["string", "int", "long"] } + | ] + |}""".stripMargin + + val json1 = + """{"first":{"int":10}}""".stripMargin + + val nn = new org.apache.avro.Schema.Parser().parse(schema1) + val record = testUtil.jsonToAvro(json1, schema1) + + val writer = CarbonWriter.builder + .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn) + writer.write(record) + writer.close() + sql( + s"""CREATE EXTERNAL TABLE sdkOutputTable(id decimal(4,3)) STORED BY + |'carbondata' LOCATION + |'$writerPath' """.stripMargin) + checkAnswer(sql("select * from sdkOutputTable"), Seq(Row(Row(null, 10, null)))) + } + + test("test union type with null") { + sql("drop table if exists sdkOutputTable") + FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(writerPath)) + val schema1 = + """{ + | "namespace": "com.apache.schema", + | "type": "record", + | "name": "StudentActivity", + | "fields": [ + | { "name": "first", "type": ["null", "int"] } + | ] + |}""".stripMargin + + val json1 = + """{"first":{"null":null}}""".stripMargin + + val nn = new org.apache.avro.Schema.Parser().parse(schema1) + val record = testUtil.jsonToAvro(json1, schema1) + + val writer = CarbonWriter.builder + .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn) + writer.write(record) + writer.close() + sql( + s"""CREATE EXTERNAL TABLE sdkOutputTable(id decimal(4,3)) STORED BY + |'carbondata' LOCATION + |'$writerPath' """.stripMargin) + checkAnswer(sql("select * from sdkOutputTable"), Seq(Row(Row(null)))) + } + + test("test union type with only type null") { + sql("drop table if exists sdkOutputTable") + FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(writerPath)) + val schema1 = + """{ + | "namespace": "com.apache.schema", + | "type": "record", + | "name": "StudentActivity", + | "fields": [ + | { "name": "first", "type": ["null"] } + | ] + |}""".stripMargin + + val json1 = + """{"first":{"null":null}}""".stripMargin + + val nn = new org.apache.avro.Schema.Parser().parse(schema1) + val record = testUtil.jsonToAvro(json1, schema1) + val exception1 = intercept[UnsupportedOperationException] { + val writer = CarbonWriter.builder + .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn) + writer.write(record) + writer.close() + } + assert(exception1.getMessage + .contains("Carbon do not support Avro UNION with only null type")) + } + + test("test union type with Enum") { + sql("drop table if exists sdkOutputTable") + FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(writerPath)) + val schema1 = + """{ + | "namespace": "com.apache.schema", + | "type": "record", + | "name": "StudentActivity", + | "fields": [ + | { + | "name": "enum_field", "type": [{ + | "namespace": "org.example.avro", + | "name": "EnumField", + | "type": "enum", + | "symbols": [ + | "VAL_0", + | "VAL_1" + | ] + | },"null"], "default": null + | }] + |}""".stripMargin + + val json1 = + """{"enum_field":{"org.example.avro.EnumField":"VAL_0"}}""".stripMargin + + val nn = new org.apache.avro.Schema.Parser().parse(schema1) + val record = testUtil.jsonToAvro(json1, schema1) + + val writer = CarbonWriter.builder + .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn) + writer.write(record) + writer.close() + sql( + s"""CREATE EXTERNAL TABLE sdkOutputTable(id decimal(4,3)) STORED BY + |'carbondata' LOCATION + |'$writerPath' """.stripMargin) + checkAnswer(sql("select * from sdkOutputTable"), Seq(Row(Row("VAL_0")))) + } + + test("test union type with Map") { + sql("drop table if exists sdkOutputTable") + FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(writerPath)) + val schema1 = + """{ + | "namespace": "com.apache.schema", + | "type": "record", + | "name": "StudentActivity", + | "fields": [ + | { + | "name": "map_field", "type": [{ + | "namespace": "org.example.avro", + | "name": "mapField", + | "type": "map", + | "values":"string" + | },"int"], "default": null + | }] + |}""".stripMargin + + val json1 = + """{"map_field":{"map":{"street": "k-lane", "city": "bangalore"}}}""".stripMargin + + val nn = new org.apache.avro.Schema.Parser().parse(schema1) + val record = testUtil.jsonToAvro(json1, schema1) + + val writer = CarbonWriter.builder + .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn) + writer.write(record) + writer.close() + sql( + s"""CREATE EXTERNAL TABLE sdkOutputTable(id decimal(4,3)) STORED BY + |'carbondata' LOCATION + |'$writerPath' """.stripMargin) + sql("select * from sdkOutputTable").show(false) + checkAnswer(sql("select * from sdkOutputTable"), Seq( + Row(Row(Map("city" -> "bangalore", "street" -> "k-lane"), null)))) + } + + test("test union type") { + sql("drop table if exists sdkOutputTable") + FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(writerPath)) + val schema1 = + """{ + | "namespace": "com.apache.schema", + | "type": "record", + | "name": "StudentActivity", + | "fields": [ + | { + | "name": "struct_field", "type": [{ + | "namespace": "org.example.avro", + | "name": "structField", + | "type": "array", + | "items": { "name" : "name0", "type":"string"} + | },"int"], "default": null + | }] + |}""".stripMargin + + val json1 = + """{"struct_field":{"int":12}}""".stripMargin + + val nn = new org.apache.avro.Schema.Parser().parse(schema1) + val record = testUtil.jsonToAvro(json1, schema1) + + val writer = CarbonWriter.builder + .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn) + writer.write(record) + writer.close() + sql( + s"""CREATE EXTERNAL TABLE sdkOutputTable(id decimal(4,3)) STORED BY + |'carbondata' LOCATION + |'$writerPath' """.stripMargin) + checkAnswer(sql("select * from sdkOutputTable"), + Seq(Row(Row(mutable.WrappedArray.make(Array(null)), 12)))) + } + + test("test Struct of Union") { + sql("drop table if exists sdkOutputTable") + FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(writerPath)) + val schema1 = + """ + |{"name": "address", + | "type": "record", + | "fields": [ + | { "name": "address", "type": { + | "type" : "record", "name" : "my_address", + | "fields" : [ + | {"name": "city", "type": ["string","int"]}]}} + |]} + """.stripMargin + + val json1 = + """{"address":{"city":{"int":1}}}""".stripMargin + + val nn = new org.apache.avro.Schema.Parser().parse(schema1) + val record = testUtil.jsonToAvro(json1, schema1) + + val writer = CarbonWriter.builder + .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn) + writer.write(record) + writer.close() + sql( + s"""CREATE EXTERNAL TABLE sdkOutputTable(id decimal(4,3)) STORED BY + |'carbondata' LOCATION + |'$writerPath' """.stripMargin) + sql("describe formatted sdkOutputTable").show(false) + checkAnswer(sql("select * from sdkOutputTable"), Seq(Row(Row(Row(null, 1))))) + sql("insert into sdkOutputTable values('abc:12')") + sql("select address.city.city0 from sdkOutputTable").show(false) + } + + test("test Union with struct of array") { + sql("drop table if exists sdkOutputTable") + FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(writerPath)) + val schema1 = + """ + |{"name": "address", + | "type": "record", + | "fields": [ + | { "name": "address", "type": { + | "type" : "record", "name" : "my_address", + | "fields" : [ + | {"name": "city", "type": ["string", { + | "type": "array", + | "name": "abc_name_0", + | "items": { + | "name": "_name_0", + | "type": "record", + | "fields": [ + | { + | "name": "app_id", + | "type": [ + | "null", + | "string" + | ] + | } + | ] + | } + | } + | ]}]}} + |]} + """.stripMargin + + val json1 = + """{ + |"address":{"city": + |{"array":[ + | { + | "app_id": { + | "string": "abc" + | }}] + | }}}""".stripMargin + + val nn = new org.apache.avro.Schema.Parser().parse(schema1) + val record = testUtil.jsonToAvro(json1, schema1) + + val writer = CarbonWriter.builder + .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn) + writer.write(record) + writer.close() + sql( + s"""CREATE EXTERNAL TABLE sdkOutputTable(id decimal(4,3)) STORED BY + |'carbondata' LOCATION + |'$writerPath' """.stripMargin) + checkAnswer(sql("select * from sdkOutputTable"), + Seq(Row(Row(Row(null, mutable.WrappedArray.make(Array(Row(Row("abc"))))))))) + } + + test("test union type with Array and Struct") { + sql("drop table if exists sdkOutputTable") + FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(writerPath)) + val schema1 = + """{ + | "type": "record", + | "namespace": "example.avro", + | "name": "array_union", + | "fields": [ + | { + | "name": "body", + | "type": { + | "name": "body", + | "type": "record", + | "fields": [ + | { + | "name": "abc", + | "type": [ + | "int", + | { + | "type": "array", + | "name": "abc_name_0", + | "items": { + | "name": "_name_0", + | "type": "record", + | "fields": [ + | { + | "name": "app_id", + | "type": [ + | "null", + | "string" + | ] + | }, + | { + | "name": "app_name", + | "type": [ + | "int", + | "float", + | "string" + | ] + | }, + | { + | "name": "app_key", + | "type": [ + | "null", + | "string" + | ] + | } + | ] + | } + | } + | ] + | } + | ] + | } + | } + | ] + |}""".stripMargin + + val json1 = + """{ + | "body": { + | "abc": { + | "array": [ + | { + | "app_id": { + | "string": "abc" + | }, + | "app_name": { + | "string": "bcd" + | }, + | "app_key": { + | "string": "cde" + | } + | } + | ] + | } + | } + |}""".stripMargin + + val nn = new org.apache.avro.Schema.Parser().parse(schema1) + val record = testUtil.jsonToAvro(json1, schema1) + + val writer = CarbonWriter.builder + .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn) + writer.write(record) + writer.close() + sql( + s"""CREATE EXTERNAL TABLE sdkOutputTable(id decimal(4,3)) STORED BY + |'carbondata' LOCATION + |'$writerPath' """.stripMargin) + sql("describe formatted sdkOutputTable").show(false) + checkAnswer(sql("select * from sdkOutputTable"), + Seq(Row(Row(Row(null, + mutable.WrappedArray.make(Array(Row(Row("abc"), Row(null, null, "bcd"), Row("cde"))))))))) + } + + test("test union type with Decimal") { + sql("drop table if exists sdkOutputTable") + FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(writerPath)) + val schema1 = + """{ + | "namespace": "com.apache.schema", + | "type": "record", + | "name": "StudentActivity", + | "fields": [ + | { + | "name": "enum_field", "type": [{ + | "namespace": "org.example.avro", + | "name": "dec", + | "type": "bytes", + | "logicalType": "decimal", + | "precision": 10, + | "scale": 2 + | },"null"] + | }] + |}""".stripMargin + + val json1 = + """{"enum_field":{"bytes":"1010"}}""".stripMargin + + val nn = new org.apache.avro.Schema.Parser().parse(schema1) + val record = testUtil.jsonToAvro(json1, schema1) + + val writer = CarbonWriter.builder + .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn) + writer.write(record) + writer.close() + sql( + s"""CREATE EXTERNAL TABLE sdkOutputTable(id decimal(4,3)) STORED BY + |'carbondata' LOCATION + |'$writerPath' """.stripMargin) + checkExistence(sql("select * from sdkOutputTable"), true, "1010.00") + } + + test("test logical type decimal with struct") { + sql("drop table if exists sdkOutputTable") + FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(writerPath)) + val schema1 = + """ + |{"name": "address", + | "type": "record", + | "fields": [ + | { "name": "name", "type": "string"}, + | { "name": "age", "type": "float"}, + | { "name": "address", "type": { + | "type" : "record", "name" : "my_address", + | "fields" : [ + | {"name": "street", "type": "string"}, + | {"name": "city", "type": {"type" : "bytes", + | "logicalType": "decimal", + | "precision": 4, + | "scale": 2 + | }}]}} + |]} + """.stripMargin + + val json1 = """ {"name":"bob", "age":10.24, "address" : {"street":"abc", "city":"32"}} """ + + val nn = new org.apache.avro.Schema.Parser().parse(schema1) + val record = testUtil.jsonToAvro(json1, schema1) + + val writer = CarbonWriter.builder + .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn) + writer.write(record) + writer.close() + sql( + s"""CREATE EXTERNAL TABLE sdkOutputTable(id struct) STORED BY + |'carbondata' LOCATION + |'$writerPath' """.stripMargin) + checkExistence(sql("select * from sdkOutputTable"), true, "32.00") + } + + test("test logical type decimal with Array") { + sql("drop table if exists sdkOutputTable") + FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(writerPath)) + val schema1 = + """ { + | "name": "address", + | "type": "record", + | "fields": [ + | { + | "name": "name", + | "type": "string" + | }, + | { + | "name": "age", + | "type": "int" + | }, + | { + | "name": "address", + | "type": { + | "type": "array", + | "items": { + | "name": "street", + | "type": "bytes", + | "logicalType": "decimal", + | "precision": 4, + | "scale": 1 + | }}}] + | } + """.stripMargin + + val json1: String = """ {"name": "bob","age": 10,"address": ["32", "42"]} """ + + val nn = new org.apache.avro.Schema.Parser().parse(schema1) + val record = testUtil.jsonToAvro(json1, schema1) + + val writer = CarbonWriter.builder + .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn) + writer.write(record) + writer.close() + sql( + s"""CREATE EXTERNAL TABLE sdkOutputTable(id struct) STORED BY + |'carbondata' LOCATION + |'$writerPath' """.stripMargin) + checkExistence(sql("select * from sdkOutputTable"), true, "32.0") + } + +} diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java index 64dfd429d45..7c1d9a27527 100644 --- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java +++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java @@ -18,6 +18,8 @@ package org.apache.carbondata.sdk.file; import java.io.IOException; +import java.math.BigDecimal; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; @@ -29,6 +31,7 @@ import org.apache.carbondata.common.annotations.InterfaceAudience; import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.keygenerator.directdictionary.timestamp.DateDirectDictionaryGenerator; import org.apache.carbondata.core.metadata.datatype.DataType; import org.apache.carbondata.core.metadata.datatype.DataTypes; @@ -46,6 +49,7 @@ import org.apache.avro.LogicalTypes; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; +import org.apache.avro.util.Utf8; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.JobID; @@ -103,42 +107,6 @@ private Object avroFieldToObject(Schema.Field avroField, Object fieldValue) { Schema.Type type = avroField.schema().getType(); LogicalType logicalType = avroField.schema().getLogicalType(); switch (type) { - case INT: - if (logicalType != null) { - if (logicalType instanceof LogicalTypes.Date) { - int dateIntValue = (int) fieldValue; - out = dateIntValue * DateDirectDictionaryGenerator.MILLIS_PER_DAY; - } else { - LOGGER.warn("Actual type: INT, Logical Type: " + logicalType.getName()); - out = fieldValue; - } - } else { - out = fieldValue; - } - break; - case BOOLEAN: - case LONG: - if (logicalType != null && !(logicalType instanceof LogicalTypes.TimestampMillis)) { - if (logicalType instanceof LogicalTypes.TimestampMicros) { - long dateIntValue = (long) fieldValue; - out = dateIntValue / 1000L; - } else { - LOGGER.warn("Actual type: INT, Logical Type: " + logicalType.getName()); - out = fieldValue; - } - } else { - out = fieldValue; - } - break; - case DOUBLE: - case STRING: - out = fieldValue; - break; - case FLOAT: - // direct conversion will change precision. So parse from string. - // also carbon internally needs float as double - out = Double.parseDouble(fieldValue.toString()); - break; case MAP: // Note: Avro object takes care of removing the duplicates so we should not handle it again // Map will be internally stored as Array> @@ -213,6 +181,124 @@ private Object avroFieldToObject(Schema.Field avroField, Object fieldValue) { } out = new ArrayObject(arrayChildObjects); break; + case UNION: + // Union type will be internally stored as Struct + // Fill data object only if fieldvalue is instance of datatype + // For other field datatypes, fill value as Null + List unionFields = avroField.schema().getTypes(); + int notNullUnionFieldsCount = 0; + for (Schema unionField : unionFields) { + if (!unionField.getType().equals(Schema.Type.NULL)) { + notNullUnionFieldsCount++; + } + } + Object[] values = new Object[notNullUnionFieldsCount]; + int j = 0; + for (Schema unionField : unionFields) { + if (unionField.getType().equals(Schema.Type.NULL)) { + continue; + } + if (checkFieldValueType(unionField.getType(), fieldValue)) { + values[j] = avroFieldToObjectForUnionType(unionField, fieldValue, avroField); + break; + } + j++; + } + out = new StructObject(values); + break; + default: + out = avroPrimitiveFieldToObject(type, logicalType, fieldValue); + } + return out; + } + + /** + * For Union type, fill data if Schema.Type is instance of fieldValue + * and return result + * + * @param type + * @param fieldValue + * @return + */ + private boolean checkFieldValueType(Schema.Type type, Object fieldValue) { + switch (type) { + case INT: + return (fieldValue instanceof Integer); + case BOOLEAN: + return (fieldValue instanceof Boolean); + case LONG: + return (fieldValue instanceof Long); + case DOUBLE: + return (fieldValue instanceof Double); + case STRING: + return (fieldValue instanceof Utf8 || fieldValue instanceof String); + case FLOAT: + return (fieldValue instanceof Float); + case RECORD: + return (fieldValue instanceof GenericData.Record); + case ARRAY: + return (fieldValue instanceof GenericData.Array || fieldValue instanceof ArrayList); + case BYTES: + return (fieldValue instanceof ByteBuffer); + case MAP: + return (fieldValue instanceof HashMap); + case ENUM: + return (fieldValue instanceof GenericData.EnumSymbol); + default: + return false; + } + } + + private Object avroPrimitiveFieldToObject(Schema.Type type, LogicalType logicalType, + Object fieldValue) { + Object out = null; + switch (type) { + case INT: + if (logicalType != null) { + if (logicalType instanceof LogicalTypes.Date) { + int dateIntValue = (int) fieldValue; + out = dateIntValue * DateDirectDictionaryGenerator.MILLIS_PER_DAY; + } else { + LOGGER.warn("Actual type: INT, Logical Type: " + logicalType.getName()); + out = fieldValue; + } + } else { + out = fieldValue; + } + break; + case BOOLEAN: + case LONG: + if (logicalType != null && !(logicalType instanceof LogicalTypes.TimestampMillis)) { + if (logicalType instanceof LogicalTypes.TimestampMicros) { + long dateIntValue = (long) fieldValue; + out = dateIntValue / 1000L; + } else { + LOGGER.warn("Actual type: INT, Logical Type: " + logicalType.getName()); + out = fieldValue; + } + } else { + out = fieldValue; + } + break; + case DOUBLE: + case STRING: + case ENUM: + out = fieldValue; + break; + case FLOAT: + // direct conversion will change precision. So parse from string. + // also carbon internally needs float as double + out = Double.parseDouble(fieldValue.toString()); + break; + case BYTES: + // DECIMAL type is defined in Avro as a BYTE type with the logicalType property + // set to "decimal" and a specified precision and scale + // As binary type is not supported yet,value will be null + if (logicalType instanceof LogicalTypes.Decimal) { + out = new BigDecimal(new String(((ByteBuffer) fieldValue).array(), + CarbonCommonConstants.DEFAULT_CHARSET_CLASS)); + } + break; case NULL: out = null; break; @@ -223,6 +309,110 @@ private Object avroFieldToObject(Schema.Field avroField, Object fieldValue) { return out; } + /** + * fill fieldvalue for union type + * + * @param avroField + * @param fieldValue + * @param avroFields + * @return + */ + private Object avroFieldToObjectForUnionType(Schema avroField, Object fieldValue, + Schema.Field avroFields) { + Object out; + Schema.Type type = avroField.getType(); + LogicalType logicalType = avroField.getLogicalType(); + switch (type) { + case RECORD: + if (fieldValue instanceof GenericData.Record) { + List fields = avroField.getFields(); + + Object[] structChildObjects = new Object[fields.size()]; + for (int i = 0; i < fields.size(); i++) { + Object childObject = + avroFieldToObject(fields.get(i), ((GenericData.Record) fieldValue).get(i)); + if (childObject != null) { + structChildObjects[i] = childObject; + } + } + out = new StructObject(structChildObjects); + } else { + out = null; + } + break; + case ARRAY: + if (fieldValue instanceof GenericData.Array || fieldValue instanceof ArrayList) { + Object[] arrayChildObjects; + if (fieldValue instanceof GenericData.Array) { + int size = ((GenericData.Array) fieldValue).size(); + arrayChildObjects = new Object[size]; + for (int i = 0; i < size; i++) { + Object childObject = avroFieldToObject( + new Schema.Field(avroFields.name(), avroField.getElementType(), avroFields.doc(), + avroFields.defaultVal()), ((GenericData.Array) fieldValue).get(i)); + if (childObject != null) { + arrayChildObjects[i] = childObject; + } + } + } else { + int size = ((ArrayList) fieldValue).size(); + arrayChildObjects = new Object[size]; + for (int i = 0; i < size; i++) { + Object childObject = avroFieldToObject( + new Schema.Field(avroFields.name(), avroField.getElementType(), avroFields.doc(), + avroFields.defaultVal()), ((ArrayList) fieldValue).get(i)); + if (childObject != null) { + arrayChildObjects[i] = childObject; + } + } + } + out = new ArrayObject(arrayChildObjects); + } else { + out = null; + } + break; + case MAP: + // Note: Avro object takes care of removing the duplicates so we should not handle it again + // Map will be internally stored as Array> + if (fieldValue instanceof HashMap) { + Map mapEntries = (HashMap) fieldValue; + Object[] arrayMapChildObjects = new Object[mapEntries.size()]; + if (!mapEntries.isEmpty()) { + Iterator iterator = mapEntries.entrySet().iterator(); + int counter = 0; + while (iterator.hasNext()) { + // size is 2 because map will have key and value + Object[] mapChildObjects = new Object[2]; + Map.Entry mapEntry = (HashMap.Entry) iterator.next(); + // evaluate key + Object keyObject = avroFieldToObject( + new Schema.Field(avroFields.name(), Schema.create(Schema.Type.STRING), + avroFields.doc(), avroFields.defaultVal()), mapEntry.getKey()); + // evaluate value + Object valueObject = avroFieldToObject( + new Schema.Field(avroFields.name(), avroField.getValueType(), avroFields.doc(), + avroFields.defaultVal()), mapEntry.getValue()); + if (keyObject != null) { + mapChildObjects[0] = keyObject; + } + if (valueObject != null) { + mapChildObjects[1] = valueObject; + } + StructObject keyValueObject = new StructObject(mapChildObjects); + arrayMapChildObjects[counter++] = keyValueObject; + } + } + out = new ArrayObject(arrayMapChildObjects); + } else { + out = null; + } + break; + default: + out = avroPrimitiveFieldToObject(type, logicalType, fieldValue); + } + return out; + } + /** * converts avro schema to carbon schema required by carbonWriter * @@ -270,6 +460,7 @@ private static Field prepareFields(Schema.Field avroField) { } case DOUBLE: return new Field(fieldName, DataTypes.DOUBLE); + case ENUM: case STRING: return new Field(fieldName, DataTypes.STRING); case FLOAT: @@ -310,6 +501,32 @@ private static Field prepareFields(Schema.Field avroField) { } else { return null; } + case UNION: + int i = 0; + // Get union types and store as Struct + ArrayList unionFields = new ArrayList<>(); + for (Schema avroSubField : avroField.schema().getTypes()) { + if (!avroSubField.getType().equals(Schema.Type.NULL)) { + StructField unionField = prepareSubFields(avroField.name() + i++, avroSubField); + if (unionField != null) { + unionFields.add(unionField); + } + } + } + if (unionFields.isEmpty()) { + throw new UnsupportedOperationException( + "Carbon do not support Avro UNION with only null type"); + } + return new Field(fieldName, "struct", unionFields); + case BYTES: + // DECIMAL type is defined in Avro as a BYTE type with the logicalType property + // set to "decimal" and a specified precision and scale + if (logicalType instanceof LogicalTypes.Decimal) { + int precision = ((LogicalTypes.Decimal) childSchema.getLogicalType()).getPrecision(); + int scale = ((LogicalTypes.Decimal) childSchema.getLogicalType()).getScale(); + return new Field(fieldName, DataTypes.createDecimalType(precision, scale)); + } + return null; case NULL: return null; default: @@ -343,6 +560,7 @@ private static StructField prepareSubFields(String fieldName, Schema childSchema } case DOUBLE: return new StructField(fieldName, DataTypes.DOUBLE); + case ENUM: case STRING: return new StructField(fieldName, DataTypes.STRING); case FLOAT: @@ -385,6 +603,26 @@ private static StructField prepareSubFields(String fieldName, Schema childSchema } else { return null; } + case UNION: + // recursively get the union types + int i = 0; + ArrayList structSubTypes = new ArrayList<>(); + for (Schema avroSubField : childSchema.getTypes()) { + StructField structField = prepareSubFields(fieldName + i++, avroSubField); + if (structField != null) { + structSubTypes.add(structField); + } + } + return (new StructField(fieldName, DataTypes.createStructType(structSubTypes))); + case BYTES: + // DECIMAL type is defined in Avro as a BYTE type with the logicalType property + // set to "decimal" and a specified precision and scale + if (logicalType instanceof LogicalTypes.Decimal) { + int precision = ((LogicalTypes.Decimal) childSchema.getLogicalType()).getPrecision(); + int scale = ((LogicalTypes.Decimal) childSchema.getLogicalType()).getScale(); + return new StructField(fieldName, DataTypes.createDecimalType(precision, scale)); + } + return null; case NULL: return null; default: @@ -425,6 +663,7 @@ private static DataType getMappingDataTypeForCollectionRecord(Schema childSchema } case DOUBLE: return DataTypes.DOUBLE; + case ENUM: case STRING: return DataTypes.STRING; case FLOAT: @@ -457,6 +696,26 @@ private static DataType getMappingDataTypeForCollectionRecord(Schema childSchema } else { return null; } + case UNION: + int i = 0; + // recursively get the union types and create struct type + ArrayList unionFields = new ArrayList<>(); + for (Schema avroSubField : childSchema.getTypes()) { + StructField unionField = prepareSubFields(avroSubField.getName() + i++, avroSubField); + if (unionField != null) { + unionFields.add(unionField); + } + } + return DataTypes.createStructType(unionFields); + case BYTES: + // DECIMAL type is defined in Avro as a BYTE type with the logicalType property + // set to "decimal" and a specified precision and scale + if (logicalType instanceof LogicalTypes.Decimal) { + int precision = ((LogicalTypes.Decimal) childSchema.getLogicalType()).getPrecision(); + int scale = ((LogicalTypes.Decimal) childSchema.getLogicalType()).getScale(); + return DataTypes.createDecimalType(precision, scale); + } + return null; case NULL: return null; default: