Skip to content

Commit

Permalink
Optimize for preaggregate and timeseries datamap
Browse files Browse the repository at this point in the history
only show user specified datamap properties. For preaggregate datamap,
it does not support DMProperties, so the output should be empty.
  • Loading branch information
xuchuanyin committed Jun 26, 2018
1 parent 5cea0ea commit 4c4294c
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,12 @@ public class DataMapProperty {
* property internally
*/
public static final String DEFERRED_REBUILD = "_internal.deferred.rebuild";
/**
* for internal property 'CHILD_SELECT_QUERY'
*/
public static final String CHILD_SELECT_QUERY = "CHILD_SELECT QUERY";
/**
* for internal property 'QUERYTYPE'
*/
public static final String QUERY_TYPE = "QUERYTYPE";
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.carbondata.core.metadata.schema.BucketingInfo;
import org.apache.carbondata.core.metadata.schema.PartitionInfo;
import org.apache.carbondata.core.metadata.schema.SchemaEvolution;
import org.apache.carbondata.core.metadata.schema.datamap.DataMapProperty;
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
import org.apache.carbondata.core.util.CarbonUtil;

Expand Down Expand Up @@ -279,13 +280,12 @@ public DataMapSchema buildChildSchema(String dataMapName, String className, Stri
new RelationIdentifier(databaseName, tableName, tableId);
Map<String, String> properties = new HashMap<>();
if (queryString != null) {
properties.put(
"CHILD_SELECT QUERY",
properties.put(DataMapProperty.CHILD_SELECT_QUERY,
CarbonUtil.encodeToString(queryString.trim().getBytes(
// replace = to with & as hive metastore does not allow = inside. For base 64
// only = is allowed as special character , so replace with &
CarbonCommonConstants.DEFAULT_CHARSET)).replace("=", "&"));
properties.put("QUERYTYPE", queryType);
properties.put(DataMapProperty.QUERY_TYPE, queryType);
}
DataMapSchema dataMapSchema = new DataMapSchema(dataMapName, className);
dataMapSchema.setProperties(properties);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ class TestDataMapCommand extends QueryTest with BeforeAndAfterAll {
val datamapName2 = "bloomdatamap2"
val datamapName3 = "bloomdatamap3"
sql(s"drop table if exists $tableName")
// for index datamap
sql(s"create table $tableName (a string, b string, c string) stored by 'carbondata'")
sql(
s"""
Expand All @@ -227,12 +228,43 @@ class TestDataMapCommand extends QueryTest with BeforeAndAfterAll {
| with deferred rebuild
| DMPROPERTIES ('index_columns'='c')
""".stripMargin)
val result = sql(s"show datamap on table $tableName").cache()
var result = sql(s"show datamap on table $tableName").cache()
checkAnswer(sql(s"show datamap on table $tableName"),
Seq(Row(datamapName, "bloomfilter", s"default.$tableName", "'_internal.deferred.rebuild'='false', 'bloom_fpp'='0.001', 'bloom_size'='32000', 'index_columns'='a'"),
Row(datamapName2, "bloomfilter", s"default.$tableName", "'_internal.deferred.rebuild'='false', 'index_columns'='b'"),
Row(datamapName3, "bloomfilter", s"default.$tableName", "'_internal.deferred.rebuild'='true', 'index_columns'='c'")))
Seq(Row(datamapName, "bloomfilter", s"default.$tableName", "'bloom_fpp'='0.001', 'bloom_size'='32000', 'index_columns'='a'"),
Row(datamapName2, "bloomfilter", s"default.$tableName", "'index_columns'='b'"),
Row(datamapName3, "bloomfilter", s"default.$tableName", "'index_columns'='c'")))
result.unpersist()
sql(s"drop table if exists $tableName")

// for timeseries datamap
sql(s"CREATE TABLE $tableName(mytime timestamp, name string, age int) STORED BY 'org.apache.carbondata.format'")
sql(
s"""
| CREATE DATAMAP agg0_hour ON TABLE $tableName
| USING 'timeSeries'
| DMPROPERTIES (
| 'EVENT_TIME'='mytime',
| 'HOUR_GRANULARITY'='1')
| AS SELECT mytime, SUM(age) FROM $tableName
| GROUP BY mytime
""".stripMargin)
checkAnswer(sql(s"show datamap on table $tableName"),
Seq(Row("agg0_hour", "timeSeries", s"default.${tableName}_agg0_hour", "'event_time'='mytime', 'hour_granularity'='1'")))
sql(s"drop table if exists $tableName")

// for preaggreate datamap, the property is empty
sql(s"CREATE TABLE $tableName(id int, name string, city string, age string)" +
s" STORED BY 'org.apache.carbondata.format'")
sql (
s"""
| CREATE DATAMAP agg0 ON TABLE $tableName USING 'preaggregate' AS
| SELECT name,
| count(age)
| FROM $tableName GROUP BY name
| """.stripMargin)
checkAnswer(sql(s"show datamap on table $tableName"),
Seq(Row("agg0", "preaggregate", s"default.${tableName}_agg0", "")))
sql(s"drop table if exists $tableName")
}

test("test if preaggregate load is successfull for hivemetastore") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.spark.sql.execution.command.{Checker, DataCommand}
import org.apache.spark.sql.types.StringType

import org.apache.carbondata.core.datamap.DataMapStoreManager
import org.apache.carbondata.core.metadata.schema.datamap.{DataMapClassProvider, DataMapProperty}
import org.apache.carbondata.core.metadata.schema.table.DataMapSchema

/**
Expand Down Expand Up @@ -66,11 +67,23 @@ case class CarbonDataMapShowCommand(tableIdentifier: Option[TableIdentifier])

private def convertToRow(schemaList: util.List[DataMapSchema]) = {
if (schemaList != null && schemaList.size() > 0) {
schemaList.asScala.map { s =>
val relationIdentifier = s.getRelationIdentifier
val table = relationIdentifier.getDatabaseName + "." + relationIdentifier.getTableName
val dmPropertieStr = s.getProperties.asScala.map(p => s"'${p._1}'='${p._2}'").toSeq
.sorted.mkString(", ")
schemaList.asScala
.map { s =>
val relationIdentifier = s.getRelationIdentifier
val table = relationIdentifier.getDatabaseName + "." + relationIdentifier.getTableName
// preaggregate datamap does not support user specified property, therefor we return empty
val dmPropertieStr = if (s.getProviderName.equalsIgnoreCase(
DataMapClassProvider.PREAGGREGATE.getShortName)) {
""
} else {
s.getProperties.asScala
// ignore internal used property
.filter(p => !p._1.equalsIgnoreCase(DataMapProperty.DEFERRED_REBUILD) &&
!p._1.equalsIgnoreCase(DataMapProperty.CHILD_SELECT_QUERY) &&
!p._1.equalsIgnoreCase(DataMapProperty.QUERY_TYPE))
.map(p => s"'${ p._1 }'='${ p._2 }'").toSeq
.sorted.mkString(", ")
}
Row(s.getDataMapName, s.getProviderName, table, dmPropertieStr)
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.sql.CarbonExpressions.{CarbonSubqueryAlias => SubqueryAl
import org.apache.spark.sql.CarbonExpressions.MatchCastExpression
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAlias, UnresolvedFunction, UnresolvedRelation}
import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, AttributeSeq, Cast, Expression, ExprId, NamedExpression, ScalaUDF}
import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, AttributeSeq, Cast, ExprId, Expression, NamedExpression, ScalaUDF}
import org.apache.spark.sql.catalyst.expressions.aggregate.{Count, _}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution.command.{ColumnTableRelation, DataMapField, Field}
Expand All @@ -40,6 +40,7 @@ import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage}
import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
import org.apache.carbondata.core.metadata.schema.datamap.DataMapProperty
import org.apache.carbondata.core.metadata.schema.table.{AggregationDataMapSchema, CarbonTable, DataMapSchema, TableSchema}
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
import org.apache.carbondata.core.util.CarbonUtil
Expand Down Expand Up @@ -869,7 +870,7 @@ object PreAggregateUtil {
def getChildQuery(aggDataMapSchema: AggregationDataMapSchema): String = {
new String(
CarbonUtil.decodeStringToBytes(
aggDataMapSchema.getProperties.get("CHILD_SELECT QUERY").replace("&", "=")),
aggDataMapSchema.getProperties.get(DataMapProperty.CHILD_SELECT_QUERY).replace("&", "=")),
CarbonCommonConstants.DEFAULT_CHARSET)
}

Expand Down

0 comments on commit 4c4294c

Please sign in to comment.