From 6e113694d55eed4943779100eee586a612529a49 Mon Sep 17 00:00:00 2001 From: Indhumathi27 Date: Fri, 11 May 2018 10:29:42 +0530 Subject: [PATCH] [CARBONDATA-2481]Adding SDV for SDKwriter --- integration/spark-common-cluster-test/pom.xml | 12 + .../sdv/generated/SDKwriterTestCase.scala | 732 ++++++++++++++++++ .../cluster/sdv/suite/SDVSuites.scala | 1 + 3 files changed, 745 insertions(+) create mode 100644 integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/SDKwriterTestCase.scala diff --git a/integration/spark-common-cluster-test/pom.xml b/integration/spark-common-cluster-test/pom.xml index 44453b3a287..d8aecc2b162 100644 --- a/integration/spark-common-cluster-test/pom.xml +++ b/integration/spark-common-cluster-test/pom.xml @@ -68,6 +68,18 @@ ${project.version} test + + org.apache.carbondata + carbondata-store-sdk + ${project.version} + test + + + tech.allegro.schema.json2avro + converter + 0.2.5 + test + diff --git a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/SDKwriterTestCase.scala b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/SDKwriterTestCase.scala new file mode 100644 index 00000000000..012091d9193 --- /dev/null +++ b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/SDKwriterTestCase.scala @@ -0,0 +1,732 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.cluster.sdv.generated + + +import java.util + +import org.apache.spark.sql.Row +import org.apache.spark.sql.common.util.QueryTest +import org.scalatest.BeforeAndAfterEach +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.avro +import org.apache.commons.lang.CharEncoding +import org.junit.Assert +import tech.allegro.schema.json2avro.converter.JsonAvroConverter + +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datastore.filesystem.CarbonFile +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.util.CarbonUtil +import org.apache.carbondata.sdk.file.{AvroCarbonWriter, CarbonWriter, Schema} + +/** + * Test Class for SDKwriterTestcase to verify all scenarios + */ + +class SDKwriterTestCase extends QueryTest with BeforeAndAfterEach { + + var writerPath = + s"${ resourcesPath }" + "/SparkCarbonFileFormat/WriterOutput1/" + + override def beforeEach: Unit = { + sql("DROP TABLE IF EXISTS sdkTable1") + sql("DROP TABLE IF EXISTS sdkTable2") + sql("DROP TABLE IF EXISTS table1") + cleanTestData() + } + + override def afterEach(): Unit = { + sql("DROP TABLE IF EXISTS sdkTable1") + sql("DROP TABLE IF EXISTS sdkTable2") + sql("DROP TABLE IF EXISTS table1") + cleanTestData() + } + + def cleanTestData() = { + FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(writerPath)) + } + + def buildTestDataSingleFile(): Any = { + buildTestData(3, false, null) + } + + def buildTestDataWithBadRecordForce(writerPath: String): Any = { + var options = Map("bAd_RECords_action" -> "FORCE").asJava + buildTestData(3, false, options) + } + + def buildTestDataWithBadRecordFail(writerPath: String): Any = { + var options = Map("bAd_RECords_action" -> "FAIL").asJava + buildTestData(15001, false, options) + } + + def buildTestData(rows: Int, + persistSchema: Boolean, + options: util.Map[String, String]): Any = { + buildTestData(rows, persistSchema, options, List("name"), writerPath) + } + + // prepare sdk writer output + def buildTestData(rows: Int, + persistSchema: Boolean, + options: util.Map[String, String], + sortColumns: List[String], + writerPath: String): Any = { + val schema = new StringBuilder() + .append("[ \n") + .append(" {\"name\":\"string\"},\n") + .append(" {\"age\":\"int\"},\n") + .append(" {\"height\":\"double\"}\n") + .append("]") + .toString() + + try { + val builder = CarbonWriter.builder() + val writer = + if (persistSchema) { + builder.persistSchemaFile(true) + builder + .sortBy(sortColumns.toArray) + .outputPath(writerPath) + .isTransactionalTable(false) + .uniqueIdentifier(System.currentTimeMillis) + .buildWriterForCSVInput(Schema.parseJson(schema)) + } else { + if (options != null) { + builder.outputPath(writerPath) + .isTransactionalTable(false) + .sortBy(sortColumns.toArray) + .uniqueIdentifier( + System.currentTimeMillis).withBlockSize(2).withLoadOptions(options) + .buildWriterForCSVInput(Schema.parseJson(schema)) + } else { + builder.outputPath(writerPath) + .isTransactionalTable(false) + .sortBy(sortColumns.toArray) + .uniqueIdentifier( + System.currentTimeMillis).withBlockSize(2) + .buildWriterForCSVInput(Schema.parseJson(schema)) + } + } + var i = 0 + while (i < rows) { + if ((options != null) && (i < 3)) { + // writing a bad record + writer.write(Array[String]("abc" + i, String.valueOf(i.toDouble / 2), "abc")) + } else { + writer.write(Array[String]("abc" + i, String.valueOf(i), String.valueOf(i.toDouble / 2))) + } + i += 1 + } + if (options != null) { + //Keep one valid record. else carbon data file will not generate + writer.write(Array[String]("abc" + i, String.valueOf(i), String.valueOf(i.toDouble / 2))) + } + writer.close() + } catch { + case ex: Exception => throw new RuntimeException(ex) + + case _ => None + } + } + + def buildTestDataWithBadRecordIgnore(writerPath: String): Any = { + var options = Map("bAd_RECords_action" -> "IGNORE").asJava + buildTestData(3, false, options) + } + + def buildTestDataWithBadRecordRedirect(writerPath: String): Any = { + var options = Map("bAd_RECords_action" -> "REDIRECT").asJava + buildTestData(3, false, options) + } + + def deleteFile(path: String, extension: String): Unit = { + val file: CarbonFile = FileFactory + .getCarbonFile(path, FileFactory.getFileType(path)) + + for (eachDir <- file.listFiles) { + if (!eachDir.isDirectory) { + if (eachDir.getName.endsWith(extension)) { + CarbonUtil.deleteFoldersAndFilesSilent(eachDir) + } + } else { + deleteFile(eachDir.getPath, extension) + } + } + } + + test("test create External Table with WriterPath") { + buildTestDataSingleFile() + assert(FileFactory.getCarbonFile(writerPath).exists()) + sql("DROP TABLE IF EXISTS sdkTable") + sql( + s"""CREATE EXTERNAL TABLE sdkTable STORED BY + |'carbondata' LOCATION + |'$writerPath' """.stripMargin) + + checkAnswer(sql("select * from sdkTable"), Seq(Row("abc0", 0, 0.0), + Row("abc1", 1, 0.5), + Row("abc2", 2, 1.0))) + } + + test("test create External Table with Comment") { + buildTestDataSingleFile() + assert(FileFactory.getCarbonFile(writerPath).exists()) + sql("DROP TABLE IF EXISTS sdkTable") + sql( + s"""CREATE EXTERNAL TABLE sdkTable comment 'this is comment' STORED BY + |'carbondata' LOCATION + |'$writerPath' """.stripMargin) + + checkAnswer(sql("select * from sdkTable"), Seq(Row("abc0", 0, 0.0), + Row("abc1", 1, 0.5), + Row("abc2", 2, 1.0))) + } + + test("test create External Table and test files written from sdk writer") { + buildTestDataSingleFile() + assert(FileFactory.getCarbonFile(writerPath).exists()) + sql("DROP TABLE IF EXISTS sdkTable") + + sql( + s"""CREATE EXTERNAL TABLE sdkTable(name string,age int) STORED BY + |'carbondata' LOCATION + |'$writerPath' """.stripMargin) + checkAnswer(sql("select * from sdkTable"), Seq(Row("abc0", 0, 0.0), + Row("abc1", 1, 0.5), + Row("abc2", 2, 1.0))) + + checkAnswer(sql("select name from sdkTable"), Seq(Row("abc0"), + Row("abc1"), + Row("abc2"))) + + checkAnswer(sql("select age from sdkTable"), Seq(Row(0), Row(1), Row(2))) + checkAnswer(sql("select * from sdkTable where age > 1 and age < 8"), + Seq(Row("abc2", 2, 1.0))) + + checkAnswer(sql("select * from sdkTable where name = 'abc2'"), + Seq(Row("abc2", 2, 1.0))) + + checkAnswer(sql("select * from sdkTable where name like '%b%' limit 2"), + Seq(Row("abc0", 0, 0.0), + Row("abc1", 1, 0.5))) + + checkAnswer(sql("select sum(age) from sdkTable where name like 'abc%'"), Seq(Row(3))) + checkAnswer(sql("select count(*) from sdkTable where name like 'abc%' "), Seq(Row(3))) + checkAnswer(sql("select count(*) from sdkTable"), Seq(Row(3))) + + } + + test("test create External Table and test insert into external table") { + buildTestDataSingleFile() + assert(FileFactory.getCarbonFile(writerPath).exists()) + sql("DROP TABLE IF EXISTS sdkTable") + + sql( + s"""CREATE EXTERNAL TABLE sdkTable(name string,age int) STORED BY + |'carbondata' LOCATION + |'$writerPath' """.stripMargin) + + checkAnswer(sql(s"""select count(*) from sdkTable where age = 1"""), + Seq(Row(1))) + + sql("insert into sdktable select 'def0',1,5.5") + sql("insert into sdktable select 'def1',5,6.6") + + checkAnswer(sql(s"""select count(*) from sdkTable where age = 1"""), + Seq(Row(2))) + } + + test("test create External Table and test insert into normal table with different schema") { + buildTestDataSingleFile() + assert(FileFactory.getCarbonFile(writerPath).exists()) + sql("DROP TABLE IF EXISTS sdkTable") + sql("DROP TABLE IF EXISTS table1") + + sql( + s"""CREATE EXTERNAL TABLE sdkTable STORED BY + |'carbondata' LOCATION + |'$writerPath' """.stripMargin) + sql( + "create table if not exists table1 (name string, age int) STORED BY 'carbondata'") + sql("insert into table1 select * from sdkTable") + checkAnswer(sql("select * from table1"), Seq(Row("abc0", 0), + Row("abc1", 1), + Row("abc2", 2))) + } + + test("test Insert into External Table from another External Table with Same Schema") { + buildTestDataSingleFile() + assert(FileFactory.getCarbonFile(writerPath).exists()) + sql("DROP TABLE IF EXISTS sdkTable1") + sql("DROP TABLE IF EXISTS sdkTable2") + sql( + s"""CREATE EXTERNAL TABLE sdkTable1(name string,age int) STORED BY + |'carbondata' LOCATION + |'$writerPath' """.stripMargin) + sql( + s"""CREATE EXTERNAL TABLE sdkTable2(name string,age int) STORED BY + |'carbondata' LOCATION + |'$writerPath' """.stripMargin) + + sql("insert into sdkTable1 select *from sdkTable2") + checkAnswer(sql("select count(*) from sdkTable1"), Seq(Row(6))) + } + + test("test create External Table with Schema with partition, external table should " + + "ignore schema and partition") { + buildTestDataSingleFile() + assert(FileFactory.getCarbonFile(writerPath).exists()) + sql("DROP TABLE IF EXISTS sdkTable") + + sql( + s"""CREATE EXTERNAL TABLE sdkTable(name string) PARTITIONED BY (age int) STORED BY + |'carbondata' LOCATION + |'$writerPath' """.stripMargin) + + checkAnswer(sql("select * from sdkTable"), Seq(Row("abc0", 0, 0.0), + Row("abc1", 1, 0.5), + Row("abc2", 2, 1.0))) + } + + test("test External Table with insert overwrite") { + buildTestDataSingleFile() + assert(FileFactory.getCarbonFile(writerPath).exists()) + sql("DROP TABLE IF EXISTS sdkTable") + sql("DROP TABLE IF EXISTS table1") + + sql( + s"""CREATE EXTERNAL TABLE sdkTable(name string) PARTITIONED BY (age int) STORED BY + |'carbondata' LOCATION + |'$writerPath' """.stripMargin) + + checkAnswer(sql("select * from sdkTable"), Seq(Row("abc0", 0, 0.0), + Row("abc1", 1, 0.5), + Row("abc2", 2, 1.0))) + + sql( + "create table if not exists table1 (name string, age int, height double) STORED BY 'org" + + ".apache.carbondata.format'") + sql(s"""insert into table1 values ("aaaaa", 12, 20)""") + + checkAnswer(sql(s"""select count(*) from sdkTable where age = 1"""), + Seq(Row(1))) + + sql("insert overwrite table sdkTable select * from table1") + + checkAnswer(sql(s"""select count(*) from sdkTable where age = 1"""), + Seq(Row(0))) + } + + test("test create External Table with Table properties should ignore tblproperties") { + buildTestDataSingleFile() + assert(FileFactory.getCarbonFile(writerPath).exists()) + sql("DROP TABLE IF EXISTS sdkTable") + + sql( + s"""CREATE EXTERNAL TABLE sdkTable(name string,age int) STORED BY + |'carbondata' LOCATION + |'$writerPath' TBLPROPERTIES('sort_scope'='batch_sort') """.stripMargin) + + checkExistence(sql("Describe formatted sdkTable "), false, "batch_sort") + } + + test("Read sdk writer output file and test without carbondata and carbonindex files should fail") + { + buildTestDataSingleFile() + deleteFile(writerPath, CarbonCommonConstants.FACT_FILE_EXT) + deleteFile(writerPath, CarbonCommonConstants.UPDATE_INDEX_FILE_EXT) + assert(FileFactory.getCarbonFile(writerPath).exists()) + sql("DROP TABLE IF EXISTS sdkTable") + + val exception = intercept[Exception] { + //data source file format + sql( + s"""CREATE EXTERNAL TABLE sdkTable STORED BY 'carbondata' LOCATION + |'$writerPath' """.stripMargin) + } + assert(exception.getMessage() + .contains("Operation not allowed: Invalid table path provided:")) + } + + test("test create External Table and test CTAS") { + buildTestDataSingleFile() + assert(FileFactory.getCarbonFile(writerPath).exists()) + sql("DROP TABLE IF EXISTS sdkTable") + sql("DROP TABLE IF EXISTS table1") + sql( + s"""CREATE EXTERNAL TABLE sdkTable STORED BY + |'carbondata' LOCATION + |'$writerPath' """.stripMargin) + + checkAnswer(sql("select * from sdkTable"), Seq(Row("abc0", 0, 0.0), + Row("abc1", 1, 0.5), + Row("abc2", 2, 1.0))) + + sql("create table table1 stored by 'carbondata' as select *from sdkTable") + + checkAnswer(sql("select * from table1"), Seq(Row("abc0", 0, 0.0), + Row("abc1", 1, 0.5), + Row("abc2", 2, 1.0))) + } + + test("test create External Table and test JOIN on External Tables") { + buildTestDataSingleFile() + assert(FileFactory.getCarbonFile(writerPath).exists()) + sql("DROP TABLE IF EXISTS sdkTable") + sql("DROP TABLE IF EXISTS sdkTable1") + sql( + s"""CREATE EXTERNAL TABLE sdkTable STORED BY + |'carbondata' LOCATION + |'$writerPath' """.stripMargin) + sql( + s"""CREATE EXTERNAL TABLE sdkTable1 STORED BY + |'carbondata' LOCATION + |'$writerPath' """.stripMargin) + + checkAnswer(sql("select * from sdkTable JOIN sdkTable1 on (sdkTable.age=sdkTable1.age)"), + Seq(Row("abc0", 0, 0.0, "abc0", 0, 0.0), + Row("abc1", 1, 0.5, "abc1", 1, 0.5), + Row("abc2", 2, 1.0, "abc2", 2, 1.0))) + checkAnswer(sql( + "select * from sdkTable LEFT OUTER JOIN sdkTable1 on (sdkTable.age=sdkTable1.age)"), + Seq(Row("abc0", 0, 0.0, "abc0", 0, 0.0), + Row("abc1", 1, 0.5, "abc1", 1, 0.5), + Row("abc2", 2, 1.0, "abc2", 2, 1.0))) + checkAnswer(sql( + "select * from sdkTable RIGHT OUTER JOIN sdkTable1 on (sdkTable.age=sdkTable1.age)"), + Seq(Row("abc0", 0, 0.0, "abc0", 0, 0.0), + Row("abc1", 1, 0.5, "abc1", 1, 0.5), + Row("abc2", 2, 1.0, "abc2", 2, 1.0))) + } + + test("test create external table and test bad record") { + //1. Action = FORCE + buildTestDataWithBadRecordForce(writerPath) + assert(FileFactory.getCarbonFile(writerPath).exists()) + sql("DROP TABLE IF EXISTS sdkTable") + sql( + s"""CREATE EXTERNAL TABLE sdkTable STORED BY 'carbondata' LOCATION + |'$writerPath' """.stripMargin) + checkAnswer(sql("select * from sdkTable"), Seq( + Row("abc0", null, null), + Row("abc1", null, null), + Row("abc2", null, null), + Row("abc3", 3, 1.5))) + + sql("DROP TABLE sdkTable") + cleanTestData() + + //2. Action = REDIRECT + buildTestDataWithBadRecordRedirect(writerPath) + assert(FileFactory.getCarbonFile(writerPath).exists()) + sql("DROP TABLE IF EXISTS sdkTable") + sql( + s"""CREATE EXTERNAL TABLE sdkTable STORED BY 'carbondata' LOCATION + |'$writerPath' """.stripMargin) + + checkAnswer(sql("select * from sdkTable"), Seq( + Row("abc3", 3, 1.5))) + + sql("DROP TABLE sdkTable") + cleanTestData() + + //3. Action = IGNORE + buildTestDataWithBadRecordIgnore(writerPath) + assert(FileFactory.getCarbonFile(writerPath).exists()) + sql("DROP TABLE IF EXISTS sdkTable") + sql( + s"""CREATE EXTERNAL TABLE sdkTable STORED BY 'carbondata' LOCATION + |'$writerPath' """.stripMargin) + checkAnswer(sql("select * from sdkTable"), Seq( + Row("abc3", 3, 1.5))) + + } + + def buildAvroTestDataStructType(): Any = { + buildAvroTestDataStruct(3, null) + } + + def buildAvroTestDataStruct(rows: Int, + options: util.Map[String, String]): Any = { + + val mySchema = + """ + |{"name": "address", + | "type": "record", + | "fields": [ + | { "name": "name", "type": "string"}, + | { "name": "age", "type": "int"}, + | { "name": "address", "type": { + | "type" : "record", "name" : "my_address", + | "fields" : [ + | {"name": "street", "type": "string"}, + | {"name": "city", "type": "string"}]}} + |]} + """.stripMargin + + val json = """ {"name":"bob", "age":10, "address" : {"street":"abc", "city":"bang"}} """ + WriteFilesWithAvroWriter(rows, mySchema, json) + } + + def buildAvroTestDataBothStructArrayType(): Any = { + buildAvroTestDataStructWithArrayType(3, null) + } + + def buildAvroTestDataStructWithArrayType(rows: Int, + options: util.Map[String, String]): Any = { + + val mySchema = + """ + { + | "name": "address", + | "type": "record", + | "fields": [ + | { "name": "name", "type": "string"}, + | { "name": "age", "type": "int"}, + | { + | "name": "address", + | "type": { + | "type" : "record", + | "name" : "my_address", + | "fields" : [ + | {"name": "street", "type": "string"}, + | {"name": "city", "type": "string"} + | ]} + | }, + | {"name" :"doorNum", + | "type" : { + | "type" :"array", + | "items":{ + | "name" :"EachdoorNums", + | "type" : "int", + | "default":-1 + | }} + | }]} + """.stripMargin + + val json = + """ {"name":"bob", "age":10, + |"address" : {"street":"abc", "city":"bang"}, + |"doorNum" : [1,2,3,4]}""".stripMargin + WriteFilesWithAvroWriter(rows, mySchema, json) + } + + private def WriteFilesWithAvroWriter(rows: Int, + mySchema: String, + json: String): Unit = { + // conversion to GenericData.Record + val nn = new avro.Schema.Parser().parse(mySchema) + val converter = new JsonAvroConverter + val record = converter + .convertToGenericDataRecord(json.getBytes(CharEncoding.UTF_8), nn) + + try { + val writer = CarbonWriter.builder + .outputPath(writerPath).isTransactionalTable(false) + .uniqueIdentifier(System.currentTimeMillis()).buildWriterForAvroInput(nn) + var i = 0 + while (i < rows) { + writer.write(record) + i = i + 1 + } + writer.close() + } + catch { + case e: Exception => { + e.printStackTrace() + Assert.fail(e.getMessage) + } + } + } + + def buildAvroTestDataArrayOfStructType(): Any = { + buildAvroTestDataArrayOfStruct(3, null) + } + + def buildAvroTestDataArrayOfStruct(rows: Int, + options: util.Map[String, String]): Any = { + + val mySchema = + """ { + | "name": "address", + | "type": "record", + | "fields": [ + | { + | "name": "name", + | "type": "string" + | }, + | { + | "name": "age", + | "type": "int" + | }, + | { + | "name": "doorNum", + | "type": { + | "type": "array", + | "items": { + | "type": "record", + | "name": "my_address", + | "fields": [ + | { + | "name": "street", + | "type": "string" + | }, + | { + | "name": "city", + | "type": "string" + | } + | ] + | } + | } + | } + | ] + |} """.stripMargin + val json = + """ {"name":"bob","age":10,"doorNum" : + |[{"street":"abc","city":"city1"}, + |{"street":"def","city":"city2"}, + |{"street":"ghi","city":"city3"}, + |{"street":"jkl","city":"city4"}]} """.stripMargin + WriteFilesWithAvroWriter(rows, mySchema, json) + } + + def buildAvroTestDataStructOfArrayType(): Any = { + buildAvroTestDataStructOfArray(3, null) + } + + def buildAvroTestDataStructOfArray(rows: Int, + options: util.Map[String, String]): Any = { + + val mySchema = + """ { + | "name": "address", + | "type": "record", + | "fields": [ + | { + | "name": "name", + | "type": "string" + | }, + | { + | "name": "age", + | "type": "int" + | }, + | { + | "name": "address", + | "type": { + | "type": "record", + | "name": "my_address", + | "fields": [ + | { + | "name": "street", + | "type": "string" + | }, + | { + | "name": "city", + | "type": "string" + | }, + | { + | "name": "doorNum", + | "type": { + | "type": "array", + | "items": { + | "name": "EachdoorNums", + | "type": "int", + | "default": -1 + | } + | } + | } + | ] + | } + | } + | ] + |} """.stripMargin + + val json = + """ { + | "name": "bob", + | "age": 10, + | "address": { + | "street": "abc", + | "city": "bang", + | "doorNum": [ + | 1, + | 2, + | 3, + | 4 + | ] + | } + |} """.stripMargin + WriteFilesWithAvroWriter(rows, mySchema, json) + } + + test("Read sdk writer Avro output Record Type for nontransactional table") { + buildAvroTestDataStructType() + assert(FileFactory.getCarbonFile(writerPath).exists()) + sql("DROP TABLE IF EXISTS sdkTable") + sql( + s"""CREATE EXTERNAL TABLE sdkTable STORED BY 'carbondata' LOCATION + |'$writerPath' """.stripMargin) + + checkAnswer(sql("select * from sdkTable"), Seq( + Row("bob", 10, Row("abc", "bang")), + Row("bob", 10, Row("abc", "bang")), + Row("bob", 10, Row("abc", "bang")))) + + } + + test("Read sdk writer Avro output with both Array and Struct Type for nontransactional table") { + buildAvroTestDataBothStructArrayType() + assert(FileFactory.getCarbonFile(writerPath).exists()) + sql("DROP TABLE IF EXISTS sdkTable") + sql( + s"""CREATE EXTERNAL TABLE sdkTable STORED BY 'carbondata' LOCATION + |'$writerPath' """.stripMargin) + checkAnswer(sql("select * from sdkTable"), Seq( + Row("bob", 10, Row("abc", "bang"), mutable.WrappedArray.newBuilder[Int].+=(1, 2, 3, 4)), + Row("bob", 10, Row("abc", "bang"), mutable.WrappedArray.newBuilder[Int].+=(1, 2, 3, 4)), + Row("bob", 10, Row("abc", "bang"), mutable.WrappedArray.newBuilder[Int].+=(1, 2, 3, 4)))) + } + + test("Read sdk writer Avro output with Array of struct for external table") { + buildAvroTestDataArrayOfStructType() + assert(FileFactory.getCarbonFile(writerPath).exists()) + sql("DROP TABLE IF EXISTS sdkTable") + sql( + s"""CREATE EXTERNAL TABLE sdkTable STORED BY 'carbondata' LOCATION + |'$writerPath' """.stripMargin) + + checkAnswer(sql(s"""select count(*) from sdkTable"""), + Seq(Row(3))) + } + + test("Read sdk writer Avro output with struct of Array for nontransactional table") { + buildAvroTestDataStructOfArrayType() + assert(FileFactory.getCarbonFile(writerPath).exists()) + sql("DROP TABLE IF EXISTS sdkTable") + sql( + s"""CREATE EXTERNAL TABLE sdkTable STORED BY 'carbondata' LOCATION + |'$writerPath' """.stripMargin) + + checkAnswer(sql(s"""select count(*) from sdkTable"""), + Seq(Row(3))) + } +} diff --git a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/suite/SDVSuites.scala b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/suite/SDVSuites.scala index 2f7d98bcfea..c5aceaad4d1 100644 --- a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/suite/SDVSuites.scala +++ b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/suite/SDVSuites.scala @@ -150,6 +150,7 @@ class SDVSuites3 extends Suites with BeforeAndAfterAll { new LuceneTestCase :: new TimeSeriesPreAggregateTestCase :: new TestPartitionWithGlobalSort :: + new SDKwriterTestCase :: new SetParameterTestCase :: new PartitionWithPreAggregateTestCase :: Nil