Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-3417: Wrap reporter calls in try/catch blocks #3635

Merged
merged 1 commit into from Apr 30, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
27 changes: 21 additions & 6 deletions clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
Expand Up @@ -524,8 +524,13 @@ public void addMetric(MetricName metricName, MetricValueProvider<?> metricValueP
public synchronized KafkaMetric removeMetric(MetricName metricName) {
KafkaMetric metric = this.metrics.remove(metricName);
if (metric != null) {
for (MetricsReporter reporter : reporters)
reporter.metricRemoval(metric);
for (MetricsReporter reporter : reporters) {
try {
reporter.metricRemoval(metric);
} catch (Exception e) {
log.error("Error when removing metric from " + reporter.getClass().getName(), e);
}
}
}
return metric;
}
Expand All @@ -552,8 +557,13 @@ synchronized void registerMetric(KafkaMetric metric) {
if (this.metrics.containsKey(metricName))
throw new IllegalArgumentException("A metric named '" + metricName + "' already exists, can't register another one.");
this.metrics.put(metricName, metric);
for (MetricsReporter reporter : reporters)
reporter.metricChange(metric);
for (MetricsReporter reporter : reporters) {
try {
reporter.metricChange(metric);
} catch (Exception e) {
log.error("Error when registering metric on " + reporter.getClass().getName(), e);
}
}
}

/**
Expand Down Expand Up @@ -634,8 +644,13 @@ public void close() {
}
}

for (MetricsReporter reporter : this.reporters)
reporter.close();
for (MetricsReporter reporter : reporters) {
try {
reporter.close();
} catch (Exception e) {
log.error("Error when closing " + reporter.getClass().getName(), e);
}
}
}

}
12 changes: 12 additions & 0 deletions core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
Expand Up @@ -154,6 +154,18 @@ abstract class BaseRequestTest extends KafkaServerTestHarness {
skipResponseHeader(response)
}

/**
* Sends a request built by the builder, waits for the response and parses it
*/
def requestResponse(socket: Socket, clientId: String, correlationId: Int, requestBuilder: AbstractRequest.Builder[_ <: AbstractRequest]): Struct = {
val apiKey = requestBuilder.apiKey
val request = requestBuilder.build()
val header = new RequestHeader(apiKey, request.version, clientId, correlationId)
val response = requestAndReceive(socket, request.serialize(header).array)
val responseBuffer = skipResponseHeader(response)
apiKey.parseResponse(request.version, responseBuffer)
}

/**
* Serializes and sends the requestStruct to the given api.
* A ByteBuffer containing the response (without the response header) is returned.
Expand Down
@@ -0,0 +1,116 @@
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/

package kafka.server

import java.net.Socket
import java.util.Properties

import kafka.utils.TestUtils
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.requests.{ListGroupsRequest,ListGroupsResponse}
import org.apache.kafka.common.metrics.MetricsReporter
import org.apache.kafka.common.metrics.KafkaMetric
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.protocol.Errors

import org.junit.Assert._
import org.junit.{Before, Test}
import org.junit.After
import java.util.concurrent.atomic.AtomicInteger

/*
* this test checks that a reporter that throws an exception will not affect other reporters
* and will not affect the broker's message handling
*/
class KafkaMetricReporterExceptionHandlingTest extends BaseRequestTest {

override def numBrokers: Int = 1

override def propertyOverrides(properties: Properties): Unit = {
properties.put(KafkaConfig.MetricReporterClassesProp, classOf[KafkaMetricReporterExceptionHandlingTest.BadReporter].getName + "," + classOf[KafkaMetricReporterExceptionHandlingTest.GoodReporter].getName)
}

@Before
override def setUp() {
super.setUp()

// need a quota prop to register a "throttle-time" metrics after server startup
val quotaProps = new Properties()
quotaProps.put(DynamicConfig.Client.RequestPercentageOverrideProp, "0.1")
adminZkClient.changeClientIdConfig("<default>", quotaProps)
}

@After
override def tearDown() {
KafkaMetricReporterExceptionHandlingTest.goodReporterRegistered.set(0)
KafkaMetricReporterExceptionHandlingTest.badReporterRegistered.set(0)

super.tearDown()
}

@Test
def testBothReportersAreInvoked() {
val port = anySocketServer.boundPort(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))
val socket = new Socket("localhost", port)
socket.setSoTimeout(10000)

try {
TestUtils.retry(10000) {
val error = new ListGroupsResponse(requestResponse(socket, "clientId", 0, new ListGroupsRequest.Builder())).error()
assertEquals(Errors.NONE, error)
assertEquals(KafkaMetricReporterExceptionHandlingTest.goodReporterRegistered.get, KafkaMetricReporterExceptionHandlingTest.badReporterRegistered.get)
assertTrue(KafkaMetricReporterExceptionHandlingTest.goodReporterRegistered.get > 0)
}
} finally {
socket.close()
}
}
}

object KafkaMetricReporterExceptionHandlingTest {
var goodReporterRegistered = new AtomicInteger
var badReporterRegistered = new AtomicInteger

class GoodReporter extends MetricsReporter {

def configure(configs: java.util.Map[String, _]) {
}

def init(metrics: java.util.List[KafkaMetric]) {
}

def metricChange(metric: KafkaMetric) {
if (metric.metricName.group == "Request") {
goodReporterRegistered.incrementAndGet
}
}

def metricRemoval(metric: KafkaMetric) {
}

def close() {
}
}

class BadReporter extends GoodReporter {

override def metricChange(metric: KafkaMetric) {
if (metric.metricName.group == "Request") {
badReporterRegistered.incrementAndGet
throw new RuntimeException(metric.metricName.toString)
}
}
}
}
10 changes: 0 additions & 10 deletions core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
Expand Up @@ -14,7 +14,6 @@

package kafka.server

import java.net.Socket
import java.nio.ByteBuffer
import java.util.{Collections, LinkedHashMap, Properties}
import java.util.concurrent.{Executors, Future, TimeUnit}
Expand Down Expand Up @@ -331,15 +330,6 @@ class RequestQuotaTest extends BaseRequestTest {
}
}

private def requestResponse(socket: Socket, clientId: String, correlationId: Int, requestBuilder: AbstractRequest.Builder[_ <: AbstractRequest]): Struct = {
val apiKey = requestBuilder.apiKey
val request = requestBuilder.build()
val header = new RequestHeader(apiKey, request.version, clientId, correlationId)
val response = requestAndReceive(socket, request.serialize(header).array)
val responseBuffer = skipResponseHeader(response)
apiKey.parseResponse(request.version, responseBuffer)
}

case class Client(clientId: String, apiKey: ApiKeys) {
var correlationId: Int = 0
val builder = requestBuilder(apiKey)
Expand Down