Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
Original file line number Diff line number Diff line change
Expand Up @@ -1768,6 +1768,15 @@ public static enum ConfVars {
"use BloomFilter in Hybrid grace hash join to minimize unnecessary spilling."),
HIVEMAPJOINFULLOUTER("hive.mapjoin.full.outer", true,
"Whether to use MapJoin for FULL OUTER JOINs."),
HIVE_MAPJOIN_PROBEDECODE_ENABLED("hive.mapjoin.probedecode.enabled", false,
"Use cached MapJoin hashtable created on the small table side to filter out row columns that are not going\n "+
"to be used when reading the large table data. This will result less CPU cycles spent for decoding unused data. "),
HIVE_MAPJOIN_PROBEDECODE_BF_ENABLED("hive.mapjoin.probedecode.bloomfilter.enabled", false,
"Use CuckooSet for probing or not"),
HIVE_MAPJOIN_PROBEDECODE_FILTER_PERC("hive.mapjoin.probedecode.filterperc", (float) 0.6,
"The percentage of matching rows per VectorColumnBatch BELOW which the filter is going to be applied."
+ "Applying a filter when all or the most rows match (0.9 - 1.0) can be more time consuming than just reading the whole batch"
+ "Filterperc value acts as an upper limit (as percentage) for the matching rows under which filtering applies."),
HIVE_TEST_MAPJOINFULLOUTER_OVERRIDE(
"hive.test.mapjoin.full.outer.override",
"none", new StringSet("none", "enable", "disable"),
Expand Down
1 change: 1 addition & 0 deletions itests/src/test/resources/testconfiguration.properties
Original file line number Diff line number Diff line change
Expand Up @@ -558,6 +558,7 @@ minillaplocal.query.files=\
dynpart_sort_optimization.q,\
dynpart_sort_optimization_acid.q,\
dynpart_sort_opt_bucketing.q,\
probedecode_mapjoin.q,\
enforce_constraint_notnull.q,\
escape1.q,\
escape2.q,\
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ public enum LlapIOCounters {
NUM_VECTOR_BATCHES(true),
NUM_DECODED_BATCHES(true),
SELECTED_ROWGROUPS(true),
NUM_DECODED_ROWS(true),
NUM_ERRORS(true),
ROWS_EMITTED(true),
METADATA_CACHE_HIT(true),
Expand All @@ -35,6 +36,7 @@ public enum LlapIOCounters {
ALLOCATED_USED_BYTES(true),
TOTAL_IO_TIME_NS(false),
DECODE_TIME_NS(false),
PROBE_DECODE_TIME_NS(false),
HDFS_TIME_NS(false),
CONSUMER_TIME_NS(false),
IO_CPU_NS(false),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.ArrayList;
import java.io.IOException;
import java.util.List;
import java.util.Stack;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
Expand All @@ -36,13 +37,18 @@
import org.apache.hadoop.hive.llap.counters.LlapIOCounters;
import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters;
import org.apache.hadoop.hive.llap.daemon.impl.StatsRecordingThreadPool;
import org.apache.hadoop.hive.llap.io.decode.ColumnVectorBatchWrapper;
import org.apache.hadoop.hive.llap.io.decode.ColumnVectorProducer;
import org.apache.hadoop.hive.llap.io.decode.ColumnVectorProducer.Includes;
import org.apache.hadoop.hive.llap.io.decode.ColumnVectorProducer.SchemaEvolutionFactory;
import org.apache.hadoop.hive.llap.io.decode.ReadPipeline;
import org.apache.hadoop.hive.llap.tezplugins.LlapTezUtils;
import org.apache.hadoop.hive.ql.exec.AppMasterEventOperator;
import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
import org.apache.hadoop.hive.ql.exec.vector.mapjoin.VectorMapJoinCommonOperator;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
import org.apache.hadoop.hive.ql.io.orc.OrcRecordUpdater;
Expand All @@ -53,7 +59,10 @@
import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.DynamicPruningEventDesc;
import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc;
import org.apache.hadoop.hive.serde2.Deserializer;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
Expand All @@ -72,7 +81,7 @@
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

class LlapRecordReader implements RecordReader<NullWritable, VectorizedRowBatch>, Consumer<ColumnVectorBatch> {
class LlapRecordReader implements RecordReader<NullWritable, VectorizedRowBatch>, Consumer<ColumnVectorBatchWrapper> {

private static final Logger LOG = LoggerFactory.getLogger(LlapRecordReader.class);
private static final Object DONE_OBJECT = new Object();
Expand All @@ -89,13 +98,13 @@ class LlapRecordReader implements RecordReader<NullWritable, VectorizedRowBatch>
private final AtomicReference<Throwable> pendingError = new AtomicReference<>(null);

/** Vector that is currently being processed by our user. */
private ColumnVectorBatch lastCvb = null;
private ColumnVectorBatchWrapper lastCvb = null;
private boolean isFirst = true;
private int maxQueueSize = 0;

private volatile boolean isClosed = false;
private volatile boolean isInterrupted = false;
private final ConsumerFeedback<ColumnVectorBatch> feedback;
private final ConsumerFeedback<ColumnVectorBatchWrapper> feedback;
private final QueryFragmentCounters counters;
private long firstReturnTime;

Expand All @@ -104,6 +113,7 @@ class LlapRecordReader implements RecordReader<NullWritable, VectorizedRowBatch>
private final ExecutorService executor;
private final boolean isAcidScan;
private final boolean isAcidFormat;
private final boolean probeDecodeEnabled;

/**
* Creates the record reader and checks the input-specific compatibility.
Expand Down Expand Up @@ -195,11 +205,63 @@ private LlapRecordReader(MapWork mapWork, JobConf job, FileSplit split,
this.includes = new IncludesImpl(tableIncludedCols, isAcidFormat, rbCtx,
schema, job, isAcidScan && acidReader.includeAcidColumns());

this.probeDecodeEnabled = HiveConf.getBoolVar(jobConf, HiveConf.ConfVars.HIVE_MAPJOIN_PROBEDECODE_ENABLED);
if (this.probeDecodeEnabled) {
findProbeDecodeSettings(mapWork, includes);
}
LOG.info("LlapRecordReader ProbeDecode enabled is {}", this.probeDecodeEnabled);

// Create the consumer of encoded data; it will coordinate decoding to CVBs.
feedback = rp = cvp.createReadPipeline(this, split, includes, sarg, counters, includes,
sourceInputFormat, sourceSerDe, reporter, job, mapWork.getPathToPartitionInfo());
}

private static void findProbeDecodeSettings(MapWork mapWork, IncludesImpl includes) {
Stack<Operator<?>> opStack = new Stack<>();
// Children BFS
opStack.addAll(mapWork.getWorks());
VectorMapJoinCommonOperator vop = null;
String mjCacheKey = null;
int colId = 0;
String colName = null;
while (!opStack.empty()) {
Operator<?> op = opStack.pop();
if (op instanceof VectorMapJoinCommonOperator) {
vop = (VectorMapJoinCommonOperator) op;
if (!vop.getConf().isDynamicPartitionHashJoin() && !vop.getConf().isBucketMapJoin() && validProbeDecodeMapJoin(op)){
// Following MapJoinOperator cache key definition
mjCacheKey = vop.getCacheKey() != null ? vop.getCacheKey(): MapJoinDesc.generateCacheKey(op.getOperatorId());
colId = ((VectorMapJoinDesc) vop.getVectorDesc()).getVectorMapJoinInfo().getBigTableKeyColumnMap()[0];
colName = vop.getInputVectorizationContext().getInitialColumnNames().get(colId);
// ColId to ColIdx mapping
includes.setProbeDecodeColIdx(includes.getReaderLogicalColumnIds().indexOf(colId));
includes.setPosSingleVectorMapJoinSmallTable(vop.posSingleVectorMapJoinSmallTable);
includes.setProbeDecodeColName(colName);
}
}
if (op.getChildOperators() != null) {
opStack.addAll(op.getChildOperators());
}
}
if (mjCacheKey != null) {
LOG.info("ProbeDecode found MapJoin op {} with CacheKey {} MapJoin ColID {} and ColName {} and Pos {}", vop.getName(), mjCacheKey,
colId, colName, vop.posSingleVectorMapJoinSmallTable);
}
includes.setProbeDecodeCacheKey(mjCacheKey);
}

// Is Single Key MapJoin of Number(Long/Int/Short) type
private static boolean validProbeDecodeMapJoin(Operator mapJoinOp) {
if (mapJoinOp instanceof VectorMapJoinCommonOperator) {
VectorMapJoinDesc vectorMapJoinDesc = (VectorMapJoinDesc) ((VectorMapJoinCommonOperator) mapJoinOp).getVectorDesc();
return (vectorMapJoinDesc.getVectorMapJoinInfo().getBigTableKeyColumnMap().length == 1) &&
(vectorMapJoinDesc.getHashTableKeyType() == VectorMapJoinDesc.HashTableKeyType.LONG ||
vectorMapJoinDesc.getHashTableKeyType() == VectorMapJoinDesc.HashTableKeyType.SHORT ||
vectorMapJoinDesc.getHashTableKeyType() == VectorMapJoinDesc.HashTableKeyType.INT);
}
return false;
}

private static int getQueueVar(ConfVars var, JobConf jobConf, Configuration daemonConf) {
// Check job config for overrides, otherwise use the default server value.
int jobVal = jobConf.getInt(var.varname, -1);
Expand Down Expand Up @@ -352,7 +414,7 @@ public boolean next(NullWritable key, VectorizedRowBatch vrb) throws IOException
}
isFirst = false;
}
ColumnVectorBatch cvb;
ColumnVectorBatchWrapper cvb;
try {
cvb = nextCvb();
} catch (InterruptedException e) {
Expand Down Expand Up @@ -386,34 +448,46 @@ public boolean next(NullWritable key, VectorizedRowBatch vrb) throws IOException
//so +1 is the OrcRecordUpdater.ROW?
acidColCount + 1 + vrb.getDataColumnCount());
// By assumption, ACID columns are currently always in the beginning of the arrays.
System.arraycopy(cvb.cols, 0, inputVrb.cols, 0, acidColCount);
for (int ixInReadSet = acidColCount; ixInReadSet < cvb.cols.length; ++ixInReadSet) {
System.arraycopy(cvb.getCvb().cols, 0, inputVrb.cols, 0, acidColCount);
for (int ixInReadSet = acidColCount; ixInReadSet < cvb.getCvb().cols.length; ++ixInReadSet) {
int ixInVrb = includes.getPhysicalColumnIds().get(ixInReadSet) -
(acidReader.includeAcidColumns() ? 0 : OrcRecordUpdater.ROW);
// TODO: should we create the batch from vrbctx, and reuse the vectors, like below? Future work.
inputVrb.cols[ixInVrb] = cvb.cols[ixInReadSet];
inputVrb.cols[ixInVrb] = cvb.getCvb().cols[ixInReadSet];
}
if (cvb.getFilterContext().isSelectedInUse()) {
inputVrb.selectedInUse = true;
inputVrb.size = cvb.getFilterContext().getSelectedSize();
inputVrb.selected = cvb.getFilterContext().getSelected();
} else {
inputVrb.size = cvb.getCvb().size;
}
inputVrb.size = cvb.size;
acidReader.setBaseAndInnerReader(new AcidWrapper(inputVrb));
acidReader.next(NullWritable.get(), vrb);
} else {
// TODO: WTF? The old code seems to just drop the ball here.
throw new AssertionError("Unsupported mode");
}
} else {
if (includes.getPhysicalColumnIds().size() != cvb.cols.length) {
if (includes.getPhysicalColumnIds().size() != cvb.getCvb().cols.length) {
throw new RuntimeException("Unexpected number of columns, VRB has "
+ includes.getPhysicalColumnIds().size() + " included, but the reader returned "
+ cvb.cols.length);
+ cvb.getCvb().cols.length);
}
// VRB was created from VrbCtx, so we already have pre-allocated column vectors.
// Return old CVs (if any) to caller. We assume these things all have the same schema.
for (int ixInReadSet = 0; ixInReadSet < cvb.cols.length; ++ixInReadSet) {
for (int ixInReadSet = 0; ixInReadSet < cvb.getCvb().cols.length; ++ixInReadSet) {
int ixInVrb = includes.getPhysicalColumnIds().get(ixInReadSet);
cvb.swapColumnVector(ixInReadSet, vrb.cols, ixInVrb);
cvb.getCvb().swapColumnVector(ixInReadSet, vrb.cols, ixInVrb);
}
if (cvb.getFilterContext().isSelectedInUse()) {
vrb.selectedInUse = true;
vrb.size = cvb.getFilterContext().getSelectedSize();
vrb.selected = cvb.getFilterContext().getSelected();
} else {
vrb.selectedInUse = false;
vrb.size = cvb.getCvb().size;
}
vrb.selectedInUse = false;//why?
vrb.size = cvb.size;
}

if (wasFirst) {
Expand Down Expand Up @@ -477,7 +551,7 @@ public void uncaughtException(final Thread t, final Throwable e) {
}
}

ColumnVectorBatch nextCvb() throws InterruptedException, IOException {
ColumnVectorBatchWrapper nextCvb() throws InterruptedException, IOException {
boolean isFirst = (lastCvb == null);
if (!isFirst) {
feedback.returnData(lastCvb);
Expand Down Expand Up @@ -509,7 +583,7 @@ ColumnVectorBatch nextCvb() throws InterruptedException, IOException {
rethrowErrorIfAny((Throwable) next);
throw new AssertionError("Unreachable");
}
lastCvb = (ColumnVectorBatch) next;
lastCvb = (ColumnVectorBatchWrapper) next;
if (LlapIoImpl.LOG.isTraceEnabled()) {
LlapIoImpl.LOG.trace("Processing will receive vector {}", lastCvb);
}
Expand Down Expand Up @@ -566,7 +640,7 @@ public void setDone() throws InterruptedException {
}

@Override
public void consumeData(ColumnVectorBatch data) throws InterruptedException {
public void consumeData(ColumnVectorBatchWrapper data) throws InterruptedException {
if (LlapIoImpl.LOG.isTraceEnabled()) {
LlapIoImpl.LOG.trace("consume called; closed {}, interrupted {}, err {}, pending {}",
isClosed, isInterrupted, pendingError.get(), queue.size());
Expand Down Expand Up @@ -627,6 +701,12 @@ private static class IncludesImpl implements SchemaEvolutionFactory, Includes {
private TypeDescription readerSchema;
private JobConf jobConf;

// ProbeDecode settings
private String probeDecodeOpCacheKey;
private int probeDecodeColIdx = -1;
private byte posSingleVectorMapJoinSmallTable;
private String probeDecodeColName = null;

public IncludesImpl(List<Integer> tableIncludedCols, boolean isAcidScan,
VectorizedRowBatchCtx rbCtx, TypeDescription readerSchema,
JobConf jobConf, boolean includeAcidColumns) {
Expand All @@ -644,7 +724,6 @@ public IncludesImpl(List<Integer> tableIncludedCols, boolean isAcidScan,
tableIncludedCols.add(i);
}
}
LOG.debug("Logical table includes: {}", tableIncludedCols);
this.readerLogicalColumnIds = tableIncludedCols;
// Note: schema evolution currently does not support column index changes.
// So, the indices should line up... to be fixed in SE v2?
Expand Down Expand Up @@ -683,6 +762,47 @@ public IncludesImpl(List<Integer> tableIncludedCols, boolean isAcidScan,
this.includeAcidColumns = includeAcidColumns;
}

public void setProbeDecodeCacheKey(String probeDecodeOpCacheKey) {
this.probeDecodeOpCacheKey = probeDecodeOpCacheKey;
}

@Override
public String getProbeDecodeCacheKey() {
return this.probeDecodeOpCacheKey;
}

public void setProbeDecodeColIdx(int probeDecodeColIdx) {
this.probeDecodeColIdx = probeDecodeColIdx;
}

public void setPosSingleVectorMapJoinSmallTable(byte posSingleVectorMapJoinSmallTable) {
this.posSingleVectorMapJoinSmallTable = posSingleVectorMapJoinSmallTable;
}

@Override
public byte getPosSingleVectorMapJoinSmallTable() {
return posSingleVectorMapJoinSmallTable;
}

@Override
public int getProbeDecodeColIdx() {
return probeDecodeColIdx;
}

public void setProbeDecodeColName(String probeDecodeColName) {
this.probeDecodeColName = probeDecodeColName;
}

@Override
public String getProbeDecodeColName() {
return probeDecodeColName;
}

@Override
public JobConf getJobConf() {
return jobConf;
}

@Override
public String toString() {
return "logical columns " + readerLogicalColumnIds
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package org.apache.hadoop.hive.llap.io.decode;

import org.apache.hadoop.hive.llap.io.api.impl.ColumnVectorBatch;
import org.apache.orc.impl.TreeReaderFactory;

public class ColumnVectorBatchWrapper {
private ColumnVectorBatch cvb;
private TreeReaderFactory.MutableFilterContext fcnx;

ColumnVectorBatchWrapper(int numCols) {
this.cvb = new ColumnVectorBatch(numCols);
this.fcnx = new TreeReaderFactory.MutableFilterContext();
}

public ColumnVectorBatch getCvb() {
return cvb;
}

public TreeReaderFactory.MutableFilterContext getFilterContext(){
return fcnx;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,14 @@ public interface Includes {
List<Integer> getPhysicalColumnIds();
List<Integer> getReaderLogicalColumnIds();
TypeDescription[] getBatchReaderTypes(TypeDescription fileSchema);
String getProbeDecodeCacheKey();
byte getPosSingleVectorMapJoinSmallTable();
int getProbeDecodeColIdx();
String getProbeDecodeColName();
JobConf getJobConf();
}

ReadPipeline createReadPipeline(Consumer<ColumnVectorBatch> consumer, FileSplit split,
ReadPipeline createReadPipeline(Consumer<ColumnVectorBatchWrapper> consumer, FileSplit split,
Includes includes, SearchArgument sarg, QueryFragmentCounters counters,
SchemaEvolutionFactory sef, InputFormat<?, ?> sourceInputFormat, Deserializer sourceSerDe,
Reporter reporter, JobConf job, Map<Path, PartitionDesc> parts) throws IOException;
Expand Down
Loading