From 3cd48b9a81c38dc0d3dc13a89c37bd27dc1054eb Mon Sep 17 00:00:00 2001 From: akashrn5 Date: Mon, 27 May 2019 12:28:00 +0530 Subject: [PATCH] Fix select * failure when MV datamap is enabled --- .../mv/rewrite/MVCreateTestCase.scala | 18 ++++++++++++++++++ .../mv/plans/modular/ModularPatterns.scala | 10 ++++++++++ .../util/Logical2ModularExtractions.scala | 7 +++++++ 3 files changed, 35 insertions(+) diff --git a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala index 5016bbe5369..5e060d733f1 100644 --- a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala +++ b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala @@ -954,6 +954,23 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll { sql("drop table if exists all_table") } + test("test select * and distinct when MV is enabled") { + sql("drop table if exists limit_fail") + sql("CREATE TABLE limit_fail (empname String, designation String, doj Timestamp,workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,utilization int,salary int)STORED BY 'org.apache.carbondata.format'") + sql(s"LOAD DATA local inpath '$resourcesPath/data_big.csv' INTO TABLE limit_fail OPTIONS" + + "('DELIMITER'= ',', 'QUOTECHAR'= '\"')") + sql("create datamap limit_fail_dm1 using 'mv' as select empname,designation from limit_fail") + try { + val df = sql("select distinct(empname) from limit_fail limit 10") + sql("select * from limit_fail limit 10").show() + val analyzed = df.queryExecution.analyzed + assert(verifyMVDataMap(analyzed, "limit_fail_dm1")) + } catch { + case ex: Exception => + assert(false) + } + } + def verifyMVDataMap(logicalPlan: LogicalPlan, dataMapName: String): Boolean = { val tables = logicalPlan collect { case l: LogicalRelation => l.catalogTable.get @@ -971,6 +988,7 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll { sql("drop table IF EXISTS fact_streaming_table1") sql("drop table IF EXISTS fact_streaming_table2") sql("drop table IF EXISTS fact_table_parquet") + sql("drop table if exists limit_fail") } override def afterAll { diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularPatterns.scala b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularPatterns.scala index a4116d90cbd..30857c8e80b 100644 --- a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularPatterns.scala +++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularPatterns.scala @@ -19,6 +19,7 @@ package org.apache.carbondata.mv.plans.modular import org.apache.spark.sql.catalyst.expressions.{Expression, NamedExpression, PredicateHelper, _} import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.carbondata.mv.plans.{Pattern, _} import org.apache.carbondata.mv.plans.modular.Flags._ @@ -118,6 +119,15 @@ abstract class ModularPatterns extends Modularizer[ModularPlan] { makeSelectModule(output, input, predicate, aliasmap, joinedge, flags, children.map(modularizeLater), Seq(Seq(limitExpr)) ++ fspec1, wspec) + // if select * is with limit, then projection is removed from plan, so send the parent plan + // to ExtractSelectModule to make the select node + case limit@Limit(limitExpr, lr: LogicalRelation) => + val (output, input, predicate, aliasmap, joinedge, children, flags1, + fspec1, wspec) = ExtractSelectModule.unapply(limit).get + val flags = flags1.setFlag(LIMIT) + makeSelectModule(output, input, predicate, aliasmap, joinedge, flags, + children.map(modularizeLater), Seq(Seq(limitExpr)) ++ fspec1, wspec) + case Limit( limitExpr, ExtractSelectModule(output, input, predicate, aliasmap, joinedge, children, flags1, diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/Logical2ModularExtractions.scala b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/Logical2ModularExtractions.scala index 06525758c4d..abc43ba9b6b 100644 --- a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/Logical2ModularExtractions.scala +++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/Logical2ModularExtractions.scala @@ -132,6 +132,13 @@ object ExtractSelectModule extends PredicateHelper { s"\n right child ${ right }") } + // when select * is executed with limit, ColumnPruning rule will remove the project node from + // the plan during optimization, so if child of Limit is relation, then make the select node + // and make the modular plan + case Limit(limitExpr, lr: LogicalRelation) => + (lr.output, lr.output, Nil, Nil, Seq(lr), true, Map.empty, NoFlags, Seq.empty, Seq + .empty) + case other => (other.output, other.output, Nil, Nil, Seq(other), false, Map.empty, NoFlags, Seq.empty, Seq .empty)