/
AbstractSlidingWindowTimeReservoir.java
257 lines (230 loc) · 10.9 KB
/
AbstractSlidingWindowTimeReservoir.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
/*
* Copyright (c) 2015, 2019 Oracle and/or its affiliates. All rights reserved.
* Copyright 2010, 2013 Coda Hale and Yammer, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.glassfish.jersey.server.internal.monitoring.core;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import static org.glassfish.jersey.server.internal.monitoring.core.ReservoirConstants.COLLISION_BUFFER;
import static org.glassfish.jersey.server.internal.monitoring.core.ReservoirConstants.COLLISION_BUFFER_POWER;
/**
* An abstract {@link TimeReservoir} implementation backed by a sliding window that stores only the measurements made in the last
* {@code N} seconds (or other startTime unit) and allows an update with data that happened in past (which is what makes it
* different from Dropwizard's Metrics SlidingTimeWindowReservoir.
* <p/>
* The snapshot this reservoir returns has limitations as mentioned in {@link TimeReservoir}.
* <p/>
* This reservoir is capable to store up to 2^{@link ReservoirConstants#COLLISION_BUFFER_POWER}, that is 256, in a granularity of
* nanoseconds. In other words, up to 256 values that occurred at the same nanosecond can be stored in this reservoir. For
* particular nanosecond, if the collision buffer exceeds, newly added values are thrown away.
*
* @param <V> The type of values to store in this sliding window reservoir
* @author Stepan Vavra
* @see <pre><a href="https://github.com/dropwizard/metrics/blob/master/metrics-core/src/main/java/io/dropwizard/metrics
* /SlidingTimeWindowReservoir.java">Dropwizard's
* Metrics SlidingTimeWindowReservoir</a></pre>
*/
public abstract class AbstractSlidingWindowTimeReservoir<V> implements TimeReservoir<V> {
private final ConcurrentNavigableMap<Long, V> measurements;
private final long window;
private final AtomicLong greatestTick;
private final AtomicLong updateCount;
private final AtomicLong startTick;
private final AtomicInteger trimOff;
private final SlidingWindowTrimmer<V> trimmer;
private final long interval;
private final TimeUnit intervalUnit;
/**
* Creates a new {@link SlidingWindowTimeReservoir} with the start time and window of startTime.
*
* @param window The window of startTime
* @param windowUnit The unit of {@code window}
* @param startTime The start time from which this reservoir calculates measurements
* @param startTimeUnit The start time unit
*/
public AbstractSlidingWindowTimeReservoir(final long window,
final TimeUnit windowUnit,
final long startTime,
final TimeUnit startTimeUnit) {
this(window, windowUnit, startTime, startTimeUnit, null);
}
/**
* Creates a new base sliding time window reservoir with the start time and a specified time window.
*
* @param window The window of startTime.
* @param windowUnit The unit of {@code window}.
* @param startTime The start time from which this reservoir calculates measurements.
* @param startTimeUnit The start time unit.
* @param trimmer The trimmer to use for trimming, if {@code null}, default trimmer is used.
*/
@SuppressWarnings("unchecked")
public AbstractSlidingWindowTimeReservoir(final long window,
final TimeUnit windowUnit,
final long startTime,
final TimeUnit startTimeUnit,
final SlidingWindowTrimmer<V> trimmer) {
this.trimmer = trimmer != null ? trimmer : (SlidingWindowTrimmer<V>) DefaultSlidingWindowTrimmerHolder.INSTANCE;
this.measurements = new ConcurrentSkipListMap<>();
this.interval = window;
this.intervalUnit = windowUnit;
this.window = windowUnit.toNanos(window) << COLLISION_BUFFER_POWER;
this.startTick = new AtomicLong(tick(startTime, startTimeUnit));
this.greatestTick = new AtomicLong(startTick.get());
this.updateCount = new AtomicLong(0);
this.trimOff = new AtomicInteger(0);
this.trimmer.setTimeReservoir(this);
}
@Override
public int size(long time, TimeUnit timeUnit) {
conditionallyUpdateGreatestTick(tick(time, timeUnit));
trim();
return measurements.size();
}
@Override
public void update(V value, long time, TimeUnit timeUnit) {
if (updateCount.incrementAndGet() % ReservoirConstants.TRIM_THRESHOLD == 0) {
trim();
}
long tick = tick(time, timeUnit);
for (int i = 0; i < COLLISION_BUFFER; ++i) {
if (measurements.putIfAbsent(tick, value) == null) {
conditionallyUpdateGreatestTick(tick);
return;
}
// increase the tick, there should be up to COLLISION_BUFFER empty slots
// where to put the value for given 'time'
// if empty slot is not found, throw it away as we're getting inaccurate statistics anyway
tick++;
}
}
@Override
public long interval(final TimeUnit timeUnit) {
return timeUnit.convert(interval, intervalUnit);
}
private long conditionallyUpdateGreatestTick(final long tick) {
while (true) {
final long currentGreatestTick = greatestTick.get();
if (tick <= currentGreatestTick) {
// the tick is too small, return the greatest one
return currentGreatestTick;
}
if (greatestTick.compareAndSet(currentGreatestTick, tick)) {
// successfully updated greatestTick with the tick
return tick;
}
}
}
/**
* Updates the startTick in case that the sliding window was created AFTER the time of a value that updated this window.
*
* @param firstEntry The first entry of the windowed measurments
*/
private void conditionallyUpdateStartTick(final Map.Entry<Long, V> firstEntry) {
final Long firstEntryKey = firstEntry != null ? firstEntry.getKey() : null;
if (firstEntryKey != null && firstEntryKey < startTick.get()) {
while (true) {
final long expectedStartTick = startTick.get();
if (startTick.compareAndSet(expectedStartTick, firstEntryKey)) {
return;
}
}
}
}
/**
* Subclasses are required to instantiate {@link UniformTimeSnapshot} on their own.
*
* @param values The values to create the snapshot from
* @param timeInterval The time interval this snapshot conforms to
* @param timeIntervalUnit The interval unit of the time interval
* @param time The time of the request of the snapshot
* @param timeUnit The unit of the time of the snapshot request
* @return The snapshot
*/
protected abstract UniformTimeSnapshot snapshot(final Collection<V> values,
final long timeInterval,
final TimeUnit timeIntervalUnit,
final long time,
final TimeUnit timeUnit);
@Override
public UniformTimeSnapshot getSnapshot(long time, TimeUnit timeUnit) {
trimOff.incrementAndGet();
final long baselineTick = conditionallyUpdateGreatestTick(tick(time, timeUnit));
try {
// now, with the 'baselineTick' we can be sure that no trim will be performed
// we just cannot guarantee that 'time' will correspond with the 'baselineTick' which is what the API warns about
final ConcurrentNavigableMap<Long, V> windowMap = measurements
.subMap((roundTick(baselineTick)) - window, true, baselineTick, true);
// if the first update came with value lower that the 'startTick' we need to extend the window size so that the
// calculation depending on the actual measured interval is not unnecessary boosted
conditionallyUpdateStartTick(windowMap.firstEntry());
// calculate the actual measured interval
final long measuredTickInterval = Math.min(baselineTick - startTick.get(), window);
return snapshot(windowMap.values(), measuredTickInterval >> COLLISION_BUFFER_POWER,
TimeUnit.NANOSECONDS, time, timeUnit);
} finally {
trimOff.decrementAndGet();
trim(baselineTick);
}
}
private long tick(long time, TimeUnit timeUnit) {
return timeUnit.toNanos(time) << COLLISION_BUFFER_POWER;
}
private void trim() {
trim(greatestTick.get());
}
private void trim(final long baselineTick) {
if (trimEnabled()) {
final long key = roundTick(baselineTick) - window;
trimmer.trim(measurements, key);
}
}
private boolean trimEnabled() {
return trimOff.get() == 0;
}
/**
* The purpose of this method is to deal with the fact that data for the same nanosecond can be distributed in an interval
* [0,256). By rounding the tick, we get the tick to which all the other ticks from the same interval belong.
*
* @param tick The tick
* @return The rounded tick
*/
private long roundTick(final long tick) {
// tick / COLLISION_BUFFER * COLLISION_BUFFER
return tick >> COLLISION_BUFFER_POWER << COLLISION_BUFFER_POWER;
}
/**
* The holder of the lazy loaded instance of the default trimmer.
*/
private static final class DefaultSlidingWindowTrimmerHolder {
/**
* The default instance of sliding window trimmer.
*/
static final SlidingWindowTrimmer<Object> INSTANCE = new SlidingWindowTrimmer<Object>() {
@Override
public void trim(final ConcurrentNavigableMap<Long, Object> map, final long key) {
map.headMap(key).clear();
}
@Override
public void setTimeReservoir(final TimeReservoir<Object> reservoir) {
// not used
}
};
}
}