Permalink
Browse files

HIVE-1830 Mappers in group by followed by map-join may die OOM

(Liyin Tang via namit)



git-svn-id: https://svn.apache.org/repos/asf/hive/trunk@1043843 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information...
Namit Jain
Namit Jain committed Dec 9, 2010
1 parent e9e6908 commit 6cef02eb0b7cea3dbe2b1fe03bf989d921e537e5
Showing with 740 additions and 278 deletions.
  1. +3 −0 CHANGES.txt
  2. +4 −1 common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
  3. +18 −1 conf/hive-default.xml
  4. +18 −2 ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
  5. +2 −4 ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java
  6. +0 −1 ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java
  7. +0 −8 ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinResolver.java
  8. +98 −67 ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LocalMapJoinProcFactory.java
  9. +99 −125 ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MapJoinResolver.java
  10. +16 −10 ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
  11. +23 −3 ql/src/java/org/apache/hadoop/hive/ql/plan/GroupByDesc.java
  12. +9 −0 ql/src/java/org/apache/hadoop/hive/ql/plan/HashTableSinkDesc.java
  13. +0 −2 ql/src/java/org/apache/hadoop/hive/ql/plan/MapredLocalWork.java
  14. +0 −1 ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java
  15. +10 −0 ql/src/test/queries/clientpositive/auto_join26.q
  16. +315 −0 ql/src/test/results/clientpositive/auto_join26.q.out
  17. +25 −13 ql/src/test/results/compiler/plan/groupby1.q.xml
  18. +20 −8 ql/src/test/results/compiler/plan/groupby2.q.xml
  19. +20 −8 ql/src/test/results/compiler/plan/groupby3.q.xml
  20. +20 −8 ql/src/test/results/compiler/plan/groupby4.q.xml
  21. +20 −8 ql/src/test/results/compiler/plan/groupby5.q.xml
  22. +20 −8 ql/src/test/results/compiler/plan/groupby6.q.xml
