Skip to content

Commit

Permalink
Merge 32e4094 into 2ecf30c
Browse files Browse the repository at this point in the history
  • Loading branch information
qiuchenjian committed Feb 2, 2019
2 parents 2ecf30c + 32e4094 commit 9daed6f
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.spark.sql.{CarbonEnv, CarbonToSparkAdapter, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, Cast, Expression, NamedExpression, ScalaUDF, SortOrder}
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, Cast, Coalesce, Expression, NamedExpression, ScalaUDF, SortOrder}
import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan, Project}
import org.apache.spark.sql.execution.command.{Field, TableModel, TableNewProcessor}
Expand Down Expand Up @@ -246,7 +246,7 @@ object MVHelper {

def getAttributeMap(subsumer: Seq[NamedExpression],
subsume: Seq[NamedExpression]): Map[AttributeKey, NamedExpression] = {
if (subsumer.length == subsume.length) {
if (subsumer.length >= subsume.length) {
subsume.zip(subsumer).flatMap { case (left, right) =>
var tuples = left collect {
case attr: AttributeReference =>
Expand All @@ -260,7 +260,7 @@ object MVHelper {
Seq((AttributeKey(left), createAttrReference(right, left.name))) ++ tuples
}.toMap
} else {
throw new UnsupportedOperationException("Cannot create mapping with unequal sizes")
throw new UnsupportedOperationException("Cannot create mapping with less sizes")
}
}

Expand Down Expand Up @@ -437,6 +437,8 @@ object MVHelper {
case Alias(agg@AggregateExpression(fun@Count(Seq(child)), _, _, _), name) =>
val uFun = Sum(right)
Alias(agg.copy(aggregateFunction = uFun), left.name)(exprId = left.exprId)
case Alias(coalesce@Coalesce(_), name) =>
Alias(coalesce.copy(), name)(exprId = left.exprId)
case _ =>
if (left.name != right.name) Alias(right, left.name)(exprId = left.exprId) else right
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,30 @@ object SelectSelectNoChildDelta extends DefaultMatchPattern with PredicateHelper
}

object GroupbyGroupbyNoChildDelta extends DefaultMatchPattern {
def tryMatch(gb_2a: GroupBy,
gb_2q: GroupBy,
isGroupingEmR: Boolean,
isInheritTableRelation: Boolean) : Seq[ModularPlan] = {
val aliasMap = AttributeMap(gb_2a.outputList.collect { case a: Alias =>
(a.toAttribute, a)})
if (isGroupingEmR) {
Utils.tryMatch(
gb_2a, gb_2q, aliasMap).flatMap {
case g: GroupBy =>
Some(g.copy(child = g.child.withNewChildren(
g.child.children.map {
case modular.Select(_, _, _, _, _, _, _, _, _, _) => gb_2a;
case other => other
}), dataMapTableRelation =
if (isInheritTableRelation) gb_2a.dataMapTableRelation
else g.dataMapTableRelation
));
case _ => None}.map(Seq(_)).getOrElse(Nil)
} else {
Nil
}
}

def apply(
subsumer: ModularPlan,
subsumee: ModularPlan,
Expand Down Expand Up @@ -398,24 +422,10 @@ object GroupbyGroupbyNoChildDelta extends DefaultMatchPattern {

Seq(gb_2a.copy(outputList = oList))
} else {
Nil
tryMatch(gb_2a, gb_2q, isGroupingEmR, true)
}
} else {
val aliasMap = AttributeMap(gb_2a.outputList.collect { case a: Alias =>
(a.toAttribute, a)})
if (isGroupingEmR) {
Utils.tryMatch(
gb_2a, gb_2q, aliasMap).flatMap {
case g: GroupBy =>
Some(g.copy(child = g.child.withNewChildren(
g.child.children.map {
case modular.Select(_, _, _, _, _, _, _, _, _, _) => gb_2a;
case other => other
})));
case _ => None}.map(Seq(_)).getOrElse(Nil)
} else {
Nil
}
tryMatch(gb_2a, gb_2q, isGroupingEmR, false)
}

case _ => Nil
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.mv.rewrite

import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.test.util.QueryTest
import org.scalatest.BeforeAndAfterAll

class MVCoalesceTestCase extends QueryTest with BeforeAndAfterAll {
override def beforeAll(): Unit = {
drop()
sql("create table coalesce_test_main(id int,name string,height int,weight int) " +
"using carbondata")
sql("insert into coalesce_test_main select 1,'tom',170,130")
sql("insert into coalesce_test_main select 2,'tom',170,130")
sql("insert into coalesce_test_main select 3,'lily',160,100")
}

def drop(): Unit = {
sql("drop table if exists coalesce_test_main")
}

test("test mv table with coalesce expression") {
sql("drop datamap if exists coalesce_test_main_mv")
sql("create datamap coalesce_test_main_mv using 'mv' as " +
"select sum(id) as sum_id,sum(height) as sum_height,name as myname from coalesce_test_main group by name")
sql("rebuild datamap coalesce_test_main_mv")

var frame = sql("select coalesce(sum(id),12) as sumid,sum(height) as sum_height from coalesce_test_main group by name")
assert(verifyMVDataMap(frame.queryExecution.analyzed, "coalesce_test_main_mv"))
checkAnswer(frame, Seq(Row(3, 340), Row(3, 160)))

frame = sql("select coalesce(sum(id),12) as sumid,sum(height) as sum_height from coalesce_test_main " +
"where name = 'tom' group by name")
assert(verifyMVDataMap(frame.queryExecution.analyzed, "coalesce_test_main_mv"))
checkAnswer(frame, Seq(Row(3, 340)))

sql("drop datamap if exists coalesce_test_main_mv")
}

def verifyMVDataMap(logicalPlan: LogicalPlan, dataMapName: String): Boolean = {
val tables = logicalPlan collect {
case l: LogicalRelation => l.catalogTable.get
}
tables.exists(_.identifier.table.equalsIgnoreCase(dataMapName+"_table"))
}

override def afterAll(): Unit ={
drop
}
}

0 comments on commit 9daed6f

Please sign in to comment.