diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java index ac52728f9dc..fc4704edb24 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java @@ -597,6 +597,10 @@ public static String unquoteChar(String parseStr) { */ public static String delimiterConverter(String delimiter) { switch (delimiter) { + case "\\001": + case "\\002": + case "\\003": + case "\\004": case "|": case "*": case ".": diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java index dbd2f0e92e2..16486d0c51b 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java @@ -37,6 +37,7 @@ import org.apache.carbondata.core.util.ObjectSerializationUtil; import org.apache.carbondata.core.util.ThreadLocalSessionInfo; import org.apache.carbondata.hadoop.internal.ObjectArrayWritable; +import org.apache.carbondata.processing.loading.ComplexDelimitersEnum; import org.apache.carbondata.processing.loading.DataLoadExecutor; import org.apache.carbondata.processing.loading.TableProcessingOperations; import org.apache.carbondata.processing.loading.iterator.CarbonOutputIteratorWrapper; @@ -338,11 +339,19 @@ public static CarbonLoadModel getLoadModel(Configuration conf) throws IOExceptio SKIP_EMPTY_LINE, carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_SKIP_EMPTY_LINE))); - String complexDelim = conf.get(COMPLEX_DELIMITERS, "\\\001" + "," + "\\\002"); + String complexDelim = conf.get(COMPLEX_DELIMITERS); + if (null == complexDelim) { + complexDelim = ComplexDelimitersEnum.COMPLEX_DELIMITERS_LEVEL_1.value() + "," + + ComplexDelimitersEnum.COMPLEX_DELIMITERS_LEVEL_2.value() + "," + + ComplexDelimitersEnum.COMPLEX_DELIMITERS_LEVEL_3.value(); + } String[] split = complexDelim.split(","); - model.setComplexDelimiterLevel1(split[0]); - if (split.length > 1) { - model.setComplexDelimiterLevel2(split[1]); + model.setComplexDelimiter(split[0]); + if (split.length > 2) { + model.setComplexDelimiter(split[1]); + model.setComplexDelimiter(split[2]); + } else if (split.length > 1) { + model.setComplexDelimiter(split[1]); } model.setDateFormat( conf.get( diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateDDLForComplexMapType.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateDDLForComplexMapType.scala new file mode 100644 index 00000000000..a00c2ce266a --- /dev/null +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateDDLForComplexMapType.scala @@ -0,0 +1,445 @@ +/* + + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + * + http://www.apache.org/licenses/LICENSE-2.0 + * + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ +package org.apache.carbondata.spark.testsuite.createTable.TestCreateDDLForComplexMapType + +import java.io.{BufferedWriter, File, FileWriter} +import java.util + +import au.com.bytecode.opencsv.CSVWriter +import org.apache.hadoop.conf.Configuration +import org.apache.spark.sql.{AnalysisException, Row} +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.BeforeAndAfterAll + +import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk + +import scala.collection.JavaConversions._ + +class TestCreateDDLForComplexMapType extends QueryTest with BeforeAndAfterAll { + private val conf: Configuration = new Configuration(false) + + val rootPath = new File(this.getClass.getResource("/").getPath + + "../../../..").getCanonicalPath + + val path = s"$rootPath/integration/spark-common-test/src/test/resources/maptest2.csv" + + private def checkForLocalDictionary(dimensionRawColumnChunks: util + .List[DimensionRawColumnChunk]): Boolean = { + var isLocalDictionaryGenerated = false + import scala.collection.JavaConversions._ + isLocalDictionaryGenerated = dimensionRawColumnChunks + .filter(dimensionRawColumnChunk => dimensionRawColumnChunk.getDataChunkV3 + .isSetLocal_dictionary).size > 0 + isLocalDictionaryGenerated + } + + def createCSVFile(): Unit = { + val out = new BufferedWriter(new FileWriter(path)); + val writer = new CSVWriter(out); + + val employee1 = Array("1\u0002Nalla\u00012\u0002Singh\u00011\u0002Gupta\u00014\u0002Kumar") + + val employee2 = Array("10\u0002Nallaa\u000120\u0002Sissngh\u0001100\u0002Gusspta\u000140" + + "\u0002Kumar") + + var listOfRecords = List(employee1, employee2) + + writer.writeAll(listOfRecords) + out.close() + } + + override def beforeAll(): Unit = { + createCSVFile() + sql("DROP TABLE IF EXISTS carbon") + } + + override def afterAll(): Unit = { + new File(path).delete() + } + + test("Single Map One Level") { + sql("DROP TABLE IF EXISTS carbon") + sql( + s""" + | CREATE TABLE carbon( + | mapField map + | ) + | STORED BY 'carbondata' + | """ + .stripMargin) + val desc = sql( + s""" + | Describe Formatted + | carbon + | """.stripMargin).collect() + assert(desc(0).get(1).asInstanceOf[String].trim.equals("map")) + } + + test("Single Map with Two Nested Level") { + sql("DROP TABLE IF EXISTS carbon") + sql( + s""" + | CREATE TABLE carbon( + | mapField map> + | ) + | STORED BY + |'carbondata' + |""" + .stripMargin) + val desc = sql( + s""" + | Describe Formatted + | carbon + | """.stripMargin).collect() + assert(desc(0).get(1).asInstanceOf[String].trim.equals("map>")) + } + + test("Map Type with array type as value") { + sql("DROP TABLE IF EXISTS carbon") + sql( + s""" + | CREATE TABLE carbon( + | mapField map> + | ) + | STORED BY 'carbondata' + | + """ + .stripMargin) + val desc = sql( + s""" + | Describe Formatted + | carbon + | """.stripMargin).collect() + assert(desc(0).get(1).asInstanceOf[String].trim.equals("map>")) + } + + test("Map Type with struct type as value") { + sql("DROP TABLE IF EXISTS carbon") + sql( + s""" + | CREATE TABLE carbon( + | mapField map> + | ) + | STORED BY + | 'carbondata' + | """ + .stripMargin) + val desc = sql( + s""" + | Describe Formatted + | carbon + | """.stripMargin).collect() + assert(desc(0).get(1).asInstanceOf[String].trim + .equals("map>")) + } + + test("Map Type as child to struct type") { + sql("DROP TABLE IF EXISTS carbon") + sql( + s""" + | CREATE TABLE carbon( + | mapField struct> + | ) + | STORED BY + |'carbondata' """ + .stripMargin) + val desc = sql( + s""" + | Describe Formatted + | carbon + | """.stripMargin).collect() + assert(desc(0).get(1).asInstanceOf[String].trim + .equals("struct>")) + } + + test("Map Type as child to array type") { + sql("DROP TABLE IF EXISTS carbon") + sql( + s""" + | CREATE TABLE carbon( + | mapField array> + | ) + | STORED BY 'carbondata' + | """ + .stripMargin) + val desc = sql( + s""" + | Describe Formatted + | carbon """.stripMargin).collect() + assert(desc(0).get(1).asInstanceOf[String].trim.equals("array>")) + sql("insert into carbon values('1\0032\0022\0033\001100\003200\002200\003300')") + sql("select * from carbon").show(false) + } + + test("Test Load data in map") { + sql("DROP TABLE IF EXISTS carbon") + sql( + s""" + | CREATE TABLE carbon( + | mapField map + | ) + | STORED BY 'carbondata' + | """ + .stripMargin) + val desc = sql( + s""" + | Describe Formatted + | carbon + | """.stripMargin).collect() + sql("insert into carbon values('1\002Nalla\0012\002Singh\0013\002Gupta\0014\002Kumar')") + checkAnswer(sql("select * from carbon"), Seq( + Row(Map(1 -> "Nalla", 2 -> "Singh", 3 -> "Gupta", 4 -> "Kumar")))) + } + + test("Test Load data in map with empty value") { + sql("DROP TABLE IF EXISTS carbon") + sql( + s""" + | CREATE TABLE carbon( + | mapField map + | ) + | STORED BY 'carbondata' + | """ + .stripMargin) + val desc = sql( + s""" + | Describe Formatted + | carbon + | """.stripMargin).collect() + sql("insert into carbon values('1\002Nalla\0012\002\0013\002Gupta\0014\002Kumar')") + checkAnswer(sql("select * from carbon"), Seq( + Row(Map(1 -> "Nalla", 2 -> "", 3 -> "Gupta", 4 -> "Kumar")))) + } + + // Support this for Map type + test("Test Load data in map with dictionary include") { + sql("DROP TABLE IF EXISTS carbon") + sql( + s""" + | CREATE TABLE carbon( + | mapField map + | ) + | STORED BY 'carbondata' + | TBLPROPERTIES('DICTIONARY_INCLUDE'='mapField') + | """ + .stripMargin) + sql("insert into carbon values('1\002Nalla\0012\002Singh\0013\002Gupta')") + sql("select * from carbon").show(false) + //checkAnswer(sql("select * from carbon"), Seq( + //Row(Map(1 -> "Nalla", 2 -> "Singh", 3 -> "Gupta", 4 -> "Kumar")))) + } + + test("Test Load data in map with partition columns") { + sql("DROP TABLE IF EXISTS carbon") + val exception = intercept[AnalysisException]( + sql( + s""" + | CREATE TABLE carbon( + | a INT, + | mapField array, + | b STRING + | ) + | PARTITIONED BY (mp map) + | STORED BY 'carbondata' + | """ + .stripMargin) + ) + assertResult("Cannot use map for partition column;")(exception.getMessage()) + } + + test("Test IUD in map columns") { + sql("DROP TABLE IF EXISTS carbon") + sql( + s""" + | CREATE TABLE carbon( + | a INT, + | mapField map + | ) + | STORED BY 'carbondata' + | """ + .stripMargin) + sql("insert into carbon values(1,'1\002Nalla\0012\002Singh\0013\002Gupta\0014\002Kumar')") + sql("insert into carbon values(2,'1\002abc\0012\002xyz\0013\002hello\0014\002mno')") + val exception = intercept[UnsupportedOperationException]( + sql("update carbon set(mapField)=('1,haha') where a=1").show(false)) + assertResult("Unsupported operation on Complex data type")(exception.getMessage()) + sql("delete from carbon where mapField[1]='abc'") + checkAnswer(sql("select * from carbon"), Seq( + Row(1, Map(1 -> "Nalla", 2 -> "Singh", 3 -> "Gupta", 4 -> "Kumar")))) + + } + + test("Test Compaction blocking") { + sql("DROP TABLE IF EXISTS carbon") + + sql( + s""" + | CREATE TABLE carbon( + | a INT, + | mapField array + | ) + | STORED BY 'carbondata' + | """ + .stripMargin) + + val exception = intercept[UnsupportedOperationException]( + sql("ALTER table carbon compact 'minor'") + ) + assertResult("Compaction is unsupported for Table containing Complex Columns")(exception + .getMessage()) + } + + test("Test Load duplicate keys data in map") { + sql("DROP TABLE IF EXISTS carbon") + sql( + s""" + | CREATE TABLE carbon( + | mapField map + | ) + | STORED BY 'carbondata' + | """ + .stripMargin) + val desc = sql( + s""" + | Describe Formatted + | carbon + | """.stripMargin).collect() + sql("insert into carbon values('1\002Nalla\0012\002Singh\0011\002Gupta\0014\002Kumar')") + checkAnswer(sql("select * from carbon"), Seq( + Row(Map(1 -> "Nalla", 2 -> "Singh", 4 -> "Kumar")))) + } + + test("Test Load data in map of map") { + sql("DROP TABLE IF EXISTS carbon") + sql( + s""" + | CREATE TABLE carbon( + | mapField map> + | ) + | STORED BY + |'carbondata' """ + .stripMargin) + sql( + "insert into carbon values('manish\0021\004nalla\0032\004gupta\001kunal\0021\004kapoor\0032" + + "\004sharma')") + checkAnswer(sql("select * from carbon"), Seq( + Row(Map("manish" -> Map(1 -> "nalla", 2 -> "gupta"), + "kunal" -> Map(1 -> "kapoor", 2 -> "sharma"))))) + } + + test("Test Load duplicate keys data in map of map") { + sql("DROP TABLE IF EXISTS carbon") + sql( + s""" + | CREATE TABLE carbon( + | mapField map> + | ) + | STORED BY + |'carbondata' + |""" + .stripMargin) + sql( + "insert into carbon values('manish\0021\004nalla\0031\004gupta\001kunal\0021\004kapoor\0032" + + "\004sharma')") + checkAnswer(sql("select * from carbon"), Seq( + Row(Map("manish" -> Map(1 -> "nalla"), + "kunal" -> Map(1 -> "kapoor", 2 -> "sharma"))))) + } + + test("Test Create table as select with map") { + sql("DROP TABLE IF EXISTS carbon") + sql("DROP TABLE IF EXISTS carbon1") + sql( + s""" + | CREATE TABLE carbon( + | mapField map + | ) + | STORED BY 'carbondata' + | """ + .stripMargin) + sql("insert into carbon values('1\002Nalla\0012\002Singh\0013\002Gupta\0014\002Kumar')") + sql( + s""" + | CREATE TABLE carbon1 + | AS + | Select * + | From carbon + | """ + .stripMargin) + checkAnswer(sql("select * from carbon1"), Seq( + Row(Map(1 -> "Nalla", 2 -> "Singh", 3 -> "Gupta", 4 -> "Kumar")))) + } + + test("Test Create table with double datatype in map") { + sql("DROP TABLE IF EXISTS carbon") + sql( + s""" + | CREATE TABLE carbon( + | mapField map + | ) + | STORED BY 'carbondata' + | """ + .stripMargin) + sql( + "insert into carbon values('1.23\002Nalla\0012.34\002Singh\0013.67676\002Gupta\0013.67676" + + "\002Kumar')") + checkAnswer(sql("select * from carbon"), Seq( + Row(Map(1.23 -> "Nalla", 2.34 -> "Singh", 3.67676 -> "Gupta")))) + } + + test("Load Map data from CSV File") { + sql("DROP TABLE IF EXISTS carbon") + sql( + s""" + | CREATE TABLE carbon( + | mapField map + | ) + | STORED BY 'carbondata' + | """ + .stripMargin) + sql( + s""" + | LOAD DATA LOCAL INPATH '$path' + | INTO TABLE carbon OPTIONS( + | 'header' = 'false') + """.stripMargin) + checkAnswer(sql("select * from carbon"), Seq( + Row(Map(1 -> "Nalla", 2 -> "Singh", 4 -> "Kumar")), + Row(Map(10 -> "Nallaa", 20 -> "Sissngh", 100 -> "Gusspta", 40 -> "Kumar")) + )) + } + + test("Sort Column table property blocking for Map type") { + sql("DROP TABLE IF EXISTS carbon") + val exception1 = intercept[Exception] { + sql( + s""" + | CREATE TABLE carbon( + | mapField map + | ) + | STORED BY 'carbondata' + | TBLPROPERTIES('SORT_COLUMNS'='mapField') + | """ + .stripMargin) + } + assert(exception1.getMessage + .contains( + "sort_columns is unsupported for map datatype column: mapfield")) + } + +} \ No newline at end of file diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/localdictionary/LocalDictionarySupportLoadTableTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/localdictionary/LocalDictionarySupportLoadTableTest.scala index d23c844d8b4..d332a5a6061 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/localdictionary/LocalDictionarySupportLoadTableTest.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/localdictionary/LocalDictionarySupportLoadTableTest.scala @@ -21,6 +21,7 @@ import java.io.{File, PrintWriter} import java.util import java.util.Collections +import org.apache.spark.sql.Row import org.apache.spark.sql.test.util.QueryTest import org.scalatest.{BeforeAndAfterAll, Ignore} @@ -136,6 +137,22 @@ class LocalDictionarySupportLoadTableTest extends QueryTest with BeforeAndAfterA assert(checkForLocalDictionary(getDimRawChunk(2))) } + test("test local dictionary generation for map type") { + sql("drop table if exists local2") + sql( + "CREATE TABLE local2(name map) STORED BY 'carbondata' tblproperties" + + "('local_dictionary_enable'='true','local_dictionary_include'='name')") + sql( + "insert into local2 values('Manish\002Nalla\001Manish\002Gupta\001Shardul\002Singh" + + "\001Vishal\002Kumar')") + checkAnswer(sql("select * from local2"), Seq( + Row(Map("Manish" -> "Nalla", "Shardul" -> "Singh", "Vishal" -> "Kumar")))) + assert(!checkForLocalDictionary(getDimRawChunk(0))) + assert(!checkForLocalDictionary(getDimRawChunk(1))) + assert(checkForLocalDictionary(getDimRawChunk(2))) + assert(checkForLocalDictionary(getDimRawChunk(3))) + } + test("test local dictionary data validation") { sql("drop table if exists local_query_enable") sql("drop table if exists local_query_disable") diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala index 3752eef1a2d..f7249b88213 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala @@ -339,8 +339,9 @@ class NewRddIterator(rddIter: Iterator[Row], private val dateFormatString = CarbonProperties.getInstance().getProperty(CarbonCommonConstants .CARBON_DATE_FORMAT, CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT) private val dateFormat = new SimpleDateFormat(dateFormatString) - private val delimiterLevel1 = carbonLoadModel.getComplexDelimiterLevel1 - private val delimiterLevel2 = carbonLoadModel.getComplexDelimiterLevel2 + private val delimiterLevel1 = carbonLoadModel.getComplexDelimiters.get(0) + private val delimiterLevel2 = carbonLoadModel.getComplexDelimiters.get(1) + private val delimiterLevel3 = carbonLoadModel.getComplexDelimiters.get(2) private val serializationNullFormat = carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 2)(1) import scala.collection.JavaConverters._ @@ -388,8 +389,9 @@ class LazyRddIterator(serializer: SerializerInstance, .getProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT) private val dateFormat = new SimpleDateFormat(dateFormatString) - private val delimiterLevel1 = carbonLoadModel.getComplexDelimiterLevel1 - private val delimiterLevel2 = carbonLoadModel.getComplexDelimiterLevel2 + private val delimiterLevel1 = carbonLoadModel.getComplexDelimiters.get(0) + private val delimiterLevel2 = carbonLoadModel.getComplexDelimiters.get(1) + private val delimiterLevel3 = carbonLoadModel.getComplexDelimiters.get(2) private val serializationNullFormat = carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 2)(1) // the order of fields in dataframe and createTable may be different, here we need to know whether diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala index 71447e93d37..ca9b4afded4 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala @@ -643,6 +643,7 @@ object CarbonScalaUtil { !x.dataType.get.equalsIgnoreCase("STRING") && !x.dataType.get.equalsIgnoreCase("VARCHAR") && !x.dataType.get.equalsIgnoreCase("STRUCT") && + !x.dataType.get.equalsIgnoreCase("MAP") && !x.dataType.get.equalsIgnoreCase("ARRAY"))) { val errormsg = "LOCAL_DICTIONARY_INCLUDE/LOCAL_DICTIONARY_EXCLUDE column: " + dictColm.trim + diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala index 3ac2d2b2ea0..c9715734258 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala @@ -114,6 +114,7 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser { protected val MULTILINE = carbonKeyWord("MULTILINE") protected val COMPLEX_DELIMITER_LEVEL_1 = carbonKeyWord("COMPLEX_DELIMITER_LEVEL_1") protected val COMPLEX_DELIMITER_LEVEL_2 = carbonKeyWord("COMPLEX_DELIMITER_LEVEL_2") + protected val COMPLEX_DELIMITER_LEVEL_3 = carbonKeyWord("COMPLEX_DELIMITER_LEVEL_3") protected val OPTIONS = carbonKeyWord("OPTIONS") protected val OUTPATH = carbonKeyWord("OUTPATH") protected val OVERWRITE = carbonKeyWord("OVERWRITE") @@ -915,7 +916,7 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser { * @param dimensionDatatype */ def isDetectAsDimentionDatatype(dimensionDatatype: String): Boolean = { - val dimensionType = Array("string", "array", "struct", "timestamp", "date", "char") + val dimensionType = Array("string", "array", "struct", "map", "timestamp", "date", "char") dimensionType.exists(x => dimensionDatatype.toLowerCase.contains(x)) } @@ -1070,13 +1071,32 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser { // validate with all supported options val options = optionList.get.groupBy(x => x._1) - val supportedOptions = Seq("DELIMITER", "QUOTECHAR", "FILEHEADER", "ESCAPECHAR", "MULTILINE", - "COMPLEX_DELIMITER_LEVEL_1", "COMPLEX_DELIMITER_LEVEL_2", "COLUMNDICT", - "SERIALIZATION_NULL_FORMAT", "BAD_RECORDS_LOGGER_ENABLE", "BAD_RECORDS_ACTION", - "ALL_DICTIONARY_PATH", "MAXCOLUMNS", "COMMENTCHAR", "DATEFORMAT", "BAD_RECORD_PATH", - "BATCH_SORT_SIZE_INMB", "GLOBAL_SORT_PARTITIONS", "SINGLE_PASS", - "IS_EMPTY_DATA_BAD_RECORD", "HEADER", "TIMESTAMPFORMAT", "SKIP_EMPTY_LINE", - "SORT_COLUMN_BOUNDS", "LOAD_MIN_SIZE_INMB" + val supportedOptions = Seq("DELIMITER", + "QUOTECHAR", + "FILEHEADER", + "ESCAPECHAR", + "MULTILINE", + "COMPLEX_DELIMITER_LEVEL_1", + "COMPLEX_DELIMITER_LEVEL_2", + "COMPLEX_DELIMITER_LEVEL_3", + "COLUMNDICT", + "SERIALIZATION_NULL_FORMAT", + "BAD_RECORDS_LOGGER_ENABLE", + "BAD_RECORDS_ACTION", + "ALL_DICTIONARY_PATH", + "MAXCOLUMNS", + "COMMENTCHAR", + "DATEFORMAT", + "BAD_RECORD_PATH", + "BATCH_SORT_SIZE_INMB", + "GLOBAL_SORT_PARTITIONS", + "SINGLE_PASS", + "IS_EMPTY_DATA_BAD_RECORD", + "HEADER", + "TIMESTAMPFORMAT", + "SKIP_EMPTY_LINE", + "SORT_COLUMN_BOUNDS", + "LOAD_MIN_SIZE_INMB" ) var isSupported = true val invalidOptions = StringBuilder.newBuilder @@ -1291,13 +1311,16 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser { Field("unknown", Some("struct"), Some("unknown"), Some(e1)) } + // Map is represented as Map> protected lazy val mapFieldType: Parser[Field] = (MAP ^^^ "map") ~> "<" ~> primitiveFieldType ~ ("," ~> nestedType) <~ ">" ^^ { case key ~ value => Field("unknown", Some("map"), Some("unknown"), Some(List( - Field("key", key.dataType, Some("key"), key.children), - Field("value", value.dataType, Some("value"), value.children)))) + Field("val", Some("struct"), Some("unknown"), + Some(List( + Field("key", key.dataType, Some("key"), key.children), + Field("value", value.dataType, Some("value"), value.children))))))) } protected lazy val measureCol: Parser[Field] = diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala index 3d8170e4ca7..184cc1d77a4 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution.streaming +import java.util import java.util.Date import scala.collection.JavaConverters._ @@ -77,10 +78,10 @@ class CarbonAppendableStreamSink( conf.set(entry._1, entry._2) } // properties below will be used for default CarbonStreamParser - conf.set("carbon_complex_delimiter_level_1", - carbonLoadModel.getComplexDelimiterLevel1) - conf.set("carbon_complex_delimiter_level_2", - carbonLoadModel.getComplexDelimiterLevel2) + val complexDelimiters = carbonLoadModel.getComplexDelimiters + conf.set("carbon_complex_delimiter_level_1", complexDelimiters.get(0)) + conf.set("carbon_complex_delimiter_level_2", complexDelimiters.get(1)) + conf.set("carbon_complex_delimiter_level_3", complexDelimiters.get(2)) conf.set( DataLoadProcessorConstants.SERIALIZATION_NULL_FORMAT, carbonLoadModel.getSerializationNullFormat().split(",")(1)) diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala index 7e2e7d96c6f..50b34444944 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala @@ -988,8 +988,8 @@ object CarbonDataRDDFactory { // generate RDD[(K, V)] to use the partitionBy method of PairRDDFunctions val inputRDD: RDD[(String, Row)] = if (dataFrame.isDefined) { // input data from DataFrame - val delimiterLevel1 = carbonLoadModel.getComplexDelimiterLevel1 - val delimiterLevel2 = carbonLoadModel.getComplexDelimiterLevel2 + val delimiterLevel1 = carbonLoadModel.getComplexDelimiters.get(0) + val delimiterLevel2 = carbonLoadModel.getComplexDelimiters.get(1) val serializationNullFormat = carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 2)(1) dataFrame.get.rdd.map { row => diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala index af406bb7b3a..38d2f721b36 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala @@ -69,7 +69,7 @@ import org.apache.carbondata.core.util._ import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.events.{BuildDataMapPostExecutionEvent, BuildDataMapPreExecutionEvent, OperationContext, OperationListenerBus} import org.apache.carbondata.events.exception.PreEventException -import org.apache.carbondata.processing.loading.TableProcessingOperations +import org.apache.carbondata.processing.loading.{ComplexDelimitersEnum, TableProcessingOperations} import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataEvent, LoadTablePostExecutionEvent, LoadTablePreExecutionEvent} import org.apache.carbondata.processing.loading.exception.NoRetryException import org.apache.carbondata.processing.loading.model.{CarbonLoadModelBuilder, LoadOption} @@ -188,11 +188,13 @@ case class CarbonLoadDataCommand( val carbonLoadModel = new CarbonLoadModel() val tableProperties = table.getTableInfo.getFactTable.getTableProperties val optionsFinal = LoadOption.fillOptionWithDefaultValue(options.asJava) + optionsFinal + .put("complex_delimiter_level_4", + ComplexDelimitersEnum.COMPLEX_DELIMITERS_LEVEL_4.value()) optionsFinal.put("sort_scope", tableProperties.asScala.getOrElse("sort_scope", carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE, carbonProperty.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE, CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT)))) - optionsFinal .put("bad_record_path", CarbonBadRecordUtil.getBadRecordsPath(options.asJava, table)) val factPath = if (dataFrame.isDefined) { diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala index ca39931793c..713561b35e2 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala @@ -128,7 +128,7 @@ case class CarbonCreateTableCommand( if (partitionInfo != null && partitionInfo.getPartitionType == PartitionType.NATIVE_HIVE) { // Restrict dictionary encoding on partition columns. - // TODO Need to decide wherher it is required + // TODO Need to decide whether it is required val dictionaryOnPartitionColumn = partitionInfo.getColumnSchemaList.asScala.exists{p => p.hasEncoding(Encoding.DICTIONARY) && !p.hasEncoding(Encoding.DIRECT_DICTIONARY) diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala index 08c149b2191..ed5486b8510 100644 --- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala +++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala @@ -55,8 +55,8 @@ class AllDictionaryTestCase extends Spark2QueryTest with BeforeAndAfterAll { carbonLoadModel.setFactFilePath(filePath) carbonLoadModel.setCsvHeader(header) carbonLoadModel.setCsvDelimiter(",") - carbonLoadModel.setComplexDelimiterLevel1("$") - carbonLoadModel.setComplexDelimiterLevel2(":") + carbonLoadModel.setComplexDelimiter("$") + carbonLoadModel.setComplexDelimiter(":") carbonLoadModel.setAllDictPath(allDictFilePath) carbonLoadModel.setSerializationNullFormat( TableOptionConstant.SERIALIZATION_NULL_FORMAT.getName + ",\\N") diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala index 060afcacf9b..69248d6aa80 100644 --- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala +++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala @@ -165,8 +165,8 @@ class ExternalColumnDictionaryTestCase extends Spark2QueryTest with BeforeAndAft carbonLoadModel.setFactFilePath(filePath) carbonLoadModel.setCsvHeader(header) carbonLoadModel.setCsvDelimiter(csvDelimiter) - carbonLoadModel.setComplexDelimiterLevel1("$") - carbonLoadModel.setComplexDelimiterLevel2(":") + carbonLoadModel.setComplexDelimiter("$") + carbonLoadModel.setComplexDelimiter(":") carbonLoadModel.setColDictFilePath(extColFilePath) carbonLoadModel.setQuoteChar("\""); carbonLoadModel.setSerializationNullFormat( diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala index f00da2d22e0..21cad071a09 100644 --- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala +++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala @@ -42,7 +42,6 @@ case class StreamData(id: Integer, name: String, city: String, salary: java.lang register: String, updated: String, file: FileElement) -@Ignore class TestStreamingTableWithRowParser extends QueryTest with BeforeAndAfterAll { private val spark = sqlContext.sparkSession @@ -420,266 +419,264 @@ class TestStreamingTableWithRowParser extends QueryTest with BeforeAndAfterAll { continueSeconds = 20, generateBadRecords = true, badRecordAction = "force", - autoHandoff = true + autoHandoff = false ) - - // non-filter -// val result = sql("select * from streaming1.stream_table_filter_complex order by id, name").collect() -// assert(result != null) -// assert(result.length == 55) -// // check one row of streaming data -// assert(result(0).isNullAt(0)) -// assert(result(0).getString(1) == "") -// assert(result(0).getStruct(9).isNullAt(1)) -// // check one row of batch loading -// assert(result(50).getInt(0) == 100000001) -// assert(result(50).getString(1) == "batch_1") -// assert(result(50).getStruct(9).getInt(1) == 20) + val result = sql("select * from streaming1.stream_table_filter_complex order by id, name").collect() + assert(result != null) + assert(result.length == 55) + // check one row of streaming data + assert(result(0).isNullAt(0)) + assert(result(0).getString(1) == "") + assert(result(0).getStruct(9).isNullAt(1)) + // check one row of batch loading + assert(result(50).getInt(0) == 100000001) + assert(result(50).getString(1) == "batch_1") + assert(result(50).getStruct(9).getInt(1) == 20) sql("select * from streaming1.stream_table_filter_complex where id = 1").show // filter -// checkAnswer( -// sql("select * from stream_table_filter_complex where id = 1"), -// Seq(Row(1, "name_1", "city_1", 10000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_1", "school_11")), 1)))) -// -// checkAnswer( -// sql("select * from stream_table_filter_complex where id > 49 and id < 100000002"), -// Seq(Row(50, "name_50", "city_50", 500000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_50", "school_5050")), 50)), -// Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_1", "school_11")), 20)))) -// -// checkAnswer( -// sql("select * from stream_table_filter_complex where id between 50 and 100000001"), -// Seq(Row(50, "name_50", "city_50", 500000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_50", "school_5050")), 50)), -// Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_1", "school_11")), 20)))) -// -// checkAnswer( -// sql("select * from stream_table_filter_complex where name = 'name_3'"), -// Seq(Row(3, "name_3", "city_3", 30000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_3", "school_33")), 3)))) -// -// checkAnswer( -// sql("select * from stream_table_filter_complex where name like '%me_3%' and id < 30"), -// Seq(Row(3, "name_3", "city_3", 30000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_3", "school_33")), 3)))) -// -// checkAnswer(sql("select count(*) from stream_table_filter_complex where name like '%ame%'"), -// Seq(Row(49))) -// -// checkAnswer(sql("select count(*) from stream_table_filter_complex where name like '%batch%'"), -// Seq(Row(5))) -// -// checkAnswer( -// sql("select * from stream_table_filter_complex where name >= 'name_3' and id < 4"), -// Seq(Row(3, "name_3", "city_3", 30000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_3", "school_33")), 3)))) -// -// checkAnswer( -// sql("select * from stream_table_filter_complex where city = 'city_1'"), -// Seq(Row(1, "name_1", "city_1", 10000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_1", "school_11")), 1)), -// Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_1", "school_11")), 20)))) -// -// checkAnswer( -// sql("select * from stream_table_filter_complex where city like '%ty_1%' and ( id < 10 or id >= 100000001)"), -// Seq(Row(1, "name_1", "city_1", 10000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_1", "school_11")), 1)), -// Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_1", "school_11")), 20)))) -// -// checkAnswer(sql("select count(*) from stream_table_filter_complex where city like '%city%'"), -// Seq(Row(54))) -// -// checkAnswer( -// sql("select * from stream_table_filter_complex where city > 'city_09' and city < 'city_10'"), -// Seq(Row(1, "name_1", "city_1", 10000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_1", "school_11")), 1)), -// Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_1", "school_11")), 20)))) -// -// checkAnswer( -// sql("select * from stream_table_filter_complex where city between 'city_09' and 'city_1'"), -// Seq(Row(1, "name_1", "city_1", 10000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_1", "school_11")), 1)), -// Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_1", "school_11")), 20)))) -// -// checkAnswer( -// sql("select * from stream_table_filter_complex where salary = 90000"), -// Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9)))) -// -// checkAnswer( -// sql("select * from stream_table_filter_complex where salary > 80000 and salary <= 100000"), -// Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9)), -// Row(10, "name_10", "city_10", 100000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_10", "school_1010")), 10)))) -// -// checkAnswer( -// sql("select * from stream_table_filter_complex where salary between 80001 and 90000"), -// Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9)))) -// -// checkAnswer( -// sql("select * from stream_table_filter_complex where tax = 0.04 and id < 100"), -// Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9)))) -// -// checkAnswer( -// sql("select * from stream_table_filter_complex where tax >= 0.04 and id < 100"), -// Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9)))) -// -// checkAnswer( -// sql("select * from stream_table_filter_complex where tax < 0.05 and tax > 0.02 and id < 100"), -// Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9)))) -// -// checkAnswer( -// sql("select * from stream_table_filter_complex where tax between 0.02 and 0.04 and id < 100"), -// Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9)))) -// -// checkAnswer( -// sql("select * from stream_table_filter_complex where percent = 80.04 and id < 100"), -// Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9)))) -// -// checkAnswer( -// sql("select * from stream_table_filter_complex where percent >= 80.04 and id < 100"), -// Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9)))) -// -// checkAnswer( -// sql("select * from stream_table_filter_complex where percent < 80.05 and percent > 80.02 and id < 100"), -// Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9)))) -// -// checkAnswer( -// sql("select * from stream_table_filter_complex where percent between 80.02 and 80.05 and id < 100"), -// Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9)))) -// -// checkAnswer( -// sql("select * from stream_table_filter_complex where birthday between '1990-01-04' and '1990-01-05'"), -// Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9)), -// Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_4", "school_44")), 50)), -// Row(100000005, "batch_5", "city_5", 0.5, BigDecimal.valueOf(0.05), 80.05, Date.valueOf("1990-01-05"), Timestamp.valueOf("2010-01-05 10:01:01.0"), Timestamp.valueOf("2010-01-05 10:01:01.0"), Row(wrap(Array("school_5", "school_55")), 60)))) -// -// checkAnswer( -// sql("select * from stream_table_filter_complex where birthday = '1990-01-04'"), -// Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9)), -// Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_4", "school_44")), 50)))) -// -// checkAnswer( -// sql("select * from stream_table_filter_complex where birthday > '1990-01-03' and birthday <= '1990-01-04'"), -// Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9)), -// Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_4", "school_44")), 50)))) -// -// checkAnswer( -// sql("select * from stream_table_filter_complex where birthday between '1990-01-04' and '1990-01-05'"), -// Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9)), -// Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_4", "school_44")), 50)), -// Row(100000005, "batch_5", "city_5", 0.5, BigDecimal.valueOf(0.05), 80.05, Date.valueOf("1990-01-05"), Timestamp.valueOf("2010-01-05 10:01:01.0"), Timestamp.valueOf("2010-01-05 10:01:01.0"), Row(wrap(Array("school_5", "school_55")), 60)))) -// -// checkAnswer( -// sql("select * from stream_table_filter_complex where register = '2010-01-04 10:01:01'"), -// Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9)), -// Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_4", "school_44")), 50)))) -// -// checkAnswer( -// sql("select * from stream_table_filter_complex where register > '2010-01-03 10:01:01' and register <= '2010-01-04 10:01:01'"), -// Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9)), -// Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_4", "school_44")), 50)))) -// -// checkAnswer( -// sql("select * from stream_table_filter_complex where register between '2010-01-04 10:01:01' and '2010-01-05 10:01:01'"), -// Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9)), -// Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_4", "school_44")),50)), -// Row(100000005, "batch_5", "city_5", 0.5, BigDecimal.valueOf(0.05), 80.05, Date.valueOf("1990-01-05"), Timestamp.valueOf("2010-01-05 10:01:01.0"), Timestamp.valueOf("2010-01-05 10:01:01.0"), Row(wrap(Array("school_5", "school_55")), 60)))) -// -// checkAnswer( -// sql("select * from stream_table_filter_complex where updated = '2010-01-04 10:01:01'"), -// Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9)), -// Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_4", "school_44")), 50)))) -// -// checkAnswer( -// sql("select * from stream_table_filter_complex where updated > '2010-01-03 10:01:01' and register <= '2010-01-04 10:01:01'"), -// Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9)), -// Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_4", "school_44")), 50)))) -// -// checkAnswer( -// sql("select * from stream_table_filter_complex where updated between '2010-01-04 10:01:01' and '2010-01-05 10:01:01'"), -// Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9)), -// Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_4", "school_44")), 50)), -// Row(100000005, "batch_5", "city_5", 0.5, BigDecimal.valueOf(0.05), 80.05, Date.valueOf("1990-01-05"), Timestamp.valueOf("2010-01-05 10:01:01.0"), Timestamp.valueOf("2010-01-05 10:01:01.0"), Row(wrap(Array("school_5", "school_55")), 60)))) -// -// checkAnswer( -// sql("select * from stream_table_filter_complex where id is null order by name"), -// Seq(Row(null, "", "", null, null, null, null, null, null, Row(wrap(Array(null)), null)), -// Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), null, Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_6", "school_66")), 6)))) -// -// checkAnswer( -// sql("select * from stream_table_filter_complex where name = ''"), -// Seq(Row(null, "", "", null, null, null, null, null, null, Row(wrap(Array(null)), null)))) -// -// checkAnswer( -// sql("select * from stream_table_filter_complex where id is null and name <> ''"), -// Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), null, Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_6", "school_66")), 6)))) -// -// checkAnswer( -// sql("select * from stream_table_filter_complex where city = ''"), -// Seq(Row(null, "", "", null, null, null, null, null, null, Row(wrap(Array(null)), null)))) -// -// checkAnswer( -// sql("select * from stream_table_filter_complex where id is null and city <> ''"), -// Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), null, Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_6", "school_66")), 6)))) -// -// checkAnswer( -// sql("select * from stream_table_filter_complex where salary is null"), -// Seq(Row(null, "", "", null, null, null, null, null, null, Row(wrap(Array(null)), null)))) -// -// checkAnswer( -// sql("select * from stream_table_filter_complex where id is null and salary is not null"), -// Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), null, Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_6", "school_66")), 6)))) -// -// checkAnswer( -// sql("select * from stream_table_filter_complex where tax is null"), -// Seq(Row(null, "", "", null, null, null, null, null, null, Row(wrap(Array(null)), null)))) -// -// checkAnswer( -// sql("select * from stream_table_filter_complex where id is null and tax is not null"), -// Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), null, Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_6", "school_66")), 6)))) -// -// checkAnswer( -// sql("select * from stream_table_filter_complex where percent is null"), -// Seq(Row(null, "", "", null, null, null, null, null, null, Row(wrap(Array(null)), null)))) -// -// checkAnswer( -// sql("select * from stream_table_filter_complex where id is null and salary is not null"), -// Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), null, Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_6", "school_66")), 6)))) -// -// checkAnswer( -// sql("select * from stream_table_filter_complex where birthday is null"), -// Seq(Row(null, "", "", null, null, null, null, null, null, Row(wrap(Array(null)), null)))) -// -// checkAnswer( -// sql("select * from stream_table_filter_complex where id is null and birthday is not null"), -// Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), null, Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_6", "school_66")), 6)))) -// -// checkAnswer( -// sql("select * from stream_table_filter_complex where register is null"), -// Seq(Row(null, "", "", null, null, null, null, null, null, Row(wrap(Array(null)), null)), -// Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), null, Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_6", "school_66")), 6)))) -// -// checkAnswer( -// sql("select * from stream_table_filter_complex where id is null and register is not null"), -// Seq()) -// -// checkAnswer( -// sql("select * from stream_table_filter_complex where updated is null"), -// Seq(Row(null, "", "", null, null, null, null, null, null, Row(wrap(Array(null)), null)))) -// -// checkAnswer( -// sql("select * from stream_table_filter_complex where id is null and updated is not null"), -// Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), null, Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_6", "school_66")), 6)))) -// -// // agg -// checkAnswer( -// sql("select count(*), max(id), min(name), cast(avg(file.age) as integer), sum(file.age) " + -// "from stream_table_filter_complex where id >= 2 and id <= 100000004"), -// Seq(Row(51, 100000004, "batch_1", 27, 1406))) -// -// checkAnswer( -// sql("select city, count(id), sum(id), cast(avg(file.age) as integer), " + -// "max(salary), min(salary) " + -// "from stream_table_filter_complex " + -// "where name in ('batch_1', 'batch_2', 'batch_3', 'name_1', 'name_2', 'name_3') " + -// "and city <> '' " + -// "group by city " + -// "order by city"), -// Seq(Row("city_1", 2, 100000002, 10, 10000.0, 0.1), -// Row("city_2", 1, 100000002, 30, 0.2, 0.2), -// Row("city_3", 2, 100000006, 21, 30000.0, 0.3))) + checkAnswer( + sql("select * from stream_table_filter_complex where id = 1"), + Seq(Row(1, "name_1", "city_1", 10000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_1", "school_11")), 1)))) + + checkAnswer( + sql("select * from stream_table_filter_complex where id > 49 and id < 100000002"), + Seq(Row(50, "name_50", "city_50", 500000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_50", "school_5050")), 50)), + Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_1", "school_11")), 20)))) + + checkAnswer( + sql("select * from stream_table_filter_complex where id between 50 and 100000001"), + Seq(Row(50, "name_50", "city_50", 500000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_50", "school_5050")), 50)), + Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_1", "school_11")), 20)))) + + checkAnswer( + sql("select * from stream_table_filter_complex where name = 'name_3'"), + Seq(Row(3, "name_3", "city_3", 30000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_3", "school_33")), 3)))) + + checkAnswer( + sql("select * from stream_table_filter_complex where name like '%me_3%' and id < 30"), + Seq(Row(3, "name_3", "city_3", 30000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_3", "school_33")), 3)))) + + checkAnswer(sql("select count(*) from stream_table_filter_complex where name like '%ame%'"), + Seq(Row(49))) + + checkAnswer(sql("select count(*) from stream_table_filter_complex where name like '%batch%'"), + Seq(Row(5))) + + checkAnswer( + sql("select * from stream_table_filter_complex where name >= 'name_3' and id < 4"), + Seq(Row(3, "name_3", "city_3", 30000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_3", "school_33")), 3)))) + + checkAnswer( + sql("select * from stream_table_filter_complex where city = 'city_1'"), + Seq(Row(1, "name_1", "city_1", 10000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_1", "school_11")), 1)), + Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_1", "school_11")), 20)))) + + checkAnswer( + sql("select * from stream_table_filter_complex where city like '%ty_1%' and ( id < 10 or id >= 100000001)"), + Seq(Row(1, "name_1", "city_1", 10000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_1", "school_11")), 1)), + Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_1", "school_11")), 20)))) + + checkAnswer(sql("select count(*) from stream_table_filter_complex where city like '%city%'"), + Seq(Row(54))) + + checkAnswer( + sql("select * from stream_table_filter_complex where city > 'city_09' and city < 'city_10'"), + Seq(Row(1, "name_1", "city_1", 10000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_1", "school_11")), 1)), + Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_1", "school_11")), 20)))) + + checkAnswer( + sql("select * from stream_table_filter_complex where city between 'city_09' and 'city_1'"), + Seq(Row(1, "name_1", "city_1", 10000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_1", "school_11")), 1)), + Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_1", "school_11")), 20)))) + + checkAnswer( + sql("select * from stream_table_filter_complex where salary = 90000"), + Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9)))) + + checkAnswer( + sql("select * from stream_table_filter_complex where salary > 80000 and salary <= 100000"), + Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9)), + Row(10, "name_10", "city_10", 100000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_10", "school_1010")), 10)))) + + checkAnswer( + sql("select * from stream_table_filter_complex where salary between 80001 and 90000"), + Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9)))) + + checkAnswer( + sql("select * from stream_table_filter_complex where tax = 0.04 and id < 100"), + Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9)))) + + checkAnswer( + sql("select * from stream_table_filter_complex where tax >= 0.04 and id < 100"), + Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9)))) + + checkAnswer( + sql("select * from stream_table_filter_complex where tax < 0.05 and tax > 0.02 and id < 100"), + Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9)))) + + checkAnswer( + sql("select * from stream_table_filter_complex where tax between 0.02 and 0.04 and id < 100"), + Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9)))) + + checkAnswer( + sql("select * from stream_table_filter_complex where percent = 80.04 and id < 100"), + Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9)))) + + checkAnswer( + sql("select * from stream_table_filter_complex where percent >= 80.04 and id < 100"), + Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9)))) + + checkAnswer( + sql("select * from stream_table_filter_complex where percent < 80.05 and percent > 80.02 and id < 100"), + Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9)))) + + checkAnswer( + sql("select * from stream_table_filter_complex where percent between 80.02 and 80.05 and id < 100"), + Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9)))) + + checkAnswer( + sql("select * from stream_table_filter_complex where birthday between '1990-01-04' and '1990-01-05'"), + Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9)), + Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_4", "school_44")), 50)), + Row(100000005, "batch_5", "city_5", 0.5, BigDecimal.valueOf(0.05), 80.05, Date.valueOf("1990-01-05"), Timestamp.valueOf("2010-01-05 10:01:01.0"), Timestamp.valueOf("2010-01-05 10:01:01.0"), Row(wrap(Array("school_5", "school_55")), 60)))) + + checkAnswer( + sql("select * from stream_table_filter_complex where birthday = '1990-01-04'"), + Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9)), + Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_4", "school_44")), 50)))) + + checkAnswer( + sql("select * from stream_table_filter_complex where birthday > '1990-01-03' and birthday <= '1990-01-04'"), + Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9)), + Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_4", "school_44")), 50)))) + + checkAnswer( + sql("select * from stream_table_filter_complex where birthday between '1990-01-04' and '1990-01-05'"), + Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9)), + Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_4", "school_44")), 50)), + Row(100000005, "batch_5", "city_5", 0.5, BigDecimal.valueOf(0.05), 80.05, Date.valueOf("1990-01-05"), Timestamp.valueOf("2010-01-05 10:01:01.0"), Timestamp.valueOf("2010-01-05 10:01:01.0"), Row(wrap(Array("school_5", "school_55")), 60)))) + + checkAnswer( + sql("select * from stream_table_filter_complex where register = '2010-01-04 10:01:01'"), + Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9)), + Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_4", "school_44")), 50)))) + + checkAnswer( + sql("select * from stream_table_filter_complex where register > '2010-01-03 10:01:01' and register <= '2010-01-04 10:01:01'"), + Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9)), + Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_4", "school_44")), 50)))) + + checkAnswer( + sql("select * from stream_table_filter_complex where register between '2010-01-04 10:01:01' and '2010-01-05 10:01:01'"), + Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9)), + Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_4", "school_44")),50)), + Row(100000005, "batch_5", "city_5", 0.5, BigDecimal.valueOf(0.05), 80.05, Date.valueOf("1990-01-05"), Timestamp.valueOf("2010-01-05 10:01:01.0"), Timestamp.valueOf("2010-01-05 10:01:01.0"), Row(wrap(Array("school_5", "school_55")), 60)))) + + checkAnswer( + sql("select * from stream_table_filter_complex where updated = '2010-01-04 10:01:01'"), + Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9)), + Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_4", "school_44")), 50)))) + + checkAnswer( + sql("select * from stream_table_filter_complex where updated > '2010-01-03 10:01:01' and register <= '2010-01-04 10:01:01'"), + Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9)), + Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_4", "school_44")), 50)))) + + checkAnswer( + sql("select * from stream_table_filter_complex where updated between '2010-01-04 10:01:01' and '2010-01-05 10:01:01'"), + Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9)), + Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_4", "school_44")), 50)), + Row(100000005, "batch_5", "city_5", 0.5, BigDecimal.valueOf(0.05), 80.05, Date.valueOf("1990-01-05"), Timestamp.valueOf("2010-01-05 10:01:01.0"), Timestamp.valueOf("2010-01-05 10:01:01.0"), Row(wrap(Array("school_5", "school_55")), 60)))) + + checkAnswer( + sql("select * from stream_table_filter_complex where id is null order by name"), + Seq(Row(null, "", "", null, null, null, null, null, null, Row(wrap(Array(null)), null)), + Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), null, Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_6", "school_66")), 6)))) + + checkAnswer( + sql("select * from stream_table_filter_complex where name = ''"), + Seq(Row(null, "", "", null, null, null, null, null, null, Row(wrap(Array(null)), null)))) + + checkAnswer( + sql("select * from stream_table_filter_complex where id is null and name <> ''"), + Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), null, Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_6", "school_66")), 6)))) + + checkAnswer( + sql("select * from stream_table_filter_complex where city = ''"), + Seq(Row(null, "", "", null, null, null, null, null, null, Row(wrap(Array(null)), null)))) + + checkAnswer( + sql("select * from stream_table_filter_complex where id is null and city <> ''"), + Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), null, Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_6", "school_66")), 6)))) + + checkAnswer( + sql("select * from stream_table_filter_complex where salary is null"), + Seq(Row(null, "", "", null, null, null, null, null, null, Row(wrap(Array(null)), null)))) + + checkAnswer( + sql("select * from stream_table_filter_complex where id is null and salary is not null"), + Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), null, Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_6", "school_66")), 6)))) + + checkAnswer( + sql("select * from stream_table_filter_complex where tax is null"), + Seq(Row(null, "", "", null, null, null, null, null, null, Row(wrap(Array(null)), null)))) + + checkAnswer( + sql("select * from stream_table_filter_complex where id is null and tax is not null"), + Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), null, Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_6", "school_66")), 6)))) + + checkAnswer( + sql("select * from stream_table_filter_complex where percent is null"), + Seq(Row(null, "", "", null, null, null, null, null, null, Row(wrap(Array(null)), null)))) + + checkAnswer( + sql("select * from stream_table_filter_complex where id is null and salary is not null"), + Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), null, Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_6", "school_66")), 6)))) + + checkAnswer( + sql("select * from stream_table_filter_complex where birthday is null"), + Seq(Row(null, "", "", null, null, null, null, null, null, Row(wrap(Array(null)), null)))) + + checkAnswer( + sql("select * from stream_table_filter_complex where id is null and birthday is not null"), + Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), null, Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_6", "school_66")), 6)))) + + checkAnswer( + sql("select * from stream_table_filter_complex where register is null"), + Seq(Row(null, "", "", null, null, null, null, null, null, Row(wrap(Array(null)), null)), + Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), null, Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_6", "school_66")), 6)))) + + checkAnswer( + sql("select * from stream_table_filter_complex where id is null and register is not null"), + Seq()) + + checkAnswer( + sql("select * from stream_table_filter_complex where updated is null"), + Seq(Row(null, "", "", null, null, null, null, null, null, Row(wrap(Array(null)), null)))) + + checkAnswer( + sql("select * from stream_table_filter_complex where id is null and updated is not null"), + Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), null, Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_6", "school_66")), 6)))) + + // agg + checkAnswer( + sql("select count(*), max(id), min(name), cast(avg(file.age) as integer), sum(file.age) " + + "from stream_table_filter_complex where id >= 2 and id <= 100000004"), + Seq(Row(51, 100000004, "batch_1", 27, 1406))) + + checkAnswer( + sql("select city, count(id), sum(id), cast(avg(file.age) as integer), " + + "max(salary), min(salary) " + + "from stream_table_filter_complex " + + "where name in ('batch_1', 'batch_2', 'batch_3', 'name_1', 'name_2', 'name_3') " + + "and city <> '' " + + "group by city " + + "order by city"), + Seq(Row("city_1", 2, 100000002, 10, 10000.0, 0.1), + Row("city_2", 1, 100000002, 30, 0.2, 0.2), + Row("city_3", 2, 100000006, 21, 30000.0, 0.3))) } def createWriteSocketThread( diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/ComplexDelimitersEnum.java b/processing/src/main/java/org/apache/carbondata/processing/loading/ComplexDelimitersEnum.java new file mode 100644 index 00000000000..bc196e15df6 --- /dev/null +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/ComplexDelimitersEnum.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.processing.loading; + +public enum ComplexDelimitersEnum { + + COMPLEX_DELIMITERS_LEVEL_1("\001"), + + COMPLEX_DELIMITERS_LEVEL_2("\002"), + + COMPLEX_DELIMITERS_LEVEL_3("\003"), + + COMPLEX_DELIMITERS_LEVEL_4("\004"); + + private String value; + + ComplexDelimitersEnum(String value) { + this.value = value; + } + + public String value() { + return value; + } +} diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java index 89d09fe23ad..6fe89a24dc7 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java @@ -221,9 +221,10 @@ public static CarbonDataLoadConfiguration createConfiguration(CarbonLoadModel lo configuration.setHeader(loadModel.getCsvHeaderColumns()); configuration.setSegmentId(loadModel.getSegmentId()); configuration.setTaskNo(loadModel.getTaskNo()); - configuration.setDataLoadProperty(DataLoadProcessorConstants.COMPLEX_DELIMITERS, - new String[] { loadModel.getComplexDelimiterLevel1(), - loadModel.getComplexDelimiterLevel2() }); + String[] complexDelimiters = new String[loadModel.getComplexDelimiters().size()]; + loadModel.getComplexDelimiters().toArray(complexDelimiters); + configuration + .setDataLoadProperty(DataLoadProcessorConstants.COMPLEX_DELIMITERS, complexDelimiters); configuration.setDataLoadProperty(DataLoadProcessorConstants.SERIALIZATION_NULL_FORMAT, loadModel.getSerializationNullFormat().split(",")[1]); configuration.setDataLoadProperty(DataLoadProcessorConstants.FACT_TIME_STAMP, diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java index e15fb5d919b..aecc52ef812 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java @@ -65,8 +65,7 @@ public class CarbonLoadModel implements Serializable { private String csvHeader; private String[] csvHeaderColumns; private String csvDelimiter; - private String complexDelimiterLevel1; - private String complexDelimiterLevel2; + private ArrayList complexDelimiters; private List loadMetadataDetails; private transient SegmentUpdateStatusManager segmentUpdateStatusManager; @@ -276,20 +275,14 @@ public void setCsvDelimiter(String csvDelimiter) { this.csvDelimiter = csvDelimiter; } - public String getComplexDelimiterLevel1() { - return complexDelimiterLevel1; + public void setComplexDelimiter(String delimiter) { + checkAndInitializeComplexDelimiterList(); + this.complexDelimiters.add(delimiter); } - public void setComplexDelimiterLevel1(String complexDelimiterLevel1) { - this.complexDelimiterLevel1 = complexDelimiterLevel1; - } - - public String getComplexDelimiterLevel2() { - return complexDelimiterLevel2; - } - - public void setComplexDelimiterLevel2(String complexDelimiterLevel2) { - this.complexDelimiterLevel2 = complexDelimiterLevel2; + public ArrayList getComplexDelimiters() { + checkAndInitializeComplexDelimiterList(); + return complexDelimiters; } public String getAllDictPath() { @@ -441,8 +434,7 @@ public CarbonLoadModel getCopyWithTaskNo(String taskNo) { copy.csvHeader = csvHeader; copy.csvHeaderColumns = csvHeaderColumns; copy.csvDelimiter = csvDelimiter; - copy.complexDelimiterLevel1 = complexDelimiterLevel1; - copy.complexDelimiterLevel2 = complexDelimiterLevel2; + copy.complexDelimiters = complexDelimiters; copy.carbonDataLoadSchema = carbonDataLoadSchema; copy.blocksID = blocksID; copy.taskNo = taskNo; @@ -500,8 +492,7 @@ public CarbonLoadModel getCopyWithPartition(String header, String delimiter) { copyObj.csvHeader = header; copyObj.csvHeaderColumns = csvHeaderColumns; copyObj.csvDelimiter = delimiter; - copyObj.complexDelimiterLevel1 = complexDelimiterLevel1; - copyObj.complexDelimiterLevel2 = complexDelimiterLevel2; + copyObj.complexDelimiters = complexDelimiters; copyObj.blocksID = blocksID; copyObj.taskNo = taskNo; copyObj.factTimeStamp = factTimeStamp; @@ -631,7 +622,16 @@ public void setFactTimeStamp(long factTimeStamp) { } public String[] getDelimiters() { - return new String[] { complexDelimiterLevel1, complexDelimiterLevel2 }; + checkAndInitializeComplexDelimiterList(); + String[] delimiters = new String[complexDelimiters.size()]; + delimiters = complexDelimiters.toArray(delimiters); + return delimiters; + } + + private void checkAndInitializeComplexDelimiterList() { + if (null == complexDelimiters) { + complexDelimiters = new ArrayList<>(); + } } /** diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java index 4a293044957..d02348d6121 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java @@ -164,6 +164,8 @@ public void build( String delimeter = optionsFinal.get("delimiter"); String complex_delimeter_level1 = optionsFinal.get("complex_delimiter_level_1"); String complex_delimeter_level2 = optionsFinal.get("complex_delimiter_level_2"); + String complex_delimeter_level3 = optionsFinal.get("complex_delimiter_level_3"); + String complex_delimeter_level4 = optionsFinal.get("complex_delimiter_level_4"); String all_dictionary_path = optionsFinal.get("all_dictionary_path"); String column_dict = optionsFinal.get("columndict"); validateDateTimeFormat(timestampformat, "TimestampFormat"); @@ -257,11 +259,14 @@ public void build( if (delimeter.equalsIgnoreCase(complex_delimeter_level1) || complex_delimeter_level1.equalsIgnoreCase(complex_delimeter_level2) || - delimeter.equalsIgnoreCase(complex_delimeter_level2)) { + delimeter.equalsIgnoreCase(complex_delimeter_level2) || + delimeter.equalsIgnoreCase(complex_delimeter_level3)) { throw new InvalidLoadOptionException("Field Delimiter and Complex types delimiter are same"); } else { - carbonLoadModel.setComplexDelimiterLevel1(complex_delimeter_level1); - carbonLoadModel.setComplexDelimiterLevel2(complex_delimeter_level2); + carbonLoadModel.setComplexDelimiter(complex_delimeter_level1); + carbonLoadModel.setComplexDelimiter(complex_delimeter_level2); + carbonLoadModel.setComplexDelimiter(complex_delimeter_level3); + carbonLoadModel.setComplexDelimiter(complex_delimeter_level4); } // set local dictionary path, and dictionary file extension carbonLoadModel.setAllDictPath(all_dictionary_path); diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java b/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java index 2c5fa8bb583..759cf04e639 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java @@ -32,6 +32,7 @@ import org.apache.carbondata.core.constants.CarbonLoadOptionConstants; import org.apache.carbondata.core.util.CarbonProperties; import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.processing.loading.ComplexDelimitersEnum; import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException; import org.apache.carbondata.processing.util.CarbonDataProcessorUtil; import org.apache.carbondata.processing.util.CarbonLoaderUtil; @@ -111,13 +112,17 @@ public static Map fillOptionWithDefaultValue( "all_dictionary_path", Maps.getOrDefault(options, "all_dictionary_path", "")); - optionsFinal.put( - "complex_delimiter_level_1", - Maps.getOrDefault(options,"complex_delimiter_level_1", "\\\001")); + optionsFinal.put("complex_delimiter_level_1", + Maps.getOrDefault(options, "complex_delimiter_level_1", + ComplexDelimitersEnum.COMPLEX_DELIMITERS_LEVEL_1.value())); - optionsFinal.put( - "complex_delimiter_level_2", - Maps.getOrDefault(options, "complex_delimiter_level_2", "\\\002")); + optionsFinal.put("complex_delimiter_level_2", + Maps.getOrDefault(options, "complex_delimiter_level_2", + ComplexDelimitersEnum.COMPLEX_DELIMITERS_LEVEL_2.value())); + + optionsFinal.put("complex_delimiter_level_3", + Maps.getOrDefault(options, "complex_delimiter_level_3", + ComplexDelimitersEnum.COMPLEX_DELIMITERS_LEVEL_3.value())); optionsFinal.put( "dateformat", diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/parser/CarbonParserFactory.java b/processing/src/main/java/org/apache/carbondata/processing/loading/parser/CarbonParserFactory.java index 39648691c83..6ffea4fd3db 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/parser/CarbonParserFactory.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/parser/CarbonParserFactory.java @@ -16,6 +16,7 @@ */ package org.apache.carbondata.processing.loading.parser; +import java.util.ArrayList; import java.util.List; import org.apache.carbondata.core.metadata.datatype.DataType; @@ -23,6 +24,7 @@ import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; import org.apache.carbondata.processing.loading.parser.impl.ArrayParserImpl; +import org.apache.carbondata.processing.loading.parser.impl.MapParserImpl; import org.apache.carbondata.processing.loading.parser.impl.PrimitiveParserImpl; import org.apache.carbondata.processing.loading.parser.impl.StructParserImpl; @@ -35,7 +37,8 @@ public final class CarbonParserFactory { * @param complexDelimiters * @return */ - public static GenericParser createParser(CarbonColumn carbonColumn, String[] complexDelimiters, + public static GenericParser createParser(CarbonColumn carbonColumn, + ArrayList complexDelimiters, String nullFormat) { return createParser(carbonColumn, complexDelimiters, nullFormat, 0); } @@ -51,23 +54,33 @@ public static GenericParser createParser(CarbonColumn carbonColumn, String[] com * delimiters * @return GenericParser */ - private static GenericParser createParser(CarbonColumn carbonColumn, String[] complexDelimiters, - String nullFormat, int depth) { + private static GenericParser createParser(CarbonColumn carbonColumn, + ArrayList complexDelimiters, String nullFormat, int depth) { DataType dataType = carbonColumn.getDataType(); - if (DataTypes.isArrayType(dataType) || DataTypes.isMapType(dataType)) { + if (DataTypes.isArrayType(dataType)) { List listOfChildDimensions = ((CarbonDimension) carbonColumn).getListOfChildDimensions(); // Create array parser with complex delimiter - ArrayParserImpl arrayParser = new ArrayParserImpl(complexDelimiters[depth], nullFormat); + ArrayParserImpl arrayParser = new ArrayParserImpl(complexDelimiters.get(depth), nullFormat); for (CarbonDimension dimension : listOfChildDimensions) { arrayParser.addChildren(createParser(dimension, complexDelimiters, nullFormat, depth + 1)); } return arrayParser; + } else if (DataTypes.isMapType(dataType)) { + List listOfChildDimensions = + ((CarbonDimension) carbonColumn).getListOfChildDimensions(); + // Create map parser with complex delimiter and key-value delimiter + MapParserImpl mapParser = new MapParserImpl(complexDelimiters.get(depth), nullFormat, + complexDelimiters.get(depth + 1)); + for (CarbonDimension dimension : listOfChildDimensions) { + mapParser.addChildren(createParser(dimension, complexDelimiters, nullFormat, depth + 1)); + } + return mapParser; } else if (DataTypes.isStructType(dataType)) { List dimensions = ((CarbonDimension) carbonColumn).getListOfChildDimensions(); // Create struct parser with complex delimiter - StructParserImpl parser = new StructParserImpl(complexDelimiters[depth], nullFormat); + StructParserImpl parser = new StructParserImpl(complexDelimiters.get(depth), nullFormat); for (CarbonDimension dimension : dimensions) { parser.addChildren(createParser(dimension, complexDelimiters, nullFormat, depth + 1)); } diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/ArrayParserImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/ArrayParserImpl.java index 492ea80746f..c27f0fa5a1f 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/ArrayParserImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/ArrayParserImpl.java @@ -16,7 +16,6 @@ */ package org.apache.carbondata.processing.loading.parser.impl; -import java.io.BufferedWriter; import java.util.regex.Pattern; import org.apache.carbondata.core.util.CarbonUtil; @@ -33,11 +32,11 @@ */ public class ArrayParserImpl implements ComplexParser { - private Pattern pattern; + protected Pattern pattern; - private GenericParser child; + protected GenericParser child; - private String nullFormat; + protected String nullFormat; public ArrayParserImpl(String delimiter, String nullFormat) { pattern = Pattern.compile(CarbonUtil.delimiterConverter(delimiter)); diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/MapParserImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/MapParserImpl.java new file mode 100644 index 00000000000..e6814f8628a --- /dev/null +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/MapParserImpl.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.processing.loading.parser.impl; + + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.Set; + +import org.apache.carbondata.processing.loading.complexobjects.ArrayObject; + +import org.apache.commons.lang.ArrayUtils; + + +public class MapParserImpl extends ArrayParserImpl { + + private String keyValueDelimiter; + + public MapParserImpl(String delimiter, String nullFormat, String keyValueDelimiter) { + super(delimiter, nullFormat); + this.keyValueDelimiter = keyValueDelimiter; + } + + //The Key for Map will always be a PRIMITIVE type so Set here will work fine + //Only the first occurance of key will be added and the remaining will be skipped/ignored + @Override public ArrayObject parse(Object data) { + if (data != null) { + String value = data.toString(); + if (!value.isEmpty() && !value.equals(nullFormat)) { + String[] split = pattern.split(value, -1); + if (ArrayUtils.isNotEmpty(split)) { + ArrayList array = new ArrayList<>(); + Set set = new HashSet<>(); + for (int i = 0; i < split.length; i++) { + Object currKey = split[i].split(keyValueDelimiter)[0]; + if (set.add(currKey)) { + array.add(child.parse(split[i])); + } + } + return new ArrayObject(array.toArray()); + } + } + } + return null; + } +} diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/RowParserImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/RowParserImpl.java index 00d8420e448..d0fe30ba557 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/RowParserImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/RowParserImpl.java @@ -16,6 +16,8 @@ */ package org.apache.carbondata.processing.loading.parser.impl; +import java.util.ArrayList; + import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration; import org.apache.carbondata.processing.loading.DataField; import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants; @@ -34,8 +36,12 @@ public class RowParserImpl implements RowParser { private int numberOfColumns; public RowParserImpl(DataField[] output, CarbonDataLoadConfiguration configuration) { - String[] complexDelimiters = + String[] tempComplexDelimiters = (String[]) configuration.getDataLoadProperty(DataLoadProcessorConstants.COMPLEX_DELIMITERS); + ArrayList complexDelimiters = new ArrayList<>(); + for (int i = 0; i < tempComplexDelimiters.length; i++) { + complexDelimiters.add(tempComplexDelimiters[i]); + } String nullFormat = configuration.getDataLoadProperty(DataLoadProcessorConstants.SERIALIZATION_NULL_FORMAT) .toString(); diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java index c9adcdf2efe..1241504fec9 100644 --- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java +++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java @@ -184,6 +184,7 @@ public CarbonWriterBuilder withLoadOptions(Map options) { !option.equalsIgnoreCase("timestampformat") && !option.equalsIgnoreCase("complex_delimiter_level_1") && !option.equalsIgnoreCase("complex_delimiter_level_2") && + !option.equalsIgnoreCase("complex_delimiter_level_3") && !option.equalsIgnoreCase("quotechar") && !option.equalsIgnoreCase("escapechar")) { throw new IllegalArgumentException("Unsupported option:" + option diff --git a/streaming/src/main/scala/org/apache/carbondata/streaming/parser/RowStreamParserImp.scala b/streaming/src/main/scala/org/apache/carbondata/streaming/parser/RowStreamParserImp.scala index 5a888ef41d4..4dcb3ce29f6 100644 --- a/streaming/src/main/scala/org/apache/carbondata/streaming/parser/RowStreamParserImp.scala +++ b/streaming/src/main/scala/org/apache/carbondata/streaming/parser/RowStreamParserImp.scala @@ -41,6 +41,7 @@ class RowStreamParserImp extends CarbonStreamParser { var dateFormat: SimpleDateFormat = null var complexDelimiterLevel1: String = null var complexDelimiterLevel2: String = null + var complexDelimiterLevel3: String = null var serializationNullFormat: String = null override def initialize(configuration: Configuration, structType: StructType): Unit = { @@ -54,6 +55,7 @@ class RowStreamParserImp extends CarbonStreamParser { this.configuration.get(CarbonCommonConstants.CARBON_DATE_FORMAT)) this.complexDelimiterLevel1 = this.configuration.get("carbon_complex_delimiter_level_1") this.complexDelimiterLevel2 = this.configuration.get("carbon_complex_delimiter_level_2") + this.complexDelimiterLevel3 = this.configuration.get("carbon_complex_delimiter_level_3") this.serializationNullFormat = this.configuration.get(DataLoadProcessorConstants.SERIALIZATION_NULL_FORMAT) }