Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HIVE-26968: Compare DPP sources when compare and gather parent operators #3981

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1187,7 +1187,7 @@ private static boolean areMergeableExtendedCheck(ParseContext pctx, SharedWorkOp
if (!bs.get(j)) {
// If not visited yet
Operator<?> dppOp2 = dppsOp2.get(j);
if (compareAndGatherOps(pctx, dppOp1, dppOp2) != null) {
if (compareAndGatherOps(pctx, optimizerCache, dppOp1, dppOp2) != null) {
// The DPP operator/branch are equal
bs.set(j);
break;
Expand Down Expand Up @@ -1258,7 +1258,7 @@ private static boolean areMergeableExcludeSemijoinsExtendedCheck(ParseContext pc
if (!bs.get(j)) {
// If not visited yet
Operator<?> dppOp2 = dppsOp2.get(j);
if (compareAndGatherOps(pctx, dppOp1, dppOp2) != null) {
if (compareAndGatherOps(pctx, optimizerCache, dppOp1, dppOp2) != null) {
// The DPP operator/branch are equal
bs.set(j);
break;
Expand Down Expand Up @@ -1488,7 +1488,7 @@ private static SharedResult extractSharedOptimizationInfo(ParseContext pctx,
}
// Compare input
List<Operator<?>> removeOpsForCurrentInput =
compareAndGatherOps(pctx, parentOp1, parentOp2);
compareAndGatherOps(pctx, optimizerCache, parentOp1, parentOp2);
if (removeOpsForCurrentInput == null) {
// Inputs are not the same, bail out
break;
Expand Down Expand Up @@ -1625,22 +1625,35 @@ private static void removeBranch(Operator<?> currentOp, Set<Operator<?>> branche
}

private static List<Operator<?>> compareAndGatherOps(ParseContext pctx,
Operator<?> op1, Operator<?> op2) throws SemanticException {
SharedWorkOptimizerCache optimizerCache, Operator<?> op1, Operator<?> op2) throws SemanticException {
List<Operator<?>> result = new ArrayList<>();
boolean mergeable = compareAndGatherOps(pctx, op1, op2, result, true);
boolean mergeable = compareAndGatherOps(pctx, optimizerCache, op1, op2, result, true);
if (!mergeable) {
return null;
}
return result;
}

private static boolean compareAndGatherOps(ParseContext pctx, Operator<?> op1, Operator<?> op2,
List<Operator<?>> result, boolean gather) throws SemanticException {
private static boolean compareAndGatherOps(ParseContext pctx,
SharedWorkOptimizerCache optimizerCache,
Operator<?> op1,
Operator<?> op2,
List<Operator<?>> result,
boolean gather) throws SemanticException {
if (!compareOperator(pctx, op1, op2)) {
LOG.debug("Operators not equal: {} and {}", op1, op2);
return false;
}

if (op1 instanceof TableScanOperator) {
Boolean areMergeable =
areMergeableExtendedCheck(pctx, optimizerCache, (TableScanOperator) op1, (TableScanOperator) op2);
if (!areMergeable) {
LOG.debug("Operators have different DPP parent: {} and {}", op1, op2);
return false;
}
}

if (gather && op2.getChildOperators().size() > 1) {
// If the second operator has more than one child, we stop gathering
gather = false;
Expand All @@ -1660,7 +1673,7 @@ private static boolean compareAndGatherOps(ParseContext pctx, Operator<?> op1, O
Operator<?> op1ParentOp = op1ParentOperators.get(i);
Operator<?> op2ParentOp = op2ParentOperators.get(i);
boolean mergeable =
compareAndGatherOps(pctx, op1ParentOp, op2ParentOp, result, gather);
compareAndGatherOps(pctx, optimizerCache, op1ParentOp, op2ParentOp, result, gather);
if (!mergeable) {
return false;
}
Expand Down
64 changes: 64 additions & 0 deletions ql/src/test/queries/clientpositive/sharedwork_dpp.q
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
drop table if exists x_date_dim;
drop table if exists x_item;
drop table if exists x_store_sales;

create table x_date_dim (d_date_sk bigint, d_year int);

create table x_item (i_item_sk bigint, i_product_name string);

create table x_store_sales (ss_item_sk bigint, dummy string) partitioned by (ss_sold_date_sk bigint);
-- In terms of sizes, x_store_sales should be the biggest table and x_date_dim should be the smallest table so
-- the content is chosen accordingly.

insert into table x_date_dim
values (1, 1999), (2, 2000), (3, 2001);

insert into table x_item values
(1, 'white snow'),
(2, 'solid steel'),
(3, 'dim cloud');

insert into table x_store_sales (ss_item_sk, dummy, ss_sold_date_sk)
values
(1, 'Dummy content just to make this table size the bigger among others', 1),
(2, 'Dummy content just to make this table size the bigger among others', 2),
(2, 'Dummy content just to make this table size the bigger among others', 3);

-- x_store_sales(TS[0], TS[19]) should join with x_item(TS[1], TS[20]) before join with x_date_dim(TS[2], TS[21]).
-- hive.cbo.enable is set to false in order to arrange joins in the desired order.
set hive.cbo.enable=false;
set hive.auto.convert.join=true;
set hive.optimize.shared.work=true;
set hive.optimize.shared.work.extended=true;
set hive.optimize.shared.work.parallel.edge.support=true;

explain
with base as (
select
i_product_name product_name, i_item_sk item_sk, d_year year
from x_store_sales, x_item, x_date_dim
where
ss_item_sk = i_item_sk and ss_sold_date_sk = d_date_sk
group by i_product_name, i_item_sk, d_year
)
select cs1.product_name, cs1.year, cs2.year
from base cs1, base cs2
where
cs1.item_sk = cs2.item_sk and
cs1.year = 2000 and
cs2.year = 2001;

with base as (
select
i_product_name product_name, i_item_sk item_sk, d_year year
from x_store_sales, x_item, x_date_dim
where
ss_item_sk = i_item_sk and ss_sold_date_sk = d_date_sk
group by i_product_name, i_item_sk, d_year
)
select cs1.product_name, cs1.year, cs2.year
from base cs1, base cs2
where
cs1.item_sk = cs2.item_sk and
cs1.year = 2000 and
cs2.year = 2001;