Skip to content

Commit

Permalink
Fix some rebase/merge conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
uce committed Oct 8, 2015
1 parent e54a86c commit aad3799
Show file tree
Hide file tree
Showing 7 changed files with 103 additions and 106 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ package org.apache.flink.runtime.jobmanager
import java.io.{File, IOException}
import java.lang.reflect.{Constructor, InvocationTargetException}
import java.net.InetSocketAddress
import java.util.{Collections, UUID}
import java.util.UUID

import akka.actor.Status.Failure
import akka.actor._
Expand All @@ -33,7 +33,7 @@ import org.apache.flink.core.io.InputSplitAssigner
import org.apache.flink.runtime.accumulators.AccumulatorSnapshot
import org.apache.flink.runtime.akka.{AkkaUtils, ListeningBehaviour}
import org.apache.flink.runtime.blob.BlobServer
import org.apache.flink.runtime.checkpoint.{ZooKeeperCheckpointRecoveryFactory, StandaloneCheckpointRecoveryFactory, CheckpointRecoveryFactory}
import org.apache.flink.runtime.checkpoint.{CheckpointRecoveryFactory, StandaloneCheckpointRecoveryFactory, ZooKeeperCheckpointRecoveryFactory}
import org.apache.flink.runtime.client._
import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
import org.apache.flink.runtime.executiongraph.{ExecutionGraph, ExecutionJobVertex}
Expand All @@ -43,7 +43,7 @@ import org.apache.flink.runtime.jobgraph.{JobGraph, JobStatus, JobVertexID}
import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore.SubmittedJobGraphListener
import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => FlinkScheduler}
import org.apache.flink.runtime.jobmanager.web.WebInfoServer
import org.apache.flink.runtime.leaderelection.{StandaloneLeaderElectionService, LeaderContender, LeaderElectionService}
import org.apache.flink.runtime.leaderelection.{LeaderContender, LeaderElectionService, StandaloneLeaderElectionService}
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService
import org.apache.flink.runtime.messages.ArchiveMessages.ArchiveExecutionGraph
import org.apache.flink.runtime.messages.ExecutionGraphMessages.JobStatusChanged
Expand All @@ -60,17 +60,16 @@ import org.apache.flink.runtime.security.SecurityUtils
import org.apache.flink.runtime.security.SecurityUtils.FlinkSecuredRunner
import org.apache.flink.runtime.taskmanager.TaskManager
import org.apache.flink.runtime.util._
import org.apache.flink.runtime.webmonitor.{WebMonitorUtils, WebMonitor}
import org.apache.flink.runtime.webmonitor.{WebMonitor, WebMonitorUtils}
import org.apache.flink.runtime.{FlinkActor, LeaderSessionMessageFilter, LogMessages, StreamingMode}
import org.apache.flink.util.{ExceptionUtils, InstantiationUtil, SerializedValue}
import org.apache.flink.util.{ExceptionUtils, InstantiationUtil, NetUtils}

import scala.collection.JavaConverters._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent._
import scala.concurrent.duration._
import scala.concurrent.forkjoin.ForkJoinPool
import scala.language.postfixOps
import scala.collection.JavaConverters._
import scala.concurrent.ExecutionContext.Implicits.global


