Skip to content

Commit

Permalink
HIVE-25370: Improve SharedWorkOptimizer performance (#2536) (Zoltan H…
Browse files Browse the repository at this point in the history
…aindrich reviewed by Krisztian Kasa)
  • Loading branch information
kgyrtkirk committed Jul 28, 2021
1 parent e6d98b8 commit 69c97c2
Show file tree
Hide file tree
Showing 2 changed files with 130 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
Expand Down Expand Up @@ -130,6 +131,7 @@ public class SharedWorkOptimizer extends Transform {

@Override
public ParseContext transform(ParseContext pctx) throws SemanticException {
LOG.info("SharedWorkOptimizer start");

final Map<String, TableScanOperator> topOps = pctx.getTopOps();
if (topOps.size() < 2) {
Expand Down Expand Up @@ -220,14 +222,8 @@ public ParseContext transform(ParseContext pctx) throws SemanticException {
// reduce sink operator.
final Multimap<Operator<?>, MapJoinOperator> parentToMapJoinOperators =
ArrayListMultimap.create();
final Set<Operator<?>> visitedOperators = new HashSet<>();
for (Entry<Operator<?>, Collection<Operator<?>>> e :
optimizerCache.operatorToWorkOperators.asMap().entrySet()) {
if (visitedOperators.contains(e.getKey())) {
// Already visited this work, we move on
continue;
}
for (Operator<?> op : e.getValue()) {
for (Set<Operator<?>> workOperators : optimizerCache.getWorkGroups()) {
for (Operator<?> op : workOperators) {
if (op instanceof MapJoinOperator) {
MapJoinOperator mapJoinOp = (MapJoinOperator) op;
// Only allowed for mapjoin operator
Expand All @@ -237,7 +233,6 @@ public ParseContext transform(ParseContext pctx) throws SemanticException {
obtainBroadcastInput(mapJoinOp).getParentOperators().get(0), mapJoinOp);
}
}
visitedOperators.add(op);
}
}
// For each group, set the cache key accordingly if there is more than one operator
Expand Down Expand Up @@ -288,6 +283,7 @@ public ParseContext transform(ParseContext pctx) throws SemanticException {
}
}

LOG.info("SharedWorkOptimizer end");
return pctx;
}

Expand Down Expand Up @@ -388,6 +384,10 @@ public boolean sharedWorkOptimization(ParseContext pctx, SharedWorkOptimizerCach
continue;
}
for (TableScanOperator retainableTsOp : retainedScans) {
if (optimizerCache.getWorkGroup(discardableTsOp).contains(retainableTsOp)) {
LOG.trace("No need check further {} and {} are in the same group", discardableTsOp, retainableTsOp);
continue;
}
if (removedOps.contains(retainableTsOp)) {
LOG.debug("Skip {} as it has already been removed", retainableTsOp);
continue;
Expand Down Expand Up @@ -2002,14 +2002,12 @@ private static Set<Operator<?>> findDescendantWorkOperators(ParseContext pctx,
// Stores result in cache
private static Set<Operator<?>> findWorkOperators(
SharedWorkOptimizerCache optimizerCache, Operator<?> start) {
Set<Operator<?>> c = optimizerCache.operatorToWorkOperators.get(start);
Set<Operator<?>> c = optimizerCache.getWorkGroup(start);
if (!c.isEmpty()) {
return c;
}
c = findWorkOperators(start, new HashSet<Operator<?>>());
for (Operator<?> op : c) {
optimizerCache.operatorToWorkOperators.putAll(op, c);
}
optimizerCache.addWorkGroup(c);
return c;
}

Expand Down Expand Up @@ -2223,25 +2221,44 @@ public String toString() {
}

/** Cache to accelerate optimization */
private static class SharedWorkOptimizerCache {
static class SharedWorkOptimizerCache {
// Operators that belong to each work
final HashMultimap<Operator<?>, Operator<?>> operatorToWorkOperators =
HashMultimap.<Operator<?>, Operator<?>>create();
private final Map<Operator<?>, Set<Operator<?>>> operatorToWorkOperators = new IdentityHashMap<>();
// Table scan operators to DPP sources
final Multimap<TableScanOperator, Operator<?>> tableScanToDPPSource =
HashMultimap.<TableScanOperator, Operator<?>>create();
private Set<Operator<?>> knownFilterOperators = new HashSet<>();

// Add new operator to cache work group of existing operator (if group exists)
void putIfWorkExists(Operator<?> opToAdd, Operator<?> existingOp) {
List<Operator<?>> c = ImmutableList.copyOf(operatorToWorkOperators.get(existingOp));
if (!c.isEmpty()) {
for (Operator<?> op : c) {
operatorToWorkOperators.get(op).add(opToAdd);
}
operatorToWorkOperators.putAll(opToAdd, c);
operatorToWorkOperators.put(opToAdd, opToAdd);
Set<Operator<?>> group = operatorToWorkOperators.get(existingOp);
if (group == null) {
return;
}
group.add(opToAdd);
operatorToWorkOperators.put(opToAdd, group);
}

public void addWorkGroup(Collection<Operator<?>> c) {
Set<Operator<?>> group = Sets.newIdentityHashSet();
group.addAll(c);
for (Operator<?> op : c) {
operatorToWorkOperators.put(op, group);
}
}

public Set<Operator<?>> getWorkGroup(Operator<?> start) {
Set<Operator<?>> set = operatorToWorkOperators.get(start);
if (set == null) {
return Collections.emptySet();
}
return set;
}

public Set<Set<Operator<?>>> getWorkGroups() {
Set<Set<Operator<?>>> ret = Sets.newIdentityHashSet();
ret.addAll(operatorToWorkOperators.values());
return ret;
}

public boolean isKnownFilteringOperator(Operator<? extends OperatorDesc> op) {
Expand All @@ -2254,32 +2271,32 @@ public void setKnownFilteringOperator(Operator<?> filterOp) {

// Remove operator
void removeOp(Operator<?> opToRemove) {
Set<Operator<?>> s = operatorToWorkOperators.get(opToRemove);
s.remove(opToRemove);
List<Operator<?>> c1 = ImmutableList.copyOf(s);
if (!c1.isEmpty()) {
for (Operator<?> op1 : c1) {
operatorToWorkOperators.remove(op1, opToRemove); // Remove operator
}
operatorToWorkOperators.removeAll(opToRemove); // Remove entry for operator
Set<Operator<?>> group = operatorToWorkOperators.get(opToRemove);
if (group == null) {
return;
}
group.remove(opToRemove);
operatorToWorkOperators.remove(opToRemove);
}

// Remove operator and combine
void removeOpAndCombineWork(Operator<?> opToRemove, Operator<?> replacementOp) {
Set<Operator<?>> s = operatorToWorkOperators.get(opToRemove);
s.remove(opToRemove);
List<Operator<?>> c1 = ImmutableList.copyOf(s);
List<Operator<?>> c2 = ImmutableList.copyOf(operatorToWorkOperators.get(replacementOp));
if (!c1.isEmpty() && !c2.isEmpty()) {
for (Operator<?> op1 : c1) {
operatorToWorkOperators.remove(op1, opToRemove); // Remove operator
operatorToWorkOperators.putAll(op1, c2); // Add ops of new collection
}
operatorToWorkOperators.removeAll(opToRemove); // Remove entry for operator
for (Operator<?> op2 : c2) {
operatorToWorkOperators.putAll(op2, c1); // Add ops to existing collection
}
Set<Operator<?>> group1 = operatorToWorkOperators.get(opToRemove);
Set<Operator<?>> group2 = operatorToWorkOperators.get(replacementOp);

group1.remove(opToRemove);
operatorToWorkOperators.remove(opToRemove);

if (group1.size() > group2.size()) {
Set<Operator<?>> t = group2;
group2 = group1;
group1 = t;
}

group2.addAll(group1);

for (Operator<?> o : group1) {
operatorToWorkOperators.put(o, group2);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.hadoop.hive.ql.optimizer;

import org.apache.hadoop.hive.ql.CompilationOpContext;

import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.OperatorFactory;
import org.apache.hadoop.hive.ql.exec.TableScanOperator;
Expand All @@ -34,14 +35,17 @@
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFConcat;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
import org.apache.hadoop.hive.ql.optimizer.SharedWorkOptimizer.SharedWorkOptimizerCache;
import org.apache.hadoop.hive.ql.plan.FilterDesc;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import java.util.ArrayList;
import java.util.List;

import org.junit.Test;

import com.google.common.collect.Lists;

import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;

import static org.apache.hadoop.hive.ql.plan.ReduceSinkDesc.ReducerTraits.AUTOPARALLEL;
import static org.apache.hadoop.hive.ql.plan.ReduceSinkDesc.ReducerTraits.FIXED;
import static org.apache.hadoop.hive.ql.plan.ReduceSinkDesc.ReducerTraits.UNIFORM;
Expand Down Expand Up @@ -200,4 +204,66 @@ private TableScanOperator addFilter(TableScanOperator ts, int i) {
return ts;
}

private Operator<? extends OperatorDesc> getFilterOp(int constVal) {
ExprNodeDesc pred = new ExprNodeConstantDesc(constVal);
FilterDesc fd = new FilterDesc(pred, true);
Operator<? extends OperatorDesc> op = OperatorFactory.get(cCtx, fd);
return op;
}

@Test
public void testSharedWorkOptimizerCache() {

List<Operator<?>> ops = new ArrayList<>();
for (int i = 0; i < 100; i++) {
ops.add(getFilterOp(i));
}

SharedWorkOptimizerCache c = new SharedWorkOptimizerCache();

for (int i = 1; i < 10; i++) {
int u = 10 * i;
c.addWorkGroup(ops.subList(u, u + 10));
}

// unknowns
for (int i = 0; i < 10; i++) {
assertTrue(c.getWorkGroup(ops.get(i)).isEmpty());
}

// equiv group
for (int i = 40; i < 50; i++) {
assertEquals(c.getWorkGroup(ops.get(40)), c.getWorkGroup(ops.get(i)));
}

// non equiv
for (int i = 10; i < 100; i += 10) {
for (int j = i + 10; j < 100; j += 10) {
assertNotEquals(c.getWorkGroup(ops.get(i)), c.getWorkGroup(ops.get(j)));
}
}

c.removeOpAndCombineWork(ops.get(10), ops.get(20));
assertTrue(c.getWorkGroup(ops.get(10)).isEmpty());
assertEquals(19, c.getWorkGroup(ops.get(11)).size());
for (int i = 11; i < 20; i++) {
assertTrue(c.getWorkGroup(ops.get(11)).contains(ops.get(i)));
}

c.putIfWorkExists(ops.get(0), ops.get(1));
assertTrue(c.getWorkGroup(ops.get(0)).isEmpty());
assertTrue(c.getWorkGroup(ops.get(1)).isEmpty());

c.putIfWorkExists(ops.get(0), ops.get(30));
assertFalse(c.getWorkGroup(ops.get(0)).isEmpty());
assertTrue(c.getWorkGroup(ops.get(31)).contains(ops.get(0)));

c.removeOp(ops.get(1));

c.removeOp(ops.get(50));
assertTrue(c.getWorkGroup(ops.get(50)).isEmpty());
assertFalse(c.getWorkGroup(ops.get(51)).contains(ops.get(50)));

}

}

0 comments on commit 69c97c2

Please sign in to comment.