Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HIVE-24231: Enhance shared work optimizer to merge scans with filters on both sides #1553

Merged
merged 63 commits into from Oct 20, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
63 commits
Select commit Hold shift + click to select a range
af0863b
existing work
kgyrtkirk Oct 5, 2020
92ce37b
add semi;
kgyrtkirk Oct 5, 2020
3d5607d
smi
kgyrtkirk Oct 5, 2020
59c3d1f
x
kgyrtkirk Oct 5, 2020
758c900
remove exception
kgyrtkirk Oct 5, 2020
1ee6e17
update tpcds
kgyrtkirk Oct 5, 2020
d67efb6
remove typo
kgyrtkirk Oct 5, 2020
38470ef
update
kgyrtkirk Oct 6, 2020
4bb84bf
disable dppu
kgyrtkirk Oct 6, 2020
afd8a66
add comment
kgyrtkirk Oct 6, 2020
6c54baa
add comment+fix
kgyrtkirk Oct 6, 2020
e889d4b
update mergeablecheck
kgyrtkirk Oct 6, 2020
4a87c2f
q.out updates
kgyrtkirk Oct 6, 2020
7d1584d
update
kgyrtkirk Oct 6, 2020
b43cb82
en-x
kgyrtkirk Oct 6, 2020
261fdcc
Revert "en-x"
kgyrtkirk Oct 6, 2020
c35896f
qqqqqqqqqqqqqqqqRevert "Revert "en-x""
kgyrtkirk Oct 6, 2020
34c4423
accept q.out; small updates
kgyrtkirk Oct 6, 2020
0320b9a
accept dyn-op
kgyrtkirk Oct 7, 2020
8c41061
update messages
kgyrtkirk Oct 7, 2020
8d33c6e
updates
kgyrtkirk Oct 7, 2020
e5b14a1
downstreammerge
kgyrtkirk Oct 7, 2020
dda3523
fixes to downstream merge
kgyrtkirk Oct 7, 2020
6f4faea
remove fixme
kgyrtkirk Oct 7, 2020
9b18ff9
fix npe
kgyrtkirk Oct 7, 2020
ad629eb
remove downstream-merge
kgyrtkirk Oct 7, 2020
d1a62b4
accepted q.out changes
kgyrtkirk Oct 7, 2020
610b789
fix bug
kgyrtkirk Oct 7, 2020
94608ec
back to 0 diff
kgyrtkirk Oct 7, 2020
d4f41a3
fix
kgyrtkirk Oct 7, 2020
06cfe04
accept q.outs
kgyrtkirk Oct 7, 2020
8d7b4f5
accept syubquery_in
kgyrtkirk Oct 7, 2020
15e5eff
Revert "remove downstream-merge"
kgyrtkirk Oct 7, 2020
a97ac51
Revert "Revert "remove downstream-merge""
kgyrtkirk Oct 7, 2020
e102149
q.out updates
kgyrtkirk Oct 7, 2020
aaccbdc
fix union
kgyrtkirk Oct 7, 2020
9838961
x
kgyrtkirk Oct 7, 2020
90607e8
q5 back
kgyrtkirk Oct 7, 2020
5656c7d
changes relating to removal of downstream-merge
kgyrtkirk Oct 8, 2020
199fee4
fix recompilation in SWO
kgyrtkirk Oct 8, 2020
831c708
Merge remote-tracking branch 'apache/master' into HIVE-swo-dppunion
kgyrtkirk Oct 8, 2020
18678dc
remove logger changes
kgyrtkirk Oct 8, 2020
f672130
remove ws change
kgyrtkirk Oct 8, 2020
b0d9aef
remove non-finished test
kgyrtkirk Oct 8, 2020
e9bb991
cleanup
kgyrtkirk Oct 8, 2020
92f2951
archive testresult-xmls
kgyrtkirk Oct 8, 2020
c210b8e
archive results for each split
kgyrtkirk Oct 9, 2020
6abc1ac
create test-results.tgz archive
kgyrtkirk Oct 9, 2020
b4b9ce1
fix typo
kgyrtkirk Oct 9, 2020
31ddb97
use surefire M5
kgyrtkirk Oct 9, 2020
053281c
Merge remote-tracking branch 'apache/master' into HIVE-swo-dppunion
kgyrtkirk Oct 12, 2020
520598f
upgrade to M5 in other parts
kgyrtkirk Oct 12, 2020
35fccf2
remove HIVE-24624 related stuff
kgyrtkirk Oct 12, 2020
8468572
remove feature disable from subq tests
kgyrtkirk Oct 12, 2020
b336b84
remove set from explain
kgyrtkirk Oct 12, 2020
9e07afd
update qfiles; go back to simpler changes
kgyrtkirk Oct 12, 2020
e775b74
tests back to master state
kgyrtkirk Oct 13, 2020
a54fc6a
cleanup
kgyrtkirk Oct 13, 2020
c3087ae
removal of ()
kgyrtkirk Oct 13, 2020
70e4ac2
removal of ()
kgyrtkirk Oct 13, 2020
acec785
cleanup; move functions; address review comments
kgyrtkirk Oct 13, 2020
7fa1eef
use adoptchildren
kgyrtkirk Oct 13, 2020
c1e35d3
add doc to mode
kgyrtkirk Oct 13, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
Expand Up @@ -2592,6 +2592,9 @@ public static enum ConfVars {
"When shared work optimizer is enabled, whether we should reuse the cache for the broadcast side\n" +
"of mapjoin operators that share same broadcast input. Requires hive.optimize.shared.work\n" +
"to be set to true. Tez only."),
HIVE_SHARED_WORK_DPPUNION_OPTIMIZATION("hive.optimize.shared.work.dppunion", true,
"Enables dppops unioning. This optimization will enable to merge multiple tablescans with different "
+ "dynamic filters into a single one (with a more complex filter)"),
HIVE_COMBINE_EQUIVALENT_WORK_OPTIMIZATION("hive.combine.equivalent.work.optimization", true, "Whether to " +
"combine equivalent work objects during physical optimization.\n This optimization looks for equivalent " +
"work objects and combines them if they meet certain preconditions. Spark only."),
Expand Down
417 changes: 274 additions & 143 deletions ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedWorkOptimizer.java

