Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -123,11 +123,11 @@ STAGE PLANS:
minReductionHashAggr: 0.95
mode: hash
outputColumnNames: _col0, _col1, _col2
Statistics: Num rows: 1 Data size: 552 Basic stats: COMPLETE Column stats: COMPLETE
Statistics: Num rows: 1 Data size: 512 Basic stats: COMPLETE Column stats: COMPLETE
Reduce Output Operator
null sort order:
sort order:
Statistics: Num rows: 1 Data size: 552 Basic stats: COMPLETE Column stats: COMPLETE
Statistics: Num rows: 1 Data size: 512 Basic stats: COMPLETE Column stats: COMPLETE
value expressions: _col0 (type: string), _col1 (type: string), _col2 (type: binary)
Execution mode: vectorized
Reducer 2
Expand Down Expand Up @@ -172,11 +172,11 @@ STAGE PLANS:
aggregations: min(VALUE._col0), max(VALUE._col1), bloom_filter(VALUE._col2, 1, expectedEntries=20)
mode: final
outputColumnNames: _col0, _col1, _col2
Statistics: Num rows: 1 Data size: 552 Basic stats: COMPLETE Column stats: COMPLETE
Statistics: Num rows: 1 Data size: 512 Basic stats: COMPLETE Column stats: COMPLETE
Reduce Output Operator
null sort order:
sort order:
Statistics: Num rows: 1 Data size: 552 Basic stats: COMPLETE Column stats: COMPLETE
Statistics: Num rows: 1 Data size: 512 Basic stats: COMPLETE Column stats: COMPLETE
value expressions: _col0 (type: string), _col1 (type: string), _col2 (type: binary)

