Permalink
Browse files

PIG-2652: Skew join and order by dont trigger reducer estimation (dvr…

…yaboy)

git-svn-id: https://svn.apache.org/repos/asf/pig/trunk@1331637 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information...
1 parent 8291804 commit d49f903e0d041ce7d9e8e2d0a9066cd666f99da7 Dmitriy V. Ryaboy committed Apr 27, 2012
View
2 CHANGES.txt
@@ -112,6 +112,8 @@ OPTIMIZATIONS
BUG FIXES
+PIG-2652: Skew join and order by don't trigger reducer estimation (dvryaboy)
+
PIG-2616: JobControlCompiler.getInputSizeFromLoader must handle exceptions from LoadFunc.getStatistics (billgraham)
PIG-2644: Piggybank's HadoopJobHistoryLoader throws NPE when reading broken history file (herberts via daijy)
View
10 src/org/apache/pig/PigServer.java
@@ -27,7 +27,6 @@
import java.io.PrintStream;
import java.io.StringReader;
import java.io.StringWriter;
-import java.net.MalformedURLException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collection;
@@ -74,16 +73,13 @@
import org.apache.pig.impl.plan.CompilationMessageCollector.MessageType;
import org.apache.pig.impl.streaming.StreamingCommand;
import org.apache.pig.impl.util.LogUtils;
-import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.pig.impl.util.PropertiesUtil;
import org.apache.pig.impl.util.UDFContext;
import org.apache.pig.impl.util.Utils;
import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
import org.apache.pig.newplan.logical.expression.LogicalExpressionVisitor;
import org.apache.pig.newplan.logical.expression.ScalarExpression;
import org.apache.pig.newplan.logical.optimizer.AllExpressionVisitor;
-import org.apache.pig.newplan.logical.optimizer.LogicalPlanOptimizer;
-import org.apache.pig.newplan.logical.optimizer.SchemaResetter;
import org.apache.pig.newplan.logical.relational.LOForEach;
import org.apache.pig.newplan.logical.relational.LOLoad;
import org.apache.pig.newplan.logical.relational.LOStore;
@@ -93,7 +89,6 @@
import org.apache.pig.newplan.logical.visitor.CastLineageSetter;
import org.apache.pig.newplan.logical.visitor.ColumnAliasConversionVisitor;
import org.apache.pig.newplan.logical.visitor.ScalarVariableValidator;
-import org.apache.pig.newplan.logical.visitor.ProjStarInUdfExpander;
import org.apache.pig.newplan.logical.visitor.ScalarVisitor;
import org.apache.pig.newplan.logical.visitor.SchemaAliasVisitor;
import org.apache.pig.newplan.logical.visitor.TypeCheckingRelVisitor;
@@ -109,7 +104,6 @@
import org.apache.pig.tools.pigstats.JobStats;
import org.apache.pig.tools.pigstats.OutputStats;
import org.apache.pig.tools.pigstats.PigStats;
-import org.apache.pig.tools.pigstats.PigStatsUtil;
import org.apache.pig.tools.pigstats.ScriptState;
import org.apache.pig.tools.pigstats.PigStats.JobGraph;
@@ -1201,10 +1195,10 @@ public void shutdown() {
return exgen.getExamples();
} catch (ExecException e) {
e.printStackTrace(System.out);
- throw new IOException("ExecException : " + e.getMessage());
+ throw new IOException("ExecException" , e);
} catch (Exception e) {
e.printStackTrace(System.out);
- throw new IOException("Exception : " + e.getMessage());
+ throw new IOException("Exception ", e);
}
}
View
87 src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
@@ -58,6 +58,7 @@
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.WeightedRangePartitioner;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ConstantExpression;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
@@ -261,7 +262,7 @@ public JobControl compile(MROperPlan plan, String grpName) throws JobCreationExc
if(mro instanceof NativeMapReduceOper) {
return null;
}
- Job job = getJob(mro, conf, pigContext);
+ Job job = getJob(plan, mro, conf, pigContext);
jobMroMap.put(job, mro);
jobCtrl.addJob(job);
}
@@ -327,7 +328,7 @@ public int updateMROpPlan(List<Job> completeFailedJobs)
* @throws JobCreationException
*/
@SuppressWarnings({ "unchecked", "deprecation" })
- private Job getJob(MapReduceOper mro, Configuration config, PigContext pigContext) throws JobCreationException{
+ private Job getJob(MROperPlan plan, MapReduceOper mro, Configuration config, PigContext pigContext) throws JobCreationException{
org.apache.hadoop.mapreduce.Job nwJob = null;
try{
@@ -376,6 +377,8 @@ private Job getJob(MapReduceOper mro, Configuration config, PigContext pigContex
}
try{
+ adjustNumReducers(plan, mro, conf, nwJob);
+
//Process the POLoads
List<POLoad> lds = PlanHelper.getLoads(mro.mapPlan);
@@ -594,14 +597,6 @@ else if (mapStores.size() + reduceStores.size() > 0) { // multi store case
nwJob.setMapperClass(PigMapReduce.Map.class);
nwJob.setReducerClass(PigMapReduce.Reduce.class);
- // first check the PARALLE in query, then check the defaultParallel in PigContext, and last do estimation
- if (mro.requestedParallelism > 0)
- nwJob.setNumReduceTasks(mro.requestedParallelism);
- else if (pigContext.defaultParallel > 0)
- conf.set("mapred.reduce.tasks", ""+pigContext.defaultParallel);
- else
- estimateNumberOfReducers(conf, lds, nwJob);
-
if (mro.customPartitioner != null)
nwJob.setPartitionerClass(PigContext.resolveClassName(mro.customPartitioner));
@@ -743,9 +738,42 @@ else if ((tmp = pigContext.getProperties().getProperty("pig.maxCombinedSplitSize
}
}
+ public void adjustNumReducers(MROperPlan plan, MapReduceOper mro, Configuration conf,
+ org.apache.hadoop.mapreduce.Job nwJob) throws IOException {
+ List<PhysicalOperator> loads = mro.mapPlan.getRoots();
+ List<POLoad> lds = new ArrayList<POLoad>();
+ for (PhysicalOperator ld : loads) {
+ lds.add((POLoad)ld);
+ }
+ int jobParallelism = -1;
+ int estimatedParallelism = estimateNumberOfReducers(conf, lds, nwJob);
+ if (mro.requestedParallelism > 0) {
+ jobParallelism = mro.requestedParallelism;
+ } else if (pigContext.defaultParallel > 0) {
+ jobParallelism = pigContext.defaultParallel;
+ } else {
+ jobParallelism = estimatedParallelism;
+ }
+ // Special case: Skewed Join and Order set parallelism to 1 even when no parallelism is specified.
+ if ((mro.isSkewedJoin() || mro.isGlobalSort()) && jobParallelism == 1) {
+ jobParallelism = estimatedParallelism;
+ }
+ if (mro.isSampler() && jobParallelism == 1) {
+ // Note: this is suboptimal, as the number of reducers communicated to the
+ // sampler is only based on the sampler inputs, meaning, the left side in the case
+ // of a skewed join. Ideally, we'd take into account the right side, as well.
+ ParallelConstantVisitor visitor =
+ new ParallelConstantVisitor(mro.reducePlan, estimatedParallelism);
+ visitor.visit();
+ }
+ log.info("Setting Parallelism to " + jobParallelism);
+ mro.requestedParallelism = jobParallelism;
+ conf.setInt("mapred.reduce.tasks", jobParallelism);
+ }
+
/**
* Looks up the estimator from REDUCER_ESTIMATOR_KEY and invokes it to find the number of
- * reducers to use. If REDUCER_ESTIMATOR_KEY isn't set, defaults to InputSizeReducerEstimator
+ * reducers to use. If REDUCER_ESTIMATOR_KEY isn't set, defaults to InputSizeReducerEstimator.
* @param conf
* @param lds
* @throws IOException
@@ -759,10 +787,6 @@ public static int estimateNumberOfReducers(Configuration conf, List<POLoad> lds,
log.info("Using reducer estimator: " + estimator.getClass().getName());
int numberOfReducers = estimator.estimateNumberOfReducers(conf, lds, job);
- conf.setInt("mapred.reduce.tasks", numberOfReducers);
-
- log.info("Neither PARALLEL nor default parallelism is set for this job. Setting number of "
- + "reducers to " + numberOfReducers);
return numberOfReducers;
}
@@ -1398,4 +1422,37 @@ public void visitUserFunc(POUserFunc func) throws VisitorException {
}
}
}
+
+ private static class ParallelConstantVisitor extends PhyPlanVisitor {
+
+ private int rp;
+
+ private boolean replaced = false;
+
+ public ParallelConstantVisitor(PhysicalPlan plan, int rp) {
+ super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(
+ plan));
+ this.rp = rp;
+}
+
+ @Override
+ public void visitConstant(ConstantExpression cnst) throws VisitorException {
+ if (cnst.getRequestedParallelism() == -1) {
+ Object obj = cnst.getValue();
+ if (obj instanceof Integer) {
+ if (replaced) {
+ // sample job should have only one ConstantExpression
+ throw new VisitorException("Invalid reduce plan: more " +
+ "than one ConstantExpression found in sampling job");
+ }
+ cnst.setValue(rp);
+ cnst.setRequestedParallelism(rp);
+ replaced = true;
+ }
+ }
+ }
+
+ boolean isReplaced() { return replaced; }
+ }
+
}
View
15 src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/LimitAdjuster.java
@@ -23,19 +23,14 @@
import org.apache.pig.FuncSpec;
import org.apache.pig.PigException;
-import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROpPlanVisitor;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
-import org.apache.pig.data.DataType;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.io.FileSpec;
@@ -64,9 +59,13 @@ public LimitAdjuster(MROperPlan plan, PigContext pigContext) {
@Override
public void visitMROp(MapReduceOper mr) throws VisitorException {
// Look for map reduce operators which contains limit operator.
- // If so and the requestedParallelism > 1, add one additional map-reduce
- // operator with 1 reducer into the original plan
- if ((mr.limit!=-1 || mr.limitPlan!=null) && mr.requestedParallelism!=1)
+ // If so, add one additional map-reduce
+ // operator with 1 reducer into the original plan.
+
+ // TODO: This new MR job can be skipped if at runtime we discover that
+ // its parent only has a single reducer (mr.requestedParallelism!=1).
+ // This check MUST happen at runtime since that's when reducer estimation happens.
+ if ((mr.limit!=-1 || mr.limitPlan!=null) )
{
opsToAdjust.add(mr);
}
View
8 src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
@@ -38,7 +38,6 @@
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.jobcontrol.Job;
import org.apache.hadoop.mapred.jobcontrol.JobControl;
-import org.apache.hadoop.mapreduce.JobContext;
import org.apache.pig.ExecType;
import org.apache.pig.PigException;
import org.apache.pig.PigWarning;
@@ -65,7 +64,6 @@
import org.apache.pig.impl.util.ConfigurationValidator;
import org.apache.pig.impl.util.LogUtils;
import org.apache.pig.impl.util.UDFContext;
-import org.apache.pig.impl.util.Utils;
import org.apache.pig.tools.pigstats.PigStats;
import org.apache.pig.tools.pigstats.PigStatsUtil;
import org.apache.pig.tools.pigstats.ScriptState;
@@ -516,7 +514,6 @@ public MROperPlan compile(
pc.getProperties().getProperty(
"last.input.chunksize", POJoinPackage.DEFAULT_CHUNK_SIZE);
- //String prop = System.getProperty("pig.exec.nocombiner");
String prop = pc.getProperties().getProperty("pig.exec.nocombiner");
if (!pc.inIllustrator && !("true".equals(prop))) {
boolean doMapAgg =
@@ -532,10 +529,12 @@ public MROperPlan compile(
SampleOptimizer so = new SampleOptimizer(plan, pc);
so.visit();
+ // We must ensure that there is only 1 reducer for a limit. Add a single-reducer job.
+ if (!pc.inIllustrator) {
LimitAdjuster la = new LimitAdjuster(plan, pc);
la.visit();
la.adjust();
-
+ }
// Optimize to use secondary sort key if possible
prop = pc.getProperties().getProperty("pig.exec.nosecondarykey");
if (!pc.inIllustrator && !("true".equals(prop))) {
@@ -621,6 +620,7 @@ private void createSuccessFile(Job job, POStore store) throws IOException {
*/
class JobControlThreadExceptionHandler implements Thread.UncaughtExceptionHandler {
+ @Override
public void uncaughtException(Thread thread, Throwable throwable) {
jobControlExceptionStackTrace = getStackStraceStr(throwable);
try {
View
58 src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SampleOptimizer.java
@@ -126,33 +126,6 @@ public void visitMROp(MapReduceOper mr) throws VisitorException {
return;
}
MapReduceOper succ = succs.get(0);
-
- // set/estimate the parallelism
- if (succ.requestedParallelism == 1) {
- List<PhysicalOperator> loads = pred.mapPlan.getRoots();
- List<POLoad> lds = new ArrayList<POLoad>();
- for (PhysicalOperator ld : loads) {
- lds.add((POLoad)ld);
- }
- Configuration conf = ConfigurationUtil.toConfiguration(pigContext.getProperties());
- int rp = 1;
- try {
- rp = JobControlCompiler.estimateNumberOfReducers(
- conf, lds, new org.apache.hadoop.mapreduce.Job(conf));
- } catch (IOException e) {
- log.warn("Failed to estimate number of reducers", e);
- }
-
- if (rp > 1) {
- ParallelConstantVisitor visitor = new ParallelConstantVisitor(mr.reducePlan, rp);
- visitor.visit();
- if (visitor.isReplaced()) {
- succ.requestedParallelism = rp;
- log.info(" Setting number of reducers for order by to " + rp);
- }
- }
- }
-
if (pred.mapPlan == null || pred.mapPlan.size() != 2) {
log.debug("Predecessor has more than just load+store in the map");
return;
@@ -273,35 +246,4 @@ private void scan(MapReduceOper mr, PhysicalOperator op, String fileName) {
}
}
}
-
- private static class ParallelConstantVisitor extends PhyPlanVisitor {
-
- private int rp;
-
- private boolean replaced = false;
-
- public ParallelConstantVisitor(PhysicalPlan plan, int rp) {
- super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(
- plan));
- this.rp = rp;
- }
-
- public void visitConstant(ConstantExpression cnst) throws VisitorException {
- if (cnst.getRequestedParallelism() == -1) {
- Object obj = cnst.getValue();
- if (obj instanceof Integer) {
- if (replaced) {
- // sample job should have only one ConstantExpression
- throw new VisitorException("Invalid reduce plan: more " +
- "than one ConstantExpression found in sampling job");
- }
- cnst.setValue(rp);
- cnst.setRequestedParallelism(rp);
- replaced = true;
- }
- }
- }
-
- boolean isReplaced() { return replaced; }
- }
}
View
9 test/org/apache/pig/test/TestEvalPipeline2.java
@@ -252,7 +252,7 @@ public void testBinStorageByteArrayCastsComplexBag() throws IOException {
Assert.assertTrue(tup.get(0) instanceof DataBag);
DataBag db = (DataBag)tup.get(0);
Assert.assertTrue(db.iterator().hasNext());
- Tuple innerTuple = (Tuple)db.iterator().next();
+ Tuple innerTuple = db.iterator().next();
Assert.assertTrue(innerTuple.get(0)==null);
//tuple 5
@@ -1487,6 +1487,7 @@ public void testDereferenceUidBug() throws Exception{
}
static public class UDFWithNonStandardType extends EvalFunc<Tuple>{
+ @Override
public Tuple exec(Tuple input) throws IOException {
Tuple t = TupleFactory.getInstance().newTuple();
t.append(new ArrayList<Integer>());
@@ -1655,9 +1656,9 @@ public void testLimitAutoReducer() throws Exception{
Util.createInputFile(cluster, "table_testLimitAutoReducer", input);
- pigServer.getPigContext().getProperties().setProperty("pig.exec.reducers.bytes.per.reducer", "9");
- pigServer.registerQuery("A = load 'table_testLimitAutoReducer' as (a0, a1);");
- pigServer.registerQuery("B = order A by a0;");
+ pigServer.getPigContext().getProperties().setProperty("pig.exec.reducers.bytes.per.reducer", "16");
+ pigServer.registerQuery("A = load 'table_testLimitAutoReducer';");
+ pigServer.registerQuery("B = order A by $0;");
pigServer.registerQuery("C = limit B 2;");
Iterator<Tuple> iter = pigServer.openIterator("C");
View
26 test/org/apache/pig/test/TestJobSubmission.java
@@ -17,7 +17,8 @@
*/
package org.apache.pig.test;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
import java.io.File;
import java.util.Iterator;
@@ -40,6 +41,7 @@
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.data.DataType;
@@ -618,9 +620,19 @@ public void testReducerNumEstimationForOrderBy() throws Exception{
PhysicalPlan pp = Util.buildPp(ps, query);
MROperPlan mrPlan = Util.buildMRPlanWithOptimizer(pp, pc);
+ Configuration conf = ConfigurationUtil.toConfiguration(pc.getProperties());
+ JobControlCompiler jcc = new JobControlCompiler(pc, conf);
+ JobControl jobControl = jcc.compile(mrPlan, query);
+
assertEquals(2, mrPlan.size());
+ // Simulate the first job having run so estimation kicks in.
MapReduceOper sort = mrPlan.getLeaves().get(0);
+ jcc.updateMROpPlan(jobControl.getReadyJobs());
+ FileLocalizer.create(sort.getQuantFile(), pc);
+ jcc.compile(mrPlan, query);
+
+ sort = mrPlan.getLeaves().get(0);
long reducer=Math.min((long)Math.ceil(new File("test/org/apache/pig/test/data/passwd").length()/100.0), 10);
assertEquals(reducer, sort.getRequestedParallelism());
@@ -630,6 +642,7 @@ public void testReducerNumEstimationForOrderBy() throws Exception{
pp = Util.buildPp(ps, query);
mrPlan = Util.buildMRPlanWithOptimizer(pp, pc);
+
assertEquals(2, mrPlan.size());
sort = mrPlan.getLeaves().get(0);
@@ -658,6 +671,17 @@ public void testReducerNumEstimationForOrderBy() throws Exception{
mrPlan = Util.buildMRPlanWithOptimizer(pp, pc);
assertEquals(3, mrPlan.size());
+ // Simulate the first 2 jobs having run so estimation kicks in.
+ sort = mrPlan.getLeaves().get(0);
+ FileLocalizer.create(sort.getQuantFile(), pc);
+
+ jobControl = jcc.compile(mrPlan, query);
+ Util.copyFromLocalToCluster(cluster, "test/org/apache/pig/test/data/passwd", ((POLoad) sort.mapPlan.getRoots().get(0)).getLFile().getFileName());
+ jcc.updateMROpPlan(jobControl.getReadyJobs());
+ jobControl = jcc.compile(mrPlan, query);
+ jcc.updateMROpPlan(jobControl.getReadyJobs());
+
+ jobControl = jcc.compile(mrPlan, query);
sort = mrPlan.getLeaves().get(0);
assertEquals(reducer, sort.getRequestedParallelism());
}
View
2 test/org/apache/pig/test/TestPigRunner.java
@@ -217,7 +217,7 @@ public void orderByTest() throws Exception {
try {
PigStats stats = PigRunner.run(args, new TestNotificationListener());
assertTrue(stats.isSuccessful());
- assertTrue(stats.getJobGraph().size() == 3);
+ assertTrue(stats.getJobGraph().size() == 4);
assertTrue(stats.getJobGraph().getSinks().size() == 1);
assertTrue(stats.getJobGraph().getSources().size() == 1);
JobStats js = (JobStats) stats.getJobGraph().getSinks().get(0);
View
3 test/org/apache/pig/test/TestPigStats.java
@@ -161,8 +161,7 @@ public void testPigStatsAlias() throws Exception {
PhysicalPlan pp = pig.getPigContext().getExecutionEngine().compile(lp,
null);
MROperPlan mp = getMRPlan(pp, pig.getPigContext());
-
- assertEquals(3, mp.getKeys().size());
+ assertEquals(4, mp.getKeys().size());
MapReduceOper mro = mp.getRoots().get(0);
assertEquals("A,B,C", getAlias(mro));

0 comments on commit d49f903

Please sign in to comment.