From 6fb74dc6d3ca931e433401cf382a1b98fc8007d2 Mon Sep 17 00:00:00 2001 From: Indhumathi27 Date: Mon, 3 Sep 2018 10:17:20 +0530 Subject: [PATCH] [CARBONDATA-2876]AVRO datatype support through SDK --- ...sactionalCarbonTableWithAvroDataType.scala | 94 +++++++++++++++++++ .../carbondata/sdk/file/AvroCarbonWriter.java | 24 ++--- 2 files changed, 106 insertions(+), 12 deletions(-) diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithAvroDataType.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithAvroDataType.scala index 7616ea34b3a..29aa2de9709 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithAvroDataType.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithAvroDataType.scala @@ -790,4 +790,98 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef checkExistence(sql("select * from sdkOutputTable"), true, "32.0") } + test("test logical type time-millis") { + sql("drop table if exists sdkOutputTable") + FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(writerPath)) + val schema1 = + """{ + | "namespace": "com.apache.schema", + | "type": "record", + | "name": "StudentActivity", + | "fields": [ + | { + | "name": "id", + | "type": {"type" : "int", "logicalType": "time-millis"} + | }, + | { + | "name": "course_details", + | "type": { + | "name": "course_details", + | "type": "record", + | "fields": [ + | { + | "name": "course_struct_course_time", + | "type": {"type" : "int", "logicalType": "time-millis"} + | } + | ] + | } + | } + | ] + |}""".stripMargin + + val json1 = + """{"id": 172800,"course_details": { "course_struct_course_time":172800}}""".stripMargin + + val nn = new org.apache.avro.Schema.Parser().parse(schema1) + val record = testUtil.jsonToAvro(json1, schema1) + + + val writer = CarbonWriter.builder + .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn) + writer.write(record) + writer.close() + sql( + s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY + |'carbondata' LOCATION + |'$writerPath' """.stripMargin) + checkAnswer(sql("select * from sdkOutputTable"), Seq(Row(172800, Row(172800)))) + } + + test("test logical type time-micros") { + sql("drop table if exists sdkOutputTable") + FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(writerPath)) + val schema1 = + """{ + | "namespace": "com.apache.schema", + | "type": "record", + | "name": "StudentActivity", + | "fields": [ + | { + | "name": "id", + | "type": {"type" : "long", "logicalType": "time-micros"} + | }, + | { + | "name": "course_details", + | "type": { + | "name": "course_details", + | "type": "record", + | "fields": [ + | { + | "name": "course_struct_course_time", + | "type": {"type" : "long", "logicalType": "time-micros"} + | } + | ] + | } + | } + | ] + |}""".stripMargin + + val json1 = + """{"id": 1728000,"course_details": { "course_struct_course_time":1728000}}""".stripMargin + + val nn = new org.apache.avro.Schema.Parser().parse(schema1) + val record = testUtil.jsonToAvro(json1, schema1) + + + val writer = CarbonWriter.builder + .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn) + writer.write(record) + writer.close() + sql( + s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY + |'carbondata' LOCATION + |'$writerPath' """.stripMargin) + checkAnswer(sql("select * from sdkOutputTable"), Seq(Row(1728000, Row(1728000)))) + } + } diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java index 7c1d9a27527..d1e936e97db 100644 --- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java +++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java @@ -445,8 +445,8 @@ private static Field prepareFields(Schema.Field avroField) { if (logicalType instanceof LogicalTypes.Date) { return new Field(fieldName, DataTypes.DATE); } else { - LOGGER.warn("Unsupported logical type. Considering Data Type as INT for " + childSchema - .getName()); + // Avro supports logical types time-millis as type INT, + // which will be mapped to carbon as INT data type return new Field(fieldName, DataTypes.INT); } case LONG: @@ -454,8 +454,8 @@ private static Field prepareFields(Schema.Field avroField) { || logicalType instanceof LogicalTypes.TimestampMicros) { return new Field(fieldName, DataTypes.TIMESTAMP); } else { - LOGGER.warn("Unsupported logical type. Considering Data Type as LONG for " + childSchema - .getName()); + // Avro supports logical types time-micros as type LONG, + // which will be mapped to carbon as LONG data type return new Field(fieldName, DataTypes.LONG); } case DOUBLE: @@ -545,8 +545,8 @@ private static StructField prepareSubFields(String fieldName, Schema childSchema if (logicalType instanceof LogicalTypes.Date) { return new StructField(fieldName, DataTypes.DATE); } else { - LOGGER.warn("Unsupported logical type. Considering Data Type as INT for " + childSchema - .getName()); + // Avro supports logical types time-millis as type INT, + // which will be mapped to carbon as INT data type return new StructField(fieldName, DataTypes.INT); } case LONG: @@ -554,8 +554,8 @@ private static StructField prepareSubFields(String fieldName, Schema childSchema || logicalType instanceof LogicalTypes.TimestampMicros) { return new StructField(fieldName, DataTypes.TIMESTAMP); } else { - LOGGER.warn("Unsupported logical type. Considering Data Type as LONG for " + childSchema - .getName()); + // Avro supports logical types time-micros as type LONG, + // which will be mapped to carbon as LONG data type return new StructField(fieldName, DataTypes.LONG); } case DOUBLE: @@ -641,8 +641,8 @@ private static DataType getMappingDataTypeForCollectionRecord(Schema childSchema if (logicalType instanceof LogicalTypes.Date) { return DataTypes.DATE; } else { - LOGGER.warn("Unsupported logical type. Considering Data Type as INT for " + childSchema - .getName()); + // Avro supports logical types time-millis as type INT, + // which will be mapped to carbon as INT data type return DataTypes.INT; } } else { @@ -654,8 +654,8 @@ private static DataType getMappingDataTypeForCollectionRecord(Schema childSchema || logicalType instanceof LogicalTypes.TimestampMicros) { return DataTypes.TIMESTAMP; } else { - LOGGER.warn("Unsupported logical type. Considering Data Type as LONG for " + childSchema - .getName()); + // Avro supports logical types time-micros as type LONG, + // which will be mapped to carbon as LONG data type return DataTypes.LONG; } } else {