Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
Original file line number Diff line number Diff line change
Expand Up @@ -2457,6 +2457,12 @@ public static enum ConfVars {
"The comma-separated list of SerDe classes that are considered when enhancing table-properties \n" +
"during logical optimization."),

HIVE_OPTIMIZE_SCAN_PROBEDECODE("hive.optimize.scan.probedecode", false,
"Whether to find suitable table scan operators that could reduce the number of decoded rows at runtime by probing extra available information. \n"
+ "The probe side for the row-level filtering is generated either statically in the case of expressions or dynamically for joins"
+ "e.g., use the cached MapJoin hashtable created on the small table side to filter out row columns that are not going "
+ "to be used when reading the large table data. This will result less CPU cycles spent for decoding unused data."),

// CTE
HIVE_CTE_MATERIALIZE_THRESHOLD("hive.optimize.cte.materialize.threshold", -1,
"If the number of references to a CTE clause exceeds this threshold, Hive will materialize it\n" +
Expand Down
2 changes: 2 additions & 0 deletions itests/src/test/resources/testconfiguration.properties
Original file line number Diff line number Diff line change
Expand Up @@ -542,6 +542,8 @@ minillaplocal.query.files=\
cross_prod_3.q,\
cross_prod_4.q,\
dpp.q,\
probedecode_mapjoin_stats.q,\
probedecode_mapjoin_simple.q,\
dynamic_partition_pruning.q,\
dynamic_partition_join_noncbo.q,\
dynamic_semijoin_reduction.q,\
Expand Down
54 changes: 54 additions & 0 deletions ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
Expand Down Expand Up @@ -84,6 +86,50 @@ public class TableScanOperator extends Operator<TableScanDesc> implements
private String schemaEvolutionColumns;
private String schemaEvolutionColumnsTypes;

private ProbeDecodeContext probeDecodeContextSet;

/**
* Inner wrapper class for TS ProbeDecode optimization
*/
public static class ProbeDecodeContext {

private final String mjSmallTableCacheKey;
private final String mjBigTableKeyColName;
private final byte mjSmallTablePos;
private final double keyRatio;

public ProbeDecodeContext(String mjSmallTableCacheKey, byte mjSmallTablePos, String mjBigTableKeyColName,
double keyRatio) {
this.mjSmallTableCacheKey = mjSmallTableCacheKey;
this.mjSmallTablePos = mjSmallTablePos;
this.mjBigTableKeyColName = mjBigTableKeyColName;
this.keyRatio = keyRatio;
}

public String getMjSmallTableCacheKey() {
return mjSmallTableCacheKey;
}

public byte getMjSmallTablePos() {
return mjSmallTablePos;
}

public String getMjBigTableKeyColName() {
return mjBigTableKeyColName;
}

public double getKeyRatio() {
return keyRatio;
}

@Override
public String toString() {
return "cacheKey:" + mjSmallTableCacheKey + ", bigKeyColName:" + mjBigTableKeyColName +
", smallTablePos:" + mjSmallTablePos + ", keyRatio:" + keyRatio;
}
}


public TableDesc getTableDescSkewJoin() {
return tableDesc;
}
Expand Down Expand Up @@ -435,4 +481,12 @@ public VectorizationContext getOutputVectorizationContext() {
return taskVectorizationContext;
}

public ProbeDecodeContext getProbeDecodeContext() {
return probeDecodeContextSet;
}

public void setProbeDecodeContext(ProbeDecodeContext probeDecodeContext) {
this.probeDecodeContextSet = probeDecodeContext;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,15 @@ private static boolean sharedWorkOptimization(ParseContext pctx, SharedWorkOptim
}
LOG.debug("Input operator removed: {}", op);
}

// A shared TSop across branches can not have probeContext that utilizes single branch info
// Filtered-out rows from one branch might be needed by another branch sharing a TSop
if (retainableTsOp.getProbeDecodeContext() != null) {
LOG.debug("Removing probeDecodeCntx for merged TS op {}", retainableTsOp);
retainableTsOp.setProbeDecodeContext(null);
retainableTsOp.getConf().setProbeDecodeContext(null);
}

// Then we merge the operators of the works we are going to merge
optimizerCache.removeOpAndCombineWork(discardableTsOp, retainableTsOp);
removedOps.add(discardableTsOp);
Expand Down
5 changes: 5 additions & 0 deletions ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,11 @@ public MapWork createMapWork(GenTezProcContext context, Operator<?> root,
mapWork.setIncludedBuckets(ts.getConf().getIncludedBuckets());
}

if (ts.getProbeDecodeContext() != null) {
// TODO: some operators like VectorPTFEvaluator do not allow the use of Selected take this into account here?
mapWork.setProbeDecodeContext(ts.getProbeDecodeContext());
}

// add new item to the tez work
tezWork.add(mapWork);

Expand Down
158 changes: 139 additions & 19 deletions ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@
import org.apache.hadoop.hive.ql.plan.ExprNodeFieldDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
import org.apache.hadoop.hive.ql.plan.GroupByDesc;
import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.MoveWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
Expand All @@ -138,6 +139,8 @@
import org.apache.hadoop.hive.ql.stats.OperatorStats;
import org.apache.hadoop.hive.ql.stats.StatsUtils;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFBloomFilter.GenericUDAFBloomFilterEvaluator;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -1310,7 +1313,8 @@ private static void runTopNKeyOptimization(OptimizeTezProcContext procCtx)

private boolean findParallelSemiJoinBranch(Operator<?> mapjoin, TableScanOperator bigTableTS,
ParseContext parseContext,
Map<ReduceSinkOperator, TableScanOperator> semijoins) {
Map<ReduceSinkOperator, TableScanOperator> semijoins,
Map<TableScanOperator, List<MapJoinOperator>> probeDecodeMJoins) {

boolean parallelEdges = false;
for (Operator<?> op : mapjoin.getParentOperators()) {
Expand Down Expand Up @@ -1377,12 +1381,16 @@ private boolean findParallelSemiJoinBranch(Operator<?> mapjoin, TableScanOperato

parallelEdges = true;

if (sjInfo.getIsHint() || !sjInfo.getShouldRemove()) {
// Created by hint, skip it
continue;
// Keep track of Mj to probeDecode TS
if (!probeDecodeMJoins.containsKey(ts)){
probeDecodeMJoins.put(ts, new ArrayList<>());
}
probeDecodeMJoins.get(ts).add((MapJoinOperator) mapjoin);

// Skip adding to SJ removal map when created by hint
if (!sjInfo.getIsHint() && sjInfo.getShouldRemove()) {
semijoins.put(rs, ts);
}
// Add the semijoin branch to the map
semijoins.put(rs, ts);
}
}
}
Expand Down Expand Up @@ -1443,13 +1451,15 @@ private void removeSemiJoinEdgesForUnion(OptimizeTezProcContext procCtx) throws
* The algorithm looks at all the mapjoins in the operator pipeline until
* it hits RS Op and for each mapjoin examines if it has paralllel semijoin
* edge or dynamic partition pruning.
*
* As an extension, the algorithm also looks for suitable table scan operators that
* could reduce the number of rows decoded at runtime using the information provided by
* the MapJoin operators of the branch when ProbeDecode feature is enabled.
*/
private void removeSemijoinsParallelToMapJoin(OptimizeTezProcContext procCtx)
throws SemanticException {
if(!procCtx.conf.getBoolVar(ConfVars.HIVECONVERTJOIN) ||
procCtx.conf.getBoolVar(ConfVars.TEZ_DYNAMIC_SEMIJOIN_REDUCTION_FOR_MAPJOIN)) {
// Not needed without semi-join reduction or mapjoins or when semijoins
// are enabled for parallel mapjoins.
if (!procCtx.conf.getBoolVar(ConfVars.HIVECONVERTJOIN)) {
// Not needed without mapjoin conversion
return;
}

Expand All @@ -1458,6 +1468,7 @@ private void removeSemijoinsParallelToMapJoin(OptimizeTezProcContext procCtx)
topOps.addAll(procCtx.parseContext.getTopOps().values());

Map<ReduceSinkOperator, TableScanOperator> semijoins = new HashMap<>();
Map<TableScanOperator, List<MapJoinOperator>> probeDecodeMJoins = new HashMap<>();
for (Operator<?> parent : topOps) {
// A TS can have multiple branches due to DPP Or Semijoin Opt.
// USe DFS to traverse all the branches until RS is hit.
Expand All @@ -1473,7 +1484,7 @@ private void removeSemijoinsParallelToMapJoin(OptimizeTezProcContext procCtx)
if (op instanceof MapJoinOperator) {
// A candidate.
if (!findParallelSemiJoinBranch(op, (TableScanOperator) parent,
procCtx.parseContext, semijoins)) {
procCtx.parseContext, semijoins, probeDecodeMJoins)) {
// No parallel edge was found for the given mapjoin op,
// no need to go down further, skip this TS operator pipeline.
break;
Expand All @@ -1482,18 +1493,126 @@ private void removeSemijoinsParallelToMapJoin(OptimizeTezProcContext procCtx)
deque.addAll(op.getChildOperators());
}
}
// No need to remove SJ branches when we have semi-join reduction or when semijoins are enabled for parallel mapjoins.
if (!procCtx.conf.getBoolVar(ConfVars.TEZ_DYNAMIC_SEMIJOIN_REDUCTION_FOR_MAPJOIN)) {
if (semijoins.size() > 0) {
for (Entry<ReduceSinkOperator, TableScanOperator> semiEntry : semijoins.entrySet()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Semijoin optimization with parallel edge to map join. Removing semijoin " +
OperatorUtils.getOpNamePretty(semiEntry.getKey()) + " - " + OperatorUtils.getOpNamePretty(semiEntry.getValue()));
}
GenTezUtils.removeBranch(semiEntry.getKey());
GenTezUtils.removeSemiJoinOperator(procCtx.parseContext, semiEntry.getKey(), semiEntry.getValue());
}
}
}
if (procCtx.conf.getBoolVar(ConfVars.HIVE_OPTIMIZE_SCAN_PROBEDECODE)) {
if (probeDecodeMJoins.size() > 0) {
// When multiple MJ, select one based on a policy
for (Map.Entry<TableScanOperator, List<MapJoinOperator>> probeTsMap : probeDecodeMJoins.entrySet()){
TableScanOperator.ProbeDecodeContext tsCntx = null;
// Currently supporting: LowestRatio policy
// TODO: Add more policies and make the selection a conf property
tsCntx = selectLowestRatioProbeDecodeMapJoin(probeTsMap.getKey(), probeTsMap.getValue());
LOG.debug("ProbeDecode MJ for TS {} with CacheKey {} MJ Pos {} ColName {} with Ratio {}",
probeTsMap.getKey().getName(), tsCntx.getMjSmallTableCacheKey(), tsCntx.getMjSmallTablePos(),
tsCntx.getMjBigTableKeyColName(), tsCntx.getKeyRatio());
probeTsMap.getKey().setProbeDecodeContext(tsCntx);
probeTsMap.getKey().getConf().setProbeDecodeContext(tsCntx);
}
}
}
}

if (semijoins.size() > 0) {
for (ReduceSinkOperator rs : semijoins.keySet()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Semijoin optimization with parallel edge to map join. Removing semijoin "
+ OperatorUtils.getOpNamePretty(rs) + " - " + OperatorUtils.getOpNamePretty(semijoins.get(rs)));
private static TableScanOperator.ProbeDecodeContext selectLowestRatioProbeDecodeMapJoin(TableScanOperator tsOp,
List<MapJoinOperator> mjOps){
MapJoinOperator selectedMJOp = null;
double selectedMJOpRatio = 0;
for (MapJoinOperator currMJOp : mjOps) {
if (!isValidProbeDecodeMapJoin(currMJOp)) {
continue;
}
// At this point we know it is a single Key MapJoin
if (selectedMJOp == null) {
// Set the first valid MJ
selectedMJOp = currMJOp;
selectedMJOpRatio = getProbeDecodeNDVRatio(tsOp, currMJOp);
LOG.debug("ProbeDecode MJ {} with Ratio {}", selectedMJOp, selectedMJOpRatio);
} else {
double currMJRatio = getProbeDecodeNDVRatio(tsOp, currMJOp);
if (currMJRatio < selectedMJOpRatio){
LOG.debug("ProbeDecode MJ {} Ratio {} is lower than existing MJ {} with Ratio {}",
currMJOp, currMJRatio, selectedMJOp, selectedMJOpRatio);
selectedMJOp = currMJOp;
selectedMJOpRatio = currMJRatio;
}
GenTezUtils.removeBranch(rs);
GenTezUtils.removeSemiJoinOperator(procCtx.parseContext, rs,
semijoins.get(rs));
}
}

TableScanOperator.ProbeDecodeContext tsProbeDecodeCtx = null;
// If there a valid MJ to be used for TS probeDecode make sure the MJ cache key is generated and
// then propagate the new ProbeDecodeContext (to be used by LLap IO when executing the TSop)
if (selectedMJOp != null) {
String mjCacheKey = selectedMJOp.getConf().getCacheKey();
if (mjCacheKey == null) {
// Generate cache key if it has not been yet generated
mjCacheKey = MapJoinDesc.generateCacheKey(selectedMJOp.getOperatorId());
// Set in the conf of the map join operator
selectedMJOp.getConf().setCacheKey(mjCacheKey);
}

byte posBigTable = (byte) selectedMJOp.getConf().getPosBigTable();
Byte[] order = selectedMJOp.getConf().getTagOrder();
Byte mjSmallTablePos = (order[0] == posBigTable ? order[1] : order[0]);

List<ExprNodeDesc> keyDesc = selectedMJOp.getConf().getKeys().get(posBigTable);
ExprNodeColumnDesc keyCol = (ExprNodeColumnDesc) keyDesc.get(0);

tsProbeDecodeCtx = new TableScanOperator.ProbeDecodeContext(mjCacheKey, mjSmallTablePos,
keyCol.getColumn(), selectedMJOpRatio);
}
return tsProbeDecodeCtx;
}

// Return the ratio of: (distinct) JOIN_probe_key_column_rows / (distinct) JOIN_TS_target_column_rows
private static double getProbeDecodeNDVRatio(TableScanOperator tsOp, MapJoinOperator mjOp) {
long mjKeyCardinality = mjOp.getStatistics().getNumRows();
long tsKeyCardinality = tsOp.getStatistics().getNumRows();

byte posBigTable = (byte) mjOp.getConf().getPosBigTable();

Byte[] order = mjOp.getConf().getTagOrder();
Byte mjSmallTablePos = (order[0] == posBigTable ? order[1] : order[0]);
Byte mjBigTablePos = (order[0] == posBigTable ? order[0] : order[1]);

// Single Key MJ at this point
List<ExprNodeDesc> tsKeyDesc = mjOp.getConf().getKeys().get(mjBigTablePos);
ExprNodeColumnDesc tsKeyCol = (ExprNodeColumnDesc) tsKeyDesc.get(0);

List<ExprNodeDesc> mjKeyDesc = mjOp.getConf().getKeys().get(mjSmallTablePos);
ExprNodeColumnDesc mjKeyCol = (ExprNodeColumnDesc) mjKeyDesc.get(0);

ColStatistics mjStats = mjOp.getStatistics().getColumnStatisticsFromColName(mjKeyCol.getColumn());
ColStatistics tsStats = tsOp.getStatistics().getColumnStatisticsFromColName(tsKeyCol.getColumn());

if (canUseNDV(mjStats)) {
mjKeyCardinality = mjStats.getCountDistint();
}
if (canUseNDV(tsStats)) {
tsKeyCardinality = tsStats.getCountDistint();
}
return mjKeyCardinality / (double) tsKeyCardinality;
}

// Valid MapJoin with a single Key of Number type (Long/Int/Short)
private static boolean isValidProbeDecodeMapJoin(MapJoinOperator mapJoinOp) {
Map<Byte, List<ExprNodeDesc>> keyExprs = mapJoinOp.getConf().getKeys();
List<ExprNodeDesc> bigTableKeyExprs = keyExprs.get( (byte) mapJoinOp.getConf().getPosBigTable());
return (bigTableKeyExprs.size() == 1)
&& !(((PrimitiveTypeInfo) bigTableKeyExprs.get(0).getTypeInfo()).getPrimitiveCategory().
equals(PrimitiveObjectInspector.PrimitiveCategory.STRING) ||
((PrimitiveTypeInfo) bigTableKeyExprs.get(0).getTypeInfo()).getPrimitiveCategory().
equals(PrimitiveObjectInspector.PrimitiveCategory.BYTE));
}

private static boolean canUseNDV(ColStatistics colStats) {
Expand Down Expand Up @@ -1717,6 +1836,7 @@ private static void sortSemijoinFilters(OptimizeTezProcContext procCtx,

private void removeSemijoinOptimizationByBenefit(OptimizeTezProcContext procCtx)
throws SemanticException {

Map<ReduceSinkOperator, SemiJoinBranchInfo> map = procCtx.parseContext.getRsToSemiJoinBranchInfo();
if (map.isEmpty()) {
// Nothing to do
Expand Down
11 changes: 11 additions & 0 deletions ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.hadoop.hive.ql.plan;

import org.apache.hadoop.hive.common.StringInternUtils;
import org.apache.hadoop.hive.ql.exec.TableScanOperator.ProbeDecodeContext;
import org.apache.hadoop.hive.ql.exec.Utilities;

import java.util.ArrayList;
Expand Down Expand Up @@ -175,6 +176,8 @@ public enum LlapIODescriptor {

private boolean isMergeFromResolver;

private ProbeDecodeContext probeDecodeContext = null;

public MapWork() {}

public MapWork(String name) {
Expand Down Expand Up @@ -846,6 +849,14 @@ public List<String> getVectorizationEnabledConditionsNotMet() {
return vectorizationEnabledConditionsNotMet;
}

public ProbeDecodeContext getProbeDecodeContext() {
return probeDecodeContext;
}

public void setProbeDecodeContext(ProbeDecodeContext probeDecodeContext) {
this.probeDecodeContext = probeDecodeContext;
}

public class MapExplainVectorization extends BaseExplainVectorization {

private final MapWork mapWork;
Expand Down
Loading