Skip to content

Commit

Permalink
Fix PreAggregate Datamap issue
Browse files Browse the repository at this point in the history
  • Loading branch information
Shubh18s committed Dec 27, 2018
1 parent 0e2921b commit 80094ca
Show file tree
Hide file tree
Showing 7 changed files with 31 additions and 116 deletions.
Expand Up @@ -1445,16 +1445,10 @@ private CarbonCommonConstants() {

@CarbonProperty(dynamicConfigurable = true)
public static final String SUPPORT_DIRECT_QUERY_ON_DATAMAP =
"carbon.query.directQueryOnDataMap.enabled";
"carbon.query.directQueryOnDataMap.enabled";

public static final String SUPPORT_DIRECT_QUERY_ON_DATAMAP_DEFAULTVALUE = "false";

@CarbonProperty
public static final String VALIDATE_DIRECT_QUERY_ON_DATAMAP =
"carbon.query.validate.direct.query.on.datamap";

public static final String VALIDATE_DIRECT_QUERY_ON_DATAMAP_DEFAULTVALUE = "true";

@CarbonProperty
public static final String CARBON_SHOW_DATAMAPS = "carbon.query.show.datamaps";

Expand Down
3 changes: 1 addition & 2 deletions docs/configuration-parameters.md
Expand Up @@ -135,7 +135,6 @@ This section provides the details of all the configurations required for the Car
| carbon.custom.block.distribution | false | CarbonData has its own scheduling algorithm to suggest to Spark on how many tasks needs to be launched and how much work each task need to do in a Spark cluster for any query on CarbonData. When this configuration is true, CarbonData would distribute the available blocks to be scanned among the available number of cores. For Example:If there are 10 blocks to be scanned and only 3 tasks can be run(only 3 executor cores available in the cluster), CarbonData would combine blocks as 4,3,3 and give it to 3 tasks to run. **NOTE:** When this configuration is false, as per the ***carbon.task.distribution*** configuration, each block/blocklet would be given to each task. |
| enable.query.statistics | false | CarbonData has extensive logging which would be useful for debugging issues related to performance or hard to locate issues. This configuration when made ***true*** would log additional query statistics information to more accurately locate the issues being debugged.**NOTE:** Enabling this would log more debug information to log files, there by increasing the log files size significantly in short span of time. It is advised to configure the log files size, retention of log files parameters in log4j properties appropriately. Also extensive logging is an increased IO operation and hence over all query performance might get reduced. Therefore it is recommended to enable this configuration only for the duration of debugging. |
| enable.unsafe.in.query.processing | false | CarbonData supports unsafe operations of Java to avoid GC overhead for certain operations. This configuration enables to use unsafe functions in CarbonData while scanning the data during query. |
| carbon.query.validate.direct.query.on.datamap | true | CarbonData supports creating pre-aggregate table datamaps as an independent tables. For some debugging purposes, it might be required to directly query from such datamap tables. This configuration allows to query on such datamaps. |
| carbon.max.driver.threads.for.block.pruning | 4 | Number of threads used for driver pruning when the carbon files are more than 100k Maximum memory. This configuration can used to set number of threads between 1 to 4. |
| carbon.heap.memory.pooling.threshold.bytes | 1048576 | CarbonData supports unsafe operations of Java to avoid GC overhead for certain operations. Using unsafe, memory can be allocated on Java Heap or off heap. This configuration controls the allocation mechanism on Java HEAP. If the heap memory allocations of the given size is greater or equal than this value,it should go through the pooling mechanism. But if set this size to -1, it should not go through the pooling mechanism. Default value is 1048576(1MB, the same as Spark). Value to be specified in bytes. |
| carbon.push.rowfilters.for.vector | false | When enabled complete row filters will be handled by carbon in case of vector. If it is disabled then only page level pruning will be done by carbon and row level filtering will be done by spark for vector. And also there are scan optimizations in carbon to avoid multiple data copies when this parameter is set to false. There is no change in flow for non-vector based queries. |
Expand Down Expand Up @@ -211,7 +210,7 @@ RESET
| carbon.options.sort.scope | Specifies how the current data load should be sorted with. **NOTE:** Refer to [Data Loading Configuration](#data-loading-configuration)#carbon.sort.scope for detailed information. |
| carbon.options.global.sort.partitions | |
| carbon.options.serialization.null.format | Default Null value representation in the data being loaded. **NOTE:** Refer to [Data Loading Configuration](#data-loading-configuration)#carbon.options.serialization.null.format for detailed information. |
| carbon.query.directQueryOnDataMap.enabled | Specifies whether datamap can be queried directly. This is useful for debugging purposes.**NOTE: **Refer to [Query Configuration](#query-configuration)#carbon.query.validate.direct.query.on.datamap for detailed information. |
| carbon.query.directQueryOnDataMap.enabled | Specifies whether datamap can be queried directly. This is useful for debugging purposes.**NOTE: **Refer to [Query Configuration](#query-configuration) for detailed information. |

**Examples:**

Expand Down
Expand Up @@ -487,48 +487,6 @@ class TestPreAggCreateCommand extends QueryTest with BeforeAndAfterAll {
executorService.shutdown()
}

test("support set carbon.query.directQueryOnDataMap.enabled=true") {
val rootPath = new File(this.getClass.getResource("/").getPath
+ "../../../..").getCanonicalPath
val testData = s"$rootPath/integration/spark-common-test/src/test/resources/sample.csv"
sql("drop table if exists mainTable")
sql(
s"""
| CREATE TABLE mainTable
| (id Int,
| name String,
| city String,
| age Int)
| STORED BY 'org.apache.carbondata.format'
""".stripMargin)

sql(
s"""
| LOAD DATA LOCAL INPATH '$testData'
| into table mainTable
""".stripMargin)

sql(
s"""
| create datamap preagg_sum on table mainTable
| using 'preaggregate'
| as select id,sum(age) from mainTable group by id
""".stripMargin)

CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.VALIDATE_DIRECT_QUERY_ON_DATAMAP, "true")

sql("set carbon.query.directQueryOnDataMap.enabled=true")
checkAnswer(sql("select count(*) from maintable_preagg_sum"), Row(4))
sql("set carbon.query.directQueryOnDataMap.enabled=false")
val exception: Exception = intercept[AnalysisException] {
sql("select count(*) from maintable_preagg_sum").collect()
}
assert(exception.getMessage.contains("Query On DataMap not supported"))
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.VALIDATE_DIRECT_QUERY_ON_DATAMAP, "false")
}

class QueryTask(query: String) extends Callable[String] {
override def call(): String = {
var result = "SUCCESS"
Expand Down
Expand Up @@ -2537,54 +2537,6 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
FileUtils.deleteDirectory(new File(writerPath))
}

test("check varchar with trailing space") {
FileUtils.deleteDirectory(new File(writerPath))
val fields: Array[Field] = new Array[Field](8)

fields(0) = new Field("Event_ID", DataTypes.STRING)
fields(1) = new Field("Event_Time", DataTypes.TIMESTAMP)
fields(2) = new Field("subject", DataTypes.VARCHAR)
fields(3) = new Field("From_Email", DataTypes.STRING)
fields(4) = new Field("To_Email", DataTypes.createArrayType(DataTypes.STRING))
fields(5) = new Field("CC_Email", DataTypes.createArrayType(DataTypes.STRING))
fields(6) = new Field("BCC_Email", DataTypes.createArrayType(DataTypes.STRING))
fields(7) = new Field("messagebody ", DataTypes.VARCHAR)

var options = Map("bAd_RECords_action" -> "FORCE").asJava
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "EEE, d MMM yyyy HH:mm:ss Z")

val writer: CarbonWriter = CarbonWriter.builder
.outputPath(writerPath)
.withCsvInput(new Schema(fields)).writtenBy("TestNonTransactionalCarbonTable").build()

writer
.write(Array("aaa",
"Fri, 4 May 2001 13:51:00 -0700 (PDT)",
"Re",
"cdn@ws.com",
"sd#er",
"sd",
"sds",
"ew"))
writer.close()

sql("drop table if exists test")
sql(
s"""CREATE TABLE test using carbon options('long_string_columns'='subject,messagebody')
|LOCATION '$writerPath'"""
.stripMargin)
checkAnswer(sql("select * from test"), Seq(Row("aaa",
Timestamp.valueOf("2001-05-04 13:51:00.0"),
"Re",
"cdn@ws.com",
mutable.WrappedArray.make(Array("sd#er")),
mutable.WrappedArray.make(Array("sd")),
mutable.WrappedArray.make(Array("sds")),
"ew")))
FileUtils.deleteDirectory(new File(writerPath))
}

def generateCarbonData(builder :CarbonWriterBuilder): Unit ={
val fields = new Array[Field](3)
fields(0) = new Field("name", DataTypes.STRING)
Expand Down
Expand Up @@ -42,7 +42,7 @@ class QueryTest extends PlanTest {
Locale.setDefault(Locale.US)

CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.VALIDATE_DIRECT_QUERY_ON_DATAMAP, "false")
.addProperty(CarbonCommonConstants.SUPPORT_DIRECT_QUERY_ON_DATAMAP, "true")

/**
* Runs the plan and makes sure the answer contains all of the keywords, or the
Expand Down
Expand Up @@ -626,22 +626,29 @@ object PreAggregateUtil {
// schema ordinal should be considered
columns.sortBy(_.getSchemaOrdinal).foreach { a =>
if (a.getAggFunction.nonEmpty) {
aggregateColumns += s"${a.getAggFunction match {
case "count" => "sum"
case _ => a.getAggFunction}}(${a.getColumnName})"
aggregateColumns += s"${
a.getAggFunction match {
case "count" => "sum"
case _ => a.getAggFunction
}
}(${ a.getColumnName })"
} else {
groupingExpressions += a.getColumnName
aggregateColumns+= a.getColumnName
aggregateColumns += a.getColumnName
}
}
val groupByString = if (groupingExpressions.nonEmpty) {
s" group by ${ groupingExpressions.mkString(",") }"
} else { "" }
s"select ${ aggregateColumns.mkString(",") } " +
s"from $databaseName.${ tableSchema.getTableName }" +
s" group by ${ groupingExpressions.mkString(",") }"
s"from $databaseName.${ tableSchema.getTableName }" + groupByString
}

/**
/** CarbonProperties.getInstance().addProperty(CarbonCommonConstants
* .VALIDATE_DIRECT_QUERY_ON_DATAMAP, "true")
* Below method will be used to get the select query when rollup policy is
* applied in case of timeseries table
*
* @param tableSchema main data map schema
* @param selectedDataMapSchema selected data map schema for rollup
* @return select query based on rolloup
Expand Down
Expand Up @@ -106,22 +106,27 @@ class CarbonLateDecodeRule extends Rule[LogicalPlan] with PredicateHelper {
def validateQueryDirectlyOnDataMap(relations: Seq[CarbonDecoderRelation]): Unit = {
var isPreAggDataMapExists = false
// first check if pre aggregate data map exists or not
relations.foreach{relation =>
relations.foreach { relation =>
if (relation.carbonRelation.carbonTable.isChildDataMap) {
isPreAggDataMapExists = true
}
}
val validateQuery = CarbonProperties.getInstance
.getProperty(CarbonCommonConstants.VALIDATE_DIRECT_QUERY_ON_DATAMAP,
CarbonCommonConstants.VALIDATE_DIRECT_QUERY_ON_DATAMAP_DEFAULTVALUE).toBoolean
var isThrowException = false
// if validate query is enabled and relation contains pre aggregate data map
if (validateQuery && isPreAggDataMapExists) {
// if relation contains pre aggregate data map
if (isPreAggDataMapExists) {
val carbonSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo
if (null != carbonSessionInfo) {
val supportQueryOnDataMap = CarbonProperties.getInstance.getProperty(
CarbonCommonConstants.SUPPORT_DIRECT_QUERY_ON_DATAMAP,
CarbonCommonConstants.SUPPORT_DIRECT_QUERY_ON_DATAMAP_DEFAULTVALUE).toBoolean
lazy val sessionPropertyValue = CarbonProperties.getInstance
.getProperty(CarbonCommonConstants.SUPPORT_DIRECT_QUERY_ON_DATAMAP,
CarbonCommonConstants.SUPPORT_DIRECT_QUERY_ON_DATAMAP_DEFAULTVALUE)
// Check if property is set in thread params which would mean this is an internal call
// (from load or compaction) and should be of highest priority. Otherwise get from
// session(like user has dynamically given the value using set command). If not found in
// session then look for the property in carbon.properties file, otherwise use default
// value 'false'.
val supportQueryOnDataMap = CarbonEnv
.getThreadParam(CarbonCommonConstants.SUPPORT_DIRECT_QUERY_ON_DATAMAP,
sessionPropertyValue).toBoolean
if (!supportQueryOnDataMap) {
isThrowException = true
}
Expand Down

0 comments on commit 80094ca

Please sign in to comment.