From aad9aabf960dce5227ef8e59a56c25c0972d221c Mon Sep 17 00:00:00 2001 From: Shubh18s Date: Thu, 20 Dec 2018 16:47:32 +0530 Subject: [PATCH] [CARBONDATA-3189] Fix PreAggregate Datamap Issue Problem - Load and Select query was failing on table with preaggregate datamap. Cause - Previously if query on datamap was not enabled in thread, there was no check afterwards. Solution - After checking whether thread param for Direct Query On Datamap is enable. If not enable, we check in session params and then global. This closes #3010 --- .../core/constants/CarbonCommonConstants.java | 6 --- docs/configuration-parameters.md | 3 +- .../TestPreAggCreateCommand.scala | 42 ------------------- .../spark/sql/test/util/QueryTest.scala | 2 +- .../preaaggregate/PreAggregateUtil.scala | 8 ++-- .../sql/optimizer/CarbonLateDecodeRule.scala | 23 ++++++---- 6 files changed, 21 insertions(+), 63 deletions(-) diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java index 8d0a4d962fc..c1ef940033d 100644 --- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java +++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java @@ -1449,12 +1449,6 @@ private CarbonCommonConstants() { public static final String SUPPORT_DIRECT_QUERY_ON_DATAMAP_DEFAULTVALUE = "false"; - @CarbonProperty - public static final String VALIDATE_DIRECT_QUERY_ON_DATAMAP = - "carbon.query.validate.direct.query.on.datamap"; - - public static final String VALIDATE_DIRECT_QUERY_ON_DATAMAP_DEFAULTVALUE = "true"; - @CarbonProperty public static final String CARBON_SHOW_DATAMAPS = "carbon.query.show.datamaps"; diff --git a/docs/configuration-parameters.md b/docs/configuration-parameters.md index db21c6ae062..105b7680cd4 100644 --- a/docs/configuration-parameters.md +++ b/docs/configuration-parameters.md @@ -135,7 +135,6 @@ This section provides the details of all the configurations required for the Car | carbon.custom.block.distribution | false | CarbonData has its own scheduling algorithm to suggest to Spark on how many tasks needs to be launched and how much work each task need to do in a Spark cluster for any query on CarbonData. When this configuration is true, CarbonData would distribute the available blocks to be scanned among the available number of cores. For Example:If there are 10 blocks to be scanned and only 3 tasks can be run(only 3 executor cores available in the cluster), CarbonData would combine blocks as 4,3,3 and give it to 3 tasks to run. **NOTE:** When this configuration is false, as per the ***carbon.task.distribution*** configuration, each block/blocklet would be given to each task. | | enable.query.statistics | false | CarbonData has extensive logging which would be useful for debugging issues related to performance or hard to locate issues. This configuration when made ***true*** would log additional query statistics information to more accurately locate the issues being debugged.**NOTE:** Enabling this would log more debug information to log files, there by increasing the log files size significantly in short span of time. It is advised to configure the log files size, retention of log files parameters in log4j properties appropriately. Also extensive logging is an increased IO operation and hence over all query performance might get reduced. Therefore it is recommended to enable this configuration only for the duration of debugging. | | enable.unsafe.in.query.processing | false | CarbonData supports unsafe operations of Java to avoid GC overhead for certain operations. This configuration enables to use unsafe functions in CarbonData while scanning the data during query. | -| carbon.query.validate.direct.query.on.datamap | true | CarbonData supports creating pre-aggregate table datamaps as an independent tables. For some debugging purposes, it might be required to directly query from such datamap tables. This configuration allows to query on such datamaps. | | carbon.max.driver.threads.for.block.pruning | 4 | Number of threads used for driver pruning when the carbon files are more than 100k Maximum memory. This configuration can used to set number of threads between 1 to 4. | | carbon.heap.memory.pooling.threshold.bytes | 1048576 | CarbonData supports unsafe operations of Java to avoid GC overhead for certain operations. Using unsafe, memory can be allocated on Java Heap or off heap. This configuration controls the allocation mechanism on Java HEAP. If the heap memory allocations of the given size is greater or equal than this value,it should go through the pooling mechanism. But if set this size to -1, it should not go through the pooling mechanism. Default value is 1048576(1MB, the same as Spark). Value to be specified in bytes. | | carbon.push.rowfilters.for.vector | false | When enabled complete row filters will be handled by carbon in case of vector. If it is disabled then only page level pruning will be done by carbon and row level filtering will be done by spark for vector. And also there are scan optimizations in carbon to avoid multiple data copies when this parameter is set to false. There is no change in flow for non-vector based queries. | @@ -211,7 +210,7 @@ RESET | carbon.options.sort.scope | Specifies how the current data load should be sorted with. **NOTE:** Refer to [Data Loading Configuration](#data-loading-configuration)#carbon.sort.scope for detailed information. | | carbon.options.global.sort.partitions | | | carbon.options.serialization.null.format | Default Null value representation in the data being loaded. **NOTE:** Refer to [Data Loading Configuration](#data-loading-configuration)#carbon.options.serialization.null.format for detailed information. | -| carbon.query.directQueryOnDataMap.enabled | Specifies whether datamap can be queried directly. This is useful for debugging purposes.**NOTE: **Refer to [Query Configuration](#query-configuration)#carbon.query.validate.direct.query.on.datamap for detailed information. | +| carbon.query.directQueryOnDataMap.enabled | Specifies whether datamap can be queried directly. This is useful for debugging purposes.**NOTE: **Refer to [Query Configuration](#query-configuration) for detailed information. | **Examples:** diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala index d70e1796ece..4149f6e3341 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala @@ -483,48 +483,6 @@ class TestPreAggCreateCommand extends QueryTest with BeforeAndAfterAll { executorService.shutdown() } - test("support set carbon.query.directQueryOnDataMap.enabled=true") { - val rootPath = new File(this.getClass.getResource("/").getPath - + "../../../..").getCanonicalPath - val testData = s"$rootPath/integration/spark-common-test/src/test/resources/sample.csv" - sql("drop table if exists mainTable") - sql( - s""" - | CREATE TABLE mainTable - | (id Int, - | name String, - | city String, - | age Int) - | STORED BY 'org.apache.carbondata.format' - """.stripMargin) - - sql( - s""" - | LOAD DATA LOCAL INPATH '$testData' - | into table mainTable - """.stripMargin) - - sql( - s""" - | create datamap preagg_sum on table mainTable - | using 'preaggregate' - | as select id,sum(age) from mainTable group by id - """.stripMargin) - - CarbonProperties.getInstance() - .addProperty(CarbonCommonConstants.VALIDATE_DIRECT_QUERY_ON_DATAMAP, "true") - - sql("set carbon.query.directQueryOnDataMap.enabled=true") - checkAnswer(sql("select count(*) from maintable_preagg_sum"), Row(4)) - sql("set carbon.query.directQueryOnDataMap.enabled=false") - val exception: Exception = intercept[AnalysisException] { - sql("select count(*) from maintable_preagg_sum").collect() - } - assert(exception.getMessage.contains("Query On DataMap not supported")) - CarbonProperties.getInstance() - .addProperty(CarbonCommonConstants.VALIDATE_DIRECT_QUERY_ON_DATAMAP, "false") - } - class QueryTask(query: String) extends Callable[String] { override def call(): String = { var result = "SUCCESS" diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/test/util/QueryTest.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/test/util/QueryTest.scala index 411d5a35345..e01853660fb 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/sql/test/util/QueryTest.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/test/util/QueryTest.scala @@ -42,7 +42,7 @@ class QueryTest extends PlanTest { Locale.setDefault(Locale.US) CarbonProperties.getInstance() - .addProperty(CarbonCommonConstants.VALIDATE_DIRECT_QUERY_ON_DATAMAP, "false") + .addProperty(CarbonCommonConstants.SUPPORT_DIRECT_QUERY_ON_DATAMAP, "true") /** * Runs the plan and makes sure the answer contains all of the keywords, or the diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala index 319f84b6cba..4e5b764f3ed 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala @@ -631,12 +631,14 @@ object PreAggregateUtil { case _ => a.getAggFunction}}(${a.getColumnName})" } else { groupingExpressions += a.getColumnName - aggregateColumns+= a.getColumnName + aggregateColumns += a.getColumnName } } + val groupByString = if (groupingExpressions.nonEmpty) { + s" group by ${ groupingExpressions.mkString(",") }" + } else { "" } s"select ${ aggregateColumns.mkString(",") } " + - s"from $databaseName.${ tableSchema.getTableName }" + - s" group by ${ groupingExpressions.mkString(",") }" + s"from $databaseName.${ tableSchema.getTableName }" + groupByString } /** diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala index 0f350b96d53..3986839da90 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala @@ -106,22 +106,27 @@ class CarbonLateDecodeRule extends Rule[LogicalPlan] with PredicateHelper { def validateQueryDirectlyOnDataMap(relations: Seq[CarbonDecoderRelation]): Unit = { var isPreAggDataMapExists = false // first check if pre aggregate data map exists or not - relations.foreach{relation => + relations.foreach { relation => if (relation.carbonRelation.carbonTable.isChildDataMap) { isPreAggDataMapExists = true } } - val validateQuery = CarbonProperties.getInstance - .getProperty(CarbonCommonConstants.VALIDATE_DIRECT_QUERY_ON_DATAMAP, - CarbonCommonConstants.VALIDATE_DIRECT_QUERY_ON_DATAMAP_DEFAULTVALUE).toBoolean var isThrowException = false - // if validate query is enabled and relation contains pre aggregate data map - if (validateQuery && isPreAggDataMapExists) { + // if relation contains pre aggregate data map + if (isPreAggDataMapExists) { val carbonSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo if (null != carbonSessionInfo) { - val supportQueryOnDataMap = CarbonProperties.getInstance.getProperty( - CarbonCommonConstants.SUPPORT_DIRECT_QUERY_ON_DATAMAP, - CarbonCommonConstants.SUPPORT_DIRECT_QUERY_ON_DATAMAP_DEFAULTVALUE).toBoolean + lazy val sessionPropertyValue = CarbonProperties.getInstance + .getProperty(CarbonCommonConstants.SUPPORT_DIRECT_QUERY_ON_DATAMAP, + CarbonCommonConstants.SUPPORT_DIRECT_QUERY_ON_DATAMAP_DEFAULTVALUE) + // Check if property is set in thread params which would mean this is an internal call + // (from load or compaction) and should be of highest priority. Otherwise get from + // session(like user has dynamically given the value using set command). If not found in + // session then look for the property in carbon.properties file, otherwise use default + // value 'false'. + val supportQueryOnDataMap = CarbonEnv + .getThreadParam(CarbonCommonConstants.SUPPORT_DIRECT_QUERY_ON_DATAMAP, + sessionPropertyValue).toBoolean if (!supportQueryOnDataMap) { isThrowException = true }