From 4ad69909b71db6b8f28b8879bfc508e86b124af8 Mon Sep 17 00:00:00 2001 From: Dustin Cote Date: Tue, 6 Sep 2016 15:15:53 -0400 Subject: [PATCH 1/2] log ConnectException at WARN --- .../main/java/org/apache/kafka/common/network/Selector.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java index 524471010123..5fa13003f22a 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java +++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java @@ -13,6 +13,7 @@ package org.apache.kafka.common.network; import java.io.IOException; +import java.net.ConnectException; import java.net.InetSocketAddress; import java.net.Socket; import java.nio.channels.CancelledKeyException; @@ -361,7 +362,9 @@ private void pollSelectionKeys(Iterable selectionKeys, } catch (Exception e) { String desc = channel.socketDescription(); - if (e instanceof IOException) + if (e instanceof ConnectException) + log.warn("Cannot connect to {}", desc, e); + else if (e instanceof IOException) log.debug("Connection with {} disconnected", desc, e); else log.warn("Unexpected error from {}; closing connection", desc, e); From 2f87d842375e92cfd7854fa41cc0dbf0730af040 Mon Sep 17 00:00:00 2001 From: Dustin Cote Date: Mon, 12 Dec 2016 10:54:54 -0500 Subject: [PATCH 2/2] adding a log rate limiter utility --- .../apache/kafka/common/network/Selector.java | 13 ++++- .../kafka/common/utils/LogRateLimiter.java | 49 +++++++++++++++++++ 2 files changed, 60 insertions(+), 2 deletions(-) create mode 100644 clients/src/main/java/org/apache/kafka/common/utils/LogRateLimiter.java diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java index 5fa13003f22a..2e459693ee8d 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java +++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java @@ -43,6 +43,7 @@ import org.apache.kafka.common.metrics.stats.Count; import org.apache.kafka.common.metrics.stats.Max; import org.apache.kafka.common.metrics.stats.Rate; +import org.apache.kafka.common.utils.LogRateLimiter; import org.apache.kafka.common.utils.Time; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -98,6 +99,8 @@ public class Selector implements Selectable { private final int maxReceiveSize; private final boolean metricsPerConnection; private final IdleExpiryManager idleExpiryManager; + private long loggerCount = 0; + private final long loggerCountMax = 1000; /** * Create a new nioSelector @@ -362,8 +365,13 @@ private void pollSelectionKeys(Iterable selectionKeys, } catch (Exception e) { String desc = channel.socketDescription(); - if (e instanceof ConnectException) - log.warn("Cannot connect to {}", desc, e); + if (e instanceof ConnectException) { + if (LogRateLimiter.warn(log, "Cannot connect to {}", desc, e, + loggerCount, loggerCountMax)) + loggerCount++; + else + loggerCount = 0; + } else if (e instanceof IOException) log.debug("Connection with {} disconnected", desc, e); else @@ -371,6 +379,7 @@ else if (e instanceof IOException) close(channel); this.disconnected.add(channel.id()); } + loggerCount = 0; } } diff --git a/clients/src/main/java/org/apache/kafka/common/utils/LogRateLimiter.java b/clients/src/main/java/org/apache/kafka/common/utils/LogRateLimiter.java new file mode 100644 index 000000000000..4bf4cd62643a --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/utils/LogRateLimiter.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.utils; + +import org.slf4j.Logger; + +/** + * Wrapper methods to log spammy messages only occasionally + * Relies on the caller to provide basically everything and just does comparisons + * to keep calling code a bit cleaner + */ +public class LogRateLimiter { + + /** + * Log a warn level message with + * org.slf4j.Logger#warn(java.lang.String, java.lang.Object, java.lang.Object) + * @param logger logger to use + * @param format see ref class + * @param arg1 see ref class + * @param arg2 see ref class + * @param count number of times we've tried to log this message + * @param max maximum number of times we try to log the message before suggesting a count reset + * @return boolean indicating if the count should be reset or not + */ + public static boolean warn(Logger logger, String format, Object arg1, Object arg2, + long count, long max){ + if (count == 0) + logger.warn(format, arg1, arg2); + if (count > max) { + return true; + } + return false; + } +}