/**
Expand Down Expand Up @@ -1777,23 +1776,28 @@ object JobManager {
}
}

val leaderElectionService = leaderElectionServiceOption match {
case Some(les) => les
case None => LeaderElectionUtils.createLeaderElectionService(configuration)
}

// Create recovery related components
val (leaderElectionService, submittedJobGraphs, checkpointRecoveryFactory) =
RecoveryMode.fromConfig(configuration) match {
case RecoveryMode.STANDALONE =>
(new StandaloneLeaderElectionService(),
val leaderElectionService = leaderElectionServiceOption match {
case Some(les) => les
case None => new StandaloneLeaderElectionService()
}

(leaderElectionService,
new StandaloneSubmittedJobGraphStore(),
new StandaloneCheckpointRecoveryFactory())

case RecoveryMode.ZOOKEEPER =>
val client = ZooKeeperUtils.startCuratorFramework(configuration)

(ZooKeeperUtils.createLeaderElectionService(client, configuration),
val leaderElectionService = leaderElectionServiceOption match {
case Some(les) => les
case None => ZooKeeperUtils.createLeaderElectionService(client, configuration)
}

(leaderElectionService,
ZooKeeperUtils.createSubmittedJobGraphs(client, configuration),
new ZooKeeperCheckpointRecoveryFactory(client, configuration))
}
Expand Down Expand Up @@ -1840,6 +1844,7 @@ object JobManager {
jobManagerClass,
archiveClass)
}

/**
* Starts the JobManager and job archiver based on the given configuration, in the
* given actor system.
Expand All @@ -1853,30 +1858,34 @@ object JobManager {
* @param streamingMode The mode to run the system in (streaming vs. batch-only)
* @param jobManagerClass The class of the JobManager to be started
* @param archiveClass The class of the MemoryArchivist to be started
*
*
* @return A tuple of references (JobManager Ref, Archiver Ref)
*/
def startJobManagerActors(
configuration: Configuration,
actorSystem: ActorSystem,
jobMangerActorName: Option[String],
archiveActorName: Option[String],
streamingMode: StreamingMode,
jobManagerClass: Class[_ <: JobManager],
archiveClass: Class[_ <: MemoryArchivist])
: (ActorRef, ActorRef) = {
configuration: Configuration,
actorSystem: ActorSystem,
jobMangerActorName: Option[String],
archiveActorName: Option[String],
streamingMode: StreamingMode,
jobManagerClass: Class[_ <: JobManager],
archiveClass: Class[_ <: MemoryArchivist])
: (ActorRef, ActorRef) = {

val (executionContext,
instanceManager,
scheduler,
libraryCacheManager,
executionRetries,
delayBetweenRetries,
timeout,
_,
leaderElectionService,
submittedJobGraphs,
checkpointRecoveryFactory) = createJobManagerComponents(configuration, None)
instanceManager,
scheduler,
libraryCacheManager,
executionRetries,
delayBetweenRetries,
timeout,
archiveCount,
leaderElectionService,
submittedJobGraphs,
checkpointRecoveryFactory) = createJobManagerComponents(
configuration,
None)

val archiveProps = Props(archiveClass, archiveCount)

// start the archiver with the given name, or without (avoid name conflicts)
val archive: ActorRef = archiveActorName match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.net.NetUtils;
import org.apache.flink.util.NetUtils;
import org.junit.Ignore;
import org.junit.Test;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.jobmanager.JobManager;
import org.apache.flink.runtime.jobmanager.JobManagerMode;
import org.apache.flink.runtime.net.NetUtils;
import org.apache.flink.util.NetUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,15 @@ package org.apache.flink.runtime.testingUtils

import java.util.concurrent.TimeoutException

import akka.actor.{ActorRef, ActorSystem, Props}
import akka.pattern.ask
import akka.actor.{ActorRef, Props, ActorSystem}
import akka.testkit.CallingThreadDispatcher
import org.apache.flink.configuration.{ConfigConstants, Configuration}
import org.apache.flink.runtime.StreamingMode
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory
import org.apache.flink.runtime.jobmanager.{StandaloneSubmittedJobGraphStore, JobManager}
import org.apache.flink.runtime.jobmanager.JobManager
import org.apache.flink.runtime.leaderelection.LeaderElectionService
import org.apache.flink.runtime.minicluster.FlinkMiniCluster
import org.apache.flink.runtime.net.NetUtils
import org.apache.flink.util.NetUtils
import org.apache.flink.runtime.taskmanager.TaskManager
import org.apache.flink.runtime.testingUtils.TestingMessages.Alive

