Skip to content

Commit

Permalink
[FLINK-1415] [runtime] Akka cleanups
Browse files Browse the repository at this point in the history
Replace akka.jobmanager.url by non exposed mechanism. Add heuristics to calculate different timeouts based on a single value.

Harmonize scala coding style: Remove redundant braces and parentheses, remove meaningless code statements, standardize access patterns, name boolean parameters, unnecessary semicolons, unnecessary braces in import section

Adds death watch test cases: Test if JobManager detects failing TaskManager. Test if the TaskManager detects failing JobManager and tries to reconnect to the JobManager.

Refactors notifyExecutionStateChange method to avoid access of the TaskManagers internal state from outside

This closes #319.
  • Loading branch information
tillrohrmann committed Feb 5, 2015
1 parent 6372063 commit 4046819
Show file tree
Hide file tree
Showing 105 changed files with 1,526 additions and 1,098 deletions.
Expand Up @@ -33,7 +33,6 @@
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
Expand Down Expand Up @@ -796,8 +795,8 @@ protected ActorRef getJobManager(CommandLine line) throws IOException {
}

return JobManager.getJobManager(RemoteExecutor.getInetFromHostport(jobManagerAddressStr),
ActorSystem.create("CliFrontendActorSystem", AkkaUtils
.getDefaultActorSystemConfig()),getAkkaTimeout());
ActorSystem.create("CliFrontendActorSystem",
AkkaUtils.getDefaultAkkaConfig()),getAkkaTimeout());
}