Large diffs are not rendered by default.

Expand Up @@ -60,4 +60,8 @@ public void setShouldRemove(boolean shouldRemove) {
this.shouldRemove = shouldRemove;
}
}

public void setTableScan(TableScanOperator newTs) {
ts = newTs;
}
}
Expand Up @@ -33,6 +33,7 @@
import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
import org.apache.hadoop.hive.ql.exec.RowSchema;
import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.optimizer.ConstantPropagateProcFactory;
import org.apache.hadoop.hive.ql.parse.SemanticException;
Expand All @@ -42,12 +43,12 @@
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPNotEqual;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPNotNull;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPNull;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPOr;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFStruct;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils;
import org.apache.hadoop.hive.serde2.typeinfo.HiveDecimalUtils;
import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
Expand Down Expand Up @@ -1060,4 +1061,79 @@ public static boolean isStructUDF(ExprNodeDesc columnDesc) {
return false;
}

public static ExprNodeDesc conjunction(List<ExprNodeDesc> semijoinExprNodes) throws UDFArgumentException {
if (semijoinExprNodes.isEmpty()) {
return null;
}
if (semijoinExprNodes.size() > 1) {
return ExprNodeGenericFuncDesc.newInstance(new GenericUDFOPAnd(), semijoinExprNodes);
} else {
return semijoinExprNodes.get(0);
}
}

public static ExprNodeDesc conjunction(List<ExprNodeDesc> semijoinExprNodes, ExprNodeDesc exprNode)
throws UDFArgumentException {
if (semijoinExprNodes != null && !semijoinExprNodes.isEmpty()) {
if (exprNode != null) {
semijoinExprNodes.add(0, exprNode);
}
if (semijoinExprNodes.size() > 1) {
exprNode = ExprNodeGenericFuncDesc.newInstance(new GenericUDFOPAnd(), semijoinExprNodes);
} else {
exprNode = semijoinExprNodes.get(0);
}
}
return exprNode;
}

public static ExprNodeDesc disjunction(ExprNodeDesc e1, ExprNodeDesc e2) throws UDFArgumentException {
if (e1 == null) {
return e2;
}
if (e2 == null) {
return e1;
}
if (e1.isSame(e2)) {
return e1;
}
List<ExprNodeDesc> operands = new ArrayList<ExprNodeDesc>();
disjunctiveDecomposition(e1, operands);
disjunctiveDecomposition(e2, operands);
return disjunction(operands);
}

public static ExprNodeDesc disjunction(List<ExprNodeDesc> operands) throws UDFArgumentException {
if (operands.size() == 0) {
return null;
}
if (operands.size() == 1) {
return operands.get(0);
}
return ExprNodeGenericFuncDesc.newInstance(new GenericUDFOPOr(), operands);
}