Expand All @@ -44,28 +43,28 @@ import scala.concurrent.{Await, Future}
* otherwise false
*/
class TestingCluster(
userConfiguration: Configuration,
singleActorSystem: Boolean,
synchronousDispatcher: Boolean,
streamingMode: StreamingMode)
userConfiguration: Configuration,
singleActorSystem: Boolean,
synchronousDispatcher: Boolean,
streamingMode: StreamingMode)
extends FlinkMiniCluster(
userConfiguration,
singleActorSystem,
streamingMode) {


def this(userConfiguration: Configuration,
singleActorSystem: Boolean,
synchronousDispatcher: Boolean)
= this(userConfiguration, singleActorSystem, synchronousDispatcher, StreamingMode.BATCH_ONLY)
= this(userConfiguration, singleActorSystem, synchronousDispatcher, StreamingMode.BATCH_ONLY)

def this(userConfiguration: Configuration, singleActorSystem: Boolean)
= this(userConfiguration, singleActorSystem, false)
= this(userConfiguration, singleActorSystem, false)

def this(userConfiguration: Configuration) = this(userConfiguration, true, false)

// --------------------------------------------------------------------------

override def generateConfiguration(userConfig: Configuration): Configuration = {
val cfg = new Configuration()
cfg.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost")
Expand Down Expand Up @@ -101,18 +100,18 @@ class TestingCluster(
}

val (executionContext,
instanceManager,
scheduler,
libraryCacheManager,
executionRetries,
delayBetweenRetries,
timeout,
archiveCount,
leaderElectionService,
submittedJobGraphs,
checkpointRecoveryFactory) = JobManager.createJobManagerComponents(
config,
createLeaderElectionService())
instanceManager,
scheduler,
libraryCacheManager,
executionRetries,
delayBetweenRetries,
timeout,
archiveCount,
leaderElectionService,
submittedJobsGraphs,
checkpointRecoveryFactory) = JobManager.createJobManagerComponents(
config,
createLeaderElectionService())

val testArchiveProps = Props(new TestingMemoryArchivist(archiveCount))
val archive = actorSystem.actorOf(testArchiveProps, archiveName)
Expand All @@ -130,7 +129,7 @@ class TestingCluster(
timeout,
streamingMode,
leaderElectionService,
submittedJobGraphs,
submittedJobsGraphs,
checkpointRecoveryFactory))

