Skip to content
Permalink
Browse files
[CARBONDATA-4330] Incremental Dataload of Average aggregate in MV
Why is this PR needed?
Currently, whenever MV is created with average aggregate, a full
refresh is done meaning it reloads the whole MV for any newly
added segments. This will slow down the loading. With incremental
data load, only the segments that are newly added can be loaded to the MV.

What changes were proposed in this PR?
If avg is present, rewrite the query with the sum and count of the
columns to create MV and use them to derive avg.
Refer: https://docs.google.com/document/d/1kPEMCX50FLZcmyzm6kcIQtUH9KXWDIqh-Hco7NkTp80/edit

Does this PR introduce any user interface change?
No

Is any new testcase added?
Yes

This closes #4257
  • Loading branch information
ShreelekhyaG authored and Indhumathi27 committed Apr 28, 2022
1 parent 46b62cf commit 45acd67ed742d89d539ec0351f77f08c7762e7de
Showing 11 changed files with 430 additions and 45 deletions.
@@ -56,6 +56,12 @@ public class MVSchema implements Serializable, Writable {
*/
private String query;

/**
* SQL modified query string.
* In case of MV with avg incremental mode, the original query is modified.
*/
private String modifiedQuery;

/**
* Properties provided by user
*/
@@ -99,10 +105,18 @@ public String getQuery() {
return query;
}

public String getModifiedQuery() {
return modifiedQuery;
}

public void setQuery(String query) {
this.query = query;
}

public void setModifiedQuery(String modifiedQuery) {
this.modifiedQuery = modifiedQuery;
}

public Map<String, String> getProperties() {
return properties;
}
@@ -73,7 +73,7 @@ class TestCreateIndexForCleanAndDeleteSegment extends QueryTest with BeforeAndAf
assert(preDeleteSegmentsByDate == postDeleteSegmentsByDate)
val result = sql("show materialized views on table delete_segment_by_id").collectAsList()
assert(result.get(0).get(2).toString.equalsIgnoreCase("ENABLED"))
assert(result.get(0).get(3).toString.equalsIgnoreCase("full"))
assert(result.get(0).get(3).toString.equalsIgnoreCase("incremental"))
assert(result.get(0).get(4).toString.equalsIgnoreCase("on_commit"))
dryRun = sql("clean files for table delete_segment_by_id" +
" OPTIONS('dryrun'='true', 'force'='true')").collect()
@@ -111,6 +111,26 @@ case class MVCatalogInSpark(session: SparkSession)
// So setting back to current database.
session.catalog.setCurrentDatabase(currentDatabase)
}
// The MV query is modified by replacing avg with sum and count columns.
// Here, create modifiedLogicalPlan from modified query, so that even though MV is not created
// with sum or count columns, we could still derive the columns.
// For example, Consider MV creation statement:
// create materialized view mv1 as select empname, avg(salary) from source group by empname;
// and here if user queries:
// Select empname, sum(salary) from source group by empname;
// we can use modifiedLogicalPlan for query matching and rewrite steps.
val modifiedLogicalPlan = if (mvSchema.getModifiedQuery != null &&
!mvSchema.getModifiedQuery
.equalsIgnoreCase(mvSchema.getQuery)) {
try {
session.catalog.setCurrentDatabase(mvSchema.getIdentifier.getDatabaseName)
MVHelper.dropDummyFunction(MVQueryParser.getQueryPlan(mvSchema.getModifiedQuery, session))
} finally {
session.catalog.setCurrentDatabase(currentDatabase)
}
} else {
logicalPlan
}
val mvSignature = SimpleModularizer.modularize(
BirdcageOptimizer.execute(logicalPlan)).next().semiHarmonized.signature
val mvIdentifier = mvSchema.getIdentifier
@@ -143,6 +163,7 @@ case class MVCatalogInSpark(session: SparkSession)
mvSignature,
mvSchema,
logicalPlan,
modifiedLogicalPlan,
MVPlanWrapper(modularPlan, mvSchema))
}
}
@@ -172,7 +193,7 @@ case class MVCatalogInSpark(session: SparkSession)
val modularPlan = SimpleModularizer.modularize(
BirdcageOptimizer.execute(logicalPlan)).next().semiHarmonized
val signature = modularPlan.signature
viewSchemas += MVSchemaWrapper(signature, null, logicalPlan, null)
viewSchemas += MVSchemaWrapper(signature, null, logicalPlan, logicalPlan, null)
}
}
}
@@ -166,11 +166,15 @@ object MVRefresher {
newLoadName: String,
segmentMap: java.util.Map[String, java.util.List[String]],
session: SparkSession): Boolean = {
val query = viewSchema.getQuery
val isFullRefresh = !viewSchema.isRefreshIncremental
var query = viewSchema.getQuery
if (!isFullRefresh) {
// query is modified internally if average aggregate is used with incremental load.
query = viewSchema.getModifiedQuery
}
if (query != null) {
val viewIdentifier = viewSchema.getIdentifier
val updatedQuery = MVQueryParser.getQuery(query, session)
val isFullRefresh = !viewSchema.isRefreshIncremental
// Set specified segments for incremental load
val segmentMapIterator = segmentMap.entrySet().iterator()
while (segmentMapIterator.hasNext) {
@@ -27,4 +27,5 @@ case class MVSchemaWrapper(
viewSignature: Option[Signature],
viewSchema: MVSchema,
logicalPlan: LogicalPlan,
modifiedLogicalPlan: LogicalPlan,
modularPlan: ModularPlan)
@@ -139,13 +139,42 @@ case class CarbonCreateMVCommand(

override protected def opName: String = "CREATE MATERIALIZED VIEW"

def checkIfAvgAggregatePresent(logicalPlan: LogicalPlan): Boolean = {
var isAvgPresent = false
logicalPlan.transformAllExpressions {
case aggregate: AggregateExpression =>
val avgExist = aggregate.aggregateFunction match {
case _: Average => true
case _ => false
}
isAvgPresent = avgExist || isAvgPresent
aggregate
}
isAvgPresent
}

private def doCreate(session: SparkSession,
tableIdentifier: TableIdentifier,
viewManager: MVManagerInSpark,
viewCatalog: MVCatalogInSpark): MVSchema = {
val logicalPlan = MVHelper.dropDummyFunction(
var logicalPlan = MVHelper.dropDummyFunction(
MVQueryParser.getQueryPlan(queryString, session))
// check if mv with same query already exists
val relatedTables = getRelatedTables(logicalPlan)
val viewRefreshMode = if (checkIsQueryNeedFullRefresh(logicalPlan) ||
checkIsHasNonCarbonTable(relatedTables)) {
MVProperty.REFRESH_MODE_FULL
} else {
MVProperty.REFRESH_MODE_INCREMENTAL
}
var modifiedQueryString = queryString
if (viewRefreshMode.equalsIgnoreCase(MVProperty.REFRESH_MODE_INCREMENTAL) &&
checkIfAvgAggregatePresent(logicalPlan)) {
// Check if average aggregate is used and derive logical plan from modified query string.
modifiedQueryString = MVQueryParser.checkForAvgAndModifySql(queryString)
logicalPlan = MVHelper.dropDummyFunction(
MVQueryParser.getQueryPlan(modifiedQueryString, session))
}
// check if mv with same query already exists
val mvSchemaWrapper = viewCatalog.getMVWithSameQueryPresent(logicalPlan)
if (mvSchemaWrapper.nonEmpty) {
val mvWithSameQuery = mvSchemaWrapper.get.viewSchema.getIdentifier.getTableName
@@ -154,7 +183,6 @@ case class CarbonCreateMVCommand(
}
val modularPlan = checkQuery(logicalPlan)
val viewSchema = getOutputSchema(logicalPlan)
val relatedTables = getRelatedTables(logicalPlan)
val relatedTableList = toCarbonTables(session, relatedTables)
val inputCols = logicalPlan.output.map(x =>
x.name
@@ -193,13 +221,6 @@ case class CarbonCreateMVCommand(
}
relatedTableNames.add(table.getTableName)
}

val viewRefreshMode = if (checkIsQueryNeedFullRefresh(logicalPlan) ||
checkIsHasNonCarbonTable(relatedTables)) {
MVProperty.REFRESH_MODE_FULL
} else {
MVProperty.REFRESH_MODE_INCREMENTAL
}
val viewRefreshTriggerMode = if (deferredRefresh) {
MVProperty.REFRESH_TRIGGER_MODE_ON_MANUAL
} else {
@@ -325,6 +346,9 @@ case class CarbonCreateMVCommand(
schema.setTimeSeries(true)
}
schema.setQuery(queryString)
if (!viewRefreshMode.equals(MVProperty.REFRESH_MODE_FULL)) {
schema.setModifiedQuery(modifiedQueryString)
}
try {
viewManager.createSchema(schema.getIdentifier.getDatabaseName, schema)
} catch {
@@ -564,14 +588,7 @@ case class CarbonCreateMVCommand(
var needFullRefresh = false
logicalPlan.transformAllExpressions {
case alias: Alias => alias
case aggregate: AggregateExpression =>
// If average function present then go for full refresh
val reload = aggregate.aggregateFunction match {
case _: Average => true
case _ => false
}
needFullRefresh = reload || needFullRefresh
aggregate
case aggregate: AggregateExpression => aggregate
case cast: Cast =>
needFullRefresh = cast.child.find {
case _: AggregateExpression => false

0 comments on commit 45acd67

Please sign in to comment.