Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.flink.client;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.PlanExecutor;
Expand All @@ -28,18 +27,14 @@
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.optimizer.DataStatistics;
import org.apache.flink.optimizer.Optimizer;
import org.apache.flink.optimizer.dag.DataSinkNode;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.minicluster.JobExecutorService;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
import org.apache.flink.runtime.minicluster.RpcServiceSharing;

import java.util.List;

import static org.apache.flink.util.Preconditions.checkNotNull;

/**
Expand Down Expand Up @@ -142,66 +137,4 @@ private void setPlanParallelism(final Plan plan, final Configuration executorSer

plan.setDefaultParallelism(slotsPerTaskManager * numTaskManagers);
}

/**
* Creates a JSON representation of the given dataflow's execution plan.
*
* @param plan The dataflow plan.
* @return The dataflow's execution plan, as a JSON string.
*/
@Override
public String getOptimizerPlanAsJSON(Plan plan) {
final int parallelism = plan.getDefaultParallelism() == ExecutionConfig.PARALLELISM_DEFAULT ? 1 : plan.getDefaultParallelism();

Optimizer pc = new Optimizer(new DataStatistics(), this.baseConfiguration);
pc.setDefaultParallelism(parallelism);
OptimizedPlan op = pc.compile(plan);

return new PlanJSONDumpGenerator().getOptimizerPlanAsJSON(op);
}

// --------------------------------------------------------------------------------------------
// Static variants that internally bring up an instance and shut it down after the execution
// --------------------------------------------------------------------------------------------

/**
* Executes the given dataflow plan.
*
* @param plan The dataflow plan.
* @return The execution result.
*
* @throws Exception Thrown, if either the startup of the local execution context, or the execution
* caused an exception.
*/
public static JobExecutionResult execute(Plan plan) throws Exception {
return new LocalExecutor().executePlan(plan);
}

/**
* Creates a JSON representation of the given dataflow's execution plan.
*
* @param plan The dataflow plan.
* @return The dataflow's execution plan, as a JSON string.
* @throws Exception Thrown, if the optimization process that creates the execution plan failed.
*/
public static String optimizerPlanAsJSON(Plan plan) throws Exception {
final int parallelism = plan.getDefaultParallelism() == ExecutionConfig.PARALLELISM_DEFAULT ? 1 : plan.getDefaultParallelism();

Optimizer pc = new Optimizer(new DataStatistics(), new Configuration());
pc.setDefaultParallelism(parallelism);
OptimizedPlan op = pc.compile(plan);

return new PlanJSONDumpGenerator().getOptimizerPlanAsJSON(op);
}

/**
* Creates a JSON representation of the given dataflow plan.
*
* @param plan The dataflow plan.
* @return The dataflow plan (prior to optimization) as a JSON string.
*/
public static String getPlanAsJSON(Plan plan) {
List<DataSinkNode> sinks = Optimizer.createPreOptimizedPlan(plan);
return new PlanJSONDumpGenerator().getPactPlanAsJSON(sinks);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,6 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.optimizer.DataStatistics;
import org.apache.flink.optimizer.Optimizer;
import org.apache.flink.optimizer.costs.DefaultCostEstimator;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;

import java.net.InetSocketAddress;
import java.net.URL;
Expand Down Expand Up @@ -127,11 +122,4 @@ private JobExecutionResult executePlanWithJars(JobWithJars program) throws Excep
return client.run(program, defaultParallelism).getJobExecutionResult();
}
}

@Override
public String getOptimizerPlanAsJSON(Plan plan) {
Optimizer opt = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), new Configuration());
OptimizedPlan optPlan = opt.compile(plan);
return new PlanJSONDumpGenerator().getOptimizerPlanAsJSON(optPlan);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@
import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;

import java.net.URL;
Expand Down Expand Up @@ -74,15 +72,6 @@ public JobExecutionResult execute(String jobName) throws Exception {
return lastJobExecutionResult;
}

@Override
public String getExecutionPlan() throws Exception {
Plan plan = createProgramPlan("unnamed job");

OptimizedPlan op = ClusterClient.getOptimizedPlan(client.compiler, plan, getParallelism());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that now we can make the client.compiler field private to avoid leaking information about internal state.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And maybe other ClusterClient fields can become private as well.

PlanJSONDumpGenerator gen = new PlanJSONDumpGenerator();
return gen.getOptimizerPlanAsJSON(op);
}

private void verifyExecuteIsCalledOnceWhenInDetachedMode() {
if (alreadyCalled && detached) {
throw new InvalidProgramException(DetachedJobExecutionResult.DETACHED_MESSAGE + DetachedJobExecutionResult.EXECUTE_TWICE_MESSAGE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,6 @@ public JobExecutionResult execute(String jobName) throws Exception {
throw new ProgramAbortException();
}

@Override
public String getExecutionPlan() throws Exception {
Plan plan = createProgramPlan(null, false);
this.optimizerPlan = compiler.compile(plan);

// do not go on with anything now!
throw new ProgramAbortException();
}

public FlinkPlan getOptimizedPlan(PackagedProgram prog) throws ProgramInvocationException {

// temporarily write syserr and sysout to a byte array.
Expand Down
Loading