Skip to content

Commit

Permalink
[FLINK-1425] [streaming] Add scheduling of all tasks at once
Browse files Browse the repository at this point in the history
This closes #330.
  • Loading branch information
uce committed Jan 26, 2015
1 parent 5bc93e8 commit ad31f61
Show file tree
Hide file tree
Showing 7 changed files with 157 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.flink.runtime.jobgraph.JobID;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.ScheduleMode;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.messages.ExecutionGraphMessages;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
Expand Down Expand Up @@ -113,6 +114,8 @@ public class ExecutionGraph implements Serializable {

private boolean allowQueuedScheduling = true;

private ScheduleMode scheduleMode = ScheduleMode.FROM_SOURCES;

public ExecutionGraph(JobID jobId, String jobName, Configuration jobConfig, FiniteDuration timeout) {
this(jobId, jobName, jobConfig, timeout, new ArrayList<BlobKey>());
}
Expand Down Expand Up @@ -311,6 +314,14 @@ public void setQueuedSchedulingAllowed(boolean allowed) {
this.allowQueuedScheduling = allowed;
}

public void setScheduleMode(ScheduleMode scheduleMode) {
this.scheduleMode = scheduleMode;
}

public ScheduleMode getScheduleMode() {
return scheduleMode;
}

// --------------------------------------------------------------------------------------------
// Actions
// --------------------------------------------------------------------------------------------
Expand All @@ -326,14 +337,30 @@ public void scheduleForExecution(Scheduler scheduler) throws JobException {

if (transitionState(JobStatus.CREATED, JobStatus.RUNNING)) {
this.scheduler = scheduler;

// initially, we simply take the ones without inputs.
// next, we implement the logic to go back from vertices that need computation
// to the ones we need to start running
for (ExecutionJobVertex ejv : this.tasks.values()) {
if (ejv.getJobVertex().isInputVertex()) {
ejv.scheduleAll(scheduler, allowQueuedScheduling);
}

switch (scheduleMode) {

case FROM_SOURCES:
// initially, we simply take the ones without inputs.
// next, we implement the logic to go back from vertices that need computation
// to the ones we need to start running
for (ExecutionJobVertex ejv : this.tasks.values()) {
if (ejv.getJobVertex().isInputVertex()) {
ejv.scheduleAll(scheduler, allowQueuedScheduling);
}
}

break;

case ALL:
for (ExecutionJobVertex ejv : getVerticesTopologically()) {
ejv.scheduleAll(scheduler, allowQueuedScheduling);
}

break;

case BACKTRACKING:
throw new JobException("BACKTRACKING is currently not supported as schedule mode.");
}
}
else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ public class JobGraph implements Serializable {

/** flag to enable queued scheduling */
private boolean allowQueuedScheduling;

private ScheduleMode scheduleMode = ScheduleMode.FROM_SOURCES;

// --------------------------------------------------------------------------------------------

Expand Down Expand Up @@ -198,6 +200,14 @@ public boolean getAllowQueuedScheduling() {
return allowQueuedScheduling;
}

public void setScheduleMode(ScheduleMode scheduleMode) {
this.scheduleMode = scheduleMode;
}

public ScheduleMode getScheduleMode() {
return scheduleMode;
}

/**
* Adds a new task vertex to the job graph if it is not already included.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.jobgraph;

public enum ScheduleMode {

/**
* Schedule tasks from sources to sinks with lazy deployment of receiving tasks.
*/
FROM_SOURCES,

BACKTRACKING,

/**
* Schedule tasks all at once instead of lazy deployment of receiving tasks.
*/
ALL

}
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ class JobManager(val configuration: Configuration)
jobGraph.getJobID, jobGraph.getName)
}

executionGraph.setScheduleMode(jobGraph.getScheduleMode)
executionGraph.setQueuedSchedulingAllowed(jobGraph.getAllowQueuedScheduling)

// get notified about job status changes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@ import Tasks._
import akka.actor.ActorSystem
import akka.testkit.{ImplicitSender, TestKit}
import org.apache.flink.runtime.akka.AkkaUtils
import org.apache.flink.runtime.jobgraph.{AbstractJobVertex, DistributionPattern, JobGraph}
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException
import org.apache.flink.runtime.jobgraph.{AbstractJobVertex, DistributionPattern, JobGraph, ScheduleMode}
import org.apache.flink.runtime.messages.JobManagerMessages._
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhenJobRemoved
import org.apache.flink.runtime.testingUtils.TestingUtils
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
import scheduler.{NoResourceAvailableException, SlotSharingGroup}

import scala.concurrent.duration._

Expand Down Expand Up @@ -277,6 +277,51 @@ WordSpecLike with Matchers with BeforeAndAfterAll {
}
}

