-
Notifications
You must be signed in to change notification settings - Fork 521
/
Pretoucher.java
79 lines (67 loc) · 2.79 KB
/
Pretoucher.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
package net.openhft.chronicle.queue.impl.single;
import net.openhft.chronicle.bytes.MappedBytes;
import net.openhft.chronicle.bytes.NewChunkListener;
import net.openhft.chronicle.queue.impl.WireStore;
import java.util.function.IntConsumer;
/**
* A class designed to be called from a long-lived thread.
*
* Upon invocation of the {@code execute()} method, this object will pre-touch pages in the supplied queue's underlying store file,
* attempting to keep ahead of any appenders to the queue.
*
* Resources held by this object will be released when the underlying queue is closed.
*
* Alternatively, the {@code shutdown()} method can be called to close the supplied queue and release any other resources.
* Invocation of the {@code execute()} method after {@code shutdown()} has been called with cause an {@code IllegalStateException} to be thrown.
*/
public final class Pretoucher {
private final SingleChronicleQueue queue;
private final NewChunkListener chunkListener;
private final IntConsumer cycleChangedListener;
private final PretoucherState pretoucherState;
private int currentCycle = Integer.MIN_VALUE;
private WireStore currentCycleWireStore;
private MappedBytes currentCycleMappedBytes;
public Pretoucher(final SingleChronicleQueue queue) {
this(queue, null, c -> {});
}
// visible for testing
Pretoucher(final SingleChronicleQueue queue, final NewChunkListener chunkListener,
final IntConsumer cycleChangedListener) {
this.queue = queue;
this.chunkListener = chunkListener;
this.cycleChangedListener = cycleChangedListener;
queue.addCloseListener(this, Pretoucher::releaseResources);
pretoucherState = new PretoucherState(this::getStoreWritePosition);
}
public void execute() {
assignCurrentCycle();
pretoucherState.pretouch(currentCycleMappedBytes);
}
public void shutdown() {
queue.close();
}
private void assignCurrentCycle() {
if (queue.cycle() != currentCycle) {
releaseResources();
currentCycleWireStore = queue.storeForCycle(queue.cycle(), queue.epoch(), true);
currentCycleMappedBytes = currentCycleWireStore.bytes();
currentCycle = queue.cycle();
if (chunkListener != null) {
currentCycleMappedBytes.setNewChunkListener(chunkListener);
}
cycleChangedListener.accept(queue.cycle());
}
}
private long getStoreWritePosition() {
return currentCycleWireStore.writePosition();
}
private void releaseResources() {
if (currentCycleWireStore != null) {
queue.release(currentCycleWireStore);
}
if (currentCycleMappedBytes != null) {
currentCycleMappedBytes.close();
}
}
}