From 6a21e85fc7d99ba687753b4989bb25eb19aaa3af Mon Sep 17 00:00:00 2001 From: Ufuk Celebi Date: Tue, 29 Nov 2016 17:15:27 +0100 Subject: [PATCH 01/10] [FLINK-5194] [logging] Log heartbeats on TRACE level --- .../org/apache/flink/runtime/instance/InstanceManager.java | 4 +--- .../org/apache/flink/runtime/jobmanager/JobManager.scala | 2 +- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java index 0d0d4c7ff6923..3fe92a50c0ca3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java @@ -124,9 +124,7 @@ public boolean reportHeartBeat(InstanceID instanceId, byte[] lastMetricsReport) host.reportHeartBeat(); host.setMetricsReport(lastMetricsReport); - if (LOG.isDebugEnabled()) { - LOG.debug("Received heartbeat from TaskManager " + host); - } + LOG.trace("Received heartbeat from TaskManager {}", host); return true; } diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index 4a4968f237564..8b830eb1af124 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -928,7 +928,7 @@ class JobManager( ) case Heartbeat(instanceID, metricsReport, accumulators) => - log.debug(s"Received heartbeat message from $instanceID.") + log.trace(s"Received heartbeat message from $instanceID.") updateAccumulators(accumulators) From 20ceaef71e8d10dbaf0cb01b8548fc3870d5ebac Mon Sep 17 00:00:00 2001 From: Ufuk Celebi Date: Tue, 29 Nov 2016 16:00:02 +0100 Subject: [PATCH 02/10] [FLINK-5201] [logging] Log loaded config properties on INFO level --- .../org/apache/flink/configuration/GlobalConfiguration.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java b/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java index 7e50486511a09..14a6ae8a71b51 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java @@ -253,7 +253,7 @@ private void loadYAMLResource(File file) { continue; } - LOG.debug("Loading configuration property: {}, {}", key, value); + LOG.info("Loading configuration property: {}, {}", key, value); this.config.setString(key, value); } @@ -372,7 +372,7 @@ private void loadXMLResource(File file) { if (key != null && value != null) { // Put key, value pair into the map - LOG.debug("Loading configuration property: {}, {}", key, value); + LOG.info("Loading configuration property: {}, {}", key, value); this.config.setString(key, value); } else { LOG.warn("Error while reading configuration: Cannot read property " + propNumber); From bb837cece0f0945ab05e71eda5c3a728a64b390b Mon Sep 17 00:00:00 2001 From: Ufuk Celebi Date: Tue, 29 Nov 2016 16:04:48 +0100 Subject: [PATCH 03/10] [FLINK-5196] [logging] Don't log InputChannelDeploymentDescriptor --- .../InputChannelDeploymentDescriptor.java | 3 --- .../partition/consumer/SingleInputGate.java | 18 +++++++++++++++--- 2 files changed, 15 insertions(+), 6 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java index 6b87e69e4be1b..24b95ea29f880 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java @@ -32,7 +32,6 @@ import org.slf4j.LoggerFactory; import java.io.Serializable; -import java.util.Arrays; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -135,8 +134,6 @@ else if (allowLazyDeployment) { consumedPartitionId, partitionLocation); } - LOG.debug("Created {} from edges {}.", Arrays.toString(icdd), Arrays.toString(edges)); - return icdd; } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java index 8f44fbc68a145..1550b0d9b8732 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java @@ -46,7 +46,6 @@ import java.io.IOException; import java.util.ArrayDeque; import java.util.ArrayList; -import java.util.Arrays; import java.util.BitSet; import java.util.List; import java.util.Map; @@ -573,8 +572,11 @@ public static SingleInputGate create( // Create the input channels. There is one input channel for each consumed partition. final InputChannel[] inputChannels = new InputChannel[icdd.length]; - for (int i = 0; i < inputChannels.length; i++) { + int numLocalChannels = 0; + int numRemoteChannels = 0; + int numUnknownChannels = 0; + for (int i = 0; i < inputChannels.length; i++) { final ResultPartitionID partitionId = icdd[i].getConsumedPartitionId(); final ResultPartitionLocation partitionLocation = icdd[i].getConsumedPartitionLocation(); @@ -585,6 +587,8 @@ public static SingleInputGate create( networkEnvironment.getPartitionRequestInitialAndMaxBackoff(), metrics ); + + numLocalChannels++; } else if (partitionLocation.isRemote()) { inputChannels[i] = new RemoteInputChannel(inputGate, i, partitionId, @@ -593,6 +597,8 @@ else if (partitionLocation.isRemote()) { networkEnvironment.getPartitionRequestInitialAndMaxBackoff(), metrics ); + + numRemoteChannels++; } else if (partitionLocation.isUnknown()) { inputChannels[i] = new UnknownInputChannel(inputGate, i, partitionId, @@ -602,6 +608,8 @@ else if (partitionLocation.isUnknown()) { networkEnvironment.getPartitionRequestInitialAndMaxBackoff(), metrics ); + + numUnknownChannels++; } else { throw new IllegalStateException("Unexpected partition location."); @@ -610,7 +618,11 @@ else if (partitionLocation.isUnknown()) { inputGate.setInputChannel(partitionId.getPartitionId(), inputChannels[i]); } - LOG.debug("Created input channels {} from {}.", Arrays.toString(inputChannels), igdd); + LOG.debug("Created {} input channels (local: {}, remote: {}, unknown: {}).", + inputChannels.length, + numLocalChannels, + numRemoteChannels, + numUnknownChannels); return inputGate; } From 4a91b78e52c440d5c736789cea0f8d80968339f8 Mon Sep 17 00:00:00 2001 From: Ufuk Celebi Date: Tue, 29 Nov 2016 16:15:30 +0100 Subject: [PATCH 04/10] [FLINK-5198] [logging] Improve TaskState toString --- .../checkpoint/CheckpointCoordinator.java | 8 ++++++-- .../flink/runtime/checkpoint/TaskState.java | 17 +++++++++++++++++ 2 files changed, 23 insertions(+), 2 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index 0cf944c5809a5..24cc3cb05cb8e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -693,9 +693,13 @@ public boolean receiveAcknowledgeMessage(AcknowledgeCheckpoint message) throws E if (LOG.isDebugEnabled()) { StringBuilder builder = new StringBuilder(); - for (Map.Entry entry : completed.getTaskStates().entrySet()) { - builder.append("JobVertexID: ").append(entry.getKey()).append(" {").append(entry.getValue()).append("}"); + builder.append("Checkpoint state: "); + for (TaskState state : completed.getTaskStates().values()) { + builder.append(state); + builder.append(", "); } + // Remove last two chars ", " + builder.delete(builder.length() - 2, builder.length()); LOG.debug(builder.toString()); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java index ac4503d51cadf..1c46812cdb87e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java @@ -168,4 +168,21 @@ public boolean equals(Object obj) { public int hashCode() { return parallelism + 31 * Objects.hash(jobVertexID, subtaskStates, kvStates); } + + @Override + public String toString() { + long stateSize = 0L; + for (SubtaskState subtaskState : subtaskStates.values()) { + stateSize += subtaskState.getStateSize(); + } + + // KvStates are always null in 1.1. Don't print this as it might + // confuse users that don't care about how we store it internally. + return "TaskState(" + + "jobVertexID: " + jobVertexID + + ", parallelism: " + parallelism + + ", sub task states: " + subtaskStates.size() + + ", total size (bytes): " + stateSize + + ')'; + } } From 5a5a78a15c2c3a0b3d82446b98c0c73c26cb4a4d Mon Sep 17 00:00:00 2001 From: Ufuk Celebi Date: Tue, 29 Nov 2016 16:35:14 +0100 Subject: [PATCH 05/10] [FLINK-5199] [logging] Improve logging in ZooKeeperSubmittedJobGraphStore --- .../runtime/jobmanager/SubmittedJobGraph.java | 2 +- .../ZooKeeperSubmittedJobGraphStore.java | 87 ++++++++++++------- 2 files changed, 57 insertions(+), 32 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraph.java index faacc933952f5..e868da7080893 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraph.java @@ -72,6 +72,6 @@ public JobInfo getJobInfo() throws Exception { @Override public String toString() { - return String.format("SubmittedJobGraph(%s, %s)", jobGraph, jobInfo); + return String.format("SubmittedJobGraph(%s, %s)", jobGraph.getJobID(), jobInfo); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java index a1dd14b001e79..7324c07dcef07 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java @@ -82,6 +82,9 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore { */ private final PathChildrenCache pathCache; + /** The full configured base path including the namespace. */ + private final String zooKeeperFullBasePath; + /** The external listener to be notified on races. */ private SubmittedJobGraphListener jobGraphListener; @@ -117,6 +120,7 @@ public ZooKeeperSubmittedJobGraphStore( // All operations will have the path as root CuratorFramework facade = client.usingNamespace(client.getNamespace() + currentJobsPath); + this.zooKeeperFullBasePath = client.getNamespace() + currentJobsPath; this.jobGraphsInZooKeeper = new ZooKeeperStateHandleStore<>(facade, stateStorage, executor); this.pathCache = new PathChildrenCache(facade, "/", false); @@ -156,6 +160,8 @@ public List recoverJobGraphs() throws Exception { synchronized (cacheLock) { verifyIsRunning(); + LOG.debug("Recovering all job graphs from ZooKeeper at {}.", zooKeeperFullBasePath); + List, String>> submitted; while (true) { @@ -168,6 +174,8 @@ public List recoverJobGraphs() throws Exception { } } + LOG.info("Found {} job graphs.", submitted.size()); + if (submitted.size() != 0) { List jobGraphs = new ArrayList<>(submitted.size()); @@ -195,6 +203,8 @@ public Option recoverJobGraph(JobID jobId) throws Exception { checkNotNull(jobId, "Job ID"); String path = getPathForJob(jobId); + LOG.debug("Recovering job graph {} from {}{}.", jobId, zooKeeperFullBasePath, path); + synchronized (cacheLock) { verifyIsRunning(); @@ -221,6 +231,8 @@ public void putJobGraph(SubmittedJobGraph jobGraph) throws Exception { checkNotNull(jobGraph, "Job graph"); String path = getPathForJob(jobGraph.getJobId()); + LOG.debug("Adding job graph {} to {}{}.", jobGraph.getJobId(), zooKeeperFullBasePath, path); + boolean success = false; while (!success) { @@ -235,8 +247,6 @@ public void putJobGraph(SubmittedJobGraph jobGraph) throws Exception { addedJobGraphs.add(jobGraph.getJobId()); - LOG.info("Added {} to ZooKeeper.", jobGraph); - success = true; } catch (KeeperException.NodeExistsException ignored) { @@ -258,6 +268,8 @@ else if (addedJobGraphs.contains(jobGraph.getJobId())) { } } } + + LOG.info("Added {} to ZooKeeper.", jobGraph); } @Override @@ -265,14 +277,17 @@ public void removeJobGraph(JobID jobId) throws Exception { checkNotNull(jobId, "Job ID"); String path = getPathForJob(jobId); + LOG.debug("Removing job graph {} from {}{}.", jobId, zooKeeperFullBasePath, path); + synchronized (cacheLock) { if (addedJobGraphs.contains(jobId)) { jobGraphsInZooKeeper.removeAndDiscardState(path); addedJobGraphs.remove(jobId); - LOG.info("Removed job graph {} from ZooKeeper.", jobId); } } + + LOG.info("Removed job graph {} from ZooKeeper.", jobId); } /** @@ -297,70 +312,80 @@ public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) } switch (event.getType()) { - case CHILD_ADDED: + case CHILD_ADDED: { + JobID jobId = fromEvent(event); + + LOG.debug("Received CHILD_ADDED event notification for job {}", jobId); + synchronized (cacheLock) { try { - JobID jobId = fromEvent(event); if (jobGraphListener != null && !addedJobGraphs.contains(jobId)) { try { // Whoa! This has been added by someone else. Or we were fast // to remove it (false positive). jobGraphListener.onAddedJobGraph(jobId); - } - catch (Throwable t) { + } catch (Throwable t) { LOG.error("Error in callback", t); } } - } - catch (Exception e) { + } catch (Exception e) { LOG.error("Error in SubmittedJobGraphsPathCacheListener", e); } } + } + break; - break; - - case CHILD_UPDATED: + case CHILD_UPDATED: { // Nothing to do - break; + } + break; + + case CHILD_REMOVED: { + JobID jobId = fromEvent(event); + + LOG.debug("Received CHILD_REMOVED event notification for job {}", jobId); - case CHILD_REMOVED: synchronized (cacheLock) { try { - JobID jobId = fromEvent(event); if (jobGraphListener != null && addedJobGraphs.contains(jobId)) { try { // Oh oh. Someone else removed one of our job graphs. Mean! jobGraphListener.onRemovedJobGraph(jobId); - } - catch (Throwable t) { + } catch (Throwable t) { LOG.error("Error in callback", t); } } break; - } - catch (Exception e) { + } catch (Exception e) { LOG.error("Error in SubmittedJobGraphsPathCacheListener", e); } } - break; + } + break; - case CONNECTION_SUSPENDED: + case CONNECTION_SUSPENDED: { LOG.warn("ZooKeeper connection SUSPENDED. Changes to the submitted job " + - "graphs are not monitored (temporarily)."); - break; - case CONNECTION_LOST: + "graphs are not monitored (temporarily)."); + } + break; + + case CONNECTION_LOST: { LOG.warn("ZooKeeper connection LOST. Changes to the submitted job " + - "graphs are not monitored (permanently)."); - break; + "graphs are not monitored (permanently)."); + } + break; - case CONNECTION_RECONNECTED: + case CONNECTION_RECONNECTED: { LOG.info("ZooKeeper connection RECONNECTED. Changes to the submitted job " + - "graphs are monitored again."); - break; - case INITIALIZED: + "graphs are monitored again."); + } + break; + + case INITIALIZED: { LOG.info("SubmittedJobGraphsPathCacheListener initialized"); - break; + } + break; } } From 83ef198008e4dd84a4876b7a8cefc1f870ebe0a2 Mon Sep 17 00:00:00 2001 From: Ufuk Celebi Date: Tue, 29 Nov 2016 17:08:53 +0100 Subject: [PATCH 06/10] [FLINK-5207] [logging] Decrease HadoopFileSystem logging --- .../runtime/fs/hdfs/HadoopFileSystem.java | 37 ++++++++----------- 1 file changed, 16 insertions(+), 21 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java index 4e05ebe0f08dd..008ecf80f221d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java @@ -17,25 +17,23 @@ */ package org.apache.flink.runtime.fs.hdfs; -import java.io.File; -import java.io.IOException; -import java.lang.reflect.Method; -import java.net.URI; -import java.net.UnknownHostException; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.flink.core.fs.HadoopFileSystemWrapper; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.core.fs.BlockLocation; import org.apache.flink.core.fs.FileStatus; import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.HadoopFileSystemWrapper; import org.apache.flink.core.fs.Path; import org.apache.flink.util.InstantiationUtil; - import org.apache.hadoop.conf.Configuration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.lang.reflect.Method; +import java.net.URI; +import java.net.UnknownHostException; /** * Concrete implementation of the {@link FileSystem} base class for the Hadoop File System. The @@ -186,14 +184,14 @@ public static org.apache.hadoop.conf.Configuration getHadoopConfiguration() { if (hdfsDefaultPath != null) { retConf.addResource(new org.apache.hadoop.fs.Path(hdfsDefaultPath)); } else { - LOG.debug("Cannot find hdfs-default configuration file"); + LOG.trace("{} configuration key for hdfs-default configuration file not set", ConfigConstants.HDFS_DEFAULT_CONFIG); } final String hdfsSitePath = GlobalConfiguration.getString(ConfigConstants.HDFS_SITE_CONFIG, null); if (hdfsSitePath != null) { retConf.addResource(new org.apache.hadoop.fs.Path(hdfsSitePath)); } else { - LOG.debug("Cannot find hdfs-site configuration file"); + LOG.trace("{} configuration key for hdfs-site configuration file not set", ConfigConstants.HDFS_SITE_CONFIG); } // 2. Approach environment variables @@ -211,17 +209,14 @@ public static org.apache.hadoop.conf.Configuration getHadoopConfiguration() { if (new File(possibleHadoopConfPath).exists()) { if (new File(possibleHadoopConfPath + "/core-site.xml").exists()) { retConf.addResource(new org.apache.hadoop.fs.Path(possibleHadoopConfPath + "/core-site.xml")); - - if (LOG.isDebugEnabled()) { - LOG.debug("Adding " + possibleHadoopConfPath + "/core-site.xml to hadoop configuration"); - } + } else { + LOG.debug("File " + possibleHadoopConfPath + "/core-site.xml not found."); } + if (new File(possibleHadoopConfPath + "/hdfs-site.xml").exists()) { retConf.addResource(new org.apache.hadoop.fs.Path(possibleHadoopConfPath + "/hdfs-site.xml")); - - if (LOG.isDebugEnabled()) { - LOG.debug("Adding " + possibleHadoopConfPath + "/hdfs-site.xml to hadoop configuration"); - } + } else { + LOG.debug("File " + possibleHadoopConfPath + "/hdfs-site.xml not found."); } } } From 98c63af63036178e9c8423b99360f42d18b2e388 Mon Sep 17 00:00:00 2001 From: Ufuk Celebi Date: Tue, 29 Nov 2016 17:14:23 +0100 Subject: [PATCH 07/10] [FLINK-5192] [logging] Improve log config templates --- .../src/main/flink-bin/conf/log4j.properties | 14 +++++++++- .../src/main/flink-bin/conf/logback.xml | 28 ++++++++++++++++++- 2 files changed, 40 insertions(+), 2 deletions(-) diff --git a/flink-dist/src/main/flink-bin/conf/log4j.properties b/flink-dist/src/main/flink-bin/conf/log4j.properties index 97ec653dbd7bb..8e00ce369d514 100644 --- a/flink-dist/src/main/flink-bin/conf/log4j.properties +++ b/flink-dist/src/main/flink-bin/conf/log4j.properties @@ -16,8 +16,20 @@ # limitations under the License. ################################################################################ +# This affects logging for both user code and Flink log4j.rootLogger=INFO, file +# Uncomment this if you want to _only_ change Flink's logging +#log4j.logger.org.apache.flink=INFO + +# The following lines keep the log level of common libraries/connectors on +# log level INFO. The root logger does not override this. You have to manually +# change the log levels here. +log4j.logger.akka=INFO +log4j.logger.org.apache.kafka=INFO +log4j.logger.org.apache.hadoop=INFO +log4j.logger.org.apache.zookeeper=INFO + # Log all infos in the given file log4j.appender.file=org.apache.log4j.FileAppender log4j.appender.file.file=${log.file} @@ -25,5 +37,5 @@ log4j.appender.file.append=false log4j.appender.file.layout=org.apache.log4j.PatternLayout log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n -# suppress the irrelevant (wrong) warnings from the netty channel handler +# Suppress the irrelevant (wrong) warnings from the Netty channel handler log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, file diff --git a/flink-dist/src/main/flink-bin/conf/logback.xml b/flink-dist/src/main/flink-bin/conf/logback.xml index 1147a70c71cff..f3c433105a7f9 100644 --- a/flink-dist/src/main/flink-bin/conf/logback.xml +++ b/flink-dist/src/main/flink-bin/conf/logback.xml @@ -25,8 +25,34 @@ + - + + + + + + + + + + + + + + + + + + + + + + + + From a80538d0f978e82a81746e9af806691dce1c52eb Mon Sep 17 00:00:00 2001 From: Ufuk Celebi Date: Wed, 30 Nov 2016 16:09:54 +0100 Subject: [PATCH 08/10] Log FlinkUntypedActor at TRACE --- .../flink/runtime/akka/FlinkUntypedActor.java | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/FlinkUntypedActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/FlinkUntypedActor.java index 5100d172f1b1f..3255778fe95d6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/FlinkUntypedActor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/FlinkUntypedActor.java @@ -19,10 +19,8 @@ package org.apache.flink.runtime.akka; import akka.actor.UntypedActor; - import org.apache.flink.runtime.messages.JobManagerMessages.LeaderSessionMessage; import org.apache.flink.runtime.messages.RequiresLeaderSessionID; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,7 +38,7 @@ * a leader session ID option which is returned by getLeaderSessionID. */ public abstract class FlinkUntypedActor extends UntypedActor { - + protected final Logger LOG = LoggerFactory.getLogger(getClass()); /** @@ -56,16 +54,16 @@ public abstract class FlinkUntypedActor extends UntypedActor { */ @Override public final void onReceive(Object message) throws Exception { - if(LOG.isDebugEnabled()) { - LOG.debug("Received message {} at {} from {}.", message, getSelf().path(), getSender()); + if(LOG.isTraceEnabled()) { + LOG.trace("Received message {} at {} from {}.", message, getSelf().path(), getSender()); long start = System.nanoTime(); handleLeaderSessionID(message); - long duration = (System.nanoTime() - start)/ 1000000; + long duration = (System.nanoTime() - start)/ 1_000_000; - LOG.debug("Handled message {} in {} ms from {}.", message, duration, getSender()); + LOG.trace("Handled message {} in {} ms from {}.", message, duration, getSender()); } else { handleLeaderSessionID(message); } @@ -81,7 +79,7 @@ public final void onReceive(Object message) throws Exception { * @throws Exception */ private void handleLeaderSessionID(Object message) throws Exception { - if(message instanceof LeaderSessionMessage) { + if (message instanceof LeaderSessionMessage) { LeaderSessionMessage msg = (LeaderSessionMessage) message; UUID expectedID = getLeaderSessionID(); UUID actualID = msg.leaderSessionID(); From ced8b90f510eb6c0020d80bc8fea39800959a4b6 Mon Sep 17 00:00:00 2001 From: Ufuk Celebi Date: Wed, 30 Nov 2016 16:12:52 +0100 Subject: [PATCH 09/10] Use placeholders HadoopFileSystem logs --- .../runtime/fs/hdfs/HadoopFileSystem.java | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java index 008ecf80f221d..f747112dfa688 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java @@ -97,7 +97,7 @@ private Class getDefaultHDFSClass() t fsClass = ((Class) fsHandle).asSubclass(org.apache.hadoop.fs.FileSystem.class); if (LOG.isDebugEnabled()) { - LOG.debug("Loaded '" + fsClass.getName() + "' as HDFS class."); + LOG.debug("Loaded '{}' as HDFS class.", fsClass.getName()); } } else { @@ -112,8 +112,8 @@ private Class getDefaultHDFSClass() t { // first of all, check for a user-defined hdfs class if (LOG.isDebugEnabled()) { - LOG.debug("Falling back to loading HDFS class old Hadoop style. Looking for HDFS class configuration entry '" - + HDFS_IMPLEMENTATION_KEY + "'."); + LOG.debug("Falling back to loading HDFS class old Hadoop style. Looking for HDFS class configuration entry '{}'.", + HDFS_IMPLEMENTATION_KEY); } Class classFromConfig = conf.getClass(HDFS_IMPLEMENTATION_KEY, null); @@ -124,12 +124,12 @@ private Class getDefaultHDFSClass() t fsClass = classFromConfig.asSubclass(org.apache.hadoop.fs.FileSystem.class); if (LOG.isDebugEnabled()) { - LOG.debug("Loaded HDFS class '" + fsClass.getName() + "' as specified in configuration."); + LOG.debug("Loaded HDFS class '{}' as specified in configuration.", fsClass.getName() ); } } else { if (LOG.isDebugEnabled()) { - LOG.debug("HDFS class specified by " + HDFS_IMPLEMENTATION_KEY + " is of wrong type."); + LOG.debug("HDFS class specified by {} is of wrong type.", HDFS_IMPLEMENTATION_KEY); } throw new IOException("HDFS class specified by " + HDFS_IMPLEMENTATION_KEY + @@ -139,7 +139,7 @@ private Class getDefaultHDFSClass() t else { // load the default HDFS class if (LOG.isDebugEnabled()) { - LOG.debug("Trying to load default HDFS implementation " + DEFAULT_HDFS_CLASS); + LOG.debug("Trying to load default HDFS implementation {}.", DEFAULT_HDFS_CLASS); } try { @@ -210,13 +210,13 @@ public static org.apache.hadoop.conf.Configuration getHadoopConfiguration() { if (new File(possibleHadoopConfPath + "/core-site.xml").exists()) { retConf.addResource(new org.apache.hadoop.fs.Path(possibleHadoopConfPath + "/core-site.xml")); } else { - LOG.debug("File " + possibleHadoopConfPath + "/core-site.xml not found."); + LOG.debug("File {}/core-site.xml not found.", possibleHadoopConfPath); } if (new File(possibleHadoopConfPath + "/hdfs-site.xml").exists()) { retConf.addResource(new org.apache.hadoop.fs.Path(possibleHadoopConfPath + "/hdfs-site.xml")); } else { - LOG.debug("File " + possibleHadoopConfPath + "/hdfs-site.xml not found."); + LOG.debug("File {}/hdfs-site.xml not found.", possibleHadoopConfPath); } } } @@ -280,7 +280,7 @@ public void initialize(URI path) throws IOException { } if (LOG.isDebugEnabled()) { - LOG.debug("fs.defaultFS is set to " + configEntry); + LOG.debug("fs.defaultFS is set to {}", configEntry); } if (configEntry == null) { @@ -455,7 +455,7 @@ public Class getHadoopWrapperClassNameForFileSystem(String scheme) { clazz = hadoopConf.getClass("fs." + scheme + ".impl", null, org.apache.hadoop.fs.FileSystem.class); if(clazz != null && LOG.isDebugEnabled()) { - LOG.debug("Flink supports "+scheme+" with the Hadoop file system wrapper, impl "+clazz); + LOG.debug("Flink supports {} with the Hadoop file system wrapper, impl {}", scheme, clazz); } return clazz; } From 2b7bcf136ba6f7e50627832fd55d08e6e9455a1e Mon Sep 17 00:00:00 2001 From: Ufuk Celebi Date: Wed, 30 Nov 2016 16:13:28 +0100 Subject: [PATCH 10/10] Use getStateSize in TaskState --- .../org/apache/flink/runtime/checkpoint/TaskState.java | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java index 1c46812cdb87e..14f8caab3c847 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java @@ -171,18 +171,13 @@ public int hashCode() { @Override public String toString() { - long stateSize = 0L; - for (SubtaskState subtaskState : subtaskStates.values()) { - stateSize += subtaskState.getStateSize(); - } - // KvStates are always null in 1.1. Don't print this as it might // confuse users that don't care about how we store it internally. return "TaskState(" + "jobVertexID: " + jobVertexID + ", parallelism: " + parallelism + ", sub task states: " + subtaskStates.size() + - ", total size (bytes): " + stateSize + + ", total size (bytes): " + getStateSize() + ')'; } }