Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HIVE-28050: Disable Incremental non aggregated materialized view rebuild in presence of delete operations #5053

Merged
merged 3 commits into from Feb 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -44,17 +44,13 @@
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.HiveRelOptMaterialization;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
import org.apache.hadoop.hive.ql.optimizer.calcite.HiveRelFactories;
import org.apache.hadoop.hive.ql.optimizer.calcite.HiveTezModelRelMetadataProvider;
import org.apache.hadoop.hive.ql.optimizer.calcite.RelOptHiveTable;
import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveInBetweenExpandRule;
import org.apache.hadoop.hive.ql.optimizer.calcite.rules.views.ColumnPropagationException;
import org.apache.hadoop.hive.ql.optimizer.calcite.rules.views.HiveAggregateInsertDeleteIncrementalRewritingRule;
import org.apache.hadoop.hive.ql.optimizer.calcite.rules.views.HiveAggregateInsertIncrementalRewritingRule;
import org.apache.hadoop.hive.ql.optimizer.calcite.rules.views.HiveAggregatePartitionIncrementalRewritingRule;
import org.apache.hadoop.hive.ql.optimizer.calcite.rules.views.HiveInsertOnlyScanWriteIdRule;
import org.apache.hadoop.hive.ql.optimizer.calcite.rules.views.HiveJoinInsertDeleteIncrementalRewritingRule;
import org.apache.hadoop.hive.ql.optimizer.calcite.rules.views.HiveJoinInsertIncrementalRewritingRule;
import org.apache.hadoop.hive.ql.optimizer.calcite.rules.views.HiveMaterializationRelMetadataProvider;
import org.apache.hadoop.hive.ql.optimizer.calcite.rules.views.HiveMaterializedViewRule;
Expand Down Expand Up @@ -322,8 +318,7 @@ private RelNode applyRecordIncrementalRebuildPlan(
}
return applyAggregateInsertDeleteIncremental(basePlan, mdProvider, executorProvider);
} else {
return applyJoinInsertDeleteIncremental(
basePlan, mdProvider, executorProvider, optCluster, calcitePreMVRewritingPlan);
return calcitePreMVRewritingPlan;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@amansinha100
This is the point where we fall back to full rebuild.

}
} else {
return calcitePreMVRewritingPlan;
Expand Down Expand Up @@ -373,21 +368,6 @@ private RelNode applyAggregateInsertIncremental(
}
}

private RelNode applyJoinInsertDeleteIncremental(
RelNode basePlan, RelMetadataProvider mdProvider, RexExecutor executorProvider, RelOptCluster optCluster,
RelNode calcitePreMVRewritingPlan) {
basePlan = applyIncrementalRebuild(
basePlan, mdProvider, executorProvider, HiveJoinInsertDeleteIncrementalRewritingRule.INSTANCE);
mvRebuildMode = MaterializationRebuildMode.JOIN_INSERT_DELETE_REBUILD;
try {
return new HiveJoinInsertDeleteIncrementalRewritingRule.FilterPropagator(
HiveRelFactories.HIVE_BUILDER.create(optCluster, null)).propagate(basePlan);
} catch (ColumnPropagationException ex) {
LOG.warn("Exception while propagating column " + VirtualColumn.ROWISDELETED.getName(), ex);
return calcitePreMVRewritingPlan;
}
}

private RelNode applyJoinInsertIncremental(
RelNode basePlan, RelMetadataProvider mdProvider, RexExecutor executorProvider) {
mvRebuildMode = MaterializationRebuildMode.JOIN_INSERT_REBUILD;
Expand Down Expand Up @@ -479,9 +459,6 @@ protected ASTNode fixUpAfterCbo(ASTNode originalAst, ASTNode newAst, CalcitePlan
case AGGREGATE_INSERT_DELETE_REBUILD:
fixUpASTAggregateInsertDeleteIncrementalRebuild(fixedAST, getMaterializedViewASTBuilder());
return fixedAST;
case JOIN_INSERT_DELETE_REBUILD:
fixUpASTJoinInsertDeleteIncrementalRebuild(fixedAST, getMaterializedViewASTBuilder());
return fixedAST;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will end up throwing the UnsupportedOperationException. Don't we want to fall back to the full rebuild ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At this point CBO plan is already transformed to AST hence we can not fall back. However JOIN_INSERT_DELETE_REBUILD is never set in the code any more.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, just for my reference, for the non-groupby queries that this patch is addressing, could you point to where the fall back to the full rebuild (using insert-overwrite) happens ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since JOIN_INSERT_DELETE_REBUILD is not used anymore why not remove it altogether?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed.

default:
throw new UnsupportedOperationException("No materialized view rebuild exists for mode " + mvRebuildMode);
}
Expand Down Expand Up @@ -752,90 +729,6 @@ private void fixUpASTJoinInsertIncrementalRebuild(ASTNode newAST) throws Semanti
destParent.insertChild(childIndex, newChild);
}

