Skip to content

Commit

Permalink
Merge b401a73 into 8f0ec97
Browse files Browse the repository at this point in the history
  • Loading branch information
ravipesala committed Jul 10, 2019
2 parents 8f0ec97 + b401a73 commit d4b0967
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,26 +16,28 @@
*/
package org.apache.spark.sql.hive

import java.util.concurrent.Callable

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.carbondata.execution.datasources.CarbonSparkDataSourceUtil
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.{QualifiedTableName, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.optimizer.Optimizer
import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.strategy.{CarbonLateDecodeStrategy, DDLStrategy, StreamingTableStrategy}
import org.apache.spark.sql.internal.{SQLConf, SessionResourceLoader, SessionState, SessionStateBuilder}
import org.apache.spark.sql.optimizer.{CarbonIUDRule, CarbonLateDecodeRule, CarbonUDFTransformRule}
import org.apache.spark.sql.parser.CarbonSparkSqlParser
import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.sql.{CarbonEnv, SparkSession}
import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, SparkSession}

import org.apache.carbondata.core.metadata.schema.table.column.{ColumnSchema => ColumnSchema}
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
import org.apache.carbondata.core.util.CarbonUtil
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.format.TableInfo
Expand Down Expand Up @@ -154,14 +156,22 @@ class InMemorySessionCatalog(
}

override def lookupRelation(name: TableIdentifier): LogicalPlan = {
val rtnRelation = super.lookupRelation(name)
var rtnRelation = super.lookupRelation(name)
val isRelationRefreshed =
CarbonSessionUtil.refreshRelation(rtnRelation, name)(sparkSession)
CarbonSessionUtil.refreshRelationAndSetStats(rtnRelation, name)(sparkSession)
if (isRelationRefreshed) {
super.lookupRelation(name)
} else {
rtnRelation
rtnRelation = super.lookupRelation(name)
// Reset the stats after lookup.
CarbonSessionUtil.refreshRelationAndSetStats(rtnRelation, name)(sparkSession)
}
rtnRelation
}


override def getCachedPlan(t: QualifiedTableName,
c: Callable[LogicalPlan]): LogicalPlan = {
val plan = super.getCachedPlan(t, c)
CarbonSessionUtil.updateCachedPlan(plan)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,18 @@
*/
package org.apache.spark.sql.hive

import java.util.concurrent.Callable

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.{QualifiedTableName, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.optimizer.Optimizer
import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.datasources.{FindDataSourceTable, PreWriteCheck, ResolveSQLOnFile, _}
import org.apache.spark.sql.execution.strategy.{CarbonLateDecodeStrategy, DDLStrategy, StreamingTableStrategy}
Expand All @@ -34,7 +36,7 @@ import org.apache.spark.sql.internal.{SQLConf, SessionState}
import org.apache.spark.sql.optimizer.{CarbonIUDRule, CarbonLateDecodeRule, CarbonUDFTransformRule}
import org.apache.spark.sql.parser.CarbonSparkSqlParser

import org.apache.carbondata.core.metadata.schema.table.column.{ColumnSchema => ColumnSchema}
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
import org.apache.carbondata.spark.util.CarbonScalaUtil

/**
Expand Down Expand Up @@ -86,14 +88,21 @@ class CarbonHiveSessionCatalog(
CarbonEnv.init

override def lookupRelation(name: TableIdentifier): LogicalPlan = {
val rtnRelation = super.lookupRelation(name)
var rtnRelation = super.lookupRelation(name)
val isRelationRefreshed =
CarbonSessionUtil.refreshRelation(rtnRelation, name)(sparkSession)
CarbonSessionUtil.refreshRelationAndSetStats(rtnRelation, name)(sparkSession)
if (isRelationRefreshed) {
super.lookupRelation(name)
} else {
rtnRelation
rtnRelation = super.lookupRelation(name)
// Reset the stats after lookup.
CarbonSessionUtil.refreshRelationAndSetStats(rtnRelation, name)(sparkSession)
}
rtnRelation
}

override def getCachedPlan(t: QualifiedTableName,
c: Callable[LogicalPlan]): LogicalPlan = {
val plan = super.getCachedPlan(t, c)
CarbonSessionUtil.updateCachedPlan(plan)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

package org.apache.spark.sql.hive

import org.apache.spark.sql.catalyst.TableIdentifier
import java.util.concurrent.Callable

import org.apache.spark.sql.catalyst.{QualifiedTableName, TableIdentifier}
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTablePartition, ExternalCatalogUtils}
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
Expand All @@ -29,9 +31,10 @@ import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.sql.util.SparkTypeConverter

import scala.collection.mutable.ArrayBuffer

import org.apache.spark.sql.execution.datasources.LogicalRelation

/**
* This class refresh the relation from cache if the carbontable in
* carbon catalog is not same as cached carbon relation's carbon table.
Expand All @@ -48,16 +51,43 @@ object CarbonSessionUtil {
* @param sparkSession
* @return
*/
def refreshRelation(rtnRelation: LogicalPlan, name: TableIdentifier)
def refreshRelationAndSetStats(rtnRelation: LogicalPlan, name: TableIdentifier)
(sparkSession: SparkSession): Boolean = {
var isRelationRefreshed = false

/**
* Set the stats to none in case of carbontable
*/
def setStatsNone(catalogTable: CatalogTable): Unit = {
catalogTable.provider match {
case Some(provider)
if provider.equals("org.apache.spark.sql.CarbonSource") ||
provider.equalsIgnoreCase("carbondata") =>
// Update stats to none in case of carbon table as we are not expecting any stats from
// Hive. Hive gives wrong stats for carbon table.
catalogTable.stats match {
case Some(stats) =>
CarbonReflectionUtils.setFieldToCaseClass(catalogTable, "stats", None)
case _ =>
}
isRelationRefreshed =
CarbonEnv.refreshRelationFromCache(catalogTable.identifier)(sparkSession)
case _ =>
}
}

rtnRelation match {
case SubqueryAlias(_,
MatchLogicalRelation(_: CarbonDatasourceHadoopRelation, _, _)
) =>
MatchLogicalRelation(_: CarbonDatasourceHadoopRelation, _, catalogTable)) =>
isRelationRefreshed = CarbonEnv.refreshRelationFromCache(name)(sparkSession)
case MatchLogicalRelation(_: CarbonDatasourceHadoopRelation, _, _) =>
if (catalogTable.isInstanceOf[Option[CatalogTable]]) {
catalogTable.asInstanceOf[Option[CatalogTable]].foreach(setStatsNone)
}
case MatchLogicalRelation(_: CarbonDatasourceHadoopRelation, _, catalogTable) =>
isRelationRefreshed = CarbonEnv.refreshRelationFromCache(name)(sparkSession)
if (catalogTable.isInstanceOf[Option[CatalogTable]]) {
catalogTable.asInstanceOf[Option[CatalogTable]].foreach(setStatsNone)
}
case SubqueryAlias(_, relation) if
relation.getClass.getName.equals("org.apache.spark.sql.catalyst.catalog.CatalogRelation") ||
relation.getClass.getName
Expand All @@ -70,21 +100,7 @@ object CarbonSessionUtil {
"tableMeta",
relation
).asInstanceOf[CatalogTable]
catalogTable.provider match {
case Some(provider)
if provider.equals("org.apache.spark.sql.CarbonSource") ||
provider.equalsIgnoreCase("carbondata") =>
// Update stats to none in case of carbon table as we are not expecting any stats from
// Hive. Hive gives wrong stats for carbon table.
catalogTable.stats match {
case Some(stats) =>
CarbonReflectionUtils.setFieldToCaseClass(catalogTable, "stats", None)
case _ =>
}
isRelationRefreshed =
CarbonEnv.refreshRelationFromCache(catalogTable.identifier)(sparkSession)
case _ =>
}
setStatsNone(catalogTable)
case _ =>
}
isRelationRefreshed
Expand Down Expand Up @@ -159,4 +175,15 @@ object CarbonSessionUtil {
StructType(colArray))
}

def updateCachedPlan(plan: LogicalPlan): LogicalPlan = {
plan match {
case sa@SubqueryAlias(_,
MatchLogicalRelation(_: CarbonDatasourceHadoopRelation, _, catalogTable)) =>
sa.copy(child = sa.child.asInstanceOf[LogicalRelation].copy())
case MatchLogicalRelation(_: CarbonDatasourceHadoopRelation, _, _) =>
plan.asInstanceOf[LogicalRelation].copy()
case _ => plan
}
}

}

0 comments on commit d4b0967

Please sign in to comment.