Skip to content

Commit

Permalink
Merge 08fc944 into b6bd90d
Browse files Browse the repository at this point in the history
  • Loading branch information
Indhumathi27 committed Sep 7, 2018
2 parents b6bd90d + 08fc944 commit 5f8423a
Show file tree
Hide file tree
Showing 2 changed files with 372 additions and 29 deletions.
Expand Up @@ -18,8 +18,14 @@
package org.apache.carbondata.spark.testsuite.createTable

import java.io.File
import java.nio.ByteBuffer
import javax.xml.bind.DatatypeConverter

import scala.collection.mutable

import org.apache.avro.Conversions.DecimalConversion
import org.apache.avro.{LogicalTypes, Schema}
import org.apache.avro.generic.GenericData
import org.apache.spark.sql.Row
import org.apache.spark.sql.test.util.QueryTest
import org.scalatest.BeforeAndAfterAll
Expand All @@ -46,6 +52,8 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef

writerPath = writerPath.replace("\\", "/")

val decimalConversion = new DecimalConversion

override def beforeAll(): Unit = {
sql("DROP TABLE IF EXISTS sdkOutputTable")
CarbonProperties.getInstance()
Expand Down Expand Up @@ -689,15 +697,27 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
| }]
|}""".stripMargin

val json1 =
"""{"enum_field":{"bytes":"1010"}}""".stripMargin

val nn = new org.apache.avro.Schema.Parser().parse(schema1)
val decimalConversion = new DecimalConversion
val logicalType = LogicalTypes.decimal(10, 2)
val decimal = new java.math.BigDecimal("1010").setScale(2)
//get unscaled 2's complement bytearray
val bytes =
decimalConversion.toBytes(decimal, nn.getField("enum_field").schema, logicalType)
val data = DatatypeConverter.printBase64Binary(bytes.array())
val json1 =
s"""{"enum_field":{"bytes":"$data"}}""".stripMargin
val record = testUtil.jsonToAvro(json1, schema1)
val data1 = new String(record.get(0).asInstanceOf[ByteBuffer].array(),
CarbonCommonConstants.DEFAULT_CHARSET_CLASS)
val bytes1 = ByteBuffer.wrap(DatatypeConverter.parseBase64Binary(data1))
val avroRec = new GenericData. Record(nn)
avroRec.put("enum_field", bytes1)


val writer = CarbonWriter.builder
.outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
writer.write(record)
writer.write(avroRec)
writer.close()
sql(
s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY
Expand Down Expand Up @@ -728,14 +748,46 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
|]}
""".stripMargin

val json1 = """ {"name":"bob", "age":10.24, "address" : {"street":"abc", "city":"32"}} """

val nn = new org.apache.avro.Schema.Parser().parse(schema1)

val logicalType = LogicalTypes.decimal(4, 2)
val decimal1 = new java.math.BigDecimal("32").setScale(2)
//get unscaled 2's complement bytearray
val bytes =
decimalConversion.toBytes(decimal1, nn.getField("address").schema, logicalType)
val data = DatatypeConverter.printBase64Binary(bytes.array())
val json1 = s""" {"name":"bob", "age":10.24, "address" : {"street":"abc", "city":"$data"}} """
val record = testUtil.jsonToAvro(json1, schema1)

val jsonData = new String(record.get(2).asInstanceOf[GenericData.Record].get(1)
.asInstanceOf[ByteBuffer].array(),
CarbonCommonConstants.DEFAULT_CHARSET_CLASS)
val bytesValue = ByteBuffer.wrap(DatatypeConverter.parseBase64Binary(jsonData))
val mySchema =
"""
|{"name": "address",
| "type": "record",
| "fields": [
| { "name": "street", "type": "string"},
| { "name": "city", "type": {"type" : "bytes",
| "logicalType": "decimal",
| "precision": 4,
| "scale": 2
| }}
|]}
""".stripMargin
val schema = new org.apache.avro.Schema.Parser().parse(mySchema)
val genericByteArray = new GenericData.Record(schema)
genericByteArray.put("street", "abc")
genericByteArray.put("city", bytesValue)
val avroRec = new GenericData.Record(nn)
avroRec.put("name", "bob")
avroRec.put("age", 10.24)
avroRec.put("address", genericByteArray)

