From 80094ca20d83ab7aea7d89ff95152a5eb88a7425 Mon Sep 17 00:00:00 2001 From: Shubh18s Date: Thu, 20 Dec 2018 16:47:32 +0530 Subject: [PATCH] Fix PreAggregate Datamap issue --- .../core/constants/CarbonCommonConstants.java | 8 +--- docs/configuration-parameters.md | 3 +- .../TestPreAggCreateCommand.scala | 42 ---------------- .../TestNonTransactionalCarbonTable.scala | 48 ------------------- .../spark/sql/test/util/QueryTest.scala | 2 +- .../preaaggregate/PreAggregateUtil.scala | 21 +++++--- .../sql/optimizer/CarbonLateDecodeRule.scala | 23 +++++---- 7 files changed, 31 insertions(+), 116 deletions(-) diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java index 387bf3b90ff..50743c796a5 100644 --- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java +++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java @@ -1445,16 +1445,10 @@ private CarbonCommonConstants() { @CarbonProperty(dynamicConfigurable = true) public static final String SUPPORT_DIRECT_QUERY_ON_DATAMAP = - "carbon.query.directQueryOnDataMap.enabled"; + "carbon.query.directQueryOnDataMap.enabled"; public static final String SUPPORT_DIRECT_QUERY_ON_DATAMAP_DEFAULTVALUE = "false"; - @CarbonProperty - public static final String VALIDATE_DIRECT_QUERY_ON_DATAMAP = - "carbon.query.validate.direct.query.on.datamap"; - - public static final String VALIDATE_DIRECT_QUERY_ON_DATAMAP_DEFAULTVALUE = "true"; - @CarbonProperty public static final String CARBON_SHOW_DATAMAPS = "carbon.query.show.datamaps"; diff --git a/docs/configuration-parameters.md b/docs/configuration-parameters.md index db21c6ae062..105b7680cd4 100644 --- a/docs/configuration-parameters.md +++ b/docs/configuration-parameters.md @@ -135,7 +135,6 @@ This section provides the details of all the configurations required for the Car | carbon.custom.block.distribution | false | CarbonData has its own scheduling algorithm to suggest to Spark on how many tasks needs to be launched and how much work each task need to do in a Spark cluster for any query on CarbonData. When this configuration is true, CarbonData would distribute the available blocks to be scanned among the available number of cores. For Example:If there are 10 blocks to be scanned and only 3 tasks can be run(only 3 executor cores available in the cluster), CarbonData would combine blocks as 4,3,3 and give it to 3 tasks to run. **NOTE:** When this configuration is false, as per the ***carbon.task.distribution*** configuration, each block/blocklet would be given to each task. | | enable.query.statistics | false | CarbonData has extensive logging which would be useful for debugging issues related to performance or hard to locate issues. This configuration when made ***true*** would log additional query statistics information to more accurately locate the issues being debugged.**NOTE:** Enabling this would log more debug information to log files, there by increasing the log files size significantly in short span of time. It is advised to configure the log files size, retention of log files parameters in log4j properties appropriately. Also extensive logging is an increased IO operation and hence over all query performance might get reduced. Therefore it is recommended to enable this configuration only for the duration of debugging. | | enable.unsafe.in.query.processing | false | CarbonData supports unsafe operations of Java to avoid GC overhead for certain operations. This configuration enables to use unsafe functions in CarbonData while scanning the data during query. | -| carbon.query.validate.direct.query.on.datamap | true | CarbonData supports creating pre-aggregate table datamaps as an independent tables. For some debugging purposes, it might be required to directly query from such datamap tables. This configuration allows to query on such datamaps. | | carbon.max.driver.threads.for.block.pruning | 4 | Number of threads used for driver pruning when the carbon files are more than 100k Maximum memory. This configuration can used to set number of threads between 1 to 4. | | carbon.heap.memory.pooling.threshold.bytes | 1048576 | CarbonData supports unsafe operations of Java to avoid GC overhead for certain operations. Using unsafe, memory can be allocated on Java Heap or off heap. This configuration controls the allocation mechanism on Java HEAP. If the heap memory allocations of the given size is greater or equal than this value,it should go through the pooling mechanism. But if set this size to -1, it should not go through the pooling mechanism. Default value is 1048576(1MB, the same as Spark). Value to be specified in bytes. | | carbon.push.rowfilters.for.vector | false | When enabled complete row filters will be handled by carbon in case of vector. If it is disabled then only page level pruning will be done by carbon and row level filtering will be done by spark for vector. And also there are scan optimizations in carbon to avoid multiple data copies when this parameter is set to false. There is no change in flow for non-vector based queries. | @@ -211,7 +210,7 @@ RESET | carbon.options.sort.scope | Specifies how the current data load should be sorted with. **NOTE:** Refer to [Data Loading Configuration](#data-loading-configuration)#carbon.sort.scope for detailed information. | | carbon.options.global.sort.partitions | | | carbon.options.serialization.null.format | Default Null value representation in the data being loaded. **NOTE:** Refer to [Data Loading Configuration](#data-loading-configuration)#carbon.options.serialization.null.format for detailed information. | -| carbon.query.directQueryOnDataMap.enabled | Specifies whether datamap can be queried directly. This is useful for debugging purposes.**NOTE: **Refer to [Query Configuration](#query-configuration)#carbon.query.validate.direct.query.on.datamap for detailed information. | +| carbon.query.directQueryOnDataMap.enabled | Specifies whether datamap can be queried directly. This is useful for debugging purposes.**NOTE: **Refer to [Query Configuration](#query-configuration) for detailed information. | **Examples:** diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala index c039c0f7a53..afadb477d6d 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala @@ -487,48 +487,6 @@ class TestPreAggCreateCommand extends QueryTest with BeforeAndAfterAll { executorService.shutdown() } - test("support set carbon.query.directQueryOnDataMap.enabled=true") { - val rootPath = new File(this.getClass.getResource("/").getPath - + "../../../..").getCanonicalPath - val testData = s"$rootPath/integration/spark-common-test/src/test/resources/sample.csv" - sql("drop table if exists mainTable") - sql( - s""" - | CREATE TABLE mainTable - | (id Int, - | name String, - | city String, - | age Int) - | STORED BY 'org.apache.carbondata.format' - """.stripMargin) - - sql( - s""" - | LOAD DATA LOCAL INPATH '$testData' - | into table mainTable - """.stripMargin) - - sql( - s""" - | create datamap preagg_sum on table mainTable - | using 'preaggregate' - | as select id,sum(age) from mainTable group by id - """.stripMargin) - - CarbonProperties.getInstance() - .addProperty(CarbonCommonConstants.VALIDATE_DIRECT_QUERY_ON_DATAMAP, "true") - - sql("set carbon.query.directQueryOnDataMap.enabled=true") - checkAnswer(sql("select count(*) from maintable_preagg_sum"), Row(4)) - sql("set carbon.query.directQueryOnDataMap.enabled=false") - val exception: Exception = intercept[AnalysisException] { - sql("select count(*) from maintable_preagg_sum").collect() - } - assert(exception.getMessage.contains("Query On DataMap not supported")) - CarbonProperties.getInstance() - .addProperty(CarbonCommonConstants.VALIDATE_DIRECT_QUERY_ON_DATAMAP, "false") - } - class QueryTask(query: String) extends Callable[String] { override def call(): String = { var result = "SUCCESS" diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala index 8cd72eb90f2..06d41b1c29c 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala @@ -2537,54 +2537,6 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll { FileUtils.deleteDirectory(new File(writerPath)) } - test("check varchar with trailing space") { - FileUtils.deleteDirectory(new File(writerPath)) - val fields: Array[Field] = new Array[Field](8) - - fields(0) = new Field("Event_ID", DataTypes.STRING) - fields(1) = new Field("Event_Time", DataTypes.TIMESTAMP) - fields(2) = new Field("subject", DataTypes.VARCHAR) - fields(3) = new Field("From_Email", DataTypes.STRING) - fields(4) = new Field("To_Email", DataTypes.createArrayType(DataTypes.STRING)) - fields(5) = new Field("CC_Email", DataTypes.createArrayType(DataTypes.STRING)) - fields(6) = new Field("BCC_Email", DataTypes.createArrayType(DataTypes.STRING)) - fields(7) = new Field("messagebody ", DataTypes.VARCHAR) - - var options = Map("bAd_RECords_action" -> "FORCE").asJava - CarbonProperties.getInstance() - .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "EEE, d MMM yyyy HH:mm:ss Z") - - val writer: CarbonWriter = CarbonWriter.builder - .outputPath(writerPath) - .withCsvInput(new Schema(fields)).writtenBy("TestNonTransactionalCarbonTable").build() - - writer - .write(Array("aaa", - "Fri, 4 May 2001 13:51:00 -0700 (PDT)", - "Re", - "cdn@ws.com", - "sd#er", - "sd", - "sds", - "ew")) - writer.close() - - sql("drop table if exists test") - sql( - s"""CREATE TABLE test using carbon options('long_string_columns'='subject,messagebody') - |LOCATION '$writerPath'""" - .stripMargin) - checkAnswer(sql("select * from test"), Seq(Row("aaa", - Timestamp.valueOf("2001-05-04 13:51:00.0"), - "Re", - "cdn@ws.com", - mutable.WrappedArray.make(Array("sd#er")), - mutable.WrappedArray.make(Array("sd")), - mutable.WrappedArray.make(Array("sds")), - "ew"))) - FileUtils.deleteDirectory(new File(writerPath)) - } - def generateCarbonData(builder :CarbonWriterBuilder): Unit ={ val fields = new Array[Field](3) fields(0) = new Field("name", DataTypes.STRING) diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/test/util/QueryTest.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/test/util/QueryTest.scala index 5a26dd58922..52e4e4ee435 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/sql/test/util/QueryTest.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/test/util/QueryTest.scala @@ -42,7 +42,7 @@ class QueryTest extends PlanTest { Locale.setDefault(Locale.US) CarbonProperties.getInstance() - .addProperty(CarbonCommonConstants.VALIDATE_DIRECT_QUERY_ON_DATAMAP, "false") + .addProperty(CarbonCommonConstants.SUPPORT_DIRECT_QUERY_ON_DATAMAP, "true") /** * Runs the plan and makes sure the answer contains all of the keywords, or the diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala index 319f84b6cba..e5db3488a36 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala @@ -626,22 +626,29 @@ object PreAggregateUtil { // schema ordinal should be considered columns.sortBy(_.getSchemaOrdinal).foreach { a => if (a.getAggFunction.nonEmpty) { - aggregateColumns += s"${a.getAggFunction match { - case "count" => "sum" - case _ => a.getAggFunction}}(${a.getColumnName})" + aggregateColumns += s"${ + a.getAggFunction match { + case "count" => "sum" + case _ => a.getAggFunction + } + }(${ a.getColumnName })" } else { groupingExpressions += a.getColumnName - aggregateColumns+= a.getColumnName + aggregateColumns += a.getColumnName } } + val groupByString = if (groupingExpressions.nonEmpty) { + s" group by ${ groupingExpressions.mkString(",") }" + } else { "" } s"select ${ aggregateColumns.mkString(",") } " + - s"from $databaseName.${ tableSchema.getTableName }" + - s" group by ${ groupingExpressions.mkString(",") }" + s"from $databaseName.${ tableSchema.getTableName }" + groupByString } - /** + /** CarbonProperties.getInstance().addProperty(CarbonCommonConstants + * .VALIDATE_DIRECT_QUERY_ON_DATAMAP, "true") * Below method will be used to get the select query when rollup policy is * applied in case of timeseries table + * * @param tableSchema main data map schema * @param selectedDataMapSchema selected data map schema for rollup * @return select query based on rolloup diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala index 36eb9ce3390..2b1c94b7504 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala @@ -106,22 +106,27 @@ class CarbonLateDecodeRule extends Rule[LogicalPlan] with PredicateHelper { def validateQueryDirectlyOnDataMap(relations: Seq[CarbonDecoderRelation]): Unit = { var isPreAggDataMapExists = false // first check if pre aggregate data map exists or not - relations.foreach{relation => + relations.foreach { relation => if (relation.carbonRelation.carbonTable.isChildDataMap) { isPreAggDataMapExists = true } } - val validateQuery = CarbonProperties.getInstance - .getProperty(CarbonCommonConstants.VALIDATE_DIRECT_QUERY_ON_DATAMAP, - CarbonCommonConstants.VALIDATE_DIRECT_QUERY_ON_DATAMAP_DEFAULTVALUE).toBoolean var isThrowException = false - // if validate query is enabled and relation contains pre aggregate data map - if (validateQuery && isPreAggDataMapExists) { + // if relation contains pre aggregate data map + if (isPreAggDataMapExists) { val carbonSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo if (null != carbonSessionInfo) { - val supportQueryOnDataMap = CarbonProperties.getInstance.getProperty( - CarbonCommonConstants.SUPPORT_DIRECT_QUERY_ON_DATAMAP, - CarbonCommonConstants.SUPPORT_DIRECT_QUERY_ON_DATAMAP_DEFAULTVALUE).toBoolean + lazy val sessionPropertyValue = CarbonProperties.getInstance + .getProperty(CarbonCommonConstants.SUPPORT_DIRECT_QUERY_ON_DATAMAP, + CarbonCommonConstants.SUPPORT_DIRECT_QUERY_ON_DATAMAP_DEFAULTVALUE) + // Check if property is set in thread params which would mean this is an internal call + // (from load or compaction) and should be of highest priority. Otherwise get from + // session(like user has dynamically given the value using set command). If not found in + // session then look for the property in carbon.properties file, otherwise use default + // value 'false'. + val supportQueryOnDataMap = CarbonEnv + .getThreadParam(CarbonCommonConstants.SUPPORT_DIRECT_QUERY_ON_DATAMAP, + sessionPropertyValue).toBoolean if (!supportQueryOnDataMap) { isThrowException = true }