Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-1425] [streaming] Add scheduling of all tasks at once #330

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.flink.runtime.jobgraph.JobID;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.ScheduleMode;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.messages.ExecutionGraphMessages;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
Expand Down Expand Up @@ -110,6 +111,8 @@ public class ExecutionGraph implements Serializable {

private boolean allowQueuedScheduling = true;

private ScheduleMode scheduleMode = ScheduleMode.FROM_SOURCES;

public ExecutionGraph(JobID jobId, String jobName, Configuration jobConfig) {
this(jobId, jobName, jobConfig, new ArrayList<BlobKey>());
}
Expand Down Expand Up @@ -305,6 +308,14 @@ public void setQueuedSchedulingAllowed(boolean allowed) {
this.allowQueuedScheduling = allowed;
}

public void setScheduleMode(ScheduleMode scheduleMode) {
this.scheduleMode = scheduleMode;
}

public ScheduleMode getScheduleMode() {
return scheduleMode;
}

// --------------------------------------------------------------------------------------------
// Actions
// --------------------------------------------------------------------------------------------
Expand All @@ -320,14 +331,30 @@ public void scheduleForExecution(Scheduler scheduler) throws JobException {

if (transitionState(JobStatus.CREATED, JobStatus.RUNNING)) {
this.scheduler = scheduler;

// initially, we simply take the ones without inputs.
// next, we implement the logic to go back from vertices that need computation
// to the ones we need to start running
for (ExecutionJobVertex ejv : this.tasks.values()) {
if (ejv.getJobVertex().isInputVertex()) {
ejv.scheduleAll(scheduler, allowQueuedScheduling);
}

switch (scheduleMode) {

case FROM_SOURCES:
// initially, we simply take the ones without inputs.
// next, we implement the logic to go back from vertices that need computation
// to the ones we need to start running
for (ExecutionJobVertex ejv : this.tasks.values()) {
if (ejv.getJobVertex().isInputVertex()) {
ejv.scheduleAll(scheduler, allowQueuedScheduling);
}
}

break;

case ALL:
for (ExecutionJobVertex ejv : getVerticesTopologically()) {
ejv.scheduleAll(scheduler, allowQueuedScheduling);
}

break;

case BACKTRACKING:
throw new JobException("BACKTRACKING is currently not supported as schedule mode.");
}
}
else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ public class JobGraph implements Serializable {

/** flag to enable queued scheduling */
private boolean allowQueuedScheduling;

private ScheduleMode scheduleMode = ScheduleMode.FROM_SOURCES;

// --------------------------------------------------------------------------------------------

Expand Down Expand Up @@ -198,6 +200,14 @@ public boolean getAllowQueuedScheduling() {
return allowQueuedScheduling;
}

public void setScheduleMode(ScheduleMode scheduleMode) {
this.scheduleMode = scheduleMode;
}

public ScheduleMode getScheduleMode() {
return scheduleMode;
}

/**
* Adds a new task vertex to the job graph if it is not already included.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.jobgraph;

public enum ScheduleMode {

/**
* Schedule tasks from sources to sinks with lazy deployment of receiving tasks.
*/
FROM_SOURCES,

BACKTRACKING,

/**
* Schedule tasks all at once instead of lazy deployment of receiving tasks.
*/
ALL

}
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ class JobManager(val configuration: Configuration)
jobGraph.getJobID, jobGraph.getName)
}

executionGraph.setScheduleMode(jobGraph.getScheduleMode)
executionGraph.setQueuedSchedulingAllowed(jobGraph.getAllowQueuedScheduling)

// get notified about job status changes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,19 @@

package org.apache.flink.runtime.jobmanager

import Tasks._
import akka.actor.ActorSystem
import akka.testkit.{ImplicitSender, TestKit}
import org.apache.flink.runtime.akka.AkkaUtils
import org.apache.flink.runtime.jobgraph.{DistributionPattern, JobGraph,
AbstractJobVertex}
import Tasks._
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhenJobRemoved
import org.apache.flink.runtime.testingUtils.{TestingUtils}
import org.apache.flink.runtime.jobgraph.{AbstractJobVertex, DistributionPattern, JobGraph, ScheduleMode}
import org.apache.flink.runtime.messages.JobManagerMessages._
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.{AllVerticesRunning, NotifyWhenJobRemoved, WaitForAllVerticesToBeRunningOrFinished}
import org.apache.flink.runtime.testingUtils.TestingUtils
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
import org.scalatest.{Matchers, WordSpecLike, BeforeAndAfterAll}
import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
import scheduler.{NoResourceAvailableException, SlotSharingGroup}

import scala.concurrent.duration._

@RunWith(classOf[JUnitRunner])
Expand Down Expand Up @@ -273,6 +273,51 @@ WordSpecLike with Matchers with BeforeAndAfterAll {
}
}