View
@@ -590,6 +590,9 @@ Trunk - Unreleased
HIVE-1508 Add cleanup method for HiveHistory
(Edward Capriolo via namit)
+ HIVE-1830 Mappers in group by followed by map-join may die OOM
+ (Liyin Tang via namit)
+
TESTS
HIVE-1464. improve test query performance
@@ -202,6 +202,8 @@
HIVEMAPJOINCACHEROWS("hive.mapjoin.cache.numrows", 25000),
HIVEGROUPBYMAPINTERVAL("hive.groupby.mapaggr.checkinterval", 100000),
HIVEMAPAGGRHASHMEMORY("hive.map.aggr.hash.percentmemory", (float) 0.5),
+ HIVEMAPJOINFOLLOWEDBYMAPAGGRHASHMEMORY("hive.mapjoin.followby.map.aggr.hash.percentmemory", (float) 0.3),
+ HIVEMAPAGGRMEMORYTHRESHOLD("hive.map.aggr.hash.force.flush.memory.threshold", (float) 0.9),
HIVEMAPAGGRHASHMINREDUCTION("hive.map.aggr.hash.min.reduction", (float) 0.5),
// for hive udtf operator
@@ -256,6 +258,7 @@
HIVEMAXMAPJOINSIZE("hive.mapjoin.maxsize", 100000),
HIVEHASHTABLETHRESHOLD("hive.hashtable.initialCapacity", 100000),
HIVEHASHTABLELOADFACTOR("hive.hashtable.loadfactor", (float) 0.75),
+ HIVEHASHTABLEFOLLOWBYGBYMAXMEMORYUSAGE("hive.mapjoin.followby.gby.localtask.max.memory.usage", (float) 0.55),
HIVEHASHTABLEMAXMEMORYUSAGE("hive.mapjoin.localtask.max.memory.usage", (float) 0.90),
HIVEHASHTABLESCALE("hive.mapjoin.check.memory.rows", (long)100000),
@@ -318,7 +321,7 @@
HIVEVARIABLESUBSTITUTE("hive.variable.substitute", true),
SEMANTIC_ANALYZER_HOOK("hive.semantic.analyzer.hook",null),
-
+
// Print column names in output
HIVE_CLI_PRINT_HEADER("hive.cli.print.header", false);
;
View
@@ -252,6 +252,18 @@
<description>For local mode, memory of the mappers/reducers</description>
</property>
+<property>
+ <name>hive.mapjoin.followby.map.aggr.hash.percentmemory</name>
+ <value>0.3</value>
+ <description>Portion of total memory to be used by map-side grup aggregation hash table, when this group by is followed by map join</description>
+</property>
+
+<property>
+ <name>hive.map.aggr.hash.force.flush.memory.threshold</name>
+ <value>0.9</value>
+ <description>The max memory to be used by map-side grup aggregation hash table, if the memory usage is higher than this number, force to flush data</description>
+</property>
+
<property>
<name>hive.map.aggr.hash.percentmemory</name>
<value>0.5</value>
@@ -504,6 +516,12 @@
<description>This number means how much memory the local task can take to hold the key/value into in-memory hash table; If the local task's memory usage is more than this number, the local task will be abort by themself. It means the data of small table is too large to be hold in the memory.</description>
</property>
+<property>
+ <name>hive.mapjoin.followby.gby.localtask.max.memory.usage</name>
+ <value>0.55</value>
+ <description>This number means how much memory the local task can take to hold the key/value into in-memory hash table when this map join followed by a group by; If the local task's memory usage is more than this number, the local task will be abort by themself. It means the data of small table is too large to be hold in the memory.</description>
+</property>
+
<property>
<name>hive.mapjoin.check.memory.rows</name>
<value>100000</value>
@@ -649,7 +667,6 @@
<description>Maximum number of HDFS files created by all mappers/reducers in a MapReduce job.</description>
</property>
-
<property>
<name>hive.exec.default.partition.name</name>
<value>__HIVE_DEFAULT_PARTITION__</value>
@@ -19,6 +19,8 @@
package org.apache.hadoop.hive.ql.exec;
import java.io.Serializable;
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryMXBean;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Arrays;
@@ -133,6 +135,9 @@
// new Key ObjectInspectors are objectInspectors from the parent
transient StructObjectInspector newKeyObjectInspector;
transient StructObjectInspector currentKeyObjectInspector;
+ public static MemoryMXBean memoryMXBean;
+ private long maxMemory;
+ private float memoryThreshold;
/**
* This is used to store the position and field names for variable length
@@ -373,6 +378,9 @@ protected void initializeOp(Configuration hconf) throws HiveException {
if (hashAggr) {
computeMaxEntriesHashAggr(hconf);
}
+ memoryMXBean = ManagementFactory.getMemoryMXBean();
+ maxMemory = memoryMXBean.getHeapMemoryUsage().getMax();
+ memoryThreshold = this.getConf().getMemoryThreshold();
initializeChildren(hconf);
}
@@ -386,8 +394,8 @@ protected void initializeOp(Configuration hconf) throws HiveException {
* aggregation only
**/
private void computeMaxEntriesHashAggr(Configuration hconf) throws HiveException {
- maxHashTblMemory = (long) (HiveConf.getFloatVar(hconf,
- HiveConf.ConfVars.HIVEMAPAGGRHASHMEMORY) * Runtime.getRuntime().maxMemory());
+ float memoryPercentage = this.getConf().getGroupByMemoryUsage();
+ maxHashTblMemory = (long) (memoryPercentage * Runtime.getRuntime().maxMemory());
estimateRowSize();
}
@@ -824,10 +832,18 @@ private void processAggr(Object row, ObjectInspector rowInspector,
**/
private boolean shouldBeFlushed(KeyWrapper newKeys) {
int numEntries = hashAggregations.size();
+ long usedMemory;
+ float rate;
// The fixed size for the aggregation class is already known. Get the
// variable portion of the size every NUMROWSESTIMATESIZE rows.
if ((numEntriesHashTable == 0) || ((numEntries % NUMROWSESTIMATESIZE) == 0)) {
+ //check how much memory left memory
+ usedMemory = memoryMXBean.getHeapMemoryUsage().getUsed();
+ rate = (float) usedMemory / (float) maxMemory;
+ if(rate > memoryThreshold){
+ return true;
+ }
for (Integer pos : keyPositionsSize) {
Object key = newKeys.getKeyArray()[pos.intValue()];
// Ignore nulls
@@ -118,7 +118,6 @@
private long hashTableScale;
private boolean isAbort = false;
-
public static class HashTableSinkObjectCtx {
ObjectInspector standardOI;
SerDe serde;
@@ -244,14 +243,13 @@ protected void initializeOp(Configuration hconf) throws HiveException {
for (int pos = 0; pos < numAliases; pos++) {
metadataValueTag[pos] = -1;
}
-
mapJoinTables = new HashMap<Byte, HashMapWrapper<AbstractMapJoinKey, MapJoinObjectValue>>();
int hashTableThreshold = HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEHASHTABLETHRESHOLD);
float hashTableLoadFactor = HiveConf.getFloatVar(hconf,
HiveConf.ConfVars.HIVEHASHTABLELOADFACTOR);
- float hashTableMaxMemoryUsage = HiveConf.getFloatVar(hconf,
- HiveConf.ConfVars.HIVEHASHTABLEMAXMEMORYUSAGE);
+ float hashTableMaxMemoryUsage = this.getConf().getHashtableMemoryUsage();
+
hashTableScale = HiveConf.getLongVar(hconf, HiveConf.ConfVars.HIVEHASHTABLESCALE);
if (hashTableScale <= 0) {
hashTableScale = 1;
@@ -161,7 +161,6 @@ public boolean isAbort(long numRows,LogHelper console) {
int size = mHash.size();
long usedMemory = memoryMXBean.getHeapMemoryUsage().getUsed();
double rate = (double) usedMemory / (double) maxMemory;
- long mem1 = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory();
console.printInfo(Utilities.now() + "\tProcessing rows:\t" + numRows + "\tHashtable size:\t"
+ size + "\tMemory usage:\t" + usedMemory + "\trate:\t" + num.format(rate));
if (rate > (double) maxMemoryUsage) {
@@ -92,7 +92,6 @@ private ConditionalTask processCurrentTask(MapRedTask currTask, ConditionalTask
JoinDesc joinDesc = joinOp.getConf();
Byte[] order = joinDesc.getTagOrder();
int numAliases = order.length;
-
try {
HashSet<Integer> smallTableOnlySet = MapJoinProcessor.getSmallTableOnlySet(joinDesc
.getConds());
@@ -115,7 +114,6 @@ private ConditionalTask processCurrentTask(MapRedTask currTask, ConditionalTask
if (smallTableOnlySet.contains(i)) {
continue;
}
-
// create map join task and set big table as i
// deep copy a new mapred work from xml
InputStream in = new ByteArrayInputStream(xml.getBytes("UTF-8"));
@@ -148,9 +146,7 @@ private ConditionalTask processCurrentTask(MapRedTask currTask, ConditionalTask
aliasToPath.put(bigTableAlias, path);
}
}
-
}
-
} catch (Exception e) {
e.printStackTrace();
throw new SemanticException("Generate Map Join Task Error: " + e.getMessage());
@@ -212,7 +208,6 @@ private void replaceTaskWithConditionalTask(Task<? extends Serializable> currTas
}
}
-
@Override
public Object dispatch(Node nd, Stack<Node> stack, Object... nodeOutputs)
throws SemanticException {
@@ -243,19 +238,16 @@ public Object dispatch(Node nd, Stack<Node> stack, Object... nodeOutputs)
return null;
}
-
private JoinOperator getJoinOp(MapRedTask task) throws SemanticException {
if (task.getWork() == null) {
return null;
}
-
Operator<? extends Serializable> reducerOp = task.getWork().getReducer();
if (reducerOp instanceof JoinOperator) {
return (JoinOperator) reducerOp;
} else {
return null;
}
-
}
}
}
Oops, something went wrong.

0 comments on commit 6cef02e

Please sign in to comment.