From 5cb09754be2fe4b3971cb52f5c5621f3a9792819 Mon Sep 17 00:00:00 2001 From: tonycox Date: Mon, 6 Feb 2017 19:46:31 +0400 Subject: [PATCH 1/2] [FLINK-5431] Add configurable timePattern for client akka status --- docs/setup/config.md | 2 ++ .../flink/configuration/AkkaOptions.java | 7 ++++ .../client/JobAttachmentClientActor.java | 5 +++ .../flink/runtime/client/JobClientActor.java | 33 ++++++++++++++----- .../client/JobSubmissionClientActor.java | 5 +++ .../messages/ExecutionGraphMessages.scala | 33 ++++++++----------- 6 files changed, 57 insertions(+), 28 deletions(-) diff --git a/docs/setup/config.md b/docs/setup/config.md index b21c647621669..35595b4c7b2f1 100644 --- a/docs/setup/config.md +++ b/docs/setup/config.md @@ -152,6 +152,8 @@ For more information on how Flink security internally setups Kerberos authentica ### Other +- `client.logging.time-pattern`: (DEFAULT: `yyyy-MM-dd'T'HH:mm:ss`) The date time pattern applied to client akctor logger. + - `taskmanager.tmp.dirs`: The directory for temporary files, or a list of directories separated by the system's directory delimiter (for example ':' (colon) on Linux/Unix). If multiple directories are specified, then the temporary files will be distributed across the directories in a round-robin fashion. The I/O manager component will spawn one reading and one writing thread per directory. A directory may be listed multiple times to have the I/O manager use multiple threads for it (for example if it is physically stored on a very fast disc or RAID) (DEFAULT: The system's tmp dir). - `taskmanager.log.path`: The config parameter defining the taskmanager log file location diff --git a/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java index 7e4c2b71e7908..90fd73ad5db5d 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java @@ -48,4 +48,11 @@ public class AkkaOptions { public static final ConfigOption AKKA_WATCH_HEARTBEAT_PAUSE = ConfigOptions .key("akka.watch.heartbeat.pause") .defaultValue("60 s"); + + /** + * Defines time pattern of akka status logger + */ + public static final ConfigOption CLIENT_TIME_PATTERN = ConfigOptions + .key("client.logging.time-pattern") + .defaultValue("yyyy-MM-dd'T'HH:mm:ss"); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobAttachmentClientActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobAttachmentClientActor.java index ffab9cc77636a..e2d7d5a65b471 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobAttachmentClientActor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobAttachmentClientActor.java @@ -23,6 +23,7 @@ import akka.actor.Status; import akka.dispatch.Futures; import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.runtime.akka.ListeningBehaviour; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; @@ -63,6 +64,10 @@ protected Class getClientMessageClass() { return AttachToJobAndWait.class; } + protected String getClientTimePattern() { + return AkkaOptions.CLIENT_TIME_PATTERN.defaultValue(); + } + @Override public void handleCustomMessage(Object message) { if (message instanceof AttachToJobAndWait) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java index 1380e76d41fe7..bf79e8f2b1dc7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java @@ -36,6 +36,8 @@ import org.apache.flink.util.Preconditions; import scala.concurrent.duration.FiniteDuration; +import java.text.SimpleDateFormat; +import java.util.Date; import java.util.UUID; @@ -111,6 +113,10 @@ public void postStop() { */ protected abstract Class getClientMessageClass(); + /** + * Hook to retrieve time-pattern from Flink configuration. + */ + protected abstract String getClientTimePattern(); @Override protected void handleMessage(Object message) { @@ -151,7 +157,7 @@ public void onSuccess(ActorRef result) throws Throwable { // Resolved JobManager ActorRef JobManagerActorRef msg = (JobManagerActorRef) message; connectToJobManager(msg.jobManager()); - + // TODO prints without time - is it okay? logAndPrintMessage("Connected to JobManager at " + msg.jobManager()); connectedToJobManager(); @@ -248,7 +254,6 @@ else if (!isJobManagerConnected() && getClientMessageClass().equals(message.getC } } - @Override protected UUID getLeaderSessionID() { return leaderSessionID; @@ -262,29 +267,41 @@ protected void logAndPrintMessage(String message) { } private void logAndPrintMessage(ExecutionGraphMessages.ExecutionStateChanged message) { - LOG.info(message.toString()); + String completeMess = buildMessage(message); + LOG.info(completeMess); if (sysoutUpdates) { - System.out.println(message.toString()); + System.out.println(completeMess); } } private void logAndPrintMessage(ExecutionGraphMessages.JobStatusChanged message) { // by default, this only prints the status, and not any exception. // in state FAILING, we report the exception in addition + String completeMess = buildMessage(message); if (message.newJobStatus() != JobStatus.FAILING || message.error() == null) { - LOG.info(message.toString()); + LOG.info(completeMess); if (sysoutUpdates) { - System.out.println(message.toString()); + System.out.println(completeMess); } } else { - LOG.info(message.toString(), message.error()); + LOG.info(completeMess, message.error()); if (sysoutUpdates) { - System.out.println(message.toString()); + System.out.println(completeMess); message.error().printStackTrace(System.out); } } } + private String buildMessage(ExecutionGraphMessages.Timestamped message) { + return String.format("%1$s\t%2$s", + timestampToString(message.timestamp(), getClientTimePattern()), + message.toString()); + } + + private String timestampToString(Long timestamp, String pattern) { + return new SimpleDateFormat(pattern).format(new Date(timestamp)); + } + @Override public void notifyLeaderAddress(String leaderAddress, UUID leaderSessionID) { getSelf().tell( diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java index a3fee21525dfa..159ad51638187 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java @@ -22,6 +22,7 @@ import akka.actor.Props; import akka.actor.Status; import akka.dispatch.Futures; +import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.ListeningBehaviour; @@ -74,6 +75,10 @@ protected Class getClientMessageClass() { return SubmitJobAndWait.class; } + protected String getClientTimePattern() { + return clientConfig.getString(AkkaOptions.CLIENT_TIME_PATTERN); + } + @Override public void handleCustomMessage(Object message) { // submit a job to the JobManager diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ExecutionGraphMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ExecutionGraphMessages.scala index 2369d3cb63580..469c03cd95ca0 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ExecutionGraphMessages.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ExecutionGraphMessages.scala @@ -18,9 +18,6 @@ package org.apache.flink.runtime.messages -import java.text.SimpleDateFormat -import java.util.{UUID, Date} - import org.apache.flink.api.common.JobID import org.apache.flink.runtime.execution.ExecutionState import org.apache.flink.runtime.executiongraph.ExecutionAttemptID @@ -59,7 +56,8 @@ object ExecutionGraphMessages { newExecutionState: ExecutionState, timestamp: Long, optionalMessage: String) - extends RequiresLeaderSessionID { + extends RequiresLeaderSessionID + with Timestamped { override def toString: String = { val oMsg = if (optionalMessage != null) { @@ -67,9 +65,8 @@ object ExecutionGraphMessages { } else { "" } - - s"${timestampToString(timestamp)}\t$taskName(${subtaskIndex + - 1}/$totalNumberOfSubTasks) switched to $newExecutionState $oMsg" + s"$taskName(${subtaskIndex + + 1}/$totalNumberOfSubTasks) switched to $newExecutionState $oMsg" } } @@ -86,22 +83,18 @@ object ExecutionGraphMessages { newJobStatus: JobStatus, timestamp: Long, error: Throwable) - extends RequiresLeaderSessionID { - + extends RequiresLeaderSessionID + with Timestamped { + override def toString: String = { - s"${timestampToString(timestamp)}\tJob execution switched to status $newJobStatus." + s"Job execution switched to status $newJobStatus." } } - // -------------------------------------------------------------------------- - // Utilities - // -------------------------------------------------------------------------- - - private val DATE_FORMATTER: SimpleDateFormat = new SimpleDateFormat("MM/dd/yyyy HH:mm:ss") - - private def timestampToString(timestamp: Long): String = { - DATE_FORMATTER.synchronized { - DATE_FORMATTER.format(new Date(timestamp)) - } + /** + * To retrieve a timestamp from messages + */ + trait Timestamped { + def timestamp: Long } } From 0a538c26f888cc061308cbe475f1eef511ff6914 Mon Sep 17 00:00:00 2001 From: tonycox Date: Tue, 21 Feb 2017 17:23:39 +0400 Subject: [PATCH 2/2] orthography fix --- docs/setup/config.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/setup/config.md b/docs/setup/config.md index 35595b4c7b2f1..ba8ac0a5e4204 100644 --- a/docs/setup/config.md +++ b/docs/setup/config.md @@ -152,7 +152,7 @@ For more information on how Flink security internally setups Kerberos authentica ### Other -- `client.logging.time-pattern`: (DEFAULT: `yyyy-MM-dd'T'HH:mm:ss`) The date time pattern applied to client akctor logger. +- `client.logging.time-pattern`: (DEFAULT: `yyyy-MM-dd'T'HH:mm:ss`) The date time pattern applied to client actor logger. - `taskmanager.tmp.dirs`: The directory for temporary files, or a list of directories separated by the system's directory delimiter (for example ':' (colon) on Linux/Unix). If multiple directories are specified, then the temporary files will be distributed across the directories in a round-robin fashion. The I/O manager component will spawn one reading and one writing thread per directory. A directory may be listed multiple times to have the I/O manager use multiple threads for it (for example if it is physically stored on a very fast disc or RAID) (DEFAULT: The system's tmp dir).