private void fixUpASTJoinInsertDeleteIncrementalRebuild(ASTNode newAST, MaterializedViewASTBuilder astBuilder)
throws SemanticException {
// Replace INSERT OVERWRITE by MERGE equivalent rewriting.
// Here we need to do this complex AST rewriting that generates the same plan
// that a MERGE clause would generate because CBO does not support MERGE yet.
// TODO: Support MERGE as first class member in CBO to simplify this logic.
// 1) Replace INSERT OVERWRITE by INSERT
ASTNode insertNode = new ASTSearcher().simpleBreadthFirstSearch(
newAST, HiveParser.TOK_QUERY, HiveParser.TOK_INSERT);
ASTNode destinationNode = (ASTNode) insertNode.getChild(0);
ASTNode newInsertInto = (ASTNode) ParseDriver.adaptor.create(
HiveParser.TOK_INSERT_INTO, "TOK_INSERT_INTO");
newInsertInto.addChildren(destinationNode.getChildren());
ASTNode destinationParentNode = (ASTNode) destinationNode.getParent();
int childIndex = destinationNode.childIndex;
destinationParentNode.deleteChild(childIndex);
destinationParentNode.insertChild(childIndex, newInsertInto);
// 1.1) Extract name as we will need it afterwards:
// TOK_DESTINATION TOK_TAB TOK_TABNAME <materialization_name>
ASTNode materializationNode = new ASTSearcher().simpleBreadthFirstSearch(
newInsertInto, HiveParser.TOK_INSERT_INTO, HiveParser.TOK_TAB, HiveParser.TOK_TABNAME);

ASTNode subqueryNodeInputROJ = new ASTSearcher().simpleBreadthFirstSearch(
newAST, HiveParser.TOK_QUERY, HiveParser.TOK_FROM, HiveParser.TOK_RIGHTOUTERJOIN,
HiveParser.TOK_SUBQUERY);
ASTNode selectNodeInputROJ = new ASTSearcher().simpleBreadthFirstSearch(
subqueryNodeInputROJ, HiveParser.TOK_SUBQUERY, HiveParser.TOK_QUERY,
HiveParser.TOK_INSERT, HiveParser.TOK_SELECT);
astBuilder.createAcidSortNodes(TableName.getDbTable(
materializationNode.getChild(0).getText(),
materializationNode.getChild(1).getText()))
.forEach(astNode -> ParseDriver.adaptor.addChild(selectNodeInputROJ, astNode));

ASTNode whereClauseInInsert = findWhereClause(insertNode);

// 2) Add filter condition to Insert
// Modifying filter condition. The incremental rewriting rule generated an OR
// clause where first disjunct contains the condition for the DELETE branch.
// TOK_WHERE
// or
// . <- DISJUNCT FOR <DELETE>
// TOK_TABLE_OR_COL
// $hdt$_0
// ROW__IS__DELETED
// TOK_FUNCTION <- DISJUNCT FOR <INSERT>
// isnull
// .
// TOK_TABLE_OR_COL
// $hdt$_0
// ROW__IS__DELETED
if (whereClauseInInsert.getChild(0).getType() != HiveParser.KW_OR) {
throw new SemanticException("OR clause expected below TOK_WHERE in incremental rewriting");
}
// We bypass the OR clause and select the first disjunct
int indexDelete;
int indexInsert;
if (whereClauseInInsert.getChild(0).getChild(0).getType() == HiveParser.KW_AND) {
indexDelete = 0;
indexInsert = 1;
} else if (whereClauseInInsert.getChild(0).getChild(1).getType() == HiveParser.KW_AND) {
indexDelete = 1;
indexInsert = 0;
} else {
throw new SemanticException("Unexpected condition in incremental rewriting");
}
ASTNode deletePredicate =
(ASTNode) ParseDriver.adaptor.dupTree(whereClauseInInsert.getChild(0).getChild(indexDelete));
ASTNode newCondInInsert = (ASTNode) whereClauseInInsert.getChild(0).getChild(indexInsert);
ParseDriver.adaptor.setChild(whereClauseInInsert, 0, newCondInInsert);

addDeleteBranch(insertNode, subqueryNodeInputROJ, deletePredicate, astBuilder);

// 3) Add sort node to delete branch
ASTNode sortNode = astBuilder.createSortNodes(
astBuilder.createAcidSortNodes((ASTNode) subqueryNodeInputROJ.getChild(1)));
ParseDriver.adaptor.addChild(insertNode.getParent().getChild(2), sortNode);

// 4) Now we set some tree properties related to multi-insert
// operation with INSERT/UPDATE
ctx.setOperation(Context.Operation.MERGE);
ctx.addDestNamePrefix(1, Context.DestClausePrefix.INSERT);
ctx.addDestNamePrefix(2, Context.DestClausePrefix.DELETE);
}

@Override
protected boolean allowOutputMultipleTimes() {
return true;
Expand Down

This file was deleted.

Expand Up @@ -15978,8 +15978,7 @@ protected enum MaterializationRebuildMode {
INSERT_OVERWRITE_REBUILD,
AGGREGATE_INSERT_REBUILD,
AGGREGATE_INSERT_DELETE_REBUILD,
JOIN_INSERT_REBUILD,
JOIN_INSERT_DELETE_REBUILD
JOIN_INSERT_REBUILD
}

/**
Expand Down