Skip to content

Commit

Permalink
[CARBONDATA-3309] MV datamap supports Spark 2.1
Browse files Browse the repository at this point in the history
[Problem]
MV datamap doesn't support Spark 2.1 version, so we need to support it

[Solution]
The following is the modification point and all MV test cases are passed on spark 2.1 version

The Class we cann’t access in Spark 2.1 version
(1). org.apache.spark.internal.Logging
(2). org.apache.spark.sql.internal.SQLConf
Solution:Create class extends above classed

The Class that Spark 2.1 version doesn’t have
(1). org.apache.spark.sql.catalyst.plans.logical.Subquery
(2). org.apache.spark.sql.catalyst.catalog.interface.HiveTableRelation
Solution: Use CatalogRelation instead and don’t use (in LogicalPlanSignatureGenerator) Mv the Subquery code to carbon project

The method that we can’t access in Spark 2.1 version
(1). sparkSession.sessionState.catalog.lookupRelation
Solution: Solution:Add this method of SparkSQLUtil

The changes of some class
(1). org.apache.spark.sql.catalyst.expressions.SortOrder
(2). org.apache.spark.sql.catalyst.expressions.Cast
(3). org.apache.spark.sql.catalyst.plans.Statistics
Solution: Adapt the new interface

The method that Spark 2.1 version doesn’t have
(1). normalizeExprId,canonicalized of org.apache.spark.sql.catalyst.plans.QueryPlan
(2). CASE_SENSITIVE of SQLConf
(3). STARSCHEMA_DETECTION of SQLConf
Solution:Don’t use normalize , canonicalize and the CASE_SENSITIVE, STARSCHEMA_DETECTION

Some logicplan optimization rules that Spark 2.1 version doesn’t have
(1). SimplifyCreateMapOps
(2). SimplifyCreateArrayOps
(3). SimplifyCreateStructOps
(4). RemoveRedundantProject
(5). RemoveRedundantAliases
(6). PullupCorrelatedPredicates
(7). ReplaceDeduplicateWithAggregate
(8). EliminateView
Solution: Delete or move the code to carbon project

Generate the instance in SparkSQLUtil to adapt Spark 2.1 version

Query SQL pass the MV check in Spark 2.1 version(CarbonSessionState)

This closes #3150
  • Loading branch information
