Skip to content

Commit

Permalink
HIVE-28050: Disable Incremental non aggregated materialized view rebu…
Browse files Browse the repository at this point in the history
…ild in presence of delete operations (Krisztian Kasa, reviewed by Stamatis Zampetakis, Aman Sinha)
  • Loading branch information
kasakrisz committed Feb 12, 2024
1 parent 32a7927 commit 7bff622
Show file tree
Hide file tree
Showing 13 changed files with 509 additions and 1,167 deletions.
Expand Up @@ -44,17 +44,13 @@
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.HiveRelOptMaterialization;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
import org.apache.hadoop.hive.ql.optimizer.calcite.HiveRelFactories;
import org.apache.hadoop.hive.ql.optimizer.calcite.HiveTezModelRelMetadataProvider;
import org.apache.hadoop.hive.ql.optimizer.calcite.RelOptHiveTable;
import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveInBetweenExpandRule;
import org.apache.hadoop.hive.ql.optimizer.calcite.rules.views.ColumnPropagationException;
import org.apache.hadoop.hive.ql.optimizer.calcite.rules.views.HiveAggregateInsertDeleteIncrementalRewritingRule;
import org.apache.hadoop.hive.ql.optimizer.calcite.rules.views.HiveAggregateInsertIncrementalRewritingRule;
import org.apache.hadoop.hive.ql.optimizer.calcite.rules.views.HiveAggregatePartitionIncrementalRewritingRule;
import org.apache.hadoop.hive.ql.optimizer.calcite.rules.views.HiveInsertOnlyScanWriteIdRule;
import org.apache.hadoop.hive.ql.optimizer.calcite.rules.views.HiveJoinInsertDeleteIncrementalRewritingRule;
import org.apache.hadoop.hive.ql.optimizer.calcite.rules.views.HiveJoinInsertIncrementalRewritingRule;
import org.apache.hadoop.hive.ql.optimizer.calcite.rules.views.HiveMaterializationRelMetadataProvider;
import org.apache.hadoop.hive.ql.optimizer.calcite.rules.views.HiveMaterializedViewRule;
Expand Down Expand Up @@ -322,8 +318,7 @@ private RelNode applyRecordIncrementalRebuildPlan(
}
return applyAggregateInsertDeleteIncremental(basePlan, mdProvider, executorProvider);
} else {
return applyJoinInsertDeleteIncremental(
basePlan, mdProvider, executorProvider, optCluster, calcitePreMVRewritingPlan);
return calcitePreMVRewritingPlan;
}
} else {
return calcitePreMVRewritingPlan;
Expand Down Expand Up @@ -373,21 +368,6 @@ private RelNode applyAggregateInsertIncremental(
}
}

private RelNode applyJoinInsertDeleteIncremental(
RelNode basePlan, RelMetadataProvider mdProvider, RexExecutor executorProvider, RelOptCluster optCluster,
RelNode calcitePreMVRewritingPlan) {
basePlan = applyIncrementalRebuild(
basePlan, mdProvider, executorProvider, HiveJoinInsertDeleteIncrementalRewritingRule.INSTANCE);
mvRebuildMode = MaterializationRebuildMode.JOIN_INSERT_DELETE_REBUILD;
try {
return new HiveJoinInsertDeleteIncrementalRewritingRule.FilterPropagator(
HiveRelFactories.HIVE_BUILDER.create(optCluster, null)).propagate(basePlan);
} catch (ColumnPropagationException ex) {
LOG.warn("Exception while propagating column " + VirtualColumn.ROWISDELETED.getName(), ex);
return calcitePreMVRewritingPlan;
}
}

private RelNode applyJoinInsertIncremental(
RelNode basePlan, RelMetadataProvider mdProvider, RexExecutor executorProvider) {
mvRebuildMode = MaterializationRebuildMode.JOIN_INSERT_REBUILD;
Expand Down Expand Up @@ -479,9 +459,6 @@ protected ASTNode fixUpAfterCbo(ASTNode originalAst, ASTNode newAst, CalcitePlan
case AGGREGATE_INSERT_DELETE_REBUILD:
fixUpASTAggregateInsertDeleteIncrementalRebuild(fixedAST, getMaterializedViewASTBuilder());
return fixedAST;
case JOIN_INSERT_DELETE_REBUILD:
fixUpASTJoinInsertDeleteIncrementalRebuild(fixedAST, getMaterializedViewASTBuilder());
return fixedAST;
default:
throw new UnsupportedOperationException("No materialized view rebuild exists for mode " + mvRebuildMode);
}
Expand Down Expand Up @@ -752,90 +729,6 @@ private void fixUpASTJoinInsertIncrementalRebuild(ASTNode newAST) throws Semanti
destParent.insertChild(childIndex, newChild);
}