"support scheduling all at once" in {
val num_tasks = 16
val sender = new AbstractJobVertex("Sender")
val forwarder = new AbstractJobVertex("Forwarder")
val receiver = new AbstractJobVertex("Receiver")

sender.setInvokableClass(classOf[Sender])
forwarder.setInvokableClass(classOf[Forwarder])
receiver.setInvokableClass(classOf[AgnosticReceiver])

sender.setParallelism(num_tasks)
forwarder.setParallelism(num_tasks)
receiver.setParallelism(num_tasks)

val sharingGroup = new SlotSharingGroup(sender.getID, receiver.getID)
sender.setSlotSharingGroup(sharingGroup)
forwarder.setSlotSharingGroup(sharingGroup)
receiver.setSlotSharingGroup(sharingGroup)

forwarder.connectNewDataSetAsInput(sender, DistributionPattern.ALL_TO_ALL)
receiver.connectNewDataSetAsInput(forwarder, DistributionPattern.ALL_TO_ALL)

val jobGraph = new JobGraph("Forwarding Job", sender, forwarder, receiver)

jobGraph.setScheduleMode(ScheduleMode.ALL);

val cluster = TestingUtils.startTestingCluster(num_tasks, 1)
val jm = cluster.getJobManager

try {
within(TestingUtils.TESTING_DURATION) {
jm ! SubmitJob(jobGraph)

expectMsg(SubmissionSuccess(jobGraph.getJobID))

expectMsgType[JobResultSuccess]

jm ! NotifyWhenJobRemoved(jobGraph.getJobID)
expectMsg(true)
}
} finally {
cluster.stop()
}
}

"handle job with a failing sender vertex" in {
val num_tasks = 100
val sender = new AbstractJobVertex("Sender")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,33 @@ object Tasks {
}
}

class Forwarder extends AbstractInvokable {
var reader: RecordReader[IntegerRecord] = _
var writer: RecordWriter[IntegerRecord] = _
override def registerInputOutput(): Unit = {
reader = new RecordReader[IntegerRecord](getEnvironment.getReader(0), classOf[IntegerRecord])
writer = new RecordWriter[IntegerRecord](getEnvironment.getWriter(0))
}

override def invoke(): Unit = {
try {
while (true) {
val record = reader.next();

if (record == null) {
return;
}

writer.emit(record);
}

writer.flush()
} finally {
writer.clearBuffers()
}
}
}

class Receiver extends AbstractInvokable {
var reader: RecordReader[IntegerRecord] = _

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,11 @@ import org.apache.flink.runtime.ActorLogMessages
import org.apache.flink.runtime.execution.ExecutionState
import org.apache.flink.runtime.jobgraph.JobID
import org.apache.flink.runtime.jobmanager.{JobManager, MemoryArchivist}
import org.apache.flink.runtime.messages.ExecutionGraphMessages.{JobStatusChanged,
ExecutionStateChanged}
import org.apache.flink.runtime.messages.ExecutionGraphMessages.ExecutionStateChanged
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages._
import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.NotifyWhenTaskRemoved

import scala.collection.convert.WrapAsScala
import scala.concurrent.{Await, Future}
import scala.concurrent.Future


trait TestingJobManager extends ActorLogMessages with WrapAsScala {
Expand Down