From d0ec35866cf7a22566a86128a05610c644a9a901 Mon Sep 17 00:00:00 2001 From: Thesharing Date: Fri, 5 Mar 2021 15:05:07 +0800 Subject: [PATCH] [FLINK-21625] Fix compilation errors due to changes related to scheduler benchmarks The related changes are located at FLINK-21580 and FLINK-21347. --- .../flink/benchmark/StreamGraphUtils.java | 4 ++-- .../scheduler/benchmark/JobConfiguration.java | 18 +++++++++--------- .../benchmark/SchedulerBenchmarkUtils.java | 2 +- .../topology/BuildExecutionGraphBenchmark.java | 4 ++-- 4 files changed, 14 insertions(+), 14 deletions(-) diff --git a/src/main/java/org/apache/flink/benchmark/StreamGraphUtils.java b/src/main/java/org/apache/flink/benchmark/StreamGraphUtils.java index d07ffd7..43d1fba 100644 --- a/src/main/java/org/apache/flink/benchmark/StreamGraphUtils.java +++ b/src/main/java/org/apache/flink/benchmark/StreamGraphUtils.java @@ -19,7 +19,7 @@ package org.apache.flink.benchmark; import org.apache.flink.benchmark.functions.LongSource; -import org.apache.flink.runtime.jobgraph.ScheduleMode; +import org.apache.flink.runtime.jobgraph.JobType; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.DiscardingSink; @@ -38,7 +38,7 @@ public static StreamGraph buildGraphForBatchJob(StreamExecutionEnvironment env, StreamGraph streamGraph = env.getStreamGraph(); streamGraph.setChaining(false); streamGraph.setGlobalDataExchangeMode(GlobalDataExchangeMode.ALL_EDGES_BLOCKING); - streamGraph.setScheduleMode(ScheduleMode.LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST); + streamGraph.setJobType(JobType.BATCH); return streamGraph; } diff --git a/src/main/java/org/apache/flink/scheduler/benchmark/JobConfiguration.java b/src/main/java/org/apache/flink/scheduler/benchmark/JobConfiguration.java index d1ae801..15220ad 100644 --- a/src/main/java/org/apache/flink/scheduler/benchmark/JobConfiguration.java +++ b/src/main/java/org/apache/flink/scheduler/benchmark/JobConfiguration.java @@ -21,38 +21,38 @@ import org.apache.flink.api.common.ExecutionMode; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; import org.apache.flink.runtime.jobgraph.DistributionPattern; -import org.apache.flink.runtime.jobgraph.ScheduleMode; +import org.apache.flink.runtime.jobgraph.JobType; /** * {@link JobConfiguration} contains the configuration of a STREAMING/BATCH job. - * It concludes {@link DistributionPattern}, {@link ResultPartitionType}, {@link ScheduleMode}, {@link ExecutionMode}. + * It concludes {@link DistributionPattern}, {@link ResultPartitionType}, {@link JobType}, {@link ExecutionMode}. */ public enum JobConfiguration { STREAMING(DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED, - ScheduleMode.EAGER, + JobType.STREAMING, ExecutionMode.PIPELINED), BATCH(DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING, - ScheduleMode.LAZY_FROM_SOURCES, + JobType.BATCH, ExecutionMode.BATCH); private final static int PARALLELISM = 4000; private final DistributionPattern distributionPattern; private final ResultPartitionType resultPartitionType; - private final ScheduleMode scheduleMode; + private final JobType jobType; private final ExecutionMode executionMode; JobConfiguration( DistributionPattern distributionPattern, ResultPartitionType resultPartitionType, - ScheduleMode scheduleMode, + JobType jobType, ExecutionMode executionMode) { this.distributionPattern = distributionPattern; this.resultPartitionType = resultPartitionType; - this.scheduleMode = scheduleMode; + this.jobType = jobType; this.executionMode = executionMode; } @@ -68,8 +68,8 @@ public ResultPartitionType getResultPartitionType() { return resultPartitionType; } - public ScheduleMode getScheduleMode() { - return scheduleMode; + public JobType getJobType() { + return jobType; } public ExecutionMode getExecutionMode() { diff --git a/src/main/java/org/apache/flink/scheduler/benchmark/SchedulerBenchmarkUtils.java b/src/main/java/org/apache/flink/scheduler/benchmark/SchedulerBenchmarkUtils.java index bba1616..56fb7d8 100644 --- a/src/main/java/org/apache/flink/scheduler/benchmark/SchedulerBenchmarkUtils.java +++ b/src/main/java/org/apache/flink/scheduler/benchmark/SchedulerBenchmarkUtils.java @@ -82,7 +82,7 @@ public static JobGraph createJobGraph( final JobGraph jobGraph = new JobGraph(jobVertices.toArray(new JobVertex[0])); - jobGraph.setScheduleMode(jobConfiguration.getScheduleMode()); + jobGraph.setJobType(jobConfiguration.getJobType()); final ExecutionConfig executionConfig = new ExecutionConfig(); executionConfig.setExecutionMode(jobConfiguration.getExecutionMode()); diff --git a/src/main/java/org/apache/flink/scheduler/benchmark/topology/BuildExecutionGraphBenchmark.java b/src/main/java/org/apache/flink/scheduler/benchmark/topology/BuildExecutionGraphBenchmark.java index 2e41424..acce988 100644 --- a/src/main/java/org/apache/flink/scheduler/benchmark/topology/BuildExecutionGraphBenchmark.java +++ b/src/main/java/org/apache/flink/scheduler/benchmark/topology/BuildExecutionGraphBenchmark.java @@ -19,7 +19,7 @@ package org.apache.flink.scheduler.benchmark.topology; import org.apache.flink.runtime.executiongraph.ExecutionGraph; -import org.apache.flink.runtime.executiongraph.TestingExecutionGraphBuilder; +import org.apache.flink.runtime.executiongraph.TestingDefaultExecutionGraphBuilder; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.scheduler.benchmark.JobConfiguration; @@ -58,7 +58,7 @@ public static void main(String[] args) throws RunnerException { public void setup() throws Exception { jobVertices = createDefaultJobVertices(jobConfiguration); final JobGraph jobGraph = createJobGraph(jobConfiguration); - executionGraph = TestingExecutionGraphBuilder.newBuilder().setJobGraph(jobGraph).build(); + executionGraph = TestingDefaultExecutionGraphBuilder.newBuilder().setJobGraph(jobGraph).build(); } @Benchmark