From c9da7ec0484bd1a644a79e4c4ea20499ee8c5cd6 Mon Sep 17 00:00:00 2001 From: Erik Kringen Date: Wed, 26 Jul 2017 12:18:52 -0500 Subject: [PATCH] KAFKA-5656: Support bulk attributes request on KafkaMbean where some attributes do not exist --- .../kafka/common/metrics/JmxReporter.java | 14 +- .../kafka/common/metrics/KafkaMbeanTest.java | 170 ++++++++++++++++++ 2 files changed, 177 insertions(+), 7 deletions(-) create mode 100644 clients/src/test/java/org/apache/kafka/common/metrics/KafkaMbeanTest.java diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java b/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java index 67dfaa80ddb5e..001d6fe1ae144 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java @@ -191,15 +191,15 @@ public Object getAttribute(String name) throws AttributeNotFoundException, MBean @Override public AttributeList getAttributes(String[] names) { - try { - AttributeList list = new AttributeList(); - for (String name : names) + AttributeList list = new AttributeList(); + for (String name : names) { + try { list.add(new Attribute(name, getAttribute(name))); - return list; - } catch (Exception e) { - log.error("Error getting JMX attribute: ", e); - return new AttributeList(); + } catch (Exception e) { + log.warn("Error getting JMX attribute: ", e); + } } + return list; } public KafkaMetric removeAttribute(String name) { diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/KafkaMbeanTest.java b/clients/src/test/java/org/apache/kafka/common/metrics/KafkaMbeanTest.java new file mode 100644 index 0000000000000..2fa5a05e3df24 --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/common/metrics/KafkaMbeanTest.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.metrics; + +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.metrics.stats.Count; +import org.apache.kafka.common.metrics.stats.Sum; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import javax.management.Attribute; +import javax.management.AttributeList; +import javax.management.AttributeNotFoundException; +import javax.management.MBeanServer; +import javax.management.ObjectName; +import javax.management.RuntimeMBeanException; +import java.lang.management.ManagementFactory; +import java.util.List; + +import static org.hamcrest.core.IsInstanceOf.instanceOf; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; + +public class KafkaMbeanTest { + + private MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer(); + private Sensor sensor; + private MetricName countMetricName; + private MetricName sumMetricName; + private Metrics metrics; + + @Before + public void setup() throws Exception { + metrics = new Metrics(); + metrics.addReporter(new JmxReporter()); + sensor = metrics.sensor("kafka.requests"); + countMetricName = metrics.metricName("pack.bean1.count", "grp1"); + sensor.add(countMetricName, new Count()); + sumMetricName = metrics.metricName("pack.bean1.sum", "grp1"); + sensor.add(sumMetricName, new Sum()); + } + + @After + public void tearDown() { + metrics.close(); + } + + @Test + public void testGetAttribute() throws Exception { + sensor.record(2.5); + Object counterAttribute = getAttribute(countMetricName); + assertEquals(1.0, counterAttribute); + Object sumAttribute = getAttribute(sumMetricName); + assertEquals(2.5, sumAttribute); + } + + @Test + public void testGetAttributeUnknown() throws Exception { + sensor.record(2.5); + Object counterAttribute = getAttribute(countMetricName); + assertEquals(1.0, counterAttribute); + Object sumAttribute = getAttribute(sumMetricName); + assertEquals(2.5, sumAttribute); + try { + getAttribute(sumMetricName, "name"); + fail("Should have gotten attribute not found"); + } catch (AttributeNotFoundException e) { + // Expected + } + } + + @Test + public void testGetAttributes() throws Exception { + sensor.record(3.5); + sensor.record(4.0); + AttributeList attributeList = getAttributes(countMetricName, countMetricName.name(), sumMetricName.name()); + List attributes = attributeList.asList(); + assertEquals(2, attributes.size()); + for (Attribute attribute : attributes) { + if (countMetricName.name().equals(attribute.getName())) { + assertEquals(2.0, attribute.getValue()); + } else if (sumMetricName.name().equals(attribute.getName())) { + assertEquals(7.5, attribute.getValue()); + } else { + fail("Unexpected attribute returned: " + attribute.getName()); + } + } + } + + @Test + public void testGetAttributesWithUnknown() throws Exception { + sensor.record(3.5); + sensor.record(4.0); + AttributeList attributeList = getAttributes(countMetricName, countMetricName.name(), sumMetricName.name(), "name"); + List attributes = attributeList.asList(); + assertEquals(2, attributes.size()); + for (Attribute attribute : attributes) { + if (countMetricName.name().equals(attribute.getName())) { + assertEquals(2.0, attribute.getValue()); + } else if (sumMetricName.name().equals(attribute.getName())) { + assertEquals(7.5, attribute.getValue()); + } else { + fail("Unexpected attribute returned: " + attribute.getName()); + } + } + } + + @Test + public void testInvoke() throws Exception { + try { + mBeanServer.invoke(getObjectName(countMetricName), "something", null, null); + fail("invoke should have failed"); + } catch (RuntimeMBeanException e) { + assertThat(e.getCause(), instanceOf(UnsupportedOperationException.class)); + } + } + + @Test + public void testSetAttribute() throws Exception { + try { + mBeanServer.setAttribute(getObjectName(countMetricName), new Attribute("anything", 1)); + fail("setAttribute should have failed"); + } catch (RuntimeMBeanException e) { + assertThat(e.getCause(), instanceOf(UnsupportedOperationException.class)); + } + } + + @Test + public void testSetAttributes() throws Exception { + try { + mBeanServer.setAttributes(getObjectName(countMetricName), new AttributeList(1)); + fail("setAttributes should have failed"); + } catch (Exception e) { + assertThat(e.getCause(), instanceOf(UnsupportedOperationException.class)); + } + } + + private ObjectName getObjectName(MetricName metricName) throws Exception { + return new ObjectName(JmxReporter.getMBeanName("", metricName)); + } + + private Object getAttribute(MetricName metricName, String attribute) throws Exception { + return mBeanServer.getAttribute(getObjectName(metricName), attribute); + } + + private Object getAttribute(MetricName metricName) throws Exception { + return getAttribute(metricName, metricName.name()); + } + + private AttributeList getAttributes(MetricName metricName, String... attributes) throws Exception { + return mBeanServer.getAttributes(getObjectName(metricName), attributes); + } + +}