Permalink
Browse files

PIG-2659: add source location of the aliases in the physical plan

git-svn-id: https://svn.apache.org/repos/asf/pig/trunk@1337558 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information...
1 parent 8977c70 commit 898813a9538c6727774436487df64f074aaaa9dc @julienledem julienledem committed May 12, 2012
Showing with 225 additions and 64 deletions.
  1. +2 −0 CHANGES.txt
  2. +2 −2 src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java
  3. +3 −3 src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
  4. +2 −0 src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
  5. +1 −0 src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java
  6. +2 −0 src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java
  7. +1 −0 src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java
  8. +47 −1 src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java
  9. +1 −1 src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java
  10. +1 −1 src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLimit.java
  11. +1 −1 ...apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java
  12. +1 −1 src/org/apache/pig/newplan/logical/expression/ExpToPhyTranslationVisitor.java
  13. +38 −36 src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java
  14. +10 −4 src/org/apache/pig/tools/pigstats/JobStats.java
  15. +60 −12 src/org/apache/pig/tools/pigstats/ScriptState.java
  16. +3 −0 src/org/apache/pig/tools/pigstats/SimplePigStats.java
  17. +48 −0 test/org/apache/pig/newplan/logical/relational/TestLocationInPhysicalPlan.java
  18. +1 −1 test/org/apache/pig/test/data/GoldenFiles/MRC15.gld
  19. +1 −1 test/org/apache/pig/test/data/GoldenFiles/MRC18.gld
