Skip to content

Commit

Permalink
[CARBONDATA-1713] Fixed Aggregate query on main table fails after cre…
Browse files Browse the repository at this point in the history
…ating pre-aggregate table

Problem: when select query columns are in upper case pre aggregate table selection is failing
Solution:: Need to convert column name to lower case as table columns are in lower case

This closes apache#1501
  • Loading branch information
kumarvishal09 authored and anubhav100 committed Jun 22, 2018
1 parent 29b0761 commit 1b0a036
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ class TestPreAggregateTableSelection extends QueryTest with BeforeAndAfterAll {
sql("create datamap agg6 on table mainTable using 'preaggregate' as select name,min(age) from mainTable group by name")
sql("create datamap agg7 on table mainTable using 'preaggregate' as select name,max(age) from mainTable group by name")
sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/measureinsertintotest.csv' into table mainTable")
sql("create table if not exists lineitem(L_SHIPDATE string,L_SHIPMODE string,L_SHIPINSTRUCT string,L_RETURNFLAG string,L_RECEIPTDATE string,L_ORDERKEY string,L_PARTKEY string,L_SUPPKEY string,L_LINENUMBER int,L_QUANTITY double,L_EXTENDEDPRICE double,L_DISCOUNT double,L_TAX double,L_LINESTATUS string,L_COMMITDATE string,L_COMMENT string) STORED BY 'org.apache.carbondata.format'TBLPROPERTIES ('table_blocksize'='128','NO_INVERTED_INDEX'='L_SHIPDATE,L_SHIPMODE,L_SHIPINSTRUCT,L_RETURNFLAG,L_RECEIPTDATE,L_ORDERKEY,L_PARTKEY,L_SUPPKEY','sort_columns'='')")
sql("create datamap agr_lineitem ON TABLE lineitem USING 'preaggregate' as select L_RETURNFLAG,L_LINESTATUS,sum (L_QUANTITY),sum(L_EXTENDEDPRICE) from lineitem group by L_RETURNFLAG, L_LINESTATUS")
}


Expand Down Expand Up @@ -139,6 +141,11 @@ class TestPreAggregateTableSelection extends QueryTest with BeforeAndAfterAll {
preAggTableValidator(df.queryExecution.analyzed, "maintable_agg3")
}

test("test PreAggregate table selection 19") {
val df = sql("select L_RETURNFLAG,L_LINESTATUS,sum(L_QUANTITY),sum(L_EXTENDEDPRICE) from lineitem group by L_RETURNFLAG, L_LINESTATUS")
preAggTableValidator(df.queryExecution.analyzed, "lineitem_agr_lineitem")
}

def preAggTableValidator(plan: LogicalPlan, actualTableName: String) : Unit ={
var isValidPlan = false
plan.transform {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,14 +229,15 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
aggFunction: String = ""): AttributeReference = {
val aggregationDataMapSchema = dataMapSchema.asInstanceOf[AggregationDataMapSchema];
val columnSchema = if (aggFunction.isEmpty) {
aggregationDataMapSchema.getChildColByParentColName(attributeReference.name)
aggregationDataMapSchema.getChildColByParentColName(attributeReference.name.toLowerCase)
} else {
aggregationDataMapSchema.getAggChildColByParent(attributeReference.name, aggFunction)
aggregationDataMapSchema.getAggChildColByParent(attributeReference.name.toLowerCase,
aggFunction.toLowerCase)
}
// here column schema cannot be null, if it is null then aggregate table selection
// logic has some problem
if (null == columnSchema) {
throw new AnalysisException("Column doesnot exists in Pre Aggregate table")
throw new AnalysisException("Column does not exists in Pre Aggregate table")
}
// finding the child attribute from child logical relation
childCarbonRelation.attributeMap.find(p => p._2.name.equals(columnSchema.getColumnName)).get._2
Expand Down Expand Up @@ -725,13 +726,15 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
dataType: String = "",
isChangedDataType: Boolean = false,
isFilterColumn: Boolean = false): QueryColumn = {
val columnSchema = carbonTable.getColumnByName(tableName, columnName).getColumnSchema
val columnSchema = carbonTable.getColumnByName(tableName,
columnName.toLowerCase).getColumnSchema
if (isChangedDataType) {
new QueryColumn(columnSchema, columnSchema.getDataType.getName, aggFunction, isFilterColumn)
new QueryColumn(columnSchema, columnSchema.getDataType.getName,
aggFunction.toLowerCase, isFilterColumn)
} else {
new QueryColumn(columnSchema,
CarbonScalaUtil.convertSparkToCarbonSchemaDataType(dataType),
aggFunction, isFilterColumn)
aggFunction.toLowerCase, isFilterColumn)
}
}
}
Expand Down

0 comments on commit 1b0a036

Please sign in to comment.