/
ScheduledReporter.java
294 lines (264 loc) · 11.4 KB
/
ScheduledReporter.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
package io.dropwizard.metrics5;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.util.Collections;
import java.util.Locale;
import java.util.Set;
import java.util.SortedMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
* The abstract base class for all scheduled reporters (i.e., reporters which process a registry's
* metrics periodically).
*
* @see ConsoleReporter
* @see CsvReporter
* @see Slf4jReporter
*/
public abstract class ScheduledReporter implements Closeable, Reporter {
private static final Logger LOG = LoggerFactory.getLogger(ScheduledReporter.class);
/**
* A simple named thread factory.
*/
@SuppressWarnings("NullableProblems")
private static class NamedThreadFactory implements ThreadFactory {
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
private NamedThreadFactory(String name) {
final SecurityManager s = System.getSecurityManager();
this.group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
this.namePrefix = "metrics-" + name + "-thread-";
}
@Override
public Thread newThread(Runnable r) {
final Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
t.setDaemon(true);
if (t.getPriority() != Thread.NORM_PRIORITY) {
t.setPriority(Thread.NORM_PRIORITY);
}
return t;
}
}
private static final AtomicInteger FACTORY_ID = new AtomicInteger();
private final MetricRegistry registry;
private final ScheduledExecutorService executor;
private final boolean shutdownExecutorOnStop;
private final Set<MetricAttribute> disabledMetricAttributes;
private ScheduledFuture<?> scheduledFuture;
private final MetricFilter filter;
private final long durationFactor;
private final String durationUnit;
private final long rateFactor;
private final String rateUnit;
/**
* Creates a new {@link ScheduledReporter} instance.
*
* @param registry the {@link io.dropwizard.metrics5.MetricRegistry} containing the metrics this
* reporter will report
* @param name the reporter's name
* @param filter the filter for which metrics to report
* @param rateUnit a unit of time
* @param durationUnit a unit of time
*/
protected ScheduledReporter(MetricRegistry registry,
String name,
MetricFilter filter,
TimeUnit rateUnit,
TimeUnit durationUnit) {
this(registry, name, filter, rateUnit, durationUnit, createDefaultExecutor(name));
}
/**
* Creates a new {@link ScheduledReporter} instance.
*
* @param registry the {@link io.dropwizard.metrics5.MetricRegistry} containing the metrics this
* reporter will report
* @param name the reporter's name
* @param filter the filter for which metrics to report
* @param executor the executor to use while scheduling reporting of metrics.
*/
protected ScheduledReporter(MetricRegistry registry,
String name,
MetricFilter filter,
TimeUnit rateUnit,
TimeUnit durationUnit,
ScheduledExecutorService executor) {
this(registry, name, filter, rateUnit, durationUnit, executor, true);
}
/**
* Creates a new {@link ScheduledReporter} instance.
*
* @param registry the {@link io.dropwizard.metrics5.MetricRegistry} containing the metrics this
* reporter will report
* @param name the reporter's name
* @param filter the filter for which metrics to report
* @param executor the executor to use while scheduling reporting of metrics.
* @param shutdownExecutorOnStop if true, then executor will be stopped in same time with this reporter
*/
protected ScheduledReporter(MetricRegistry registry,
String name,
MetricFilter filter,
TimeUnit rateUnit,
TimeUnit durationUnit,
ScheduledExecutorService executor,
boolean shutdownExecutorOnStop) {
this(registry, name, filter, rateUnit, durationUnit, executor, shutdownExecutorOnStop, Collections.emptySet());
}
protected ScheduledReporter(MetricRegistry registry,
String name,
MetricFilter filter,
TimeUnit rateUnit,
TimeUnit durationUnit,
ScheduledExecutorService executor,
boolean shutdownExecutorOnStop,
Set<MetricAttribute> disabledMetricAttributes) {
this.registry = registry;
this.filter = filter;
this.executor = executor == null ? createDefaultExecutor(name) : executor;
this.shutdownExecutorOnStop = shutdownExecutorOnStop;
this.rateFactor = rateUnit.toSeconds(1);
this.rateUnit = calculateRateUnit(rateUnit);
this.durationFactor = durationUnit.toNanos(1);
this.durationUnit = durationUnit.toString().toLowerCase(Locale.US);
this.disabledMetricAttributes = disabledMetricAttributes != null ? disabledMetricAttributes :
Collections.emptySet();
}
/**
* Starts the reporter polling at the given period.
*
* @param period the amount of time between polls
* @param unit the unit for {@code period}
*/
public void start(long period, TimeUnit unit) {
start(period, period, unit);
}
/**
* Starts the reporter polling at the given period with the specific runnable action.
* Visible only for testing.
*/
synchronized void start(long initialDelay, long period, TimeUnit unit, Runnable runnable) {
if (this.scheduledFuture != null) {
throw new IllegalArgumentException("Reporter already started");
}
this.scheduledFuture = executor.scheduleAtFixedRate(runnable, initialDelay, period, unit);
}
/**
* Starts the reporter polling at the given period.
*
* @param initialDelay the time to delay the first execution
* @param period the amount of time between polls
* @param unit the unit for {@code period} and {@code initialDelay}
*/
synchronized public void start(long initialDelay, long period, TimeUnit unit) {
start(initialDelay, period, unit, () -> {
try {
report();
} catch (Throwable ex) {
LOG.error("Exception thrown from {}#report. Exception was suppressed.", ScheduledReporter.this.getClass().getSimpleName(), ex);
}
});
}
/**
* Stops the reporter and if shutdownExecutorOnStop is true then shuts down its thread of execution.
* <p>
* Uses the shutdown pattern from http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorService.html
*/
public void stop() {
if (shutdownExecutorOnStop) {
executor.shutdown(); // Disable new tasks from being submitted
try {
// Wait a while for existing tasks to terminate
if (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
executor.shutdownNow(); // Cancel currently executing tasks
// Wait a while for tasks to respond to being cancelled
if (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
System.err.println(getClass().getSimpleName() + ": ScheduledExecutorService did not terminate");
}
}
} catch (InterruptedException ie) {
// (Re-)Cancel if current thread also interrupted
executor.shutdownNow();
// Preserve interrupt status
Thread.currentThread().interrupt();
}
} else {
// The external manager(like JEE container) responsible for lifecycle of executor
synchronized (this) {
if (this.scheduledFuture == null) {
// was never started
return;
}
if (this.scheduledFuture.isCancelled()) {
// already cancelled
return;
}
// just cancel the scheduledFuture and exit
this.scheduledFuture.cancel(false);
}
}
}
/**
* Stops the reporter and shuts down its thread of execution.
*/
@Override
public void close() {
stop();
}
/**
* Report the current values of all metrics in the registry.
*/
public void report() {
synchronized (this) {
report(registry.getGauges(filter),
registry.getCounters(filter),
registry.getHistograms(filter),
registry.getMeters(filter),
registry.getTimers(filter));
}
}
/**
* Called periodically by the polling thread. Subclasses should report all the given metrics.
*
* @param gauges all of the gauges in the registry
* @param counters all of the counters in the registry
* @param histograms all of the histograms in the registry
* @param meters all of the meters in the registry
* @param timers all of the timers in the registry
*/
@SuppressWarnings("rawtypes")
public abstract void report(SortedMap<MetricName, Gauge> gauges,
SortedMap<MetricName, Counter> counters,
SortedMap<MetricName, Histogram> histograms,
SortedMap<MetricName, Meter> meters,
SortedMap<MetricName, Timer> timers);
public String getRateUnit() {
return rateUnit;
}
public String getDurationUnit() {
return durationUnit;
}
public double convertDuration(double duration) {
return duration / durationFactor;
}
public double convertRate(double rate) {
return rate * rateFactor;
}
public boolean isShutdownExecutorOnStop() {
return shutdownExecutorOnStop;
}
public Set<MetricAttribute> getDisabledMetricAttributes() {
return disabledMetricAttributes;
}
private String calculateRateUnit(TimeUnit unit) {
final String s = unit.toString().toLowerCase(Locale.US);
return s.substring(0, s.length() - 1);
}
private static ScheduledExecutorService createDefaultExecutor(String name) {
return Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory(name + '-' + FACTORY_ID.incrementAndGet()));
}
}