From 94d96cb6574ada07296319f19b05283055f3029b Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Mon, 19 Jan 2015 16:13:52 +0100 Subject: [PATCH 1/2] [FLINK-1372] [runtime] Fixes logging settings. The logging is now exclusively controlled by the logging properties provided to the system. Removes akka.loglevel config parameter. --- .../flink/configuration/ConfigConstants.java | 7 -- .../apache/flink/runtime/akka/AkkaUtils.scala | 20 ++---- .../flink/runtime/jobmanager/JobManager.scala | 69 ++++++++++--------- .../runtime/taskmanager/TaskManager.scala | 68 +++++++++++------- .../runtime/testingUtils/TestingUtils.scala | 3 +- 5 files changed, 87 insertions(+), 80 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java index df96339a5ac59..d482e3cadd943 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java @@ -344,11 +344,6 @@ public final class ConfigConstants { */ public static final String AKKA_LOG_LIFECYCLE_EVENTS = "akka.log.lifecycle.events"; - /** - * Log level for akka - */ - public static final String AKKA_LOG_LEVEL = "akka.loglevel"; - /** * Timeout for all blocking calls */ @@ -597,8 +592,6 @@ public final class ConfigConstants { public static String DEFAULT_AKKA_FRAMESIZE = "10485760b"; - public static String DEFAULT_AKKA_LOG_LEVEL = "ERROR"; - public static int DEFAULT_AKKA_ASK_TIMEOUT = 100; diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala index 07f5395ec8c3b..dd4458742ce66 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala @@ -80,15 +80,9 @@ object AkkaUtils { val logLifecycleEvents = if (lifecycleEvents) "on" else "off" - val logLevel = configuration.getString(ConfigConstants.AKKA_LOG_LEVEL, - ConfigConstants.DEFAULT_AKKA_LOG_LEVEL) - val configString = s""" |akka { - | loglevel = $logLevel - | stdout-loglevel = $logLevel - | | log-dead-letters = $logLifecycleEvents | log-dead-letters-during-shutdown = $logLifecycleEvents | @@ -144,15 +138,9 @@ object AkkaUtils { val logLifecycleEvents = if (lifecycleEvents) "on" else "off" - val logLevel = configuration.getString(ConfigConstants.AKKA_LOG_LEVEL, - ConfigConstants.DEFAULT_AKKA_LOG_LEVEL) - val configString = s""" |akka { - | loglevel = $logLevel - | stdout-loglevel = $logLevel - | | log-dead-letters = $logLifecycleEvents | log-dead-letters-during-shutdown = $logLifecycleEvents | @@ -204,12 +192,16 @@ object AkkaUtils { | | loggers = ["akka.event.slf4j.Slf4jLogger"] | logger-startup-timeout = 30s - | loglevel = "WARNING" - | logging-filter = "akka.event.slf4j.Slf4jLoggingFilter" + | loglevel = "DEBUG" | stdout-loglevel = "WARNING" | jvm-exit-on-fatal-error = off | log-config-on-start = off + | | serialize-messages = on + | + | debug { + | lifecycle = on + | } |} """.stripMargin } diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index 16495cae2f555..8095e9a1d1366 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -18,7 +18,7 @@ package org.apache.flink.runtime.jobmanager -import java.io.File +import java.io.{IOException, File} import java.net.{InetSocketAddress} import java.util.concurrent.TimeUnit @@ -105,7 +105,16 @@ Actor with ActorLogMessages with ActorLogging { instanceManager.shutdown() scheduler.shutdown() - libraryCacheManager.shutdown() + + try { + libraryCacheManager.shutdown() + } catch { + case e: IOException => log.error(e, "Could not properly shutdown the library cache manager.") + } + + if(log.isDebugEnabled) { + log.debug("Job manager {} is completely stopped.", self.path) + } } override def receiveWithLogMessages: Receive = { @@ -134,7 +143,7 @@ Actor with ActorLogMessages with ActorLogging { sender ! akka.actor.Status.Failure(new IllegalArgumentException("JobGraph must not be" + " null.")) } else { - log.info(s"Received job ${jobGraph.getJobID} (${jobGraph.getName}).") + log.info("Received job {} ({}).", jobGraph.getJobID, jobGraph.getName) if (jobGraph.getNumberOfVertices == 0) { sender ! SubmissionFailure(jobGraph.getJobID, new IllegalArgumentException("Job is " + @@ -164,10 +173,8 @@ Actor with ActorLogMessages with ActorLogging { } if (log.isDebugEnabled) { - log.debug(s"Running master initialization of job ${jobGraph.getJobID} (${ - jobGraph - .getName - }}).") + log.debug("Running master initialization of job {} ({}).", + jobGraph.getJobID, jobGraph.getName) } for (vertex <- jobGraph.getVertices.asScala) { @@ -184,17 +191,15 @@ Actor with ActorLogMessages with ActorLogging { val sortedTopology = jobGraph.getVerticesSortedTopologicallyFromSources if (log.isDebugEnabled) { - log.debug(s"Adding ${sortedTopology.size()} vertices from job graph ${ - jobGraph - .getJobID - } (${jobGraph.getName}).") + log.debug("Adding {} vertices from job graph {} ({}).", + sortedTopology.size(), jobGraph.getJobID, jobGraph.getName) } executionGraph.attachJobGraph(sortedTopology) if (log.isDebugEnabled) { - log.debug(s"Successfully created execution graph from job graph " + - s"${jobGraph.getJobID} (${jobGraph.getName}).") + log.debug("Successfully created execution graph from job graph {} ({}).", + jobGraph.getJobID, jobGraph.getName) } executionGraph.setQueuedSchedulingAllowed(jobGraph.getAllowQueuedScheduling) @@ -210,7 +215,7 @@ Actor with ActorLogMessages with ActorLogging { jobInfo.detach = detach - log.info(s"Scheduling job ${jobGraph.getName}.") + log.info("Scheduling job {}.", jobGraph.getName) executionGraph.scheduleForExecution(scheduler) @@ -245,7 +250,7 @@ Actor with ActorLogMessages with ActorLogging { } case CancelJob(jobID) => { - log.info(s"Trying to cancel job with ID ${jobID}.") + log.info("Trying to cancel job with ID {}.", jobID) currentJobs.get(jobID) match { case Some((executionGraph, _)) => @@ -254,8 +259,8 @@ Actor with ActorLogMessages with ActorLogging { } sender ! CancellationSuccess(jobID) case None => - log.info(s"No job found with ID ${jobID}.") - sender ! CancellationFailure(jobID, new IllegalArgumentException(s"No job found with " + + log.info("No job found with ID {}.", jobID) + sender ! CancellationFailure(jobID, new IllegalArgumentException("No job found with " + s"ID ${jobID}.")) } } @@ -270,8 +275,8 @@ Actor with ActorLogMessages with ActorLogging { Future { originalSender ! executionGraph.updateState(taskExecutionState) } - case None => log.error(s"Cannot find execution graph for ID ${taskExecutionState - .getJobID} to change state to ${taskExecutionState.getExecutionState}.") + case None => log.error("Cannot find execution graph for ID {} to change state to {}.", + taskExecutionState.getJobID, taskExecutionState.getExecutionState) sender ! false } } @@ -283,7 +288,7 @@ Actor with ActorLogMessages with ActorLogging { val execution = executionGraph.getRegisteredExecutions().get(executionAttempt) if(execution == null){ - log.error(s"Can not find Execution for attempt ${executionAttempt}.") + log.error("Can not find Execution for attempt {}.", executionAttempt) null }else{ val slot = execution.getAssignedResource @@ -298,21 +303,21 @@ Actor with ActorLogMessages with ActorLogging { case vertex: ExecutionJobVertex => vertex.getSplitAssigner match { case splitAssigner: InputSplitAssigner => splitAssigner.getNextInputSplit(host) case _ => - log.error(s"No InputSplitAssigner for vertex ID ${vertexID}.") + log.error("No InputSplitAssigner for vertex ID {}.", vertexID) null } case _ => - log.error(s"Cannot find execution vertex for vertex ID ${vertexID}.") + log.error("Cannot find execution vertex for vertex ID {}.", vertexID) null } } case None => - log.error(s"Cannot find execution graph for job ID ${jobID}.") + log.error("Cannot find execution graph for job ID {}.", jobID) null } if(log.isDebugEnabled) { - log.debug(s"Send next input split ${nextInputSplit}.") + log.debug("Send next input split {}.", nextInputSplit) } sender ! NextInputSplit(nextInputSplit) } @@ -320,8 +325,9 @@ Actor with ActorLogMessages with ActorLogging { case JobStatusChanged(jobID, newJobStatus, timeStamp, optionalMessage) => { currentJobs.get(jobID) match { case Some((executionGraph, jobInfo)) => executionGraph.getJobName - log.info(s"Status of job ${jobID} (${executionGraph.getJobName}) changed to " + - s"${newJobStatus}${if(optionalMessage == null) "" else optionalMessage}.") + log.info("Status of job {} ({}) changed to {}{}.", + jobID, executionGraph.getJobName, newJobStatus, + if(optionalMessage == null) "" else optionalMessage) if(newJobStatus.isTerminalState) { jobInfo.end = timeStamp @@ -368,7 +374,7 @@ Actor with ActorLogMessages with ActorLogging { sender ! ConsumerNotificationResult(executionGraph .scheduleOrUpdateConsumers(executionId, partitionIndex)) case None => - log.error(s"Cannot find execution graph for job ID ${jobId}.") + log.error("Cannot find execution graph for job ID {}.", jobId) sender ! ConsumerNotificationResult(false, Some( new IllegalStateException("Cannot find execution graph for job ID " + jobId))) } @@ -422,7 +428,7 @@ Actor with ActorLogMessages with ActorLogging { } case Terminated(taskManager) => { - log.info(s"Task manager ${taskManager.path} terminated.") + log.info("Task manager {} terminated.", taskManager.path) instanceManager.unregisterTaskManager(taskManager) context.unwatch(taskManager) } @@ -442,14 +448,13 @@ Actor with ActorLogMessages with ActorLogging { libraryCacheManager.unregisterJob(jobID) } catch { case t: Throwable => - log.error(t, s"Could not properly unregister job ${jobID} form the library cache.") + log.error(t, "Could not properly unregister job {} form the library cache.", jobID) } } private def checkJavaVersion { - var javaVersion = System.getProperty("java.version") - if (javaVersion.substring(0, 3).toDouble < 1.7) { - JobManager.LOG.warn("Warning: Flink is running with Java 6. " + + if (System.getProperty("java.version").substring(0, 3).toDouble < 1.7) { + log.warning("Warning: Flink is running with Java 6. " + "Java 6 is not maintained any more by Oracle or the OpenJDK community. " + "Flink currently supports Java 6, but may not in future releases," + " due to the unavailability of bug fixes security patched.") diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala index 620a4363cb807..96eecb9da9093 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala @@ -86,7 +86,9 @@ import scala.collection.JavaConverters._ implicit val timeout = tmTimeout - log.info(s"Starting task manager at ${self.path}.") + log.info("Starting task manager at {}.", self.path) + log.info("Creating {} task slot(s).", numberOfSlots) + log.info("TaskManager connection information {}.", connectionInfo) val REGISTRATION_DELAY = 0 seconds val REGISTRATION_INTERVAL = 10 seconds @@ -95,7 +97,12 @@ import scala.collection.JavaConverters._ TaskManager.checkTempDirs(tmpDirPaths) val ioManager = new IOManagerAsync(tmpDirPaths) + + log.info("Initializing memory manager with {} MB of memory. Page size is {} bytes.", + memorySize, + pageSize) val memoryManager = new DefaultMemoryManager(memorySize, numberOfSlots, pageSize) + val bcVarManager = new BroadcastVariableManager(); val hardwareDescription = HardwareDescription.extractFromSystem(memoryManager.getMemorySize) val fileCache = new FileCache() @@ -105,9 +112,12 @@ import scala.collection.JavaConverters._ val waitForRegistration = scala.collection.mutable.Set[ActorRef](); val profiler = profilingInterval match { - case Some(interval) => Some(TaskManager.startProfiler(self.path.toSerializationFormat, - interval)) - case None => None + case Some(interval) => + log.info("Profiling of jobs is enabled.") + Some(TaskManager.startProfiler(self.path.toSerializationFormat, interval)) + case None => + log.info("Profiling of jobs is disabled.") + None } var libraryCacheManager: LibraryCacheManager = null @@ -132,7 +142,7 @@ import scala.collection.JavaConverters._ } override def postStop(): Unit = { - log.info(s"Stopping task manager ${self.path}.") + log.info("Stopping task manager {}.", self.path) cancelAndClearEverything(new Exception("Task Manager is shutting down.")) @@ -161,6 +171,10 @@ import scala.collection.JavaConverters._ case t: Throwable => log.error(t, "LibraryCacheManager did not shutdown properly.") } } + + if(log.isDebugEnabled){ + log.debug("Task manager {} is completely stopped.", self.path) + } } private def tryJobManagerRegistration(): Unit = { @@ -180,8 +194,8 @@ import scala.collection.JavaConverters._ } else if (registrationAttempts <= TaskManager.MAX_REGISTRATION_ATTEMPTS) { - log.info(s"Try to register at master ${jobManagerAkkaURL}. ${registrationAttempts}. " + - s"Attempt") + log.info("Try to register at master {}. Attempt #{}", jobManagerAkkaURL, + registrationAttempts) val jobManager = context.actorSelection(jobManagerAkkaURL) jobManager ! RegisterTaskManager(connectionInfo, hardwareDescription, numberOfSlots) @@ -200,9 +214,8 @@ import scala.collection.JavaConverters._ context.watch(currentJobManager) - log.info(s"TaskManager successfully registered at JobManager ${ - currentJobManager.path.toString - }.") + log.info("TaskManager successfully registered at JobManager {}.", + currentJobManager.path.toString) setupNetworkEnvironment() setupLibraryCacheManager(blobPort) @@ -274,8 +287,8 @@ import scala.collection.JavaConverters._ } case Terminated(jobManager) => { - log.info(s"Job manager ${jobManager.path} is no longer reachable. " - + "Cancelling all tasks and trying to reregister.") + log.info("Job manager {} is no longer reachable. Cancelling all tasks and trying to " + + "reregister.", jobManager.path) cancelAndClearEverything(new Throwable("Lost connection to JobManager")) tryJobManagerRegistration() @@ -285,7 +298,7 @@ import scala.collection.JavaConverters._ def notifyExecutionStateChange(jobID: JobID, executionID: ExecutionAttemptID, executionState: ExecutionState, optionalError: Throwable): Unit = { - log.info(s"Update execution state to ${executionState}.") + log.info("Update execution state to {}.", executionState) val futureResponse = (currentJobManager ? UpdateTaskExecutionState(new TaskExecutionState (jobID, executionID, executionState, optionalError)))(timeout) @@ -301,8 +314,8 @@ import scala.collection.JavaConverters._ self ! UnregisterTask(executionID) } case Failure(t) => { - log.warning(s"Execution state change notification failed for task ${executionID} " + - s"of job ${jobID}. Cause ${t.getMessage}.") + log.warning("Execution state change notification failed for task {} of job {}. Cause {}.", + executionID, jobID, t.getMessage) self ! UnregisterTask(executionID) } } @@ -324,9 +337,8 @@ import scala.collection.JavaConverters._ libraryCacheManager.registerTask(jobID, executionID, tdd.getRequiredJarFiles()); if (log.isDebugEnabled) { - log.debug(s"Register task ${executionID} took ${ - (System.currentTimeMillis() - startRegisteringTask) / 1000.0 - }s") + log.debug("Register task {} took {}s", executionID, + (System.currentTimeMillis() - startRegisteringTask) / 1000.0) } val userCodeClassLoader = libraryCacheManager.getClassLoader(jobID) @@ -386,7 +398,7 @@ import scala.collection.JavaConverters._ val message = if (t.isInstanceOf[CancelTaskException]) { "Task was canceled" } else { - log.error(t, s"Could not instantiate task with execution ID ${executionID}.") + log.error(t, "Could not instantiate task with execution ID {}.", executionID) ExceptionUtils.stringifyException(t) } @@ -398,7 +410,7 @@ import scala.collection.JavaConverters._ libraryCacheManager.unregisterTask(jobID, executionID) } catch { - case t: Throwable => log.error("Error during cleanup of task deployment.", t) + case t: Throwable => log.error(t, "Error during cleanup of task deployment.") } sender ! new TaskOperationResult(executionID, false, message) @@ -480,6 +492,9 @@ import scala.collection.JavaConverters._ if (blobPort > 0) { val address = new InetSocketAddress(currentJobManager.path.address.host.getOrElse ("localhost"), blobPort) + + log.info("Determined BLOB server address to be {}.", address) + libraryCacheManager = new BlobLibraryCacheManager(new BlobCache(address), cleanupInterval) } else { libraryCacheManager = new FallbackLibraryCacheManager @@ -500,14 +515,14 @@ import scala.collection.JavaConverters._ } private def unregisterTask(executionID: ExecutionAttemptID): Unit = { - log.info(s"Unregister task with execution ID ${executionID}.") + log.info("Unregister task with execution ID {}.", executionID) runningTasks.remove(executionID) match { case Some(task) => removeAllTaskResources(task) libraryCacheManager.unregisterTask(task.getJobID, executionID) case None => if (log.isDebugEnabled) { - log.debug(s"Cannot find task with ID ${executionID} to unregister.") + log.debug("Cannot find task with ID {} to unregister.", executionID) } } } @@ -537,12 +552,12 @@ import scala.collection.JavaConverters._ } private def logMemoryStats(): Unit = { - if (log.isDebugEnabled) { + if (log.isInfoEnabled) { val memoryMXBean = ManagementFactory.getMemoryMXBean() val gcMXBeans = ManagementFactory.getGarbageCollectorMXBeans().asScala - log.debug(TaskManager.getMemoryUsageStatsAsString(memoryMXBean)) - log.debug(TaskManager.getGarbageCollectorStatsAsString(gcMXBeans)) + log.info(TaskManager.getMemoryUsageStatsAsString(memoryMXBean)) + log.info(TaskManager.getGarbageCollectorStatsAsString(gcMXBeans)) } } } @@ -685,6 +700,9 @@ object TaskManager { } else { val fraction = configuration.getFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY, ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_FRACTION) + + LOG.info("Using {} of the free heap space for managed memory.", fraction) + ((EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag - networkBufferMem) * fraction) .toLong } diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala index 49bb4e0cf3569..a65f82e01e594 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala @@ -46,8 +46,7 @@ object TestingUtils { s"""akka.daemonic = on |akka.test.timefactor = 10 |akka.loggers = ["akka.event.slf4j.Slf4jLogger"] - |akka.loglevel = "OFF" - |akka.logging-filter = "akka.event.slf4j.Slf4jLoggingFilter" + |akka.loglevel = "DEBUG" |akka.stdout-loglevel = "OFF" |akka.jvm-exit-on-fata-error = off |akka.log-config-on-start = off From f61cbb33bc31bde4c34ccb0aab50608a985305ce Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Tue, 20 Jan 2015 01:04:02 +0100 Subject: [PATCH 2/2] Adopts the logger level for the akka log level --- .../apache/flink/runtime/akka/AkkaUtils.scala | 28 +++++++++++++++++-- 1 file changed, 26 insertions(+), 2 deletions(-) diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala index dd4458742ce66..79cfeef0b78b0 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala @@ -25,10 +25,13 @@ import akka.actor.{ActorSelection, ActorRef, ActorSystem} import akka.pattern.{Patterns, ask => akkaAsk} import com.typesafe.config.{Config, ConfigFactory} import org.apache.flink.configuration.{ConfigConstants, Configuration} +import org.slf4j.LoggerFactory import scala.concurrent.{ExecutionContext, Future, Await} import scala.concurrent.duration._ object AkkaUtils { + val LOG = LoggerFactory.getLogger(AkkaUtils.getClass) + val DEFAULT_TIMEOUT: FiniteDuration = 1 minute val INF_TIMEOUT = 21474835 seconds @@ -186,13 +189,14 @@ object AkkaUtils { } def getDefaultLocalActorSystemConfigString: String = { - """ + val logLevel = getLogLevel + s""" |akka { | daemonic = on | | loggers = ["akka.event.slf4j.Slf4jLogger"] | logger-startup-timeout = 30s - | loglevel = "DEBUG" + | loglevel = ${logLevel} | stdout-loglevel = "WARNING" | jvm-exit-on-fatal-error = off | log-config-on-start = off @@ -206,6 +210,26 @@ object AkkaUtils { """.stripMargin } + def getLogLevel: String = { + if(LOG.isDebugEnabled) { + "DEBUG" + } else { + if (LOG.isInfoEnabled) { + "INFO" + } else { + if(LOG.isWarnEnabled){ + "WARNING" + } else { + if (LOG.isErrorEnabled) { + "ERROR" + } else { + "OFF" + } + } + } + } + } + def getDefaultActorSystemConfig = { ConfigFactory.parseString(getDefaultActorSystemConfigString) }