val dispatcherJobManagerProps = if (synchronousDispatcher) {
Expand Down Expand Up @@ -196,4 +195,4 @@ class TestingCluster(

Await.ready(combinedFuture, timeout)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,32 +18,18 @@

package org.apache.flink.runtime.testingUtils

import akka.actor.{Cancellable, Terminated, ActorRef}
import akka.pattern.pipe
import akka.pattern.ask
import org.apache.flink.api.common.JobID
import akka.actor.ActorRef
import org.apache.flink.configuration.Configuration
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory
import org.apache.flink.runtime.leaderelection.LeaderElectionService
import org.apache.flink.runtime.StreamingMode
import org.apache.flink.runtime.execution.ExecutionState
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory
import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
import org.apache.flink.runtime.instance.InstanceManager
import org.apache.flink.runtime.jobgraph.JobStatus
import org.apache.flink.runtime.jobmanager.{SubmittedJobGraphs, JobManager}
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler
import org.apache.flink.runtime.messages.ExecutionGraphMessages.JobStatusChanged
import org.apache.flink.runtime.messages.JobManagerMessages.GrantLeadership
import org.apache.flink.runtime.messages.Messages.{Acknowledge, Disconnect}
import org.apache.flink.runtime.messages.TaskManagerMessages.Heartbeat
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages._
import org.apache.flink.runtime.testingUtils.TestingMessages.{CheckIfJobRemoved, Alive,
DisableDisconnect}
import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.AccumulatorsChanged
import org.apache.flink.runtime.jobmanager.{JobManager, SubmittedJobGraphStore}
import org.apache.flink.runtime.leaderelection.LeaderElectionService

import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._

import scala.language.postfixOps

/** JobManager implementation extended by testing messages
Expand Down Expand Up @@ -71,7 +57,7 @@ class TestingJobManager(
timeout: FiniteDuration,
mode: StreamingMode,
leaderElectionService: LeaderElectionService,
submittedJobGraphs : SubmittedJobGraphs,
submittedJobGraphs : SubmittedJobGraphStore,
checkpointRecoveryFactory : CheckpointRecoveryFactory)
extends JobManager(
flinkConfiguration,
Expand All @@ -85,6 +71,6 @@ class TestingJobManager(
timeout,
mode,
leaderElectionService,
submittedJobGraphs,
checkpointRecoveryFactory)
submittedJobGraphs,
checkpointRecoveryFactory)
with TestingJobManagerLike {}
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,20 @@ package org.apache.flink.test.util

import java.util.concurrent.TimeoutException

import akka.pattern.ask
import akka.actor.{Props, ActorRef, ActorSystem}
import akka.actor.{ActorRef, ActorSystem}
import akka.pattern.Patterns._
import akka.pattern.ask
import org.apache.curator.test.TestingCluster
import org.apache.flink.configuration.{ConfigConstants, Configuration}
import org.apache.flink.runtime.StreamingMode
import org.apache.flink.runtime.akka.AkkaUtils
import org.apache.flink.runtime.jobmanager.{RecoveryMode, JobManager}
import org.apache.flink.runtime.jobmanager.{JobManager, RecoveryMode}
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster
import org.apache.flink.runtime.taskmanager.TaskManager
import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages
.NotifyWhenRegisteredAtJobManager
import org.apache.flink.runtime.testingUtils.{TestingUtils, TestingTaskManager,
TestingJobManager, TestingMemoryArchivist}
import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager
import org.apache.flink.runtime.testingUtils.{TestingJobManager, TestingMemoryArchivist, TestingTaskManager, TestingUtils}

import scala.concurrent.{Future, Await}
import scala.concurrent.{Await, Future}

/**
* A forkable mini cluster is a special case of the mini cluster, used for parallel test execution
Expand All @@ -47,20 +45,20 @@ import scala.concurrent.{Future, Await}
* same [[ActorSystem]], otherwise false.
*/
class ForkableFlinkMiniCluster(
userConfiguration: Configuration,
singleActorSystem: Boolean,
streamingMode: StreamingMode)
userConfiguration: Configuration,
singleActorSystem: Boolean,
streamingMode: StreamingMode)
extends LocalFlinkMiniCluster(userConfiguration, singleActorSystem, streamingMode) {

def this(userConfiguration: Configuration, singleActorSystem: Boolean)
= this(userConfiguration, singleActorSystem, StreamingMode.BATCH_ONLY)
def this(userConfiguration: Configuration, singleActorSystem: Boolean)
= this(userConfiguration, singleActorSystem, StreamingMode.BATCH_ONLY)

def this(userConfiguration: Configuration) = this(userConfiguration, true)

// --------------------------------------------------------------------------

var zookeeperCluster: Option[TestingCluster] = None

override def generateConfiguration(userConfiguration: Configuration): Configuration = {
val forNumberString = System.getProperty("forkNumber")

Expand Down Expand Up @@ -264,10 +262,10 @@ object ForkableFlinkMiniCluster {
import org.apache.flink.runtime.testingUtils.TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT

def startCluster(
numSlots: Int,
numTaskManagers: Int,
timeout: String = DEFAULT_AKKA_ASK_TIMEOUT)
: ForkableFlinkMiniCluster = {
numSlots: Int,
numTaskManagers: Int,
timeout: String = DEFAULT_AKKA_ASK_TIMEOUT)
: ForkableFlinkMiniCluster = {

val config = new Configuration()
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots)
Expand Down

0 comments on commit aad3799

Please sign in to comment.