Skip to content

Commit

Permalink
Merge 349c912 into ca32374
Browse files Browse the repository at this point in the history
  • Loading branch information
Shubh18s committed Dec 27, 2018
2 parents ca32374 + 349c912 commit 94bbf7a
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1445,16 +1445,10 @@ private CarbonCommonConstants() {

@CarbonProperty(dynamicConfigurable = true)
public static final String SUPPORT_DIRECT_QUERY_ON_DATAMAP =
"carbon.query.directQueryOnDataMap.enabled";
"carbon.query.directQueryOnDataMap.enabled";

public static final String SUPPORT_DIRECT_QUERY_ON_DATAMAP_DEFAULTVALUE = "false";

@CarbonProperty
public static final String VALIDATE_DIRECT_QUERY_ON_DATAMAP =
"carbon.query.validate.direct.query.on.datamap";

public static final String VALIDATE_DIRECT_QUERY_ON_DATAMAP_DEFAULTVALUE = "true";

@CarbonProperty
public static final String CARBON_SHOW_DATAMAPS = "carbon.query.show.datamaps";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -487,48 +487,6 @@ class TestPreAggCreateCommand extends QueryTest with BeforeAndAfterAll {
executorService.shutdown()
}

test("support set carbon.query.directQueryOnDataMap.enabled=true") {
val rootPath = new File(this.getClass.getResource("/").getPath
+ "../../../..").getCanonicalPath
val testData = s"$rootPath/integration/spark-common-test/src/test/resources/sample.csv"
sql("drop table if exists mainTable")
sql(
s"""
| CREATE TABLE mainTable
| (id Int,
| name String,
| city String,
| age Int)
| STORED BY 'org.apache.carbondata.format'
""".stripMargin)

sql(
s"""
| LOAD DATA LOCAL INPATH '$testData'
| into table mainTable
""".stripMargin)

sql(
s"""
| create datamap preagg_sum on table mainTable
| using 'preaggregate'
| as select id,sum(age) from mainTable group by id
""".stripMargin)

CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.VALIDATE_DIRECT_QUERY_ON_DATAMAP, "true")

sql("set carbon.query.directQueryOnDataMap.enabled=true")
checkAnswer(sql("select count(*) from maintable_preagg_sum"), Row(4))
sql("set carbon.query.directQueryOnDataMap.enabled=false")
val exception: Exception = intercept[AnalysisException] {
sql("select count(*) from maintable_preagg_sum").collect()
}
assert(exception.getMessage.contains("Query On DataMap not supported"))
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.VALIDATE_DIRECT_QUERY_ON_DATAMAP, "false")
}

class QueryTask(query: String) extends Callable[String] {
override def call(): String = {
var result = "SUCCESS"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class QueryTest extends PlanTest {
Locale.setDefault(Locale.US)

CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.VALIDATE_DIRECT_QUERY_ON_DATAMAP, "false")
.addProperty(CarbonCommonConstants.SUPPORT_DIRECT_QUERY_ON_DATAMAP, "true")

/**
* Runs the plan and makes sure the answer contains all of the keywords, or the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -626,22 +626,29 @@ object PreAggregateUtil {
// schema ordinal should be considered
columns.sortBy(_.getSchemaOrdinal).foreach { a =>
if (a.getAggFunction.nonEmpty) {
aggregateColumns += s"${a.getAggFunction match {
case "count" => "sum"
case _ => a.getAggFunction}}(${a.getColumnName})"
aggregateColumns += s"${
a.getAggFunction match {
case "count" => "sum"
case _ => a.getAggFunction
}
}(${ a.getColumnName })"
} else {
groupingExpressions += a.getColumnName
aggregateColumns+= a.getColumnName
aggregateColumns += a.getColumnName
}
}
val groupByString = if (groupingExpressions.nonEmpty) {
s" group by ${ groupingExpressions.mkString(",") }"
} else { "" }
s"select ${ aggregateColumns.mkString(",") } " +
s"from $databaseName.${ tableSchema.getTableName }" +
s" group by ${ groupingExpressions.mkString(",") }"
s"from $databaseName.${ tableSchema.getTableName }" + groupByString
}

/**
/** CarbonProperties.getInstance().addProperty(CarbonCommonConstants
* .VALIDATE_DIRECT_QUERY_ON_DATAMAP, "true")
* Below method will be used to get the select query when rollup policy is
* applied in case of timeseries table
*
* @param tableSchema main data map schema
* @param selectedDataMapSchema selected data map schema for rollup
* @return select query based on rolloup
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,22 +106,27 @@ class CarbonLateDecodeRule extends Rule[LogicalPlan] with PredicateHelper {
def validateQueryDirectlyOnDataMap(relations: Seq[CarbonDecoderRelation]): Unit = {
var isPreAggDataMapExists = false
// first check if pre aggregate data map exists or not
relations.foreach{relation =>
relations.foreach { relation =>
if (relation.carbonRelation.carbonTable.isChildDataMap) {
isPreAggDataMapExists = true
}
}
val validateQuery = CarbonProperties.getInstance
.getProperty(CarbonCommonConstants.VALIDATE_DIRECT_QUERY_ON_DATAMAP,
CarbonCommonConstants.VALIDATE_DIRECT_QUERY_ON_DATAMAP_DEFAULTVALUE).toBoolean
var isThrowException = false
// if validate query is enabled and relation contains pre aggregate data map
if (validateQuery && isPreAggDataMapExists) {
// if relation contains pre aggregate data map
if (isPreAggDataMapExists) {
val carbonSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo
if (null != carbonSessionInfo) {
val supportQueryOnDataMap = CarbonProperties.getInstance.getProperty(
CarbonCommonConstants.SUPPORT_DIRECT_QUERY_ON_DATAMAP,
CarbonCommonConstants.SUPPORT_DIRECT_QUERY_ON_DATAMAP_DEFAULTVALUE).toBoolean
lazy val sessionPropertyValue = CarbonProperties.getInstance
.getProperty(CarbonCommonConstants.SUPPORT_DIRECT_QUERY_ON_DATAMAP,
CarbonCommonConstants.SUPPORT_DIRECT_QUERY_ON_DATAMAP_DEFAULTVALUE)
// Check if property is set in thread params which would mean this is an internal call
// (from load or compaction) and should be of highest priority. Otherwise get from
// session(like user has dynamically given the value using set command). If not found in
// session then look for the property in carbon.properties file, otherwise use default
// value 'false'.
val supportQueryOnDataMap = CarbonEnv
.getThreadParam(CarbonCommonConstants.SUPPORT_DIRECT_QUERY_ON_DATAMAP,
sessionPropertyValue).toBoolean
if (!supportQueryOnDataMap) {
isThrowException = true
}
Expand Down

0 comments on commit 94bbf7a

Please sign in to comment.