Stage: Stage-0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -476,10 +476,10 @@ private void generateEventOperatorPlan(DynamicListContext ctx, ParseContext pars
keyExprs.add(key);

// group by requires "ArrayList", don't ask.
ArrayList<String> outputNames = new ArrayList<String>();
List<String> outputNames = new ArrayList<String>();
outputNames.add(HiveConf.getColumnInternalName(0));

ArrayList<ColumnInfo> selectColInfos = new ArrayList<ColumnInfo>();
List<ColumnInfo> selectColInfos = new ArrayList<ColumnInfo>();
selectColInfos.add(new ColumnInfo(outputNames.get(0), key.getTypeInfo(), "", false));

// project the relevant key column
Expand All @@ -503,7 +503,7 @@ private void generateEventOperatorPlan(DynamicListContext ctx, ParseContext pars
HiveConf.getFloatVar(parseContext.getConf(),
ConfVars.HIVEMAPAGGRHASHMINREDUCTIONLOWERBOUND);

ArrayList<ExprNodeDesc> groupByExprs = new ArrayList<ExprNodeDesc>();
List<ExprNodeDesc> groupByExprs = new ArrayList<ExprNodeDesc>();
ExprNodeDesc groupByExpr =
new ExprNodeColumnDesc(key.getTypeInfo(), outputNames.get(0), null, false);
groupByExprs.add(groupByExpr);
Expand All @@ -513,7 +513,7 @@ private void generateEventOperatorPlan(DynamicListContext ctx, ParseContext pars
new ArrayList<AggregationDesc>(), false, groupByMemoryUsage, memoryThreshold,
minReductionHashAggr, minReductionHashAggrLowerBound, null, false, -1, true);

ArrayList<ColumnInfo> groupbyColInfos = new ArrayList<ColumnInfo>();
List<ColumnInfo> groupbyColInfos = new ArrayList<ColumnInfo>();
groupbyColInfos.add(new ColumnInfo(outputNames.get(0), key.getTypeInfo(), "", false));

GroupByOperator groupByOp = (GroupByOperator) OperatorFactory.getAndMakeChild(
Expand Down Expand Up @@ -583,7 +583,7 @@ private boolean generateSemiJoinOperatorPlan(DynamicListContext ctx, ParseContex
keyExprs.add(key);

// group by requires "ArrayList", don't ask.
ArrayList<String> outputNames = new ArrayList<String>();
List<String> outputNames = new ArrayList<String>();

// project the relevant key column
SelectDesc select = new SelectDesc(keyExprs, outputNames);
Expand All @@ -592,7 +592,7 @@ private boolean generateSemiJoinOperatorPlan(DynamicListContext ctx, ParseContex
ColumnInfo columnInfo = parentOfRS.getSchema().getColumnInfo(internalColName);
columnInfo = new ColumnInfo(columnInfo);
outputNames.add(internalColName);
ArrayList<ColumnInfo> signature = new ArrayList<ColumnInfo>();
List<ColumnInfo> signature = new ArrayList<ColumnInfo>();
signature.add(columnInfo);
RowSchema rowSchema = new RowSchema(signature);

Expand Down Expand Up @@ -627,12 +627,12 @@ private boolean generateSemiJoinOperatorPlan(DynamicListContext ctx, ParseContex
// Add min/max and bloom filter aggregations
List<ObjectInspector> aggFnOIs = new ArrayList<ObjectInspector>();
aggFnOIs.add(key.getWritableObjectInspector());
ArrayList<ExprNodeDesc> params = new ArrayList<ExprNodeDesc>();
List<ExprNodeDesc> params = new ArrayList<ExprNodeDesc>();
params.add(
new ExprNodeColumnDesc(key.getTypeInfo(), outputNames.get(0),
"", false));

ArrayList<AggregationDesc> aggs = new ArrayList<AggregationDesc>();
List<AggregationDesc> aggs = new ArrayList<AggregationDesc>();
try {
AggregationDesc min = new AggregationDesc("min",
FunctionRegistry.getGenericUDAFEvaluator("min", aggFnOIs, false, false),
Expand Down Expand Up @@ -666,7 +666,7 @@ private boolean generateSemiJoinOperatorPlan(DynamicListContext ctx, ParseContex
}

// Create the Group by Operator
ArrayList<String> gbOutputNames = new ArrayList<String>();
List<String> gbOutputNames = new ArrayList<String>();
gbOutputNames.add(SemanticAnalyzer.getColumnInternalName(0));
gbOutputNames.add(SemanticAnalyzer.getColumnInternalName(1));
gbOutputNames.add(SemanticAnalyzer.getColumnInternalName(2));
Expand All @@ -675,35 +675,26 @@ private boolean generateSemiJoinOperatorPlan(DynamicListContext ctx, ParseContex
groupByMemoryUsage, memoryThreshold, minReductionHashAggr, minReductionHashAggrLowerBound,
null, false, -1, false);

ArrayList<ColumnInfo> groupbyColInfos = new ArrayList<ColumnInfo>();
List<ColumnInfo> groupbyColInfos = new ArrayList<ColumnInfo>();
groupbyColInfos.add(new ColumnInfo(gbOutputNames.get(0), key.getTypeInfo(), "", false));
groupbyColInfos.add(new ColumnInfo(gbOutputNames.get(1), key.getTypeInfo(), "", false));
groupbyColInfos.add(new ColumnInfo(gbOutputNames.get(2), key.getTypeInfo(), "", false));
groupbyColInfos.add(new ColumnInfo(gbOutputNames.get(2), TypeInfoFactory.binaryTypeInfo, "", false));

GroupByOperator groupByOp = (GroupByOperator)OperatorFactory.getAndMakeChild(
groupBy, new RowSchema(groupbyColInfos), selectOp);

groupByOp.setColumnExprMap(new HashMap<String, ExprNodeDesc>());

// Get the column names of the aggregations for reduce sink
int colPos = 0;
ArrayList<ExprNodeDesc> rsValueCols = new ArrayList<ExprNodeDesc>();
List<ExprNodeDesc> rsValueCols = new ArrayList<ExprNodeDesc>();
Map<String, ExprNodeDesc> columnExprMap = new HashMap<String, ExprNodeDesc>();
for (int i = 0; i < aggs.size() - 1; i++) {
ExprNodeColumnDesc colExpr = new ExprNodeColumnDesc(key.getTypeInfo(),
gbOutputNames.get(colPos), "", false);
for (int i = 0; i < aggs.size(); i++) {
ExprNodeColumnDesc colExpr =
new ExprNodeColumnDesc(groupbyColInfos.get(i).getType(), gbOutputNames.get(i), "", false);
rsValueCols.add(colExpr);
columnExprMap.put(gbOutputNames.get(colPos), colExpr);
colPos++;
columnExprMap.put(gbOutputNames.get(i), colExpr);
}

// Bloom Filter uses binary
ExprNodeColumnDesc colExpr = new ExprNodeColumnDesc(TypeInfoFactory.binaryTypeInfo,
gbOutputNames.get(colPos), "", false);
rsValueCols.add(colExpr);
columnExprMap.put(gbOutputNames.get(colPos), colExpr);
colPos++;

// Create the reduce sink operator
ReduceSinkDesc rsDesc = PlanUtils.getReduceSinkDesc(
new ArrayList<ExprNodeDesc>(), rsValueCols, gbOutputNames, false,
Expand All @@ -715,14 +706,14 @@ private boolean generateSemiJoinOperatorPlan(DynamicListContext ctx, ParseContex
rsOp.getConf().setReducerTraits(EnumSet.of(ReduceSinkDesc.ReducerTraits.QUICKSTART));

// Create the final Group By Operator
ArrayList<AggregationDesc> aggsFinal = new ArrayList<AggregationDesc>();
List<AggregationDesc> aggsFinal = new ArrayList<AggregationDesc>();
try {
List<ObjectInspector> minFinalFnOIs = new ArrayList<ObjectInspector>();
List<ObjectInspector> maxFinalFnOIs = new ArrayList<ObjectInspector>();
List<ObjectInspector> bloomFilterFinalFnOIs = new ArrayList<ObjectInspector>();
ArrayList<ExprNodeDesc> minFinalParams = new ArrayList<ExprNodeDesc>();
ArrayList<ExprNodeDesc> maxFinalParams = new ArrayList<ExprNodeDesc>();
ArrayList<ExprNodeDesc> bloomFilterFinalParams = new ArrayList<ExprNodeDesc>();
List<ExprNodeDesc> minFinalParams = new ArrayList<ExprNodeDesc>();
List<ExprNodeDesc> maxFinalParams = new ArrayList<ExprNodeDesc>();
List<ExprNodeDesc> bloomFilterFinalParams = new ArrayList<ExprNodeDesc>();
// Use the expressions from Reduce Sink.
minFinalFnOIs.add(rsValueCols.get(0).getWritableObjectInspector());
maxFinalFnOIs.add(rsValueCols.get(1).getWritableObjectInspector());
Expand Down Expand Up @@ -795,32 +786,29 @@ private void createFinalRsForSemiJoinOp(
ParseContext parseContext, TableScanOperator ts, GroupByOperator gb,
ExprNodeDesc key, String keyBaseAlias, ExprNodeDesc colExpr,
boolean isHint) throws SemanticException {
ArrayList<String> gbOutputNames = new ArrayList<>();
// One each for min, max and bloom filter
gbOutputNames.add(SemanticAnalyzer.getColumnInternalName(0));
gbOutputNames.add(SemanticAnalyzer.getColumnInternalName(1));
gbOutputNames.add(SemanticAnalyzer.getColumnInternalName(2));

int colPos = 0;
ArrayList<ExprNodeDesc> rsValueCols = new ArrayList<ExprNodeDesc>();
for (int i = 0; i < gbOutputNames.size() - 1; i++) {
ExprNodeColumnDesc expr = new ExprNodeColumnDesc(key.getTypeInfo(),
gbOutputNames.get(colPos++), "", false);
rsValueCols.add(expr);
List<String> gbOutputNames = new ArrayList<>();
List<ExprNodeDesc> rsValueCols = new ArrayList<ExprNodeDesc>();
Map<String, ExprNodeDesc> columnExprMap = new HashMap<String, ExprNodeDesc>();
List<ColumnInfo> rsColInfos = new ArrayList<>();
for (ColumnInfo gbyColInfo : gb.getSchema().getSignature()) {
String gbyColName = gbyColInfo.getInternalName();
gbOutputNames.add(gbyColName);

TypeInfo typInfo = gbyColInfo.getType();
ExprNodeColumnDesc rsValExpr = new ExprNodeColumnDesc(typInfo, gbyColName, "", false);
rsValueCols.add(rsValExpr);

String rsOutputColName = Utilities.ReduceField.VALUE + "." + gbyColName;
columnExprMap.put(rsOutputColName, rsValExpr);
rsColInfos.add(new ColumnInfo(rsOutputColName, typInfo, "", false));
}

// Bloom Filter uses binary
ExprNodeColumnDesc colBFExpr = new ExprNodeColumnDesc(TypeInfoFactory.binaryTypeInfo,
gbOutputNames.get(colPos++), "", false);
rsValueCols.add(colBFExpr);

// Create the final Reduce Sink Operator
ReduceSinkDesc rsDescFinal = PlanUtils.getReduceSinkDesc(
new ArrayList<ExprNodeDesc>(), rsValueCols, gbOutputNames, false,
-1, 0, 1, Operation.NOT_ACID, NullOrdering.defaultNullOrder(parseContext.getConf()));
ReduceSinkOperator rsOpFinal = (ReduceSinkOperator)OperatorFactory.getAndMakeChild(
rsDescFinal, new RowSchema(gb.getSchema()), gb);
Map<String, ExprNodeDesc> columnExprMap = new HashMap<>();
rsDescFinal, new RowSchema(rsColInfos), gb);
rsOpFinal.setColumnExprMap(columnExprMap);

LOG.debug("DynamicSemiJoinPushdown: Saving RS to TS mapping: " + rsOpFinal + ": " + ts);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.hadoop.hive.ql.optimizer.graph.OperatorGraph;
import org.apache.hadoop.hive.ql.optimizer.graph.OperatorGraph.Cluster;
import org.apache.hadoop.hive.ql.parse.ParseContext;
import org.apache.hadoop.hive.ql.parse.RuntimeValuesInfo;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.parse.SemiJoinBranchInfo;
import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
Expand Down Expand Up @@ -155,6 +156,11 @@ private void removeSJEdges() throws SemanticException {
rs.getChildOperators().clear();
ts.getParentOperators().remove(rs);
rs2sj.put((ReduceSinkOperator) rs, sji);

if (pctx.getRsToRuntimeValuesInfoMap().containsKey(e.getKey())) {
RuntimeValuesInfo rvi = pctx.getRsToRuntimeValuesInfoMap().remove(e.getKey());
pctx.getRsToRuntimeValuesInfoMap().put((ReduceSinkOperator) rs, rvi);
}
}
pctx.setRsToSemiJoinBranchInfo(rs2sj);
}
Expand Down Expand Up @@ -325,6 +331,12 @@ public static Optional<Set<String>> colMappingInverseKeys(ReduceSinkOperator rs)
Map<String, String> ret = new HashMap<String, String>();
Map<String, ExprNodeDesc> exprMap = rs.getColumnExprMap();
Set<String> neededColumns = new HashSet<String>();

if (!rs.getSchema().getColumnNames().stream().allMatch(exprMap::containsKey)) {
// Cannot invert RS because exprMap does not contain all of RS's input columns.
return Optional.empty();
}

try {
for (Entry<String, ExprNodeDesc> e : exprMap.entrySet()) {
String columnName = extractColumnName(e.getValue());
Expand All @@ -344,6 +356,6 @@ public static Optional<Set<String>> colMappingInverseKeys(ReduceSinkOperator rs)
} catch (SemanticException e) {
return Optional.empty();
}

}
}

46 changes: 46 additions & 0 deletions ql/src/test/queries/clientpositive/sharedwork_semi_2.q
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
set hive.auto.convert.join=true;
set hive.optimize.shared.work.dppunion.merge.eventops=true;
set hive.optimize.shared.work.dppunion=true;
set hive.optimize.shared.work.extended=true;
set hive.optimize.shared.work.parallel.edge.support=true;
set hive.optimize.shared.work=true;
set hive.support.concurrency=true;
set hive.tez.bigtable.minsize.semijoin.reduction=1;
set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;

drop table if exists x1_store_sales;
drop table if exists x1_date_dim;

create table x1_store_sales (ss_sold_date_sk int, ss_item_sk int) stored as orc tblproperties('transactional'='true', 'transactional_properties'='default');
create table x1_date_dim (d_date_sk int, d_month_seq int, d_year int, d_moy int) stored as orc tblproperties('transactional'='true', 'transactional_properties'='default');

insert into x1_date_dim values (1,1,2000,1), (2,2,2001,2), (3,2,2001,3), (4,2,2001,4), (5,2,2001,5), (6,2,2001,6), (7,2,2001,7), (8,2,2001,8);
insert into x1_store_sales values (1,1),(3,3),(4,4),(5,5),(6,6),(7,7),(8,8),(9,9),(10,10),(11,11);

alter table x1_store_sales update statistics set('numRows'='123456', 'rawDataSize'='1234567');
alter table x1_date_dim update statistics set('numRows'='28', 'rawDataSize'='81449');

explain
select ss_item_sk
from (
select ss_item_sk, ss_sold_date_sk from x1_store_sales
union all
select ss_item_sk, ss_sold_date_sk from x1_store_sales
union all
select ss_item_sk, ss_sold_date_sk from x1_store_sales
) as tmp,
x1_date_dim
where ss_sold_date_sk = d_date_sk and d_moy=1;

select ss_item_sk
from (
select ss_item_sk, ss_sold_date_sk from x1_store_sales
union all
select ss_item_sk, ss_sold_date_sk from x1_store_sales
union all
select ss_item_sk, ss_sold_date_sk from x1_store_sales
) as tmp,
x1_date_dim
where ss_sold_date_sk = d_date_sk and d_moy=1;


Original file line number Diff line number Diff line change
Expand Up @@ -1712,11 +1712,11 @@ STAGE PLANS:
minReductionHashAggr: 0.99
mode: hash
outputColumnNames: _col0, _col1, _col2
Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE Column stats: COMPLETE
Statistics: Num rows: 1 Data size: 152 Basic stats: COMPLETE Column stats: COMPLETE
Reduce Output Operator
null sort order:
sort order:
Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE Column stats: COMPLETE
Statistics: Num rows: 1 Data size: 152 Basic stats: COMPLETE Column stats: COMPLETE
value expressions: _col0 (type: int), _col1 (type: int), _col2 (type: binary)
Execution mode: vectorized, llap
LLAP IO: all inputs
Expand Down Expand Up @@ -1793,11 +1793,11 @@ STAGE PLANS:
aggregations: min(VALUE._col0), max(VALUE._col1), bloom_filter(VALUE._col2, 1, expectedEntries=1000000)
mode: final
outputColumnNames: _col0, _col1, _col2
Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE Column stats: COMPLETE
Statistics: Num rows: 1 Data size: 152 Basic stats: COMPLETE Column stats: COMPLETE
Reduce Output Operator
null sort order:
sort order:
Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE Column stats: COMPLETE
Statistics: Num rows: 1 Data size: 152 Basic stats: COMPLETE Column stats: COMPLETE
value expressions: _col0 (type: int), _col1 (type: int), _col2 (type: binary)

Stage: Stage-0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,11 +224,11 @@ STAGE PLANS:
minReductionHashAggr: 0.99
mode: hash
outputColumnNames: _col0, _col1, _col2
Statistics: Num rows: 1 Data size: 24 Basic stats: COMPLETE Column stats: NONE
Statistics: Num rows: 1 Data size: 164 Basic stats: COMPLETE Column stats: NONE
Reduce Output Operator
null sort order:
sort order:
Statistics: Num rows: 1 Data size: 24 Basic stats: COMPLETE Column stats: NONE
Statistics: Num rows: 1 Data size: 164 Basic stats: COMPLETE Column stats: NONE
value expressions: _col0 (type: int), _col1 (type: int), _col2 (type: binary)
Execution mode: vectorized, llap
Map 8
Expand Down Expand Up @@ -308,11 +308,11 @@ STAGE PLANS:
aggregations: min(VALUE._col0), max(VALUE._col1), bloom_filter(VALUE._col2, 1, expectedEntries=1187500)
mode: final
outputColumnNames: _col0, _col1, _col2
Statistics: Num rows: 1 Data size: 24 Basic stats: COMPLETE Column stats: NONE
Statistics: Num rows: 1 Data size: 164 Basic stats: COMPLETE Column stats: NONE
Reduce Output Operator
null sort order:
sort order:
Statistics: Num rows: 1 Data size: 24 Basic stats: COMPLETE Column stats: NONE
Statistics: Num rows: 1 Data size: 164 Basic stats: COMPLETE Column stats: NONE
value expressions: _col0 (type: int), _col1 (type: int), _col2 (type: binary)

Stage: Stage-0
Expand Down
Loading