Skip to content

Commit

Permalink
Merge fix to omit input/output registering on JobManager
Browse files Browse the repository at this point in the history
Rework Invokable Task Hierarchy
  • Loading branch information
StephanEwen committed Jun 22, 2014
1 parent 2692643 commit 8c1d82a
Show file tree
Hide file tree
Showing 76 changed files with 1,548 additions and 1,322 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,7 @@
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;

import eu.stratosphere.api.common.InvalidProgramException;
import eu.stratosphere.api.common.Plan;
import eu.stratosphere.api.java.LocalEnvironment;
import eu.stratosphere.client.LocalExecutor;
import eu.stratosphere.compiler.DataStatistics;
import eu.stratosphere.compiler.PactCompiler;
import eu.stratosphere.compiler.costs.CostEstimator;
Expand Down Expand Up @@ -134,16 +131,16 @@ public void shouldThrowException() throws Exception
verify(this.jobClientMock).submitJob();
}


@Test(expected=InvalidProgramException.class)
public void tryLocalExecution() throws Exception {
new Client(configMock);
LocalExecutor.execute(planMock);
}

@Test(expected=InvalidProgramException.class)
public void tryLocalEnvironmentExecution() throws Exception {
new Client(configMock);
new LocalEnvironment();
}
}
//
// @Test(expected=InvalidProgramException.class)
// public void tryLocalExecution() throws Exception {
// new Client(configMock);
// LocalExecutor.execute(planMock);
// }
//
// @Test(expected=InvalidProgramException.class)
// public void tryLocalEnvironmentExecution() throws Exception {
// new Client(configMock);
// new LocalEnvironment();
// }
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;

import eu.stratosphere.api.common.io.InputFormat;
import eu.stratosphere.api.common.io.OutputFormat;
import eu.stratosphere.api.common.operators.util.UserCodeWrapper;
import eu.stratosphere.core.io.InputSplit;
import eu.stratosphere.nephele.template.AbstractInputTask;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.util.Map.Entry;

import eu.stratosphere.api.common.aggregators.AggregatorRegistry;
import eu.stratosphere.api.common.aggregators.AggregatorWithName;
Expand Down Expand Up @@ -66,7 +59,6 @@
import eu.stratosphere.nephele.jobgraph.JobInputVertex;
import eu.stratosphere.nephele.jobgraph.JobOutputVertex;
import eu.stratosphere.nephele.jobgraph.JobTaskVertex;
import eu.stratosphere.nephele.template.AbstractInputTask;
import eu.stratosphere.pact.runtime.iterative.convergence.WorksetEmptyConvergenceCriterion;
import eu.stratosphere.pact.runtime.iterative.io.FakeOutputTask;
import eu.stratosphere.pact.runtime.iterative.task.IterationHeadPactTask;
Expand Down Expand Up @@ -760,7 +752,7 @@ private JobTaskVertex createSingleInputVertex(SingleInputPlanNode node) throws C
} else {
// create task vertex
vertex = new JobTaskVertex(taskName, this.jobGraph);
vertex.setTaskClass( (this.currentIteration != null && node.isOnDynamicPath()) ? IterationIntermediatePactTask.class : RegularPactTask.class);
vertex.setInvokableClass((this.currentIteration != null && node.isOnDynamicPath()) ? IterationIntermediatePactTask.class : RegularPactTask.class);

config = new TaskConfig(vertex.getConfiguration());
config.setDriver(ds.getDriverClass());
Expand All @@ -786,7 +778,7 @@ private JobTaskVertex createDualInputVertex(DualInputPlanNode node) throws Compi
final DriverStrategy ds = node.getDriverStrategy();
final JobTaskVertex vertex = new JobTaskVertex(taskName, this.jobGraph);
final TaskConfig config = new TaskConfig(vertex.getConfiguration());
vertex.setTaskClass( (this.currentIteration != null && node.isOnDynamicPath()) ? IterationIntermediatePactTask.class : RegularPactTask.class);
vertex.setInvokableClass( (this.currentIteration != null && node.isOnDynamicPath()) ? IterationIntermediatePactTask.class : RegularPactTask.class);

// set user code
config.setStubWrapper(node.getPactContract().getUserCodeWrapper());
Expand All @@ -812,31 +804,29 @@ private JobTaskVertex createDualInputVertex(DualInputPlanNode node) throws Compi

private JobInputVertex createDataSourceVertex(SourcePlanNode node) throws CompilerException {
final JobInputVertex vertex = new JobInputVertex(node.getNodeName(), this.jobGraph);
final TaskConfig config = new TaskConfig(vertex.getConfiguration());

// set task class
@SuppressWarnings("unchecked")
final Class<AbstractInputTask<?>> clazz = (Class<AbstractInputTask<?>>) (Class<?>) DataSourceTask
.class;
vertex.setInputClass(clazz);
vertex.setInvokableClass(DataSourceTask.class);

// set user code
vertex.setInputFormat((UserCodeWrapper<? extends InputFormat<?, InputSplit>>)node.getPactContract()
.getUserCodeWrapper());
vertex.setInputFormatParameters(node.getPactContract().getParameters());
vertex.setOutputSerializer(node.getSerializer());
config.setStubWrapper(node.getPactContract().getUserCodeWrapper());
config.setStubParameters(node.getPactContract().getParameters());

config.setOutputSerializer(node.getSerializer());
return vertex;
}

private AbstractJobOutputVertex createDataSinkVertex(SinkPlanNode node) throws CompilerException {
final JobOutputVertex vertex = new JobOutputVertex(node.getNodeName(), this.jobGraph);
final TaskConfig config = new TaskConfig(vertex.getConfiguration());

vertex.setOutputClass(DataSinkTask.class);
vertex.setInvokableClass(DataSinkTask.class);
vertex.getConfiguration().setInteger(DataSinkTask.DEGREE_OF_PARALLELISM_KEY, node.getDegreeOfParallelism());

// set user code
vertex.setOutputFormat((UserCodeWrapper<? extends OutputFormat<?>>)node.getPactContract().getUserCodeWrapper());
vertex.setOutputFormatParameters(node.getPactContract().getParameters());
config.setStubWrapper(node.getPactContract().getUserCodeWrapper());
config.setStubParameters(node.getPactContract().getParameters());

return vertex;
}

Expand Down Expand Up @@ -884,15 +874,15 @@ private JobTaskVertex createBulkIterationHead(BulkPartialSolutionPlanNode pspn)
}

