Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,17 @@
package org.apache.pig.backend.hadoop.executionengine.spark.optimizer;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.backend.hadoop.executionengine.spark.SparkLauncher;
import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOpPlanVisitor;
import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperPlan;
import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator;
Expand All @@ -40,15 +45,15 @@
public class MultiQueryOptimizerSpark extends SparkOpPlanVisitor {
private String scope;
private NodeIdGenerator nig;

private static final Log LOG = LogFactory.getLog(MultiQueryOptimizerSpark.class);

public MultiQueryOptimizerSpark(SparkOperPlan plan) {
super(plan, new ReverseDependencyOrderWalker<SparkOperator, SparkOperPlan>(plan));
nig = NodeIdGenerator.getGenerator();
List<SparkOperator> roots = plan.getRoots();
scope = roots.get(0).getOperatorKey().getScope();
}


@Override
public void visitSparkOp(SparkOperator sparkOp) throws VisitorException {
try {
Expand All @@ -57,7 +62,6 @@ public void visitSparkOp(SparkOperator sparkOp) throws VisitorException {
}

List<SparkOperator> splittees = getPlan().getSuccessors(sparkOp);

if (splittees == null) {
return;
}
Expand All @@ -75,16 +79,20 @@ public void visitSparkOp(SparkOperator sparkOp) throws VisitorException {
SparkOperator singleSplitee = splittees.get(0);
POStore poStore = null;
PhysicalOperator firstNodeLeaf = sparkOp.physicalPlan.getLeaves().get(0);

if (firstNodeLeaf instanceof POStore) {
poStore = (POStore) firstNodeLeaf;
}

PhysicalOperator firstNodeLeafPred = sparkOp.physicalPlan.getPredecessors(firstNodeLeaf).get(0);
sparkOp.physicalPlan.remove(poStore); // remove unnecessary store
List<PhysicalOperator> firstNodeRoots = singleSplitee.physicalPlan.getRoots();
sparkOp.physicalPlan.merge(singleSplitee.physicalPlan);

for (int j = 0; j < firstNodeRoots.size(); j++) {
PhysicalOperator firstNodeRoot = firstNodeRoots.get(j);
POLoad poLoad = null;

if (firstNodeRoot instanceof POLoad && poStore != null) {
poLoad = (POLoad) firstNodeRoot;
if (poLoad.getLFile().getFileName().equals(poStore.getSFile().getFileName())) {
Expand All @@ -94,6 +102,7 @@ public void visitSparkOp(SparkOperator sparkOp) throws VisitorException {
}
}
}

addSubPlanPropertiesToParent(sparkOp, singleSplitee);
removeSplittee(getPlan(), sparkOp, singleSplitee);
} else {
Expand All @@ -102,19 +111,32 @@ public void visitSparkOp(SparkOperator sparkOp) throws VisitorException {
List<PhysicalOperator> firstNodeLeaves = sparkOp.physicalPlan.getLeaves();
PhysicalOperator firstNodeLeaf = firstNodeLeaves.size() > 0 ? firstNodeLeaves.get(0) : null;
POStore poStore = null;

if (firstNodeLeaf != null && firstNodeLeaf instanceof POStore) {
poStore = (POStore) firstNodeLeaf;
sparkOp.physicalPlan.remove(poStore); // remove unnecessary store
poStore = (POStore) firstNodeLeaf;

// We need to make the store only happens once for the same path
if (!poStore.isTmpStore()) {
sparkOp.physicalPlan.remove(poStore); // remove unnecessary store
}

POSplit split = getSplit();
ArrayList<SparkOperator> spliteesCopy = new ArrayList
<SparkOperator>(splittees);
for (SparkOperator splitee : spliteesCopy) {
ArrayList<SparkOperator> spliteesCopy = new ArrayList<SparkOperator>(splittees);

for (SparkOperator splitee : spliteesCopy) {
List<PhysicalOperator> firstNodeRoots = splitee.physicalPlan.getRoots();

for (int i = 0; i < firstNodeRoots.size(); i++) {
if (firstNodeRoots.get(i) instanceof POLoad) {
POLoad poLoad = (POLoad) firstNodeRoots.get(i);

if (firstNodeRoots.get(i) instanceof POLoad) {
POLoad poLoad = (POLoad) firstNodeRoots.get(i);
if (poLoad.getLFile().getFileName().equals(poStore.getSFile().getFileName())) {
splitee.physicalPlan.remove(poLoad); // remove unnecessary load

// Keep the load will fix the FRJoin issue
if (!poLoad.isTmpLoad()) {
splitee.physicalPlan.remove(poLoad); // remove unnecessary load
}

split.addPlan(splitee.physicalPlan);
addSubPlanPropertiesToParent(sparkOp, splitee);
removeSplittee(getPlan(), sparkOp, splitee);
Expand All @@ -137,11 +159,13 @@ private void removeSplittee(SparkOperPlan plan, SparkOperator splitter,
List<SparkOperator> succs = new ArrayList();
succs.addAll(plan.getSuccessors(splittee));
plan.disconnect(splitter, splittee);

for (SparkOperator succSparkOperator : succs) {
plan.disconnect(splittee, succSparkOperator);
plan.connect(splitter, succSparkOperator);
}
}

getPlan().remove(splittee);
}

Expand All @@ -157,11 +181,13 @@ static public void addSubPlanPropertiesToParent(SparkOperator parentOper, SparkO
parentOper.addCrossKey(key);
}
}

parentOper.copyFeatures(subPlanOper, null);

if (subPlanOper.getRequestedParallelism() > parentOper.getRequestedParallelism()) {
parentOper.setRequestedParallelism(subPlanOper.getRequestedParallelism());
}

subPlanOper.setRequestedParallelismByReference(parentOper);
parentOper.UDFs.addAll(subPlanOper.UDFs);
parentOper.scalars.addAll(subPlanOper.scalars);
Expand Down