val writer = CarbonWriter.builder
.outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
writer.write(record)
writer.write(avroRec)
writer.close()
sql(
s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY
Expand Down Expand Up @@ -774,14 +826,40 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
| }
""".stripMargin

val json1: String = """ {"name": "bob","age": 10,"address": ["32", "42"]} """

val nn = new org.apache.avro.Schema.Parser().parse(schema1)
val logicalType = LogicalTypes.decimal(4, 1)
val decimal1 = new java.math.BigDecimal("32").setScale(1)
val decimal2 = new java.math.BigDecimal("42").setScale(1)
//get unscaled 2's complement bytearray
val bytes1 =
decimalConversion.toBytes(decimal1, nn.getField("address").schema, logicalType)
val bytes2 =
decimalConversion.toBytes(decimal2, nn.getField("address").schema, logicalType)
val data1 = DatatypeConverter.printBase64Binary(bytes1.array())
val data2 = DatatypeConverter.printBase64Binary(bytes2.array())
val json1: String = s""" {"name": "bob","age": 10,"address":["$data1","$data2"]} """
val record = testUtil.jsonToAvro(json1, schema1)

val jsonData1 = new String(record.get(2).asInstanceOf[GenericData.Array[ByteBuffer]].get(0)
.array(),
CarbonCommonConstants.DEFAULT_CHARSET_CLASS)
val jsonData2 = new String(record.get(2).asInstanceOf[GenericData.Array[ByteBuffer]].get(1)
.array(),
CarbonCommonConstants.DEFAULT_CHARSET_CLASS)
val bytesValue1 = ByteBuffer.wrap(DatatypeConverter.parseBase64Binary(jsonData1))
val bytesValue2 = ByteBuffer.wrap(DatatypeConverter.parseBase64Binary(jsonData2))
val genericByteArray = new GenericData.Array[ByteBuffer](2,
Schema.createArray(Schema.create(Schema.Type.BYTES)))
genericByteArray.add(bytesValue1)
genericByteArray.add(bytesValue2)
val avroRec = new GenericData.Record(nn)
avroRec.put("name", "bob")
avroRec.put("age", 10)
avroRec.put("address", genericByteArray)

val writer = CarbonWriter.builder
.outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
writer.write(record)
writer.write(avroRec)
writer.close()
sql(
s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY
Expand Down Expand Up @@ -884,4 +962,220 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
checkAnswer(sql("select * from sdkOutputTable"), Seq(Row(1728000, Row(1728000))))
}

test("test logical type decimal through Json") {
sql("drop table if exists sdkOutputTable")
FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(writerPath))
val schema1 =
"""{
| "namespace": "com.apache.schema",
| "type": "record",
| "name": "StudentActivity",
| "fields": [
| {
| "name": "id",
| "type": {"type" : "bytes",
| "logicalType": "decimal",
| "precision": 5,
| "scale": 2
| }
|}
| ]
|}""".stripMargin
val nn = new org.apache.avro.Schema.Parser().parse(schema1)
val logicalType = LogicalTypes.decimal(5, 2)
val decimal = new java.math.BigDecimal("12.8").setScale(2)
//get unscaled 2's complement bytearray
val bytes =
decimalConversion.toBytes(decimal, nn.getField("id").schema, logicalType)
val data = DatatypeConverter.printBase64Binary(bytes.array())
val json1 =
s"""{"id":"$data"}""".stripMargin
val record = testUtil.jsonToAvro(json1, schema1)
val data1 = new String(record.get(0).asInstanceOf[ByteBuffer].array(),
CarbonCommonConstants.DEFAULT_CHARSET_CLASS)
val bytes1 = ByteBuffer.wrap(DatatypeConverter.parseBase64Binary(data1))
val avroRec = new GenericData. Record(nn)
avroRec.put("id", bytes1)
val writer = CarbonWriter.builder
.outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
writer.write(avroRec)
writer.close()
sql(
s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY
|'carbondata' LOCATION
|'$writerPath' """.stripMargin)
checkExistence(sql("select * from sdkOutputTable"), true, "12.80")
}

