This repository has been archived by the owner. It is now read-only.
Permalink
Browse files

Inital checkin of major refactor of the reporting infrastructure.

-One reporter can have many registries
-Move from complex constructors to factories and builders
-Polling threadpool moved into reporter
  • Loading branch information...
gorzell committed Mar 28, 2012
1 parent 182a07d commit 035d233eea2d1b241925052c4c7dcd5a024c00ed
@@ -31,7 +31,7 @@ public MetricsRegistry(String name) {
public MetricsRegistry(String name, Clock clock) {
this.clock = clock;
this.metrics = newMetricsMap();
- this.threadPools = new ThreadPools();
+ this.threadPools = ThreadPools.getInstance();
@codahale

codahale Mar 28, 2012

This means all registries would share a single set of thread pools. When a registry gets shut down, how do we know when to turn out the lights on the threads?

@gorzell

gorzell Mar 28, 2012

Owner

I'll need to go back and look at this, but since they are named pools, it should be easy to do.

@codahale

codahale Mar 28, 2012

How?

I have two registries: A and B, and each have their own instances of FooReporter (aR and bR) each of which share a thread pool named bar. When A shuts down, how do we know not to shut down the shared thread pool? When B shuts down, how do we know to shut down the thread pool?

this.listeners = new CopyOnWriteArrayList<MetricsRegistryListener>();
this.name = name;
}
@@ -387,6 +387,7 @@ public void shutdown() {
* @param name the name of the pool
* @return a new {@link ScheduledExecutorService}
*/
+ @Deprecated
public ScheduledExecutorService newScheduledThreadPool(int poolSize, String name) {
return threadPools.newScheduledThreadPool(poolSize, name);
}
@@ -6,7 +6,7 @@
/**
* A manager class for a set of named thread pools.
*/
-class ThreadPools {
+public class ThreadPools {
/**
* A simple named thread factory.
*/
@@ -42,6 +42,13 @@ public Thread newThread(Runnable r) {
private final ConcurrentMap<String, ScheduledExecutorService> threadPools =
new ConcurrentHashMap<String, ScheduledExecutorService>(100);
+ private static ThreadPools ourInstance = new ThreadPools();
+
+ public static ThreadPools getInstance() {
+ return ourInstance;
+ }
+
+ private ThreadPools(){}
/**
* Creates a new scheduled thread pool of a given size with the given name, or returns an
* existing thread pool if one was already created with the same name.
@@ -50,7 +57,7 @@ public Thread newThread(Runnable r) {
* @param name the name of the pool
* @return a new {@link ScheduledExecutorService}
*/
- ScheduledExecutorService newScheduledThreadPool(int poolSize, String name) {
+ public ScheduledExecutorService newScheduledThreadPool(int poolSize, String name) {
final ScheduledExecutorService existing = threadPools.get(name);
if (isValidExecutor(existing)) {
return existing;
@@ -0,0 +1,57 @@
+/*
+ * Copyright (c) 2012. Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.yammer.metrics.reporting;
+
+import com.yammer.metrics.core.MetricsRegistry;
+import com.yammer.metrics.core.MetricsRegistryListener;
+
+import java.util.Set;
+
+/**
+ * User: gorzell
+ * Date: 3/23/12
+ * Time: 12:23 PM
+ */
+public abstract class AbstractDynamicReporter extends AbstractReporter implements MetricsRegistryListener {
+
+
+ protected AbstractDynamicReporter(MetricsRegistry registry){
+ super(registry);
+ }
+
+ protected AbstractDynamicReporter(Set<MetricsRegistry> registries){
+ super(registries);
+ }
+
+ /**
+ * Start the reporter by adding it as a listener on the registry
+ */
+ public void start() {
+ for (MetricsRegistry metricsRegistry : metricsRegistries) {
+ metricsRegistry.addListener(this);
+ }
+ }
+
+ /**
+ * Stops the reporter and closes any internal resources.
+ */
+ public void shutdown() {
+ for (MetricsRegistry metricsRegistry : metricsRegistries) {
+ metricsRegistry.removeListener(this);
+ }
+ }
+}
@@ -1,7 +1,10 @@
package com.yammer.metrics.reporting;
import com.yammer.metrics.core.MetricsRegistry;
+import com.yammer.metrics.core.ThreadPools;
+import java.util.Set;
+import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -10,28 +13,52 @@
* metrics (e.g., to send the data to another service).
*/
public abstract class AbstractPollingReporter extends AbstractReporter implements Runnable {
- private final ScheduledExecutorService executor;
+ static final long DEFAULT_PERIOD = 1;
+ static final TimeUnit DEFAULT_TIMEUNIT = TimeUnit.MINUTES;
+
+ protected ScheduledExecutorService executor;
+ protected long period;
+ protected TimeUnit unit;
/**
* Creates a new {@link AbstractPollingReporter} instance.
*
* @param registry the {@link MetricsRegistry} containing the metrics this reporter will
* report
- * @param name the reporter's name
* @see AbstractReporter#AbstractReporter(MetricsRegistry)
*/
- protected AbstractPollingReporter(MetricsRegistry registry, String name) {
+ protected AbstractPollingReporter(MetricsRegistry registry) {
super(registry);
- this.executor = registry.newScheduledThreadPool(1, name);
+ }
+
+ /**
+ * Creates a new {@link AbstractPollingReporter} instance.
+ *
+ * @param registries the {@link MetricsRegistry} containing the metrics this reporter will
+ * report
+ * @see AbstractReporter#AbstractReporter(MetricsRegistry)
+ */
+ protected AbstractPollingReporter(Set<MetricsRegistry> registries) {
+ super(registries);
+ }
+
+ void setName(String name){
+ this.executor = ThreadPools.getInstance().newScheduledThreadPool(1, name);
@codahale

codahale Mar 28, 2012

What happens when this doesn't get called?

@gorzell

gorzell Mar 28, 2012

Owner

NPE, this is something that i need to revisit.

+ }
+
+ public void setPeriod(long period) {
+ this.period = period;
+ }
+
+ public void setUnit(TimeUnit unit) {
+ this.unit = unit;
}
/**
* Starts the reporter polling at the given period.
*
- * @param period the amount of time between polls
- * @param unit the unit for {@code period}
*/
- public void start(long period, TimeUnit unit) {
+ public void start() {
executor.scheduleWithFixedDelay(this, period, period, unit);
}
@@ -51,7 +78,6 @@ public void shutdown(long timeout, TimeUnit unit) throws InterruptedException {
@Override
public void shutdown() {
executor.shutdown();
- super.shutdown();
}
/**
@@ -3,11 +3,15 @@
import com.yammer.metrics.core.MetricsRegistry;
import com.yammer.metrics.core.MetricsRegistryListener;
+import java.util.HashSet;
+import java.util.Set;
+
/**
* The base class for all metric reporters.
*/
-public abstract class AbstractReporter {
- private final MetricsRegistry metricsRegistry;
+public abstract class AbstractReporter implements Reporter{
+ //TODO Should probably make this a map =(
+ protected final Set<MetricsRegistry> metricsRegistries;
@codahale

codahale Mar 28, 2012

I think each MetricsRegistry should have a name.

@gorzell

gorzell Mar 28, 2012

Owner

Each registry does have a name, it's just inside of it. I am on the fence if you need provide access to the registries in a given reporter specifically by name. I am open to changing it to a map, just wondering what the use case is. Do people often add things to and remove them from reporters dynamically?

/**
* Creates a new {@link AbstractReporter} instance.
@@ -16,30 +20,29 @@
* report
*/
protected AbstractReporter(MetricsRegistry registry) {
- this.metricsRegistry = registry;
+ metricsRegistries = new HashSet<MetricsRegistry>(1);
+ metricsRegistries.add(registry);
}
- /**
- * Start the reporter by adding it as a listener on the registry
- */
- /*
- public void start() {
- metricsRegistry.addListener(this);
- }*/
+ protected AbstractReporter(Set<MetricsRegistry> registries) {
+ this.metricsRegistries = registries;
+ }
- /**
- * Stops the reporter and closes any internal resources.
- */
- public void shutdown() {
- //metricsRegistry.removeListener(this);
+ public Set<MetricsRegistry> getMetricsRegistries(){
+ return metricsRegistries;
}
/**
- * Returns the reporter's {@link MetricsRegistry}.
- *
- * @return the reporter's {@link MetricsRegistry}
+ * This is for backwards compatability, and should not generally be used.
+ * @return
*/
- protected MetricsRegistry getMetricsRegistry() {
- return metricsRegistry;
+ @Deprecated
+ public MetricsRegistry getMetricsRegistry(){
+ for (MetricsRegistry r : metricsRegistries){
+ return r;
+ }
+
+ //Should be impossible ot get here.
+ return null;
}
}
Oops, something went wrong.

1 comment on commit 035d233

I like it! The big points are the shared thread pools for all registries and the builder stuff.

I really appreciate the work you're putting into this.

Please sign in to comment.