Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ class JmxReporter(server: MBeanServer) extends MetricsReporter with Logging {
def registerBean(bean: MetricMBean) {
if (!server.isRegistered(bean.objectName)) {
debug("Registering MBean for %s." format bean.objectName)
server.registerMBean(bean, bean.objectName);
server.registerMBean(bean, bean.objectName)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,65 +19,49 @@

package org.apache.samza.metrics.reporter

import org.junit.Assert._
import org.junit.AfterClass
import org.junit.BeforeClass
import org.junit.Test
import org.apache.samza.task.TaskContext
import org.apache.samza.metrics.MetricsRegistryMap
import org.apache.samza.config.MapConfig
import org.apache.samza.metrics.JvmMetrics

import java.lang.management.ManagementFactory
import java.rmi.registry.LocateRegistry

import javax.management.MBeanServer
import javax.management.ObjectName
import javax.management.remote.JMXServiceURL
import javax.management.remote.JMXConnectorServerFactory
import javax.management.remote.JMXConnectorServer
import javax.management.remote.JMXConnectorFactory
import junit.framework.Assert.assertEquals
import org.apache.samza.metrics.JmxUtil
import org.apache.samza.metrics.MetricsRegistryMap
import org.junit.Test
import org.mockito.Matchers
import org.mockito.Mockito.mock
import org.mockito.Mockito.times
import org.mockito.Mockito.verify
import org.mockito.Mockito.when
import org.mockito.Mockito.reset

import scala.collection.JavaConverters._
class TestJmxReporter {

object TestJmxReporter {
val port = 4500
val url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://localhost:%d/jmxapitestrmi" format port)
var cs: JMXConnectorServer = null
val REPORTER_SOURCE = "test"

@BeforeClass
def beforeSetupServers {
LocateRegistry.createRegistry(port)
val mbs = ManagementFactory.getPlatformMBeanServer()
cs = JMXConnectorServerFactory.newJMXConnectorServer(url, null, mbs)
cs.start
}
@Test
def testJmxReporter {
val metricGroup = "org.apache.samza.metrics.JvmMetrics"
val metricName = "mem-non-heap-used-mb"
val objectName: ObjectName = JmxUtil.getObjectName(metricGroup, metricName, REPORTER_SOURCE)

@AfterClass
def afterCleanLogDirs {
if (cs != null) {
cs.stop
}
}
}
val registry: MetricsRegistryMap = new MetricsRegistryMap
val mBeanServerMock: MBeanServer = mock(classOf[MBeanServer])

class TestJmxReporter {
import TestJmxReporter.url
// Create dummy test metric.
registry.newCounter(metricGroup, metricName)

// TODO: Fix in SAMZA-1196
//@Test
def testJmxReporter {
val registry = new MetricsRegistryMap
val jvm = new JvmMetrics(registry)
val reporter = new JmxReporterFactory().getMetricsReporter("", "", new MapConfig(Map[String, String]().asJava))
when(mBeanServerMock.isRegistered(objectName)).thenReturn(false)

reporter.register("test", registry)
val reporter = new JmxReporter(mBeanServerMock)
reporter.register(REPORTER_SOURCE, registry)
reporter.start
jvm.run

val mbserver = JMXConnectorFactory.connect(url).getMBeanServerConnection
val stateViaJMX = mbserver.getAttribute(new ObjectName("org.apache.samza.metrics.JvmMetrics:type=test,name=mem-non-heap-used-mb"), "Value").asInstanceOf[Float]
verify(mBeanServerMock, times(1)).registerMBean(Matchers.anyObject(), Matchers.eq(objectName))

reset(mBeanServerMock)
// Create dummy counter to test metrics reporting through listener.
registry.newCounter(metricGroup, metricName)

assertTrue(stateViaJMX > 0)
assertEquals(1, registry.listeners.size)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not necessary, this is registry internal state.

verify(mBeanServerMock, times(1)).registerMBean(Matchers.anyObject(), Matchers.eq(objectName))

reporter.stop
}
Expand Down