"support scheduling all at once" in {
val num_tasks = 16
val sender = new AbstractJobVertex("Sender")
val forwarder = new AbstractJobVertex("Forwarder")
val receiver = new AbstractJobVertex("Receiver")

sender.setInvokableClass(classOf[Sender])
forwarder.setInvokableClass(classOf[Forwarder])
receiver.setInvokableClass(classOf[AgnosticReceiver])

sender.setParallelism(num_tasks)
forwarder.setParallelism(num_tasks)
receiver.setParallelism(num_tasks)

val sharingGroup = new SlotSharingGroup(sender.getID, receiver.getID)
sender.setSlotSharingGroup(sharingGroup)
forwarder.setSlotSharingGroup(sharingGroup)
receiver.setSlotSharingGroup(sharingGroup)

forwarder.connectNewDataSetAsInput(sender, DistributionPattern.ALL_TO_ALL)
receiver.connectNewDataSetAsInput(forwarder, DistributionPattern.ALL_TO_ALL)

val jobGraph = new JobGraph("Forwarding Job", sender, forwarder, receiver)

jobGraph.setScheduleMode(ScheduleMode.ALL);

val cluster = TestingUtils.startTestingCluster(num_tasks, 1)
val jm = cluster.getJobManager

try {
within(TestingUtils.TESTING_DURATION) {
jm ! SubmitJob(jobGraph)

expectMsg(SubmissionSuccess(jobGraph.getJobID))

expectMsgType[JobResultSuccess]

jm ! NotifyWhenJobRemoved(jobGraph.getJobID)
expectMsg(true)
}
} finally {
cluster.stop()
}
}

"handle job with a failing sender vertex" in {
val num_tasks = 100
val sender = new AbstractJobVertex("Sender")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,33 @@ object Tasks {
}
}

class Forwarder extends AbstractInvokable {
var reader: RecordReader[IntegerRecord] = _
var writer: RecordWriter[IntegerRecord] = _
override def registerInputOutput(): Unit = {
reader = new RecordReader[IntegerRecord](getEnvironment.getReader(0), classOf[IntegerRecord])
writer = new RecordWriter[IntegerRecord](getEnvironment.getWriter(0))
}

override def invoke(): Unit = {
try {
while (true) {
val record = reader.next();

if (record == null) {
return;
}

writer.emit(record);
}

writer.flush()
} finally {
writer.clearBuffers()
}
}
}

class Receiver extends AbstractInvokable {
var reader: RecordReader[IntegerRecord] = _

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,11 @@ import org.apache.flink.runtime.ActorLogMessages
import org.apache.flink.runtime.execution.ExecutionState
import org.apache.flink.runtime.jobgraph.JobID
import org.apache.flink.runtime.jobmanager.{JobManager, MemoryArchivist}
import org.apache.flink.runtime.messages.ExecutionGraphMessages.{JobStatusChanged,
ExecutionStateChanged}
import org.apache.flink.runtime.messages.ExecutionGraphMessages.ExecutionStateChanged
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages._
import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.NotifyWhenTaskRemoved

import scala.collection.convert.WrapAsScala
import scala.concurrent.{Await, Future}
import scala.concurrent.Future


trait TestingJobManager extends ActorLogMessages with WrapAsScala {
Expand Down

0 comments on commit ad31f61

Please sign in to comment.