qiuchenjian authored and ravipesala committed May 21, 2019
1 parent c2d4b3e commit 4d7c8ad
Show file tree
Hide file tree
Showing 30 changed files with 481 additions and 81 deletions.
Expand Up @@ -81,7 +81,7 @@ class MVDataMapProvider(
val identifier = dataMapSchema.getRelationIdentifier
val logicalPlan =
new FindDataSourceTable(sparkSession).apply(
sparkSession.sessionState.catalog.lookupRelation(
SparkSQLUtil.sessionState(sparkSession).catalog.lookupRelation(
TableIdentifier(identifier.getTableName,
Some(identifier.getDatabaseName)))) match {
case s: SubqueryAlias => s.child
Expand Down
Expand Up @@ -547,7 +547,7 @@ object MVHelper {
relation,
aliasMap,
keepAlias = false)
SortOrder(expressions.head, s.direction, s.sameOrderExpressions)
SortOrder(expressions.head, s.direction)
}
// In case of limit it goes to other.
case other => other
Expand Down
Expand Up @@ -17,7 +17,7 @@

package org.apache.carbondata.mv.rewrite

import org.apache.spark.internal.Logging
import org.apache.spark.Logging
import org.apache.spark.sql.catalyst.trees.TreeNode

abstract class MatchPattern[MatchingPlan <: TreeNode[MatchingPlan]] extends Logging {
Expand Down
Expand Up @@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.datasources.FindDataSourceTable
import org.apache.spark.sql.parser.CarbonSpark2SqlParser
import org.apache.spark.sql.util.SparkSQLUtil

import org.apache.carbondata.core.datamap.DataMapCatalog
import org.apache.carbondata.core.datamap.status.DataMapStatusManager
Expand All @@ -34,6 +35,7 @@ import org.apache.carbondata.mv.plans.modular.{Flags, ModularPlan, ModularRelati
import org.apache.carbondata.mv.plans.util.Signature
import org.apache.carbondata.mv.session.MVSession


/** Holds a summary logical plan */
private[mv] case class SummaryDataset(signature: Option[Signature],
plan: LogicalPlan,
Expand Down Expand Up @@ -114,7 +116,8 @@ private[mv] class SummaryDatasetCatalog(sparkSession: SparkSession)
mvSession.sessionState.optimizer.execute(planToRegister)).next().semiHarmonized
val signature = modularPlan.signature
val identifier = dataMapSchema.getRelationIdentifier
val output = new FindDataSourceTable(sparkSession).apply(sparkSession.sessionState.catalog
val output = new FindDataSourceTable(sparkSession)
.apply(SparkSQLUtil.sessionState(sparkSession).catalog
.lookupRelation(TableIdentifier(identifier.getTableName, Some(identifier.getDatabaseName))))
.output
val relation = ModularRelation(identifier.getDatabaseName,
Expand Down
Expand Up @@ -20,15 +20,15 @@ package org.apache.carbondata.mv.rewrite
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.hive.CarbonSessionCatalog
import org.scalatest.BeforeAndAfter

import org.apache.carbondata.mv.testutil.ModularPlanTest
import org.apache.spark.sql.util.SparkSQLUtil

class TestSQLSuite extends ModularPlanTest with BeforeAndAfter {
import org.apache.carbondata.mv.rewrite.matching.TestSQLBatch._

val spark = sqlContext
val testHive = sqlContext.sparkSession
val hiveClient = spark.sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog].getClient()
val hiveClient = SparkSQLUtil.sessionState(spark.sparkSession).catalog.asInstanceOf[CarbonSessionCatalog].getClient()

ignore("protypical mqo rewrite test") {

Expand Down
Expand Up @@ -20,8 +20,8 @@ package org.apache.carbondata.mv.rewrite
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.hive.CarbonSessionCatalog
import org.scalatest.BeforeAndAfter

import org.apache.carbondata.mv.testutil.ModularPlanTest
import org.apache.spark.sql.util.SparkSQLUtil
//import org.apache.spark.sql.catalyst.SQLBuilder
import java.io.{File, PrintWriter}

Expand All @@ -31,7 +31,7 @@ class Tpcds_1_4_Suite extends ModularPlanTest with BeforeAndAfter {

val spark = sqlContext
val testHive = sqlContext.sparkSession
val hiveClient = spark.sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog].getClient()
val hiveClient = SparkSQLUtil.sessionState(spark.sparkSession).catalog.asInstanceOf[CarbonSessionCatalog].getClient()

test("test using tpc-ds queries") {

Expand Down
Expand Up @@ -17,9 +17,9 @@

package org.apache.carbondata.mv.expressions.modular

import org.apache.spark.sql.catalyst.expressions.{AttributeSeq, AttributeSet, Expression, ExprId, LeafExpression, NamedExpression, OuterReference, PlanExpression, Predicate, Unevaluable}
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.SparkSQLUtil

import org.apache.carbondata.mv.plans.modular.ModularPlan

Expand Down Expand Up @@ -53,7 +53,8 @@ abstract class ModularSubquery(
def canonicalize(attrs: AttributeSeq): ModularSubquery = {
// Normalize the outer references in the subquery plan.
val normalizedPlan = plan.transformAllExpressions {
case OuterReference(r) => OuterReference(QueryPlan.normalizeExprId(r, attrs))
case OuterReference(r) => OuterReference(SparkSQLUtil.
invokeQueryPlannormalizeExprId(r, attrs))
}
withNewPlan(normalizedPlan).canonicalized.asInstanceOf[ModularSubquery]
}
Expand All @@ -80,7 +81,7 @@ case class ScalarModularSubquery(

override lazy val canonicalized: Expression = {
ScalarModularSubquery(
plan.canonicalized,
plan.canonicalizedDef,
children.map(_.canonicalized),
ExprId(0))
}
Expand Down Expand Up @@ -122,7 +123,7 @@ case class ModularListQuery(

override lazy val canonicalized: Expression = {
ModularListQuery(
plan.canonicalized,
plan.canonicalizedDef,
children.map(_.canonicalized),
ExprId(0))
}
Expand Down Expand Up @@ -153,7 +154,7 @@ case class ModularExists(

override lazy val canonicalized: Expression = {
ModularExists(
plan.canonicalized,
plan.canonicalizedDef,
children.map(_.canonicalized),
ExprId(0))
}
Expand Down
Expand Up @@ -19,7 +19,8 @@ package org.apache.carbondata.mv.plans.modular

import scala.collection._

import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeMap, Cast, Divide, ExprId, Literal, NamedExpression}
import org.apache.spark.sql.CarbonExpressions.MatchCast
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeMap, Divide, ExprId, Literal, NamedExpression}
import org.apache.spark.sql.catalyst.expressions.aggregate._

trait AggregatePushDown { // self: ModularPlan =>
Expand Down Expand Up @@ -106,12 +107,11 @@ trait AggregatePushDown { // self: ModularPlan =>
} else {
Map.empty[Int, (NamedExpression, Seq[NamedExpression])]
}
case sum@AggregateExpression(Sum(Cast(expr, dataType, timeZoneId)), _, false, _)
if expr.isInstanceOf[Attribute] =>
case sum@AggregateExpression(Sum(cast@MatchCast(expr, dataType)), _, false, _) =>
val tAttr = selAliasMap.get(expr.asInstanceOf[Attribute]).getOrElse(expr)
.asInstanceOf[Attribute]
if (fact.outputSet.contains(tAttr)) {
val sum1 = AggregateExpression(Sum(Cast(tAttr, dataType, timeZoneId)), sum.mode, false)
val sum1 = AggregateExpression(Sum(cast), sum.mode, false)
val alias = Alias(sum1, sum1.toString)()
val tSum = AggregateExpression(Sum(alias.toAttribute), sum.mode, false, sum.resultId)
val (name, id) = aliasInfo.getOrElse(("", NamedExpression.newExprId))
Expand Down
Expand Up @@ -20,7 +20,7 @@ package org.apache.carbondata.mv.plans.modular
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.SQLConf

import org.apache.carbondata.mv.plans
import org.apache.carbondata.mv.plans._
Expand Down
Expand Up @@ -20,13 +20,13 @@ package org.apache.carbondata.mv.plans.modular
import scala.collection._
import scala.collection.mutable.{HashMap, MultiMap}

import org.apache.spark.internal.Logging
import org.apache.spark.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.{Expression, PredicateHelper}
import org.apache.spark.sql.catalyst.plans.{JoinType, QueryPlan}
import org.apache.spark.sql.catalyst.plans.logical.Statistics
import org.apache.spark.sql.catalyst.trees.TreeNode
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.SQLConf
import org.apache.spark.sql.util.SparkSQLUtil

import org.apache.carbondata.mv.plans._
Expand All @@ -45,6 +45,10 @@ abstract class ModularPlan

lazy val resolved: Boolean = expressions.forall(_.resolved) && childrenResolved

def canonicalizedDef: ModularPlan = {
canonicalized
}

def childrenResolved: Boolean = children.forall(_.resolved)

private var statsCache: Option[Statistics] = None
Expand Down
Expand Up @@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.plans.JoinType
import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical.Statistics
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.SQLConf
import org.apache.spark.sql.util.SparkSQLUtil

import org.apache.carbondata.mv.plans.modular.Flags._
Expand All @@ -47,14 +47,7 @@ case class ModularRelation(databaseName: String,
override def computeStats(spark: SparkSession, conf: SQLConf): Statistics = {
val plan = spark.table(s"${ databaseName }.${ tableName }").queryExecution.optimizedPlan
val stats = SparkSQLUtil.invokeStatsMethod(plan, conf)
val output = outputList.map(_.toAttribute)
val mapSeq = plan.collect { case n: logical.LeafNode => n }.map {
table => AttributeMap(table.output.zip(output))
}
val rewrites = mapSeq(0)
val attributeStats = AttributeMap(stats.attributeStats.iterator
.map { pair => (rewrites(pair._1), pair._2) }.toSeq)
Statistics(stats.sizeInBytes, stats.rowCount, attributeStats, stats.hints)
SparkSQLUtil.getStatisticsObj(outputList, plan, stats)
}

override def output: Seq[Attribute] = outputList.map(_.toAttribute)
Expand Down Expand Up @@ -155,10 +148,6 @@ case class HarmonizedRelation(source: ModularPlan) extends LeafNode {
val stats = SparkSQLUtil.invokeStatsMethod(plan, conf)
val output = source.asInstanceOf[GroupBy].child.children(0).asInstanceOf[ModularRelation]
.outputList.map(_.toAttribute)
val mapSeq = plan.collect { case n: logical.LeafNode => n }.map {
table => AttributeMap(table.output.zip(output))
}
val rewrites = mapSeq.head
val aliasMap = AttributeMap(
source.asInstanceOf[GroupBy].outputList.collect {
case a@Alias(ar: Attribute, _) => (ar, a.toAttribute)
Expand All @@ -167,12 +156,7 @@ case class HarmonizedRelation(source: ModularPlan) extends LeafNode {
case a@Alias(AggregateExpression(Last(ar: Attribute, _), _, _, _), _) =>
(ar, a.toAttribute)
})
val aStatsIterator = stats.attributeStats.iterator.map { pair => (rewrites(pair._1), pair._2) }
val attributeStats =
AttributeMap(
aStatsIterator.map(pair => (aliasMap.get(pair._1).getOrElse(pair._1), pair._2)).toSeq)

Statistics(stats.sizeInBytes, None, attributeStats, stats.hints)
SparkSQLUtil.getStatisticsObj(output, plan, stats, Option(aliasMap))
}

override def output: Seq[Attribute] = source.output
Expand Down
Expand Up @@ -17,7 +17,7 @@

package org.apache.carbondata.mv.plans.modular

import org.apache.spark.internal.Logging
import org.apache.spark.Logging
import org.apache.spark.sql.catalyst.expressions.{Exists, ListQuery, ScalarSubquery}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.trees.TreeNode
Expand Down
Expand Up @@ -23,13 +23,13 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.optimizer._
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, _}
import org.apache.spark.sql.catalyst.rules.{RuleExecutor, _}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.SQLConf
import org.apache.spark.sql.util.SparkSQLUtil

object BirdcageOptimizer extends RuleExecutor[LogicalPlan] {

val conf = new SQLConf()
.copy(SQLConf.CASE_SENSITIVE -> true, SQLConf.STARSCHEMA_DETECTION -> true)

protected val fixedPoint = FixedPoint(conf.optimizerMaxIterations)

def batches: Seq[Batch] = {
Expand All @@ -40,7 +40,7 @@ object BirdcageOptimizer extends RuleExecutor[LogicalPlan] {
Batch(
"Finish Analysis", Once,
EliminateSubqueryAliases,
EliminateView,
SparkSQLUtil.getEliminateViewObj(),
ReplaceExpressions,
ComputeCurrentTime,
// GetCurrentDatabase(sessionCatalog),
Expand All @@ -59,7 +59,7 @@ object BirdcageOptimizer extends RuleExecutor[LogicalPlan] {
CombineUnions) ::
Batch(
"Pullup Correlated Expressions", Once,
PullupCorrelatedPredicates) ::
SparkSQLUtil.getPullupCorrelatedPredicatesObj()) ::
Batch(
"Subquery", Once,
OptimizeSubqueries) ::
Expand Down Expand Up @@ -107,7 +107,7 @@ object BirdcageOptimizer extends RuleExecutor[LogicalPlan] {
SimplifyCaseConversionExpressions,
RewriteCorrelatedScalarSubquery,
EliminateSerialization,
RemoveRedundantAliases,
SparkSQLUtil.getRemoveRedundantAliasesObj(),
RemoveRedundantProject,
SimplifyCreateStructOps,
SimplifyCreateArrayOps,
Expand Down
Expand Up @@ -17,16 +17,17 @@

package org.apache.carbondata.mv.plans.util

import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference,
AttributeSet, Expression, NamedExpression, PredicateHelper}
import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, AttributeSet, Expression, NamedExpression, PredicateHelper}
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution.datasources.LogicalRelation

import org.apache.carbondata.mv.plans.modular.Flags._
import org.apache.carbondata.mv.plans.modular.JoinEdge



/**
* SelectModule is extracted from logical plan of SPJG query. All join conditions
* filter, and project operators in a single Aggregate-less subtree of logical plan
Expand Down Expand Up @@ -335,13 +336,13 @@ object ExtractTableModule extends PredicateHelper {
def unapply(plan: LogicalPlan): Option[ReturnType] = {
plan match {
// uncomment for cloudera1 version
// case m: CatalogRelation =>
// Some(m.tableMeta.database, m.tableMeta.identifier.table, m.output, Nil, NoFlags,
// Seq.empty)
// uncomment for apache version
// case m: CatalogRelation =>
// Some(m.tableMeta.database, m.tableMeta.identifier.table, m.output, Nil, NoFlags,
// Seq.empty)
// uncomment for apache version
case m: HiveTableRelation =>
Some(m.tableMeta.database, m.tableMeta.identifier.table, m.output, Nil, NoFlags,
Seq.empty)
Some(m.tableMeta.database, m.tableMeta.identifier.table, m.output, Nil, NoFlags,
Seq.empty)
case l: LogicalRelation =>
val tableIdentifier = l.catalogTable.map(_.identifier)
val database = tableIdentifier.map(_.database).flatten.getOrElse(null)
Expand Down
Expand Up @@ -17,6 +17,7 @@

package org.apache.carbondata.mv.plans.util

import org.apache.spark.sql.CarbonExpressions.MatchCastExpression
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeMap, AttributeReference, AttributeSet, BitwiseAnd, Cast, Expression, Grouping, GroupingID, Literal, NamedExpression, ShiftRight}
import org.apache.spark.sql.catalyst.plans.JoinType
import org.apache.spark.sql.types.{ByteType, IntegerType}
Expand Down Expand Up @@ -398,10 +399,10 @@ trait SQLBuildDSL {
// it back.
case ar: AttributeReference if ar == gid => GroupingID(Nil)
case ar: AttributeReference if groupByAttrMap.contains(ar) => groupByAttrMap(ar)
case a@Cast(
case a@MatchCastExpression(
BitwiseAnd(
ShiftRight(ar: AttributeReference, Literal(value: Any, IntegerType)),
Literal(1, IntegerType)), ByteType, None) if ar == gid =>
Literal(1, IntegerType)), ByteType) if ar == gid =>
// for converting an expression to its original SQL format grouping(col)
val idx = groupByExprs.length - 1 - value.asInstanceOf[Int]
groupByExprs.lift(idx).map(Grouping).getOrElse(a)
Expand Down
Expand Up @@ -220,15 +220,6 @@ class SQLBuilder private(
}
}
}

object RemoveCasts extends Rule[ModularPlan] {
def apply(tree: ModularPlan): ModularPlan = {
tree transformAllExpressions {
case Cast(e, dataType, _) => e
}
}
}

}

object SQLBuilder {
Expand Down
Expand Up @@ -17,7 +17,7 @@

package org.apache.carbondata.mv.plans.util

import org.apache.spark.internal.Logging
import org.apache.spark.Logging
import org.apache.spark.sql.catalyst.trees.TreeNode

case class Signature(groupby: Boolean = true, datasets: Set[String] = Set.empty)
Expand Down

0 comments on commit 4d7c8ad

Please sign in to comment.