Skip to content

Commit

Permalink
[FLINK-21625] Fix compilation errors due to changes related to schedu…
Browse files Browse the repository at this point in the history
…ler benchmarks

The related changes are located at FLINK-21580 and FLINK-21347.
  • Loading branch information
Thesharing authored and pnowojski committed Mar 5, 2021
1 parent cf3f5f1 commit d0ec358
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 14 deletions.
Expand Up @@ -19,7 +19,7 @@
package org.apache.flink.benchmark;

import org.apache.flink.benchmark.functions.LongSource;
import org.apache.flink.runtime.jobgraph.ScheduleMode;
import org.apache.flink.runtime.jobgraph.JobType;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
Expand All @@ -38,7 +38,7 @@ public static StreamGraph buildGraphForBatchJob(StreamExecutionEnvironment env,
StreamGraph streamGraph = env.getStreamGraph();
streamGraph.setChaining(false);
streamGraph.setGlobalDataExchangeMode(GlobalDataExchangeMode.ALL_EDGES_BLOCKING);
streamGraph.setScheduleMode(ScheduleMode.LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST);
streamGraph.setJobType(JobType.BATCH);

return streamGraph;
}
Expand Down
Expand Up @@ -21,38 +21,38 @@
import org.apache.flink.api.common.ExecutionMode;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.ScheduleMode;
import org.apache.flink.runtime.jobgraph.JobType;

/**
* {@link JobConfiguration} contains the configuration of a STREAMING/BATCH job.
* It concludes {@link DistributionPattern}, {@link ResultPartitionType}, {@link ScheduleMode}, {@link ExecutionMode}.
* It concludes {@link DistributionPattern}, {@link ResultPartitionType}, {@link JobType}, {@link ExecutionMode}.
*/
public enum JobConfiguration {

STREAMING(DistributionPattern.ALL_TO_ALL,
ResultPartitionType.PIPELINED,
ScheduleMode.EAGER,
JobType.STREAMING,
ExecutionMode.PIPELINED),

BATCH(DistributionPattern.ALL_TO_ALL,
ResultPartitionType.BLOCKING,
ScheduleMode.LAZY_FROM_SOURCES,
JobType.BATCH,
ExecutionMode.BATCH);

private final static int PARALLELISM = 4000;
private final DistributionPattern distributionPattern;
private final ResultPartitionType resultPartitionType;
private final ScheduleMode scheduleMode;
private final JobType jobType;
private final ExecutionMode executionMode;

JobConfiguration(
DistributionPattern distributionPattern,
ResultPartitionType resultPartitionType,
ScheduleMode scheduleMode,
JobType jobType,
ExecutionMode executionMode) {
this.distributionPattern = distributionPattern;
this.resultPartitionType = resultPartitionType;
this.scheduleMode = scheduleMode;
this.jobType = jobType;
this.executionMode = executionMode;
}

Expand All @@ -68,8 +68,8 @@ public ResultPartitionType getResultPartitionType() {
return resultPartitionType;
}

public ScheduleMode getScheduleMode() {
return scheduleMode;
public JobType getJobType() {
return jobType;
}

public ExecutionMode getExecutionMode() {
Expand Down
Expand Up @@ -82,7 +82,7 @@ public static JobGraph createJobGraph(

final JobGraph jobGraph = new JobGraph(jobVertices.toArray(new JobVertex[0]));

jobGraph.setScheduleMode(jobConfiguration.getScheduleMode());
jobGraph.setJobType(jobConfiguration.getJobType());

final ExecutionConfig executionConfig = new ExecutionConfig();
executionConfig.setExecutionMode(jobConfiguration.getExecutionMode());
Expand Down
Expand Up @@ -19,7 +19,7 @@
package org.apache.flink.scheduler.benchmark.topology;

import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.TestingExecutionGraphBuilder;
import org.apache.flink.runtime.executiongraph.TestingDefaultExecutionGraphBuilder;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.scheduler.benchmark.JobConfiguration;
Expand Down Expand Up @@ -58,7 +58,7 @@ public static void main(String[] args) throws RunnerException {
public void setup() throws Exception {
jobVertices = createDefaultJobVertices(jobConfiguration);
final JobGraph jobGraph = createJobGraph(jobConfiguration);
executionGraph = TestingExecutionGraphBuilder.newBuilder().setJobGraph(jobGraph).build();
executionGraph = TestingDefaultExecutionGraphBuilder.newBuilder().setJobGraph(jobGraph).build();
}

@Benchmark
Expand Down

0 comments on commit d0ec358

Please sign in to comment.