View
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
IMPROVEMENTS
+PIG-2659: add source location of the aliases in the physical plan (julien)
+
PIG-2547: Easier UDFs: Convenient EvalFunc super-classes (billgraham, dvryaboy)
PIG-2639: Utils.getSchemaFromString should automatically give name to all types, but fails on boolean (jcoveney)
@@ -338,7 +338,7 @@ private POPartialAgg createPartialAgg(POForEach combineFE)
String scope = combineFE.getOperatorKey().scope;
POPartialAgg poAgg = new POPartialAgg(new OperatorKey(scope,
NodeIdGenerator.getGenerator().getNextNodeId(scope)));
- poAgg.setAlias(combineFE.getAlias());
+ poAgg.addOriginalLocation(combineFE.getAlias(), combineFE.getOriginalLocations());
poAgg.setResultType(combineFE.getResultType());
//first plan in combine foreach is the group key
@@ -484,7 +484,7 @@ private POUserFunc getAlgebraicSuccessor(POProject proj, PhysicalPlan pplan) {
private POForEach createForEachWithGrpProj(POForEach foreach, byte keyType) {
String scope = foreach.getOperatorKey().scope;
POForEach newFE = new POForEach(createOperatorKey(scope), new ArrayList<PhysicalPlan>());
- newFE.setAlias(foreach.getAlias());
+ newFE.addOriginalLocation(foreach.getAlias(), foreach.getOriginalLocations());
newFE.setResultType(foreach.getResultType());
//create plan that projects the group column
PhysicalPlan grpProjPlan = new PhysicalPlan();
@@ -2130,7 +2130,7 @@ private MapReduceOper getSortJob(
keyType);
lr.setPlans(eps1);
lr.setResultType(DataType.TUPLE);
- lr.setAlias(sort.getAlias());
+ lr.addOriginalLocation(sort.getAlias(), sort.getOriginalLocations());
mro.mapPlan.addAsLeaf(lr);
mro.setMapDone(true);
@@ -2220,7 +2220,7 @@ private MapReduceOper getSortJob(
POSort sort = new POSort(inpSort.getOperatorKey(), inpSort
.getRequestedParallelism(), null, inpSort.getSortPlans(),
inpSort.getMAscCols(), inpSort.getMSortFunc());
- sort.setAlias(inpSort.getAlias());
+ sort.addOriginalLocation(inpSort.getAlias(), inpSort.getOriginalLocations());
// Turn the asc/desc array into an array of strings so that we can pass it
// to the FindQuantiles function.
@@ -2425,7 +2425,7 @@ private MapReduceOper getSortJob(
lr.setKeyType(DataType.CHARARRAY);
lr.setPlans(eps);
lr.setResultType(DataType.TUPLE);
- lr.setAlias(sort.getAlias());
+ lr.addOriginalLocation(sort.getAlias(), sort.getOriginalLocations());
mro.mapPlan.add(lr);
mro.mapPlan.connect(nfe1, lr);
@@ -285,6 +285,8 @@ public void run() {
if (mro != null) {
String alias = ScriptState.get().getAlias(mro);
log.info("Processing aliases " + alias);
+ String aliasLocation = ScriptState.get().getAliasLocation(mro);
+ log.info("detailed locations: " + aliasLocation);
}
@@ -115,6 +115,7 @@ protected void setup(Context context) throws IOException, InterruptedException {
String msg = "Problem while configuring combiner's reduce plan.";
throw new RuntimeException(msg, ioe);
}
+ log.info("Aliases being processed per job phase (AliasName[line,offset]): " + jConf.get("pig.alias.location"));
}
/**
@@ -208,6 +208,7 @@ public void setup(Context context) throws IOException, InterruptedException {
PigStatusReporter.setContext(context);
+ log.info("Aliases being processed per job phase (AliasName[line,offset]): " + job.get("pig.alias.location"));
}
/**
@@ -246,6 +247,7 @@ protected void map(Text key, Tuple inpTuple, Context context) throws IOException
pigHadoopLogger.setReporter(PigStatusReporter.getInstance());
PhysicalOperator.setPigLogger(pigHadoopLogger);
+
}
if (mp.isEmpty()) {
@@ -348,6 +348,7 @@ protected void setup(Context context) throws IOException, InterruptedException {
String msg = "Problem while configuring reduce plan.";
throw new RuntimeException(msg, ioe);
}
+ log.info("Aliases being processed per job phase (AliasName[line,offset]): " + jConf.get("pig.alias.location"));
}
/**
@@ -17,6 +17,9 @@
*/
package org.apache.pig.backend.hadoop.executionengine.physicalLayer;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -34,6 +37,7 @@
import org.apache.pig.impl.plan.Operator;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.parser.SourceLocation;
import org.apache.pig.pen.util.LineageTracer;
import org.apache.pig.pen.Illustrator;
import org.apache.pig.pen.Illustrable;
@@ -137,6 +141,8 @@
private boolean accum;
private transient boolean accumStart;
+ private List<OriginalLocation> originalLocations = new ArrayList<OriginalLocation>();
+
public PhysicalOperator(OperatorKey k) {
this(k, -1, null);
}
@@ -185,8 +191,18 @@ protected String getAliasString() {
return (alias == null) ? "" : (alias + ": ");
}
- public void setAlias(String alias) {
+ public void addOriginalLocation(String alias, SourceLocation sourceLocation) {
this.alias = alias;
+ this.originalLocations.add(new OriginalLocation(alias, sourceLocation.line(), sourceLocation.offset()));
+ }
+
+ public void addOriginalLocation(String alias, List<OriginalLocation> originalLocations) {
+ this.alias = alias;
+ this.originalLocations.addAll(originalLocations);
+ }
+
+ public List<OriginalLocation> getOriginalLocations() {
+ return Collections.unmodifiableList(originalLocations);
}
public void setAccumulative() {
@@ -441,6 +457,7 @@ public PhysicalOperator clone() throws CloneNotSupportedException {
protected void cloneHelper(PhysicalOperator op) {
resultType = op.resultType;
+ originalLocations.addAll(op.originalLocations);
}
/**
@@ -462,4 +479,33 @@ public static PigLogger getPigLogger() {
return pigLogger;
}
+ public static class OriginalLocation implements Serializable {
+ private String alias;
+ private int line;
+ private int offset;
+
+ public OriginalLocation(String alias, int line, int offset) {
+ super();
+ this.alias = alias;
+ this.line = line;
+ this.offset = offset;
+}
+
+ public String getAlias() {
+ return alias;
+ }
+
+ public int getLine() {
+ return line;
+ }
+
+ public int getOffset() {
+ return offset;
+ }
+
+ @Override
+ public String toString() {
+ return alias+"["+line+","+offset+"]";
+ }
+ }
}
@@ -593,7 +593,7 @@ public POForEach clone() throws CloneNotSupportedException {
requestedParallelism, plans, flattens);
clone.setOpsToBeReset(ops);
clone.setResultType(getResultType());
- clone.setAlias(alias);
+ clone.addOriginalLocation(alias, getOriginalLocations());
return clone;
}
@@ -162,7 +162,7 @@ public POLimit clone() throws CloneNotSupportedException {
this.requestedParallelism, this.inputs);
newLimit.mLimit = this.mLimit;
newLimit.expressionPlan = this.expressionPlan.clone();
- newLimit.setAlias(alias);
+ newLimit.addOriginalLocation(alias, getOriginalLocations());
return newLimit;
}
@@ -687,7 +687,7 @@ public POLocalRearrange clone() throws CloneNotSupportedException {
// Needs to be called as setDistinct so that the fake index tuple gets
// created.
clone.setDistinct(mIsDistinct);
- clone.setAlias(alias);
+ clone.addOriginalLocation(alias, getOriginalLocations());
return clone;
}
@@ -287,7 +287,7 @@ public void visit( MapLookupExpression op ) throws FrontendException {
nodeGen.getNextNodeId(DEFAULT_SCOPE)));
((POMapLookUp)physOp).setLookUpKey(op.getLookupKey() );
physOp.setResultType(op.getType());
- physOp.setAlias(op.getFieldSchema().alias);
+ physOp.addOriginalLocation(op.getFieldSchema().alias, op.getLocation());
currentPlan.add(physOp);
logToPhyMap.put(op, physOp);
Oops, something went wrong.

0 comments on commit 898813a

Please sign in to comment.