From 66ce1a437159c2d6ffe5bb173cb5dcb9b083b0c1 Mon Sep 17 00:00:00 2001 From: Indhumathi27 Date: Thu, 30 Aug 2018 14:50:06 +0530 Subject: [PATCH] [CARBONDATA-2876]Support Avro datatype conversion through SDK --- ...sactionalCarbonTableWithAvroDataType.scala | 793 ++++++++++++++++++ .../carbondata/sdk/file/AvroCarbonWriter.java | 331 +++++++- 2 files changed, 1088 insertions(+), 36 deletions(-) create mode 100644 integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithAvroDataType.scala diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithAvroDataType.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithAvroDataType.scala new file mode 100644 index 00000000000..b50407cc877 --- /dev/null +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithAvroDataType.scala @@ -0,0 +1,793 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.testsuite.createTable + +import java.io.File +import scala.collection.mutable + +import org.apache.spark.sql.Row +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.BeforeAndAfterAll + +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.sdk.file.CarbonWriter + +/** + * Test class for Avro supported data types through SDK + */ +class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with BeforeAndAfterAll { + + + val badRecordAction = CarbonProperties.getInstance() + .getProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION) + + var writerPath = new File(this.getClass.getResource("/").getPath + + + "../." + + "./target/SparkCarbonFileFormat/WriterOutput/") + .getCanonicalPath + + writerPath = writerPath.replace("\\", "/") + + override def beforeAll(): Unit = { + sql("DROP TABLE IF EXISTS sdkOutputTable") + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, "force") + } + + override def afterAll(): Unit = { + sql("DROP TABLE IF EXISTS sdkOutputTable") + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, badRecordAction) + } + + test("test enum") { + sql("drop table if exists sdkOutputTable") + FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(writerPath)) + val schema1 = + """{ + | "namespace": "com.apache.schema", + | "type": "record", + | "name": "StudentActivity", + | "fields": + | [{ + | "name": "id", + | "type": { + | "type": "enum", + | "name": "Suit", + | "symbols": ["SPADES", "HEARTS", "DIAMONDS", "CLUBS"] + | } + | } + | ] + |}""".stripMargin + + val json1 = + """{"id":"HEARTS"}""".stripMargin + + val nn = new org.apache.avro.Schema.Parser().parse(schema1) + val record = testUtil.jsonToAvro(json1, schema1) + + val writer = CarbonWriter.builder + .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn) + writer.write(record) + writer.close() + sql( + s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY + |'carbondata' LOCATION + |'$writerPath' """.stripMargin) + checkAnswer(sql("select * from sdkOutputTable"), Seq(Row("HEARTS"))) + } + + test("test enum with struct type") { + sql("drop table if exists sdkOutputTable") + FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(writerPath)) + val mySchema = + "{ " + + " \"name\": \"address\", " + + " \"type\": \"record\", " + + " \"fields\": [ " + + " { " + + " \"name\": \"name\", " + + " \"type\": \"string\" " + + " }, " + + " { " + + " \"name\": \"age\", " + + " \"type\": \"int\" " + + " }, " + + " { " + + " \"name\": \"address\", \"type\": {" + + " \"type\" : \"record\", \"name\" : \"my_address\"," + + " \"fields\" : [" + + " {\"name\": \"enumRec\", " + + " \"type\": { " + + " \"type\": \"enum\", " + + " \"name\": \"card\", " + + " \"symbols\": [\"SPADES\", \"HEARTS\", \"DIAMONDS\", \"CLUBS\"] " + + " } " + + "}]}" + + " } " + + " ] " + + "} " + + val json1 = "{\"name\":\"bob\", \"age\":10, \"address\": {\"enumRec\":\"SPADES\"}}" + + val nn = new org.apache.avro.Schema.Parser().parse(mySchema) + val record = testUtil.jsonToAvro(json1, mySchema) + + val writer = CarbonWriter.builder + .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn) + writer.write(record) + writer.close() + sql( + s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY + |'carbondata' LOCATION + |'$writerPath' """.stripMargin) + checkAnswer(sql("select * from sdkOutputTable"), Seq(Row("bob", 10, Row("SPADES")))) + } + + test("test enum with Array type") { + sql("drop table if exists sdkOutputTable") + FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(writerPath)) + val mySchema = + """ { + | "name": "address", + | "type": "record", + | "fields": [ + | { + | "name": "name", + | "type": "string" + | }, + | { + | "name": "age", + | "type": "int" + | }, + | { + | "name": "address", + | "type": { + | "type": "array", + | "items": { + | "name": "Suit", + | "type": "enum", + | "symbols": ["SPADES", "HEARTS", "DIAMONDS", "CLUBS"] + | }}}] + | } + """.stripMargin + + val json: String = """ {"name": "bob","age": 10,"address": ["SPADES", "DIAMONDS"]} """ + + val nn = new org.apache.avro.Schema.Parser().parse(mySchema) + val record = testUtil.jsonToAvro(json, mySchema) + + val writer = CarbonWriter.builder + .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn) + writer.write(record) + writer.close() + sql( + s"""CREATE EXTERNAL TABLE sdkOutputTable(id decimal(4,3)) STORED BY + |'carbondata' LOCATION + |'$writerPath' """.stripMargin) + checkAnswer(sql("select * from sdkOutputTable"), + Seq(Row("bob", 10, mutable.WrappedArray.make(Array("SPADES", "DIAMONDS"))))) + } + + test("test union type long") { + sql("drop table if exists sdkOutputTable") + FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(writerPath)) + val schema1 = + """{ + | "namespace": "com.apache.schema", + | "type": "record", + | "name": "StudentActivity", + | "fields": [ + | { "name": "first", "type": ["string", "int", "long"] } + | ] + |}""".stripMargin + + val json1 = + """{"first":{"long":10345}}""".stripMargin + + val nn = new org.apache.avro.Schema.Parser().parse(schema1) + val record = testUtil.jsonToAvro(json1, schema1) + + val writer = CarbonWriter.builder + .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn) + writer.write(record) + writer.close() + sql( + s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY + |'carbondata' LOCATION + |'$writerPath' """.stripMargin) + checkAnswer(sql("select * from sdkOutputTable"), Seq(Row(Row(null, null, 10345)))) + } + + test("test union type boolean") { + sql("drop table if exists sdkOutputTable") + FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(writerPath)) + val schema1 = + """{ + | "namespace": "com.apache.schema", + | "type": "record", + | "name": "StudentActivity", + | "fields": [ + | { "name": "first", "type": ["boolean", "int", "long"] } + | ] + |}""".stripMargin + + val json1 = + """{"first":{"boolean":true}}""".stripMargin + + val nn = new org.apache.avro.Schema.Parser().parse(schema1) + val record = testUtil.jsonToAvro(json1, schema1) + + val writer = CarbonWriter.builder + .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn) + writer.write(record) + writer.close() + sql( + s"""CREATE EXTERNAL TABLE sdkOutputTable(id decimal(4,3)) STORED BY + |'carbondata' LOCATION + |'$writerPath' """.stripMargin) + checkAnswer(sql("select * from sdkOutputTable"), Seq(Row(Row(true, null, null)))) + } + + test("test union type string") { + sql("drop table if exists sdkOutputTable") + FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(writerPath)) + val schema1 = + """{ + | "namespace": "com.apache.schema", + | "type": "record", + | "name": "StudentActivity", + | "fields": [ + | { "name": "first", "type": ["string", "int", "long"] } + | ] + |}""".stripMargin + + val json1 = + """{"first":{"string":"abc"}}""".stripMargin + + val nn = new org.apache.avro.Schema.Parser().parse(schema1) + val record = testUtil.jsonToAvro(json1, schema1) + + val writer = CarbonWriter.builder + .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn) + writer.write(record) + writer.close() + sql( + s"""CREATE EXTERNAL TABLE sdkOutputTable(id decimal(4,3)) STORED BY + |'carbondata' LOCATION + |'$writerPath' """.stripMargin) + checkAnswer(sql("select * from sdkOutputTable"), Seq(Row(Row("abc", null, null)))) + } + + test("test union type int") { + sql("drop table if exists sdkOutputTable") + FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(writerPath)) + val schema1 = + """{ + | "namespace": "com.apache.schema", + | "type": "record", + | "name": "StudentActivity", + | "fields": [ + | { "name": "first", "type": ["string", "int", "long"] } + | ] + |}""".stripMargin + + val json1 = + """{"first":{"int":10}}""".stripMargin + + val nn = new org.apache.avro.Schema.Parser().parse(schema1) + val record = testUtil.jsonToAvro(json1, schema1) + + val writer = CarbonWriter.builder + .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn) + writer.write(record) + writer.close() + sql( + s"""CREATE EXTERNAL TABLE sdkOutputTable(id decimal(4,3)) STORED BY + |'carbondata' LOCATION + |'$writerPath' """.stripMargin) + checkAnswer(sql("select * from sdkOutputTable"), Seq(Row(Row(null, 10, null)))) + } + + test("test union type with null") { + sql("drop table if exists sdkOutputTable") + FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(writerPath)) + val schema1 = + """{ + | "namespace": "com.apache.schema", + | "type": "record", + | "name": "StudentActivity", + | "fields": [ + | { "name": "first", "type": ["null", "int"] } + | ] + |}""".stripMargin + + val json1 = + """{"first":{"null":null}}""".stripMargin + + val nn = new org.apache.avro.Schema.Parser().parse(schema1) + val record = testUtil.jsonToAvro(json1, schema1) + + val writer = CarbonWriter.builder + .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn) + writer.write(record) + writer.close() + sql( + s"""CREATE EXTERNAL TABLE sdkOutputTable(id decimal(4,3)) STORED BY + |'carbondata' LOCATION + |'$writerPath' """.stripMargin) + checkAnswer(sql("select * from sdkOutputTable"), Seq(Row(Row(null)))) + } + + test("test union type with only type null") { + sql("drop table if exists sdkOutputTable") + FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(writerPath)) + val schema1 = + """{ + | "namespace": "com.apache.schema", + | "type": "record", + | "name": "StudentActivity", + | "fields": [ + | { "name": "first", "type": ["null"] } + | ] + |}""".stripMargin + + val json1 = + """{"first":{"null":null}}""".stripMargin + + val nn = new org.apache.avro.Schema.Parser().parse(schema1) + val record = testUtil.jsonToAvro(json1, schema1) + val exception1 = intercept[UnsupportedOperationException] { + val writer = CarbonWriter.builder + .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn) + writer.write(record) + writer.close() + } + assert(exception1.getMessage + .contains("Carbon do not support Avro UNION with only null type")) + } + + test("test union type with Enum") { + sql("drop table if exists sdkOutputTable") + FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(writerPath)) + val schema1 = + """{ + | "namespace": "com.apache.schema", + | "type": "record", + | "name": "StudentActivity", + | "fields": [ + | { + | "name": "enum_field", "type": [{ + | "namespace": "org.example.avro", + | "name": "EnumField", + | "type": "enum", + | "symbols": [ + | "VAL_0", + | "VAL_1" + | ] + | },"null"], "default": null + | }] + |}""".stripMargin + + val json1 = + """{"enum_field":{"org.example.avro.EnumField":"VAL_0"}}""".stripMargin + + val nn = new org.apache.avro.Schema.Parser().parse(schema1) + val record = testUtil.jsonToAvro(json1, schema1) + + val writer = CarbonWriter.builder + .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn) + writer.write(record) + writer.close() + sql( + s"""CREATE EXTERNAL TABLE sdkOutputTable(id decimal(4,3)) STORED BY + |'carbondata' LOCATION + |'$writerPath' """.stripMargin) + checkAnswer(sql("select * from sdkOutputTable"), Seq(Row(Row("VAL_0")))) + } + + test("test union type with Map") { + sql("drop table if exists sdkOutputTable") + FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(writerPath)) + val schema1 = + """{ + | "namespace": "com.apache.schema", + | "type": "record", + | "name": "StudentActivity", + | "fields": [ + | { + | "name": "map_field", "type": [{ + | "namespace": "org.example.avro", + | "name": "mapField", + | "type": "map", + | "values":"string" + | },"int"], "default": null + | }] + |}""".stripMargin + + val json1 = + """{"map_field":{"map":{"street": "k-lane", "city": "bangalore"}}}""".stripMargin + + val nn = new org.apache.avro.Schema.Parser().parse(schema1) + val record = testUtil.jsonToAvro(json1, schema1) + + val writer = CarbonWriter.builder + .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn) + writer.write(record) + writer.close() + sql( + s"""CREATE EXTERNAL TABLE sdkOutputTable(id decimal(4,3)) STORED BY + |'carbondata' LOCATION + |'$writerPath' """.stripMargin) + sql("select * from sdkOutputTable").show(false) + checkAnswer(sql("select * from sdkOutputTable"), Seq( + Row(Row(Map("city" -> "bangalore", "street" -> "k-lane"), null)))) + } + + test("test union type") { + sql("drop table if exists sdkOutputTable") + FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(writerPath)) + val schema1 = + """{ + | "namespace": "com.apache.schema", + | "type": "record", + | "name": "StudentActivity", + | "fields": [ + | { + | "name": "struct_field", "type": [{ + | "namespace": "org.example.avro", + | "name": "structField", + | "type": "array", + | "items": { "name" : "name0", "type":"string"} + | },"int"], "default": null + | }] + |}""".stripMargin + + val json1 = + """{"struct_field":{"int":12}}""".stripMargin + + val nn = new org.apache.avro.Schema.Parser().parse(schema1) + val record = testUtil.jsonToAvro(json1, schema1) + + val writer = CarbonWriter.builder + .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn) + writer.write(record) + writer.close() + sql( + s"""CREATE EXTERNAL TABLE sdkOutputTable(id decimal(4,3)) STORED BY + |'carbondata' LOCATION + |'$writerPath' """.stripMargin) + checkAnswer(sql("select * from sdkOutputTable"), + Seq(Row(Row(mutable.WrappedArray.make(Array(null)), 12)))) + } + + test("test Struct of Union") { + sql("drop table if exists sdkOutputTable") + FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(writerPath)) + val schema1 = + """ + |{"name": "address", + | "type": "record", + | "fields": [ + | { "name": "address", "type": { + | "type" : "record", "name" : "my_address", + | "fields" : [ + | {"name": "city", "type": ["string","int"]}]}} + |]} + """.stripMargin + + val json1 = + """{"address":{"city":{"int":1}}}""".stripMargin + + val nn = new org.apache.avro.Schema.Parser().parse(schema1) + val record = testUtil.jsonToAvro(json1, schema1) + + val writer = CarbonWriter.builder + .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn) + writer.write(record) + writer.close() + sql( + s"""CREATE EXTERNAL TABLE sdkOutputTable(id decimal(4,3)) STORED BY + |'carbondata' LOCATION + |'$writerPath' """.stripMargin) + sql("describe formatted sdkOutputTable").show(false) + checkAnswer(sql("select * from sdkOutputTable"), Seq(Row(Row(Row(null, 1))))) + sql("insert into sdkOutputTable values('abc:12')") + sql("select address.city.city0 from sdkOutputTable").show(false) + } + + test("test Union with struct of array") { + sql("drop table if exists sdkOutputTable") + FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(writerPath)) + val schema1 = + """ + |{"name": "address", + | "type": "record", + | "fields": [ + | { "name": "address", "type": { + | "type" : "record", "name" : "my_address", + | "fields" : [ + | {"name": "city", "type": ["string", { + | "type": "array", + | "name": "abc_name_0", + | "items": { + | "name": "_name_0", + | "type": "record", + | "fields": [ + | { + | "name": "app_id", + | "type": [ + | "null", + | "string" + | ] + | } + | ] + | } + | } + | ]}]}} + |]} + """.stripMargin + + val json1 = + """{ + |"address":{"city": + |{"array":[ + | { + | "app_id": { + | "string": "abc" + | }}] + | }}}""".stripMargin + + val nn = new org.apache.avro.Schema.Parser().parse(schema1) + val record = testUtil.jsonToAvro(json1, schema1) + + val writer = CarbonWriter.builder + .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn) + writer.write(record) + writer.close() + sql( + s"""CREATE EXTERNAL TABLE sdkOutputTable(id decimal(4,3)) STORED BY + |'carbondata' LOCATION + |'$writerPath' """.stripMargin) + checkAnswer(sql("select * from sdkOutputTable"), + Seq(Row(Row(Row(null, mutable.WrappedArray.make(Array(Row(Row("abc"))))))))) + } + + test("test union type with Array and Struct") { + sql("drop table if exists sdkOutputTable") + FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(writerPath)) + val schema1 = + """{ + | "type": "record", + | "namespace": "example.avro", + | "name": "array_union", + | "fields": [ + | { + | "name": "body", + | "type": { + | "name": "body", + | "type": "record", + | "fields": [ + | { + | "name": "abc", + | "type": [ + | "int", + | { + | "type": "array", + | "name": "abc_name_0", + | "items": { + | "name": "_name_0", + | "type": "record", + | "fields": [ + | { + | "name": "app_id", + | "type": [ + | "null", + | "string" + | ] + | }, + | { + | "name": "app_name", + | "type": [ + | "int", + | "float", + | "string" + | ] + | }, + | { + | "name": "app_key", + | "type": [ + | "null", + | "string" + | ] + | } + | ] + | } + | } + | ] + | } + | ] + | } + | } + | ] + |}""".stripMargin + + val json1 = + """{ + | "body": { + | "abc": { + | "array": [ + | { + | "app_id": { + | "string": "abc" + | }, + | "app_name": { + | "string": "bcd" + | }, + | "app_key": { + | "string": "cde" + | } + | } + | ] + | } + | } + |}""".stripMargin + + val nn = new org.apache.avro.Schema.Parser().parse(schema1) + val record = testUtil.jsonToAvro(json1, schema1) + + val writer = CarbonWriter.builder + .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn) + writer.write(record) + writer.close() + sql( + s"""CREATE EXTERNAL TABLE sdkOutputTable(id decimal(4,3)) STORED BY + |'carbondata' LOCATION + |'$writerPath' """.stripMargin) + sql("describe formatted sdkOutputTable").show(false) + checkAnswer(sql("select * from sdkOutputTable"), + Seq(Row(Row(Row(null, + mutable.WrappedArray.make(Array(Row(Row("abc"), Row(null, null, "bcd"), Row("cde"))))))))) + } + + test("test union type with Decimal") { + sql("drop table if exists sdkOutputTable") + FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(writerPath)) + val schema1 = + """{ + | "namespace": "com.apache.schema", + | "type": "record", + | "name": "StudentActivity", + | "fields": [ + | { + | "name": "enum_field", "type": [{ + | "namespace": "org.example.avro", + | "name": "dec", + | "type": "bytes", + | "logicalType": "decimal", + | "precision": 10, + | "scale": 2 + | },"null"] + | }] + |}""".stripMargin + + val json1 = + """{"enum_field":{"bytes":"1010"}}""".stripMargin + + val nn = new org.apache.avro.Schema.Parser().parse(schema1) + val record = testUtil.jsonToAvro(json1, schema1) + + val writer = CarbonWriter.builder + .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn) + writer.write(record) + writer.close() + sql( + s"""CREATE EXTERNAL TABLE sdkOutputTable(id decimal(4,3)) STORED BY + |'carbondata' LOCATION + |'$writerPath' """.stripMargin) + checkExistence(sql("select * from sdkOutputTable"), true, "1010.00") + } + + test("test logical type decimal with struct") { + sql("drop table if exists sdkOutputTable") + FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(writerPath)) + val schema1 = + """ + |{"name": "address", + | "type": "record", + | "fields": [ + | { "name": "name", "type": "string"}, + | { "name": "age", "type": "float"}, + | { "name": "address", "type": { + | "type" : "record", "name" : "my_address", + | "fields" : [ + | {"name": "street", "type": "string"}, + | {"name": "city", "type": {"type" : "bytes", + | "logicalType": "decimal", + | "precision": 4, + | "scale": 2 + | }}]}} + |]} + """.stripMargin + + val json1 = """ {"name":"bob", "age":10.24, "address" : {"street":"abc", "city":"32"}} """ + + val nn = new org.apache.avro.Schema.Parser().parse(schema1) + val record = testUtil.jsonToAvro(json1, schema1) + + val writer = CarbonWriter.builder + .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn) + writer.write(record) + writer.close() + sql( + s"""CREATE EXTERNAL TABLE sdkOutputTable(id struct) STORED BY + |'carbondata' LOCATION + |'$writerPath' """.stripMargin) + checkExistence(sql("select * from sdkOutputTable"), true, "32.00") + } + + test("test logical type decimal with Array") { + sql("drop table if exists sdkOutputTable") + FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(writerPath)) + val schema1 = + """ { + | "name": "address", + | "type": "record", + | "fields": [ + | { + | "name": "name", + | "type": "string" + | }, + | { + | "name": "age", + | "type": "int" + | }, + | { + | "name": "address", + | "type": { + | "type": "array", + | "items": { + | "name": "street", + | "type": "bytes", + | "logicalType": "decimal", + | "precision": 4, + | "scale": 1 + | }}}] + | } + """.stripMargin + + val json1: String = """ {"name": "bob","age": 10,"address": ["32", "42"]} """ + + val nn = new org.apache.avro.Schema.Parser().parse(schema1) + val record = testUtil.jsonToAvro(json1, schema1) + + val writer = CarbonWriter.builder + .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn) + writer.write(record) + writer.close() + sql( + s"""CREATE EXTERNAL TABLE sdkOutputTable(id struct) STORED BY + |'carbondata' LOCATION + |'$writerPath' """.stripMargin) + checkExistence(sql("select * from sdkOutputTable"), true, "32.0") + } + +} diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java index 64dfd429d45..0fdaf78e085 100644 --- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java +++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java @@ -18,6 +18,8 @@ package org.apache.carbondata.sdk.file; import java.io.IOException; +import java.math.BigDecimal; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; @@ -29,6 +31,7 @@ import org.apache.carbondata.common.annotations.InterfaceAudience; import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.keygenerator.directdictionary.timestamp.DateDirectDictionaryGenerator; import org.apache.carbondata.core.metadata.datatype.DataType; import org.apache.carbondata.core.metadata.datatype.DataTypes; @@ -46,6 +49,7 @@ import org.apache.avro.LogicalTypes; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; +import org.apache.avro.util.Utf8; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.JobID; @@ -103,42 +107,6 @@ private Object avroFieldToObject(Schema.Field avroField, Object fieldValue) { Schema.Type type = avroField.schema().getType(); LogicalType logicalType = avroField.schema().getLogicalType(); switch (type) { - case INT: - if (logicalType != null) { - if (logicalType instanceof LogicalTypes.Date) { - int dateIntValue = (int) fieldValue; - out = dateIntValue * DateDirectDictionaryGenerator.MILLIS_PER_DAY; - } else { - LOGGER.warn("Actual type: INT, Logical Type: " + logicalType.getName()); - out = fieldValue; - } - } else { - out = fieldValue; - } - break; - case BOOLEAN: - case LONG: - if (logicalType != null && !(logicalType instanceof LogicalTypes.TimestampMillis)) { - if (logicalType instanceof LogicalTypes.TimestampMicros) { - long dateIntValue = (long) fieldValue; - out = dateIntValue / 1000L; - } else { - LOGGER.warn("Actual type: INT, Logical Type: " + logicalType.getName()); - out = fieldValue; - } - } else { - out = fieldValue; - } - break; - case DOUBLE: - case STRING: - out = fieldValue; - break; - case FLOAT: - // direct conversion will change precision. So parse from string. - // also carbon internally needs float as double - out = Double.parseDouble(fieldValue.toString()); - break; case MAP: // Note: Avro object takes care of removing the duplicates so we should not handle it again // Map will be internally stored as Array> @@ -213,6 +181,124 @@ private Object avroFieldToObject(Schema.Field avroField, Object fieldValue) { } out = new ArrayObject(arrayChildObjects); break; + case UNION: + // Union type will be internally stored as Struct + // Fill data object only if fieldvalue is instance of datatype + // For other field datatypes, fill value as Null + List unionFields = avroField.schema().getTypes(); + int notNullUnionFieldsCount = 0; + for (Schema unionField : unionFields) { + if (!unionField.getType().equals(Schema.Type.NULL)) { + notNullUnionFieldsCount++; + } + } + Object[] values = new Object[notNullUnionFieldsCount]; + int j = 0; + for (Schema unionField : unionFields) { + if (!unionField.getType().equals(Schema.Type.NULL) && checkFieldValueType( + unionField.getType(), fieldValue)) { + values[j] = avroFieldToObjectForUnionType(unionField, fieldValue, avroField); + break; + } + if (notNullUnionFieldsCount != 1) { + j++; + } + } + out = new StructObject(values); + break; + default: + out = avroPrimitiveFieldToObject(type, logicalType, fieldValue); + } + return out; + } + + /** + * For Union type, fill data if Schema.Type is instance of fieldValue + * and return result + * + * @param type + * @param fieldValue + * @return + */ + private boolean checkFieldValueType(Schema.Type type, Object fieldValue) { + switch (type) { + case INT: + return (fieldValue instanceof Integer); + case BOOLEAN: + return (fieldValue instanceof Boolean); + case LONG: + return (fieldValue instanceof Long); + case DOUBLE: + return (fieldValue instanceof Double); + case STRING: + return (fieldValue instanceof Utf8 || fieldValue instanceof String); + case FLOAT: + return (fieldValue instanceof Float); + case RECORD: + return (fieldValue instanceof GenericData.Record); + case ARRAY: + return (fieldValue instanceof GenericData.Array || fieldValue instanceof ArrayList); + case BYTES: + return (fieldValue instanceof ByteBuffer); + case MAP: + return (fieldValue instanceof HashMap); + case ENUM: + return (fieldValue instanceof GenericData.EnumSymbol); + default: + return false; + } + } + + private Object avroPrimitiveFieldToObject(Schema.Type type, LogicalType logicalType, + Object fieldValue) { + Object out = null; + switch (type) { + case INT: + if (logicalType != null) { + if (logicalType instanceof LogicalTypes.Date) { + int dateIntValue = (int) fieldValue; + out = dateIntValue * DateDirectDictionaryGenerator.MILLIS_PER_DAY; + } else { + LOGGER.warn("Actual type: INT, Logical Type: " + logicalType.getName()); + out = fieldValue; + } + } else { + out = fieldValue; + } + break; + case BOOLEAN: + case LONG: + if (logicalType != null && !(logicalType instanceof LogicalTypes.TimestampMillis)) { + if (logicalType instanceof LogicalTypes.TimestampMicros) { + long dateIntValue = (long) fieldValue; + out = dateIntValue / 1000L; + } else { + LOGGER.warn("Actual type: INT, Logical Type: " + logicalType.getName()); + out = fieldValue; + } + } else { + out = fieldValue; + } + break; + case DOUBLE: + case STRING: + case ENUM: + out = fieldValue; + break; + case FLOAT: + // direct conversion will change precision. So parse from string. + // also carbon internally needs float as double + out = Double.parseDouble(fieldValue.toString()); + break; + case BYTES: + // DECIMAL type is defined in Avro as a BYTE type with the logicalType property + // set to "decimal" and a specified precision and scale + // As binary type is not supported yet,value will be null + if (logicalType instanceof LogicalTypes.Decimal) { + out = new BigDecimal(new String(((ByteBuffer) fieldValue).array(), + CarbonCommonConstants.DEFAULT_CHARSET_CLASS)); + } + break; case NULL: out = null; break; @@ -223,6 +309,110 @@ private Object avroFieldToObject(Schema.Field avroField, Object fieldValue) { return out; } + /** + * fill fieldvalue for union type + * + * @param avroField + * @param fieldValue + * @param avroFields + * @return + */ + private Object avroFieldToObjectForUnionType(Schema avroField, Object fieldValue, + Schema.Field avroFields) { + Object out; + Schema.Type type = avroField.getType(); + LogicalType logicalType = avroField.getLogicalType(); + switch (type) { + case RECORD: + if (fieldValue instanceof GenericData.Record) { + List fields = avroField.getFields(); + + Object[] structChildObjects = new Object[fields.size()]; + for (int i = 0; i < fields.size(); i++) { + Object childObject = + avroFieldToObject(fields.get(i), ((GenericData.Record) fieldValue).get(i)); + if (childObject != null) { + structChildObjects[i] = childObject; + } + } + out = new StructObject(structChildObjects); + } else { + out = null; + } + break; + case ARRAY: + if (fieldValue instanceof GenericData.Array || fieldValue instanceof ArrayList) { + Object[] arrayChildObjects; + if (fieldValue instanceof GenericData.Array) { + int size = ((GenericData.Array) fieldValue).size(); + arrayChildObjects = new Object[size]; + for (int i = 0; i < size; i++) { + Object childObject = avroFieldToObject( + new Schema.Field(avroFields.name(), avroField.getElementType(), avroFields.doc(), + avroFields.defaultVal()), ((GenericData.Array) fieldValue).get(i)); + if (childObject != null) { + arrayChildObjects[i] = childObject; + } + } + } else { + int size = ((ArrayList) fieldValue).size(); + arrayChildObjects = new Object[size]; + for (int i = 0; i < size; i++) { + Object childObject = avroFieldToObject( + new Schema.Field(avroFields.name(), avroField.getElementType(), avroFields.doc(), + avroFields.defaultVal()), ((ArrayList) fieldValue).get(i)); + if (childObject != null) { + arrayChildObjects[i] = childObject; + } + } + } + out = new ArrayObject(arrayChildObjects); + } else { + out = null; + } + break; + case MAP: + // Note: Avro object takes care of removing the duplicates so we should not handle it again + // Map will be internally stored as Array> + if (fieldValue instanceof HashMap) { + Map mapEntries = (HashMap) fieldValue; + Object[] arrayMapChildObjects = new Object[mapEntries.size()]; + if (!mapEntries.isEmpty()) { + Iterator iterator = mapEntries.entrySet().iterator(); + int counter = 0; + while (iterator.hasNext()) { + // size is 2 because map will have key and value + Object[] mapChildObjects = new Object[2]; + Map.Entry mapEntry = (HashMap.Entry) iterator.next(); + // evaluate key + Object keyObject = avroFieldToObject( + new Schema.Field(avroFields.name(), Schema.create(Schema.Type.STRING), + avroFields.doc(), avroFields.defaultVal()), mapEntry.getKey()); + // evaluate value + Object valueObject = avroFieldToObject( + new Schema.Field(avroFields.name(), avroField.getValueType(), avroFields.doc(), + avroFields.defaultVal()), mapEntry.getValue()); + if (keyObject != null) { + mapChildObjects[0] = keyObject; + } + if (valueObject != null) { + mapChildObjects[1] = valueObject; + } + StructObject keyValueObject = new StructObject(mapChildObjects); + arrayMapChildObjects[counter++] = keyValueObject; + } + } + out = new ArrayObject(arrayMapChildObjects); + } else { + out = null; + } + break; + default: + out = avroPrimitiveFieldToObject(type, logicalType, fieldValue); + } + return out; + } + /** * converts avro schema to carbon schema required by carbonWriter * @@ -270,6 +460,7 @@ private static Field prepareFields(Schema.Field avroField) { } case DOUBLE: return new Field(fieldName, DataTypes.DOUBLE); + case ENUM: case STRING: return new Field(fieldName, DataTypes.STRING); case FLOAT: @@ -310,6 +501,32 @@ private static Field prepareFields(Schema.Field avroField) { } else { return null; } + case UNION: + int i = 0; + // Get union types and store as Struct + ArrayList unionFields = new ArrayList<>(); + for (Schema avroSubField : avroField.schema().getTypes()) { + if (!avroSubField.getType().equals(Schema.Type.NULL)) { + StructField unionField = prepareSubFields(avroField.name() + i++, avroSubField); + if (unionField != null) { + unionFields.add(unionField); + } + } + } + if (unionFields.isEmpty()) { + throw new UnsupportedOperationException( + "Carbon do not support Avro UNION with only null type"); + } + return new Field(fieldName, "struct", unionFields); + case BYTES: + // DECIMAL type is defined in Avro as a BYTE type with the logicalType property + // set to "decimal" and a specified precision and scale + if (logicalType instanceof LogicalTypes.Decimal) { + int precision = ((LogicalTypes.Decimal) childSchema.getLogicalType()).getPrecision(); + int scale = ((LogicalTypes.Decimal) childSchema.getLogicalType()).getScale(); + return new Field(fieldName, DataTypes.createDecimalType(precision, scale)); + } + return null; case NULL: return null; default: @@ -343,6 +560,7 @@ private static StructField prepareSubFields(String fieldName, Schema childSchema } case DOUBLE: return new StructField(fieldName, DataTypes.DOUBLE); + case ENUM: case STRING: return new StructField(fieldName, DataTypes.STRING); case FLOAT: @@ -385,6 +603,26 @@ private static StructField prepareSubFields(String fieldName, Schema childSchema } else { return null; } + case UNION: + // recursively get the union types + int i = 0; + ArrayList structSubTypes = new ArrayList<>(); + for (Schema avroSubField : childSchema.getTypes()) { + StructField structField = prepareSubFields(fieldName + i++, avroSubField); + if (structField != null) { + structSubTypes.add(structField); + } + } + return (new StructField(fieldName, DataTypes.createStructType(structSubTypes))); + case BYTES: + // DECIMAL type is defined in Avro as a BYTE type with the logicalType property + // set to "decimal" and a specified precision and scale + if (logicalType instanceof LogicalTypes.Decimal) { + int precision = ((LogicalTypes.Decimal) childSchema.getLogicalType()).getPrecision(); + int scale = ((LogicalTypes.Decimal) childSchema.getLogicalType()).getScale(); + return new StructField(fieldName, DataTypes.createDecimalType(precision, scale)); + } + return null; case NULL: return null; default: @@ -425,6 +663,7 @@ private static DataType getMappingDataTypeForCollectionRecord(Schema childSchema } case DOUBLE: return DataTypes.DOUBLE; + case ENUM: case STRING: return DataTypes.STRING; case FLOAT: @@ -457,6 +696,26 @@ private static DataType getMappingDataTypeForCollectionRecord(Schema childSchema } else { return null; } + case UNION: + int i = 0; + // recursively get the union types and create struct type + ArrayList unionFields = new ArrayList<>(); + for (Schema avroSubField : childSchema.getTypes()) { + StructField unionField = prepareSubFields(avroSubField.getName() + i++, avroSubField); + if (unionField != null) { + unionFields.add(unionField); + } + } + return DataTypes.createStructType(unionFields); + case BYTES: + // DECIMAL type is defined in Avro as a BYTE type with the logicalType property + // set to "decimal" and a specified precision and scale + if (logicalType instanceof LogicalTypes.Decimal) { + int precision = ((LogicalTypes.Decimal) childSchema.getLogicalType()).getPrecision(); + int scale = ((LogicalTypes.Decimal) childSchema.getLogicalType()).getScale(); + return DataTypes.createDecimalType(precision, scale); + } + return null; case NULL: return null; default: