Skip to content

Commit

Permalink
[CARBONDATA-2569] Change the strategy of Search mode throw exception …
Browse files Browse the repository at this point in the history
…and run sparkSQL

Search mode throw exception but test case pass, please check the jira.

This closes #2357
  • Loading branch information
xubo245 authored and ravipesala committed Jun 11, 2018
1 parent 041603d commit 83ee2c4
Showing 1 changed file with 8 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,8 @@ import org.apache.spark.sql.profiler.{Profiler, SQLStart}
import org.apache.spark.util.{CarbonReflectionUtils, Utils}

import org.apache.carbondata.common.annotations.InterfaceAudience
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.scan.expression.LiteralExpression
import org.apache.carbondata.core.util.{CarbonProperties, CarbonSessionInfo, ThreadLocalSessionInfo}
import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
import org.apache.carbondata.store.SparkCarbonStore
Expand Down Expand Up @@ -101,8 +100,8 @@ class CarbonSession(@transient val sc: SparkContext,
} catch {
case e: Exception =>
logError(String.format(
"Exception when executing search mode: %s, fallback to SparkSQL", e.getMessage))
new Dataset[Row](self, qe, RowEncoder(qe.analyzed.schema))
"Exception when executing search mode: %s", e.getMessage))
throw e;
}
} else {
new Dataset[Row](self, qe, RowEncoder(qe.analyzed.schema))
Expand Down Expand Up @@ -171,19 +170,24 @@ class CarbonSession(@transient val sc: SparkContext,
*/
private def trySearchMode(qe: QueryExecution, sse: SQLStart): DataFrame = {
val analyzed = qe.analyzed
val LOG: LogService = LogServiceFactory.getLogService(this.getClass.getName)
analyzed match {
case _@Project(columns, _@Filter(expr, s: SubqueryAlias))
if s.child.isInstanceOf[LogicalRelation] &&
s.child.asInstanceOf[LogicalRelation].relation
.isInstanceOf[CarbonDatasourceHadoopRelation] =>
LOG.info(s"Search service started and supports filter: ${sse.sqlText}")
runSearch(analyzed, columns, expr, s.child.asInstanceOf[LogicalRelation])
case gl@GlobalLimit(_, ll@LocalLimit(_, p@Project(columns, _@Filter(expr, s: SubqueryAlias))))
if s.child.isInstanceOf[LogicalRelation] &&
s.child.asInstanceOf[LogicalRelation].relation
.isInstanceOf[CarbonDatasourceHadoopRelation] =>
val logicalRelation = s.child.asInstanceOf[LogicalRelation]
LOG.info(s"Search service started and supports limit: ${sse.sqlText}")
runSearch(analyzed, columns, expr, logicalRelation, gl.maxRows, ll.maxRows)
case _ =>
LOG.info(s"Search service started, but don't support: ${sse.sqlText}," +
s" and will run it with SparkSQL")
new Dataset[Row](self, qe, RowEncoder(qe.analyzed.schema))
}
}
Expand Down

0 comments on commit 83ee2c4

Please sign in to comment.