public static void disjunctiveDecomposition(ExprNodeDesc expr, List<ExprNodeDesc> operands) {
if (isOr(expr)) {
for (ExprNodeDesc c : expr.getChildren()) {
disjunctiveDecomposition(c, operands);
}
} else {
for (ExprNodeDesc o : operands) {
if (o.isSame(expr)) {
return;
}
}
operands.add(expr);
}
}

public static boolean isOr(ExprNodeDesc expr) {
if (expr instanceof ExprNodeGenericFuncDesc) {
ExprNodeGenericFuncDesc exprNodeGenericFuncDesc = (ExprNodeGenericFuncDesc) expr;
return (exprNodeGenericFuncDesc.getGenericUDF() instanceof GenericUDFOPOr);
}
return false;
}

}
139 changes: 139 additions & 0 deletions ql/src/test/queries/clientpositive/sharedwork_semi.q
@@ -0,0 +1,139 @@
set hive.explain.user=true;
set hive.optimize.index.filter=true;
set hive.auto.convert.join=true;
set hive.vectorized.execution.enabled=true;

drop table if exists x1_store_sales;
drop table if exists x1_date_dim;

create table x1_store_sales
(
ss_sold_date_sk int,
ss_item_sk int
)
stored as orc;

create table x1_date_dim
(
d_date_sk int,
d_month_seq int,
d_year int,
d_moy int
)
stored as orc;

insert into x1_date_dim values (1,1,2000,1),
(2,2,2001,2),
(3,2,2001,3),
(4,2,2001,4),
(5,2,2001,5),
(6,2,2001,6),
(7,2,2001,7),
(8,2,2001,8);

insert into x1_store_sales values (1,1),(3,3),(4,4),(5,5),(6,6),(7,7),(8,8),(9,9),(10,10),(11,11);

alter table x1_store_sales update statistics set(
'numRows'='123456',
'rawDataSize'='1234567');

alter table x1_date_dim update statistics set(
'numRows'='28',
'rawDataSize'='81449');


set hive.auto.convert.join.noconditionaltask.size=1;
set hive.tez.dynamic.partition.pruning=true;
set hive.tez.dynamic.semijoin.reduction=true;
set hive.optimize.index.filter=true;
set hive.tez.bigtable.minsize.semijoin.reduction=1;
set hive.tez.min.bloom.filter.entries=1;
set hive.tez.bloom.filter.factor=1.0f;
set hive.explain.user=false;

set hive.optimize.shared.work.dppunion=false;

select 'expected to see a plan in which ts scan could be shared by combining semijoin conditions';
-- note: this plan should involve a semijoin reduction
explain
select sum(s.ss_item_sk)
from
x1_store_sales s
,x1_date_dim d
where
1=1
and s.ss_sold_date_sk=d.d_date_sk
and d.d_moy=3
union
select sum(s.ss_item_sk)
from
x1_store_sales s
,x1_date_dim d
where
1=1
and s.ss_sold_date_sk=d.d_date_sk
and d.d_moy=5
;

select sum(s.ss_item_sk)
from
x1_store_sales s
,x1_date_dim d
where
1=1
and s.ss_sold_date_sk=d.d_date_sk
and d.d_moy=3
union
select sum(s.ss_item_sk)
from
x1_store_sales s
,x1_date_dim d
where
1=1
and s.ss_sold_date_sk=d.d_date_sk
and d.d_moy=5
;


set hive.optimize.shared.work.dppunion=true;

select 'expected to see a plan in which x1_store_sales(s) is only scanned once';
explain
select sum(s.ss_item_sk)
from
x1_store_sales s
,x1_date_dim d
where
1=1
and s.ss_sold_date_sk=d.d_date_sk
and d.d_moy=3
union
select sum(s.ss_item_sk)
from
x1_store_sales s
,x1_date_dim d
where
1=1
and s.ss_sold_date_sk=d.d_date_sk
and d.d_moy=5
;

select sum(s.ss_item_sk)
from
x1_store_sales s
,x1_date_dim d
where
1=1
and s.ss_sold_date_sk=d.d_date_sk
and d.d_moy=3
union
select sum(s.ss_item_sk)
from
x1_store_sales s
,x1_date_dim d
where
1=1
and s.ss_sold_date_sk=d.d_date_sk
and d.d_moy=5
;