Expand Down Expand Up @@ -867,8 +866,7 @@ protected Configuration getGlobalConfiguration() {
protected FiniteDuration getAkkaTimeout(){
Configuration config = getGlobalConfiguration();

return new FiniteDuration(config.getInteger(ConfigConstants.AKKA_ASK_TIMEOUT,
ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT), TimeUnit.SECONDS);
return AkkaUtils.getTimeout(config);
}

public static List<Tuple2<String, String>> getDynamicProperties(String dynamicPropertiesEncoded) {
Expand Down
Expand Up @@ -24,8 +24,8 @@
import java.io.PrintStream;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.concurrent.TimeUnit;

import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.java.ExecutionEnvironment;
Expand Down Expand Up @@ -305,14 +305,14 @@ public JobExecutionResult run(OptimizedPlan compiledPlan, List<File> libraries,
}

public JobExecutionResult run(JobGraph jobGraph, boolean wait) throws ProgramInvocationException {
Tuple2<ActorSystem, ActorRef> pair = JobClient.startActorSystemAndActor(configuration);
Tuple2<ActorSystem, ActorRef> pair = JobClient.startActorSystemAndActor(configuration,
false);

ActorRef client = pair._2();

String hostname = configuration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);

FiniteDuration timeout = new FiniteDuration(configuration.getInteger(ConfigConstants
.AKKA_ASK_TIMEOUT, ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT), TimeUnit.SECONDS);
FiniteDuration timeout = AkkaUtils.getTimeout(configuration);

if(hostname == null){
throw new ProgramInvocationException("Could not find hostname of job manager.");
Expand Down
Expand Up @@ -23,7 +23,6 @@
import java.io.PrintWriter;
import java.net.InetSocketAddress;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;

import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
Expand Down Expand Up @@ -59,9 +58,8 @@ public class JobsInfoServlet extends HttpServlet {
public JobsInfoServlet(Configuration flinkConfig) {
this.config = flinkConfig;
system = ActorSystem.create("JobsInfoServletActorSystem",
AkkaUtils.getDefaultActorSystemConfig());
this.timeout = new FiniteDuration(flinkConfig.getInteger(ConfigConstants
.AKKA_ASK_TIMEOUT, ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT), TimeUnit.SECONDS);
AkkaUtils.getDefaultAkkaConfig());
this.timeout = AkkaUtils.getTimeout(flinkConfig);
}

@Override
Expand Down
Expand Up @@ -84,6 +84,7 @@ public void setUp() throws Exception {

when(configMock.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null)).thenReturn("localhost");
when(configMock.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT)).thenReturn(6123);
when(configMock.getString(ConfigConstants.AKKA_ASK_TIMEOUT, ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT)).thenReturn(ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT);

when(planMock.getJobName()).thenReturn("MockPlan");
// when(mockJarFile.getAbsolutePath()).thenReturn("mockFilePath");
Expand All @@ -99,7 +100,7 @@ public void setUp() throws Exception {

Whitebox.setInternalState(JobClient$.class, mockJobClient);

when(mockJobClient.startActorSystemAndActor(configMock)).thenReturn(new Tuple2<ActorSystem,
when(mockJobClient.startActorSystemAndActor(configMock, false)).thenReturn(new Tuple2<ActorSystem,
ActorRef>(mockSystem, mockJobClientActor));
}

Expand Down
Expand Up @@ -65,11 +65,6 @@ public final class ConfigConstants {
* for communication with the job manager.
*/
public static final String JOB_MANAGER_IPC_PORT_KEY = "jobmanager.rpc.port";

/**
* The config parameter defining the akka url of the job manager
*/
public static final String JOB_MANAGER_AKKA_URL = "jobmanager.akka.url";

/**
* The config parameter defining the number of handler threads for the jobmanager RPC service.
Expand Down Expand Up @@ -594,29 +589,21 @@ public final class ConfigConstants {

// ------------------------------ Akka Values ------------------------------

public static String DEFAULT_AKKA_STARTUP_TIMEOUT = "60 s";

public static String DEFAULT_AKKA_TRANSPORT_HEARTBEAT_INTERVAL = "1000 s";

public static String DEFAULT_AKKA_TRANSPORT_HEARTBEAT_PAUSE = "6000 s";

public static double DEFAULT_AKKA_TRANSPORT_THRESHOLD = 300.0;

public static String DEFAULT_AKKA_WATCH_HEARTBEAT_INTERVAL = "5000 ms";

public static String DEFAULT_AKKA_WATCH_HEARTBEAT_PAUSE = "1 m";

public static double DEFAULT_AKKA_WATCH_THRESHOLD = 12;

public static String DEFAULT_AKKA_TCP_TIMEOUT = "100 s";

public static int DEFAULT_AKKA_DISPATCHER_THROUGHPUT = 15;

public static boolean DEFAULT_AKKA_LOG_LIFECYCLE_EVENTS = false;

public static String DEFAULT_AKKA_FRAMESIZE = "10485760b";

public static int DEFAULT_AKKA_ASK_TIMEOUT = 100;
public static String DEFAULT_AKKA_ASK_TIMEOUT = "100 s";


// ----------------------------- LocalExecution ----------------------------
Expand Down
Expand Up @@ -112,10 +112,13 @@ object KMeans {
centersPath = programArguments(1)
outputPath = programArguments(2)
numIterations = Integer.parseInt(programArguments(3))

true
}
else {
System.err.println("Usage: KMeans <points path> <centers path> <result path> <num " +
"iterations>")

false
}
}
Expand All @@ -128,8 +131,9 @@ object KMeans {
"program.")
System.out.println(" Usage: KMeans <points path> <centers path> <result path> <num " +
"iterations>")

true
}
true
}

private def getPointDataSet(env: ExecutionEnvironment): DataSet[Point] = {
Expand Down
Expand Up @@ -113,18 +113,22 @@ object ConnectedComponents {
edgesPath = args(1)
outputPath = args(2)
maxIterations = args(3).toInt

true
} else {
System.err.println("Usage: ConnectedComponents <vertices path> <edges path> <result path>" +
" <max number of iterations>")

false
}
} else {
System.out.println("Executing Connected Components example with built-in default data.")
System.out.println(" Provide parameters to read input data from a file.")
System.out.println(" Usage: ConnectedComponents <vertices path> <edges path> <result path>" +
" <max number of iterations>")

true
}
true
}

private def getVerticesDataSet(env: ExecutionEnvironment): DataSet[Long] = {
Expand Down
Expand Up @@ -148,17 +148,21 @@ object EnumTrianglesBasic {
if (args.length == 2) {
edgePath = args(0)
outputPath = args(1)

true
} else {
System.err.println("Usage: EnumTriangleBasic <edge path> <result path>")

false
}
} else {
System.out.println("Executing Enum Triangles Basic example with built-in default data.")
System.out.println(" Provide parameters to read input data from files.")
System.out.println(" See the documentation for the correct format of input files.")
System.out.println(" Usage: EnumTriangleBasic <edge path> <result path>")

true
}
true
}

private def getEdgeDataSet(env: ExecutionEnvironment): DataSet[Edge] = {
Expand Down
Expand Up @@ -215,17 +215,21 @@ object EnumTrianglesOpt {
if (args.length == 2) {
edgePath = args(0)
outputPath = args(1)

true
} else {
System.err.println("Usage: EnumTriangleOpt <edge path> <result path>")

false
}
} else {
System.out.println("Executing Enum Triangles Optimized example with built-in default data.")
System.out.println(" Provide parameters to read input data from files.")
System.out.println(" See the documentation for the correct format of input files.")
System.out.println(" Usage: EnumTriangleBasic <edge path> <result path>")

true
}
true
}

private def getEdgeDataSet(env: ExecutionEnvironment): DataSet[Edge] = {
Expand Down
Expand Up @@ -159,9 +159,12 @@ object PageRankBasic {
outputPath = args(2)
numPages = args(3).toLong
maxIterations = args(4).toInt

true
} else {
System.err.println("Usage: PageRankBasic <pages path> <links path> <output path> <num " +
"pages> <num iterations>")

false
}
} else {
Expand All @@ -173,8 +176,9 @@ object PageRankBasic {
"pages> <num iterations>")

numPages = PageRankData.getNumberOfPages

true
}
true
}

private def getPagesDataSet(env: ExecutionEnvironment): DataSet[Long] = {
Expand Down
Expand Up @@ -150,9 +150,12 @@ object LinearRegression {
dataPath = programArguments(0)
outputPath = programArguments(1)
numIterations = programArguments(2).toInt

true
}
else {
System.err.println("Usage: LinearRegression <data path> <result path> <num iterations>")

false
}
}
Expand All @@ -164,8 +167,9 @@ object LinearRegression {
System.out.println(" We provide a data generator to create synthetic input files for this " +
"program.")
System.out.println(" Usage: LinearRegression <data path> <result path> <num iterations>")

true
}
true
}

private def getDataSet(env: ExecutionEnvironment): DataSet[Data] = {
Expand Down
Expand Up @@ -70,6 +70,7 @@ object WordCount {
if (args.length == 2) {
textPath = args(0)
outputPath = args(1)
true
} else {
System.err.println("Usage: WordCount <text path> <result path>")
false
Expand All @@ -78,8 +79,8 @@ object WordCount {
System.out.println("Executing WordCount example with built-in default data.")
System.out.println(" Provide parameters to read input data from a file.")
System.out.println(" Usage: WordCount <text path> <result path>")
true
}
true
}

private def getTextDataSet(env: ExecutionEnvironment): DataSet[String] = {
Expand Down
Expand Up @@ -22,7 +22,6 @@
import akka.dispatch.OnComplete;
import akka.pattern.Patterns;
import akka.util.Timeout;

import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.deployment.PartitionInfo;
Expand Down
Expand Up @@ -35,7 +35,7 @@ public NoResourceAvailableException(ScheduledUnit unit) {
super("No resource available to schedule unit " + unit
+ ". You can decrease the operator parallelism or increase the number of slots per TaskManager in the configuration.");
}

public NoResourceAvailableException(int numInstances, int numSlotsTotal, int availableSlots) {
super(String.format("%s Resources available to scheduler: Number of instances=%d, total number of slots=%d, available slots=%d",
BASE_MESSAGE, numInstances, numSlotsTotal, availableSlots));
Expand Down
Expand Up @@ -33,8 +33,6 @@

import org.apache.flink.util.StringUtils;

import com.google.common.base.Preconditions;

public class LogfileInfoServlet extends HttpServlet {

private static final long serialVersionUID = 1L;
Expand All @@ -48,7 +46,9 @@ public class LogfileInfoServlet extends HttpServlet {


public LogfileInfoServlet(File[] logDirs) {
Preconditions.checkNotNull(logDirs, "The given log files are null.");
if(logDirs == null){
throw new NullPointerException("The given log files are null.");
}
this.logDirs = logDirs;
}

Expand Down

0 comments on commit 4046819

Please sign in to comment.