test("test logical type decimal through Avro") {
sql("drop table if exists sdkOutputTable")
FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(writerPath))
val schema1 =
"""{
| "namespace": "com.apache.schema",
| "type": "record",
| "name": "StudentActivity",
| "fields": [
| {
| "name": "id",
| "type": {"type" : "bytes",
| "logicalType": "decimal",
| "precision": 5,
| "scale": 2
| }
|}
| ]
|}""".stripMargin
val nn = new org.apache.avro.Schema.Parser().parse(schema1)
val logicalType = LogicalTypes.decimal(5, 2)
val decimal = new java.math.BigDecimal("12.8").setScale(2)
//get unscaled 2's complement bytearray
val bytes =
decimalConversion.toBytes(decimal, nn.getField("id").schema, logicalType)
val data = DatatypeConverter.printBase64Binary(bytes.array())
val json1 =
s"""{"id":"$data"}""".stripMargin
val avroRec = new GenericData. Record(nn)
avroRec.put("id", bytes)
val writer = CarbonWriter.builder
.outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
writer.write(avroRec)
writer.close()
sql(
s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY
|'carbondata' LOCATION
|'$writerPath' """.stripMargin)
checkExistence(sql("select * from sdkOutputTable"), true, "12.80")
}

test("test logical type decimal with data having greater precision") {
sql("drop table if exists sdkOutputTable")
FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(writerPath))
val schema1 =
"""{
| "namespace": "com.apache.schema",
| "type": "record",
| "name": "StudentActivity",
| "fields": [
| {
| "name": "id",
| "type": {"type" : "bytes",
| "logicalType": "decimal",
| "precision": 5,
| "scale": 2
| }
|}
| ]
|}""".stripMargin
val nn = new org.apache.avro.Schema.Parser().parse(schema1)
val logicalType = LogicalTypes.decimal(5, 2)
val decimal = new java.math.BigDecimal("1218").setScale(2)
//get unscaled 2's complement bytearray
val bytes =
decimalConversion.toBytes(decimal, nn.getField("id").schema, logicalType)
val data = DatatypeConverter.printBase64Binary(bytes.array())
val json1 =
s"""{"id":"$data"}""".stripMargin
val avroRec = new GenericData. Record(nn)
avroRec.put("id", bytes)
val exception1 = intercept[Exception] {
val writer = CarbonWriter.builder
.outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
writer.write(avroRec)
writer.close()
}
assert(exception1.getMessage
.contains("Data Loading failed as value Precision 6 is greater than specified Precision 5 in Avro Schema"))
}

test("test union with multiple record type") {
sql("drop table if exists sdkOutputTable")
FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(writerPath))
val schema1 =
"""{
| "namespace": "test.avro",
| "type": "record",
| "name": "NewCar2",
| "fields": [
| {
| "name": "optionalExtra",
| "type": ["null",{
| "type":"record",
| "name":"Stereo",
| "fields" :[{
| "name":"make",
| "type":"string"
| },
| {
| "name":"speakers",
| "type":"int"
| }]
| },{
| "type":"record",
| "name":"LeatherTrim",
| "fields":[{
| "name":"colour",
| "type":"string"
| }]
| }],
| "default":null
| }]
|
|}""".stripMargin

val json1 =
"""{"optionalExtra":{"test.avro.LeatherTrim":{"colour":"ab"}}}""".stripMargin

val nn = new org.apache.avro.Schema.Parser().parse(schema1)
val record = testUtil.jsonToAvro(json1, schema1)

val writer = CarbonWriter.builder
.outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
writer.write(record)
writer.close()
sql(
s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY
|'carbondata' LOCATION
|'$writerPath' """.stripMargin)
checkAnswer(sql("select * from sdkOutputTable"),
Seq(Row(Row(Row(null,null),Row("ab")))))
}

test("test union with multiple Enum type") {
sql("drop table if exists sdkOutputTable")
FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(writerPath))
val schema1 =
"""{
| "namespace": "test.avro",
| "type": "record",
| "name": "Union_data3",
| "fields": [
| {
| "name": "emp_id",
| "type":
| ["long","null","string",
| {"type":"enum","name":"t1","symbols":["red","blue","yellow"]},
| {"type":"enum","name":"t2","symbols":["sun","mon","tue","wed","thu","fri","sat"]},
| "int"
| ]}]
|}""".stripMargin

val json1 =
"""{"emp_id":{"test.avro.t2":"sun"}}""".stripMargin

val nn = new org.apache.avro.Schema.Parser().parse(schema1)
val record = testUtil.jsonToAvro(json1, schema1)

val writer = CarbonWriter.builder
.outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
writer.write(record)
writer.close()
sql(
s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY
|'carbondata' LOCATION
|'$writerPath' """.stripMargin)
checkAnswer(sql("select * from sdkOutputTable"),
Seq(Row(Row(null,null,null,"sun",null))))
}

}

0 comments on commit 5f8423a

Please sign in to comment.