Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HIVE-24167: TPC-DS query 14 fails while generating plan for the filter #5077

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.hadoop.hive.ql.optimizer.calcite.HiveRelShuttle;
import org.apache.hadoop.hive.ql.optimizer.calcite.RelOptHiveTable;
import org.apache.hadoop.hive.ql.optimizer.calcite.TraitsUtil;
import org.apache.hadoop.hive.ql.optimizer.signature.RelTreeSignature.RelTreeSignatureWriter;
import org.apache.hadoop.hive.ql.plan.ColStatistics;

import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -202,6 +203,12 @@ public HiveTableScan copyIncludingTable(RelDataType newRowtype) {
// Also include partition list key to trigger cost evaluation even if an
// expression was already generated.
@Override public RelWriter explainTerms(RelWriter pw) {
if (pw instanceof RelTreeSignatureWriter) {
deniskuzZ marked this conversation as resolved.
Show resolved Hide resolved
return super.explainTerms(pw)
.item("tableScanTrait", this.tableScanTrait)
.itemIf("fromVersion", ((RelOptHiveTable) table).getHiveTableMD().getVersionIntervalFrom(),
isNotBlank(((RelOptHiveTable) table).getHiveTableMD().getVersionIntervalFrom()));
Copy link
Contributor Author

@okumin okumin Feb 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added some attributes that could differentiate two HiveTableScans. We may not need these ones here.

There is one potential problem in my mind. HiveTableScan doesn't retain the equivalents of TableScanDesc#getPredicateString or TableScanDesc#getRowLimit. So, we can't unify ASTNodes or RelNodes of HiveTableScan based on signatures. Otherwise, Operators will be over-unified later.
Currently, we link only HiveFilter RelNodes using RelTreeSignatures(we link HiveTableScan with its ASTNode, but don't link it with the signature). So, the existence of TableScanDesc#getPredicateString doesn't matter to us.
Also, CBO is disabled when TABLESAMPLE is used. So, we can say TableScanDesc#getRowLimit doesn't cause an issue in the world of RelNodes. I expect we could push down the row count to HiveTableScan when we support TABLESAMPLE in CBO.
So, in my current understanding, this PR doesn't cause an immediate problem.

}
return super.explainTerms(pw)
.itemIf("qbid:alias", concatQbIDAlias, this.useQBIdInDigest)
.itemIf("htColumns", this.neededColIndxsFrmReloptHT, pw.getDetailLevel() == SqlExplainLevel.DIGEST_ATTRIBUTES)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,8 +277,8 @@ private ASTNode convert() throws CalciteSemanticException {
if (where != null) {
ASTNode cond = where.getCondition().accept(new RexVisitor(schema, false, root.getCluster().getRexBuilder()));
hiveAST.where = ASTBuilder.where(cond);
planMapper.link(cond, where);
planMapper.link(cond, RelTreeSignature.of(where));
planMapper.link(cond, where);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We link RelTreeSignature first so that we can safely unify multiple filters.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cool! :D

}

/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,15 +85,15 @@ private String relSignature(RelNode rel) {
}
final StringWriter sw = new StringWriter();
final RelWriter planWriter =
new NonRecursiveRelWriterImpl(
new RelTreeSignatureWriter(
new PrintWriter(sw), SqlExplainLevel.EXPPLAN_ATTRIBUTES, false);
rel.explain(planWriter);
return sw.toString();
}

static class NonRecursiveRelWriterImpl extends RelWriterImplCopy {
public static class RelTreeSignatureWriter extends RelWriterImplCopy {

public NonRecursiveRelWriterImpl(PrintWriter pw, SqlExplainLevel detailLevel, boolean withIdPrefix) {
public RelTreeSignatureWriter(PrintWriter pw, SqlExplainLevel detailLevel, boolean withIdPrefix) {
super(pw, detailLevel, withIdPrefix);
}

Expand Down
15 changes: 13 additions & 2 deletions ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
Original file line number Diff line number Diff line change
Expand Up @@ -1046,11 +1046,16 @@ public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx, Obje
}
}
if (nd instanceof TableScanOperator) {
TableScanOperator ts = (TableScanOperator) nd;
// If the tablescan operator is making use of filtering capabilities of readers then
// we will not see the actual incoming rowcount which was processed - so we may not use it for relNodes
TableScanOperator ts = (TableScanOperator) nd;
if (ts.getConf().getPredicateString() != null) {
planMapper.link(ts, new OperatorStats.MayNotUseForRelNodes());
invalidateForRelNodes(ts, false);
}
// If sampling is configured, the table scan could be canceled in the middle. We avoid using runtime stats
// for HiveTableScan and its descendants as it is not pushed down to HiveTableScan RelNodes
if (ts.getConf().getRowLimit() >= 0) {
invalidateForRelNodes(ts, true);
deniskuzZ marked this conversation as resolved.
Show resolved Hide resolved
}
}
return null;
Expand All @@ -1074,6 +1079,12 @@ private void mark(Operator<?> op) {
planMapper.link(op, new OperatorStats.IncorrectRuntimeStatsMarker());
}

private void invalidateForRelNodes(Operator<?> op, boolean recursive) {
planMapper.link(op, new OperatorStats.MayNotUseForRelNodes());
if (recursive) {
op.getChildOperators().forEach(child -> invalidateForRelNodes(child, true));
}
}
}

private void markOperatorsWithUnstableRuntimeStats(OptimizeTezProcContext procCtx) throws SemanticException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.hadoop.hive.ql.optimizer.signature.OpTreeSignatureFactory;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Sets;
import org.apache.hadoop.hive.ql.optimizer.signature.RelTreeSignature;

/**
* Enables to connect related objects to eachother.
Expand All @@ -45,8 +46,11 @@
*/
public class PlanMapper {

Set<EquivGroup> groups = new HashSet<>();
private Map<Object, EquivGroup> objectMap = new CompositeMap<>(OpTreeSignature.class, AuxOpTreeSignature.class);
private final Set<EquivGroup> groups = new HashSet<>();
private final Map<Object, EquivGroup> objectMap = new CompositeMap<>(
OpTreeSignature.class,
AuxOpTreeSignature.class,
RelTreeSignature.class);

/**
* Specialized class which can compare by identity or value; based on the key type.
Expand Down
28 changes: 28 additions & 0 deletions ql/src/test/queries/clientpositive/cbo_cte_materialization.q
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
--! qt:dataset:src

set hive.optimize.cte.materialize.threshold=1;
set hive.optimize.cte.materialize.full.aggregate.only=false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

set hive.query.planmapper.link.relnodes=false;

By setting this property the test fails with the same RuntimeException: equivalence mapping violation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm checking the whole execution and what the flag does. Disabling hive.query.planmapper.link.relnodes, no aux signature is created, and no merge across Operators doesn't happen. I feel it is not consistent with the description, Whether to link Calcite nodes to runtime statistics.
I guess the direct problem will be resolved if we skip linking RelNodes with Operators when hive.query.planmapper.link.relnodes=false is configured. That sounds more consistent with the description.

I'm putting my additional notes here. I tried to put some to-be solutions but it could not be very easy.
https://docs.google.com/document/d/1LCST23cSBZglBzjhnCqHlpcLv6xrXRBxU_wszSDAk9w/edit?usp=sharing

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess hive.query.planmapper.link.relnodes can be implemented like this. @kgyrtkirk might have some more contexts.
okumin@cdab2c1

Copy link
Contributor Author

@okumin okumin May 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found this fails if we disable hive.query.planmapper.link.relnodes.

create table src (key string, value string);

set hive.cbo.fallback.strategy=NEVER;
set hive.query.planmapper.link.relnodes=false;
set hive.optimize.ppd=false;

EXPLAIN
SELECT * FROM src WHERE key = '5'
UNION ALL
SELECT * FROM src WHERE key = '5';

@zabetak As a potential workaround, I'm wondering if it makes sense to relax the validation and disable features with which PlanMapper is involved when "equivalence mapping violation" happens.
I presume PlanMapper succeeds in 99.9% cases as most qtests succeed. Currently, the remaining 0.1% fails and it sounds like an excessive penalty. We can keep the validation for qtests so that we can minimize the risk of degradation.
In my feeling, it could be possible to decrease the 0.1% to 0.05%, but could be tough to achieve 0% with a single patch.
okumin@dd5be15


EXPLAIN CBO
WITH materialized_cte AS (
SELECT key, value FROM src WHERE key != '100'
),
another_materialized_cte AS (
SELECT key, value FROM src WHERE key != '100'
)
SELECT a.key, a.value, b.key, b.value
FROM materialized_cte a
JOIN another_materialized_cte b ON a.key = b.key
ORDER BY a.key;

EXPLAIN
WITH materialized_cte AS (
SELECT key, value FROM src WHERE key != '100'
),
another_materialized_cte AS (
SELECT key, value FROM src WHERE key != '100'
)
SELECT a.key, a.value, b.key, b.value
FROM materialized_cte a
JOIN another_materialized_cte b ON a.key = b.key
ORDER BY a.key;
1 change: 0 additions & 1 deletion ql/src/test/queries/clientpositive/perf/cbo_query14.q
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
--! qt:disabled:HIVE-24167
set hive.mapred.mode=nonstrict;
-- start query 1 in stream 0 using template query14.tpl and seed 1819994127
explain cbo
Expand Down
1 change: 0 additions & 1 deletion ql/src/test/queries/clientpositive/perf/query14.q
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
--! qt:disabled:HIVE-24167
set hive.mapred.mode=nonstrict;
-- start query 1 in stream 0 using template query14.tpl and seed 1819994127
explain
Expand Down
246 changes: 246 additions & 0 deletions ql/src/test/results/clientpositive/llap/cbo_cte_materialization.q.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,246 @@
PREHOOK: query: EXPLAIN CBO
WITH materialized_cte AS (
SELECT key, value FROM src WHERE key != '100'
),
another_materialized_cte AS (
SELECT key, value FROM src WHERE key != '100'
)
SELECT a.key, a.value, b.key, b.value
FROM materialized_cte a
JOIN another_materialized_cte b ON a.key = b.key
ORDER BY a.key
PREHOOK: type: QUERY
PREHOOK: Input: default@another_materialized_cte
PREHOOK: Input: default@materialized_cte
#### A masked pattern was here ####
POSTHOOK: query: EXPLAIN CBO
WITH materialized_cte AS (
SELECT key, value FROM src WHERE key != '100'
),
another_materialized_cte AS (
SELECT key, value FROM src WHERE key != '100'
)
SELECT a.key, a.value, b.key, b.value
FROM materialized_cte a
JOIN another_materialized_cte b ON a.key = b.key
ORDER BY a.key
POSTHOOK: type: QUERY
POSTHOOK: Input: default@another_materialized_cte
POSTHOOK: Input: default@materialized_cte
#### A masked pattern was here ####
CBO PLAN:
HiveSortLimit(sort0=[$0], dir0=[ASC])
HiveProject(key=[$0], value=[$1], key0=[$2], value0=[$3])
HiveJoin(condition=[=($0, $2)], joinType=[inner], algorithm=[none], cost=[not available])
HiveProject(key=[$0], value=[$1])
HiveFilter(condition=[IS NOT NULL($0)])
HiveTableScan(table=[[default, materialized_cte]], table:alias=[a])
HiveProject(key=[$0], value=[$1])
HiveFilter(condition=[IS NOT NULL($0)])
HiveTableScan(table=[[default, another_materialized_cte]], table:alias=[b])

PREHOOK: query: EXPLAIN
WITH materialized_cte AS (
SELECT key, value FROM src WHERE key != '100'
),
another_materialized_cte AS (
SELECT key, value FROM src WHERE key != '100'
)
SELECT a.key, a.value, b.key, b.value
FROM materialized_cte a
JOIN another_materialized_cte b ON a.key = b.key
ORDER BY a.key
PREHOOK: type: QUERY
PREHOOK: Input: default@another_materialized_cte
PREHOOK: Input: default@materialized_cte
#### A masked pattern was here ####
POSTHOOK: query: EXPLAIN
WITH materialized_cte AS (
SELECT key, value FROM src WHERE key != '100'
),
another_materialized_cte AS (
SELECT key, value FROM src WHERE key != '100'
)
SELECT a.key, a.value, b.key, b.value
FROM materialized_cte a
JOIN another_materialized_cte b ON a.key = b.key
ORDER BY a.key
POSTHOOK: type: QUERY
POSTHOOK: Input: default@another_materialized_cte
POSTHOOK: Input: default@materialized_cte
#### A masked pattern was here ####
STAGE DEPENDENCIES:
Stage-1 is a root stage
Stage-2 depends on stages: Stage-1
Stage-7 depends on stages: Stage-2, Stage-0, Stage-5, Stage-3
Stage-0 depends on stages: Stage-1
Stage-4 is a root stage
Stage-5 depends on stages: Stage-4
Stage-3 depends on stages: Stage-4
Stage-6 depends on stages: Stage-7

STAGE PLANS:
Stage: Stage-1
Tez
#### A masked pattern was here ####
Vertices:
Map 1
Map Operator Tree:
TableScan
alias: src
filterExpr: (key <> '100') (type: boolean)
Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
Filter Operator
predicate: (key <> '100') (type: boolean)
Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
Select Operator
expressions: key (type: string), value (type: string)
outputColumnNames: _col0, _col1
Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
File Output Operator
compressed: false
Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
table:
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: default.materialized_cte
Execution mode: vectorized, llap
LLAP IO: all inputs

Stage: Stage-2
Dependency Collection

Stage: Stage-7
Tez
#### A masked pattern was here ####
Edges:
Reducer 4 <- Map 3 (SIMPLE_EDGE), Map 6 (SIMPLE_EDGE)
Reducer 5 <- Reducer 4 (SIMPLE_EDGE)
#### A masked pattern was here ####
Vertices:
Map 3
Map Operator Tree:
TableScan
alias: a
filterExpr: key is not null (type: boolean)
Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
Filter Operator
predicate: key is not null (type: boolean)
Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
Select Operator
expressions: key (type: string), value (type: string)
outputColumnNames: _col0, _col1
Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
Reduce Output Operator
key expressions: _col0 (type: string)
null sort order: z
sort order: +
Map-reduce partition columns: _col0 (type: string)
Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
value expressions: _col1 (type: string)
Execution mode: vectorized, llap
LLAP IO: all inputs
Map 6
Map Operator Tree:
TableScan
alias: b
filterExpr: key is not null (type: boolean)
Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
Filter Operator
predicate: key is not null (type: boolean)
Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
Select Operator
expressions: key (type: string), value (type: string)
outputColumnNames: _col0, _col1
Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
Reduce Output Operator
key expressions: _col0 (type: string)
null sort order: z
sort order: +
Map-reduce partition columns: _col0 (type: string)
Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
value expressions: _col1 (type: string)
Execution mode: vectorized, llap
LLAP IO: all inputs
Reducer 4
Execution mode: llap
Reduce Operator Tree:
Merge Join Operator
condition map:
Inner Join 0 to 1
keys:
0 _col0 (type: string)
1 _col0 (type: string)
outputColumnNames: _col0, _col1, _col2, _col3
Statistics: Num rows: 791 Data size: 281596 Basic stats: COMPLETE Column stats: COMPLETE
Reduce Output Operator
key expressions: _col0 (type: string)
null sort order: z
sort order: +
Statistics: Num rows: 791 Data size: 281596 Basic stats: COMPLETE Column stats: COMPLETE
value expressions: _col1 (type: string), _col2 (type: string), _col3 (type: string)
Reducer 5
Execution mode: vectorized, llap
Reduce Operator Tree:
Select Operator
expressions: KEY.reducesinkkey0 (type: string), VALUE._col0 (type: string), VALUE._col1 (type: string), VALUE._col2 (type: string)
outputColumnNames: _col0, _col1, _col2, _col3
Statistics: Num rows: 791 Data size: 281596 Basic stats: COMPLETE Column stats: COMPLETE
File Output Operator
compressed: false
Statistics: Num rows: 791 Data size: 281596 Basic stats: COMPLETE Column stats: COMPLETE
table:
input format: org.apache.hadoop.mapred.SequenceFileInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe

Stage: Stage-0
Move Operator
files:
hdfs directory: true
#### A masked pattern was here ####

Stage: Stage-4
Tez
#### A masked pattern was here ####
Vertices:
Map 2
Map Operator Tree:
TableScan
alias: src
filterExpr: (key <> '100') (type: boolean)
Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
Filter Operator
predicate: (key <> '100') (type: boolean)
Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
Select Operator
expressions: key (type: string), value (type: string)
outputColumnNames: _col0, _col1
Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
File Output Operator
compressed: false
Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
table:
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: default.another_materialized_cte
Execution mode: vectorized, llap
LLAP IO: all inputs

Stage: Stage-5
Dependency Collection

Stage: Stage-3
Move Operator
files:
hdfs directory: true
#### A masked pattern was here ####

Stage: Stage-6
Fetch Operator
limit: -1
Processor Tree:
ListSink