private void fixUpASTJoinInsertDeleteIncrementalRebuild(ASTNode newAST, MaterializedViewASTBuilder astBuilder)
throws SemanticException {
// Replace INSERT OVERWRITE by MERGE equivalent rewriting.
// Here we need to do this complex AST rewriting that generates the same plan
// that a MERGE clause would generate because CBO does not support MERGE yet.
// TODO: Support MERGE as first class member in CBO to simplify this logic.
// 1) Replace INSERT OVERWRITE by INSERT
ASTNode insertNode = new ASTSearcher().simpleBreadthFirstSearch(
newAST, HiveParser.TOK_QUERY, HiveParser.TOK_INSERT);
ASTNode destinationNode = (ASTNode) insertNode.getChild(0);
ASTNode newInsertInto = (ASTNode) ParseDriver.adaptor.create(
HiveParser.TOK_INSERT_INTO, "TOK_INSERT_INTO");
newInsertInto.addChildren(destinationNode.getChildren());
ASTNode destinationParentNode = (ASTNode) destinationNode.getParent();
int childIndex = destinationNode.childIndex;
destinationParentNode.deleteChild(childIndex);
destinationParentNode.insertChild(childIndex, newInsertInto);
// 1.1) Extract name as we will need it afterwards:
// TOK_DESTINATION TOK_TAB TOK_TABNAME <materialization_name>
ASTNode materializationNode = new ASTSearcher().simpleBreadthFirstSearch(
newInsertInto, HiveParser.TOK_INSERT_INTO, HiveParser.TOK_TAB, HiveParser.TOK_TABNAME);

ASTNode subqueryNodeInputROJ = new ASTSearcher().simpleBreadthFirstSearch(
newAST, HiveParser.TOK_QUERY, HiveParser.TOK_FROM, HiveParser.TOK_RIGHTOUTERJOIN,
HiveParser.TOK_SUBQUERY);
ASTNode selectNodeInputROJ = new ASTSearcher().simpleBreadthFirstSearch(
subqueryNodeInputROJ, HiveParser.TOK_SUBQUERY, HiveParser.TOK_QUERY,
HiveParser.TOK_INSERT, HiveParser.TOK_SELECT);
astBuilder.createAcidSortNodes(TableName.getDbTable(
materializationNode.getChild(0).getText(),
materializationNode.getChild(1).getText()))
.forEach(astNode -> ParseDriver.adaptor.addChild(selectNodeInputROJ, astNode));

ASTNode whereClauseInInsert = findWhereClause(insertNode);

// 2) Add filter condition to Insert
// Modifying filter condition. The incremental rewriting rule generated an OR
// clause where first disjunct contains the condition for the DELETE branch.
// TOK_WHERE
// or
// . <- DISJUNCT FOR <DELETE>
// TOK_TABLE_OR_COL
// $hdt$_0
// ROW__IS__DELETED
// TOK_FUNCTION <- DISJUNCT FOR <INSERT>
// isnull
// .
// TOK_TABLE_OR_COL
// $hdt$_0
// ROW__IS__DELETED
if (whereClauseInInsert.getChild(0).getType() != HiveParser.KW_OR) {
throw new SemanticException("OR clause expected below TOK_WHERE in incremental rewriting");
}
// We bypass the OR clause and select the first disjunct
int indexDelete;
int indexInsert;
if (whereClauseInInsert.getChild(0).getChild(0).getType() == HiveParser.KW_AND) {
indexDelete = 0;
indexInsert = 1;
} else if (whereClauseInInsert.getChild(0).getChild(1).getType() == HiveParser.KW_AND) {
indexDelete = 1;
indexInsert = 0;
} else {
throw new SemanticException("Unexpected condition in incremental rewriting");
}
ASTNode deletePredicate =
(ASTNode) ParseDriver.adaptor.dupTree(whereClauseInInsert.getChild(0).getChild(indexDelete));
ASTNode newCondInInsert = (ASTNode) whereClauseInInsert.getChild(0).getChild(indexInsert);
ParseDriver.adaptor.setChild(whereClauseInInsert, 0, newCondInInsert);

addDeleteBranch(insertNode, subqueryNodeInputROJ, deletePredicate, astBuilder);

// 3) Add sort node to delete branch
ASTNode sortNode = astBuilder.createSortNodes(
astBuilder.createAcidSortNodes((ASTNode) subqueryNodeInputROJ.getChild(1)));
ParseDriver.adaptor.addChild(insertNode.getParent().getChild(2), sortNode);

// 4) Now we set some tree properties related to multi-insert
// operation with INSERT/UPDATE
ctx.setOperation(Context.Operation.MERGE);
ctx.addDestNamePrefix(1, Context.DestClausePrefix.INSERT);
ctx.addDestNamePrefix(2, Context.DestClausePrefix.DELETE);
}

@Override
protected boolean allowOutputMultipleTimes() {
return true;
Expand Down

This file was deleted.

Expand Up @@ -15978,8 +15978,7 @@ protected enum MaterializationRebuildMode {
INSERT_OVERWRITE_REBUILD,
AGGREGATE_INSERT_REBUILD,
AGGREGATE_INSERT_DELETE_REBUILD,
JOIN_INSERT_REBUILD,
JOIN_INSERT_DELETE_REBUILD
JOIN_INSERT_REBUILD
}

/**
Expand Down

0 comments on commit 7bff622

Please sign in to comment.