// reset the vertex type to iteration head
headVertex.setTaskClass(IterationHeadPactTask.class);
headVertex.setInvokableClass(IterationHeadPactTask.class);
headConfig = new TaskConfig(headVertex.getConfiguration());
toReturn = null;
} else {
// instantiate the head vertex and give it a no-op driver as the driver strategy.
// everything else happens in the post visit, after the input (the initial partial solution)
// is connected.
headVertex = new JobTaskVertex("PartialSolution ("+iteration.getNodeName()+")", this.jobGraph);
headVertex.setTaskClass(IterationHeadPactTask.class);
headVertex.setInvokableClass(IterationHeadPactTask.class);
headConfig = new TaskConfig(headVertex.getConfiguration());
headConfig.setDriver(NoOpDriver.class);
toReturn = headVertex;
Expand Down Expand Up @@ -952,15 +942,15 @@ private JobTaskVertex createWorksetIterationHead(WorksetPlanNode wspn) {
}

// reset the vertex type to iteration head
headVertex.setTaskClass(IterationHeadPactTask.class);
headVertex.setInvokableClass(IterationHeadPactTask.class);
headConfig = new TaskConfig(headVertex.getConfiguration());
toReturn = null;
} else {
// instantiate the head vertex and give it a no-op driver as the driver strategy.
// everything else happens in the post visit, after the input (the initial partial solution)
// is connected.
headVertex = new JobTaskVertex("IterationHead("+iteration.getNodeName()+")", this.jobGraph);
headVertex.setTaskClass(IterationHeadPactTask.class);
headVertex.setInvokableClass(IterationHeadPactTask.class);
headConfig = new TaskConfig(headVertex.getConfiguration());
headConfig.setDriver(NoOpDriver.class);
toReturn = headVertex;
Expand Down Expand Up @@ -1144,7 +1134,7 @@ private void finalizeBulkIteration(IterationDescriptor descr) {
// --------------------------- create the sync task ---------------------------
final JobOutputVertex sync = new JobOutputVertex("Sync(" +
bulkNode.getNodeName() + ")", this.jobGraph);
sync.setOutputClass(IterationSynchronizationSinkTask.class);
sync.setInvokableClass(IterationSynchronizationSinkTask.class);
sync.setNumberOfSubtasks(1);
this.auxVertices.add(sync);

Expand Down Expand Up @@ -1192,14 +1182,14 @@ private void finalizeBulkIteration(IterationDescriptor descr) {
// No following termination criterion
if(rootOfStepFunction.getOutgoingChannels().isEmpty()) {

rootOfStepFunctionVertex.setTaskClass(IterationTailPactTask.class);
rootOfStepFunctionVertex.setInvokableClass(IterationTailPactTask.class);

tailConfig.setOutputSerializer(bulkNode.getSerializerForIterationChannel());
tailConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);

// create the fake output task
JobOutputVertex fakeTail = new JobOutputVertex("Fake Tail", this.jobGraph);
fakeTail.setOutputClass(FakeOutputTask.class);
fakeTail.setInvokableClass(FakeOutputTask.class);
fakeTail.setNumberOfSubtasks(headVertex.getNumberOfSubtasks());
this.auxVertices.add(fakeTail);

Expand Down Expand Up @@ -1234,14 +1224,14 @@ private void finalizeBulkIteration(IterationDescriptor descr) {
tailConfigOfTerminationCriterion = new TaskConfig(rootOfTerminationCriterionVertex.getConfiguration());
}

rootOfTerminationCriterionVertex.setTaskClass(IterationTailPactTask.class);
rootOfTerminationCriterionVertex.setInvokableClass(IterationTailPactTask.class);
// Hack
tailConfigOfTerminationCriterion.setIsSolutionSetUpdate();
tailConfigOfTerminationCriterion.setOutputSerializer(bulkNode.getSerializerForIterationChannel());
tailConfigOfTerminationCriterion.addOutputShipStrategy(ShipStrategyType.FORWARD);

JobOutputVertex fakeTailTerminationCriterion = new JobOutputVertex("Fake Tail for Termination Criterion", this.jobGraph);
fakeTailTerminationCriterion.setOutputClass(FakeOutputTask.class);
fakeTailTerminationCriterion.setInvokableClass(FakeOutputTask.class);
fakeTailTerminationCriterion.setNumberOfSubtasks(headVertex.getNumberOfSubtasks());
this.auxVertices.add(fakeTailTerminationCriterion);

Expand Down Expand Up @@ -1309,7 +1299,7 @@ private void finalizeWorksetIteration(IterationDescriptor descr) {
{
final JobOutputVertex sync = new JobOutputVertex("Sync (" +
iterNode.getNodeName() + ")", this.jobGraph);
sync.setOutputClass(IterationSynchronizationSinkTask.class);
sync.setInvokableClass(IterationSynchronizationSinkTask.class);
sync.setNumberOfSubtasks(1);
this.auxVertices.add(sync);

Expand Down Expand Up @@ -1367,14 +1357,14 @@ private void finalizeWorksetIteration(IterationDescriptor descr) {
worksetTailConfig.setIsWorksetUpdate();

if (hasWorksetTail) {
nextWorksetVertex.setTaskClass(IterationTailPactTask.class);
nextWorksetVertex.setInvokableClass(IterationTailPactTask.class);

worksetTailConfig.setOutputSerializer(iterNode.getWorksetSerializer());
worksetTailConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);

// create the fake output task
JobOutputVertex fakeTail = new JobOutputVertex("Fake Tail", this.jobGraph);
fakeTail.setOutputClass(FakeOutputTask.class);
fakeTail.setInvokableClass(FakeOutputTask.class);
fakeTail.setNumberOfSubtasks(headVertex.getNumberOfSubtasks());
this.auxVertices.add(fakeTail);

Expand Down Expand Up @@ -1405,14 +1395,14 @@ private void finalizeWorksetIteration(IterationDescriptor descr) {
solutionDeltaConfig.setIsSolutionSetUpdate();

if (hasSolutionSetTail) {
solutionDeltaVertex.setTaskClass(IterationTailPactTask.class);
solutionDeltaVertex.setInvokableClass(IterationTailPactTask.class);

solutionDeltaConfig.setOutputSerializer(iterNode.getSolutionSetSerializer());
solutionDeltaConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);

// create the fake output task
JobOutputVertex fakeTail = new JobOutputVertex("Fake Tail", this.jobGraph);
fakeTail.setOutputClass(FakeOutputTask.class);
fakeTail.setInvokableClass(FakeOutputTask.class);
fakeTail.setNumberOfSubtasks(headVertex.getNumberOfSubtasks());
this.auxVertices.add(fakeTail);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ public abstract class PlanExecutor {
* Creates an executor that runs the plan locally in a multi-threaded environment.
*
* @return A local executor.
* @see eu.stratosphere.client.LocalExecutor
*/
public static PlanExecutor createLocalExecutor() {
Class<? extends PlanExecutor> leClass = loadExecutorClass(LOCAL_EXECUTOR_CLASS);
Expand All @@ -75,7 +74,6 @@ public static PlanExecutor createLocalExecutor() {
* @param jarFiles A list of jar files that contain the user-defined function (UDF) classes and all classes used
* from within the UDFs.
* @return A remote executor.
* @see eu.stratosphere.client.RemoteExecutor
*/
public static PlanExecutor createRemoteExecutor(String hostname, int port, String... jarFiles) {
if (hostname == null) {
Expand Down

0 comments on commit 8c1d82a

Please sign in to comment.