forked from voldemort/voldemort
-
Notifications
You must be signed in to change notification settings - Fork 1
/
AsyncPutSynchronizer.java
131 lines (118 loc) · 4.39 KB
/
AsyncPutSynchronizer.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
package voldemort.store.routed.action;
import java.util.Collections;
import java.util.LinkedList;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;
import voldemort.cluster.Node;
import voldemort.store.routed.Response;
import voldemort.utils.ByteArray;
/**
* The AsyncPutSynchronizer Class is used for synchronizing operations inside
* PerformParallelPut action More specifically, it coordinate the exception
* handling and hinted handoff responsibility between master thread and async
* put threads
*
*/
public class AsyncPutSynchronizer {
private final static Logger logger = Logger.getLogger(AsyncPutSynchronizer.class);
private boolean asyncCallbackShouldSendhint;
private boolean responseHandlingCutoff;
private final ConcurrentMap<Node, Boolean> slopDestinations; // the value in
// the map is
// not used
private final Queue<Response<ByteArray, Object>> responseQueue;
public AsyncPutSynchronizer() {
asyncCallbackShouldSendhint = false;
responseHandlingCutoff = false;
slopDestinations = new ConcurrentHashMap<Node, Boolean>();
responseQueue = new LinkedList<Response<ByteArray, Object>>();
}
/**
* Get list of nodes to register slop for
*
* @return list of nodes to register slop for
*/
public synchronized Set<Node> getDelegatedSlopDestinations() {
return Collections.unmodifiableSet(slopDestinations.keySet());
}
/**
* Stop accepting delegated slop responsibility by master
*/
public synchronized void disallowDelegateSlop() {
asyncCallbackShouldSendhint = true;
}
/**
* Try to delegate the responsibility of sending slops to master
*
* @param node The node that slop should eventually be pushed to
* @return true if master accept the responsibility; false if master does
* not accept
*/
public synchronized boolean tryDelegateSlop(Node node) {
if(asyncCallbackShouldSendhint) {
return false;
} else {
slopDestinations.put(node, true);
return true;
}
}
/**
* Master Stop accepting new responses (from async callbacks)
*/
public synchronized void cutoffHandling() {
responseHandlingCutoff = true;
}
/**
* try to delegate the master to handle the response
*
* @param response
* @return true if the master accepted the response; false if the master
* didn't accept
*/
public synchronized boolean tryDelegateResponseHandling(Response<ByteArray, Object> response) {
if(responseHandlingCutoff) {
return false;
} else {
responseQueue.offer(response);
this.notifyAll();
return true;
}
}
/**
* poll the response queue for response
*
* @param timeout timeout amount
* @param timeUnit timeUnit of timeout
* @return same result of BlockQueue.poll(long, TimeUnit)
* @throws InterruptedException
*/
public synchronized Response<ByteArray, Object> responseQueuePoll(long timeout,
TimeUnit timeUnit)
throws InterruptedException {
long timeoutMs = timeUnit.toMillis(timeout);
long timeoutWallClockMs = System.currentTimeMillis() + timeoutMs;
while(responseQueue.isEmpty() && System.currentTimeMillis() < timeoutWallClockMs) {
long remainingMs = Math.max(0, timeoutWallClockMs - System.currentTimeMillis());
if(logger.isDebugEnabled()) {
logger.debug("Start waiting for response queue with timeoutMs: " + timeoutMs);
}
this.wait(remainingMs);
if(logger.isDebugEnabled()) {
logger.debug("End waiting for response queue with timeoutMs: " + timeoutMs);
}
}
return responseQueue.poll();
}
/**
* to see if the response queue is empty
*
* @return true is response queue is empty; false if not empty.
*/
public synchronized boolean responseQueueIsEmpty() {
return responseQueue.isEmpty();
}
}