/
TOA.java
489 lines (409 loc) · 18.9 KB
/
TOA.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
package org.jgroups.protocols.tom;
import org.jgroups.*;
import org.jgroups.annotations.Experimental;
import org.jgroups.annotations.MBean;
import org.jgroups.annotations.ManagedAttribute;
import org.jgroups.annotations.ManagedOperation;
import org.jgroups.stack.Protocol;
import org.jgroups.util.Util;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
/**
* Total Order Anycast with three communication steps (based on Skeen's Algorithm). Establishes total order for a
* message sent to a subset of the cluster members (an anycast). Example: send a totally ordered message to {D,E}
* out of a membership of {A,B,C,D,E,F}.<p/>
* Skeen's algorithm uses consensus among the anycast target members to find the currently highest
* sequence number (seqno) and delivers the message according to the order established by the seqnos.
*
* @author Pedro Ruivo
* @since 3.1
*/
@Experimental
@MBean(description = "Implementation of Total Order Anycast based on Skeen's Algorithm")
public class TOA extends Protocol implements DeliveryProtocol {
//managers
private DeliveryManagerImpl deliverManager;
private SenderManager senderManager;
// threads
private final DeliveryThread deliverThread = new DeliveryThread(this);
//local address
private Address localAddress;
//sequence numbers, messages ids and lock
private final SequenceNumberManager sequenceNumberManager = new SequenceNumberManager();
private final AtomicLong messageIdCounter = new AtomicLong(0);
//stats: profiling information
private final StatsCollector statsCollector = new StatsCollector();
private volatile View currentView;
public TOA() {
}
@Override
public void start() throws Exception {
deliverManager = new DeliveryManagerImpl();
senderManager = new SenderManager();
deliverThread.start(deliverManager);
statsCollector.setStatsEnabled(statsEnabled());
}
@Override
public void stop() {
deliverThread.interrupt();
}
@Override
public Object down(Event evt) {
switch (evt.getType()) {
case Event.MSG:
handleDownMessage(evt);
return null;
case Event.SET_LOCAL_ADDRESS:
this.localAddress = (Address) evt.getArg();
break;
case Event.VIEW_CHANGE:
handleViewChange((View) evt.getArg());
break;
default:
break;
}
return down_prot.down(evt);
}
@Override
public Object up(Event evt) {
switch (evt.getType()) {
case Event.MSG:
Message message = (Message) evt.getArg();
ToaHeader header = (ToaHeader) message.getHeader(this.id);
if (header == null) {
break;
}
switch (header.getType()) {
case ToaHeader.DATA_MESSAGE:
handleDataMessage(message, header);
break;
case ToaHeader.PROPOSE_MESSAGE:
handleSequenceNumberPropose(message.getSrc(), header);
break;
case ToaHeader.FINAL_MESSAGE:
handleFinalSequenceNumber(header);
break;
case ToaHeader.SINGLE_DESTINATION_MESSAGE:
if (log.isTraceEnabled()) {
log.trace("Received message " + message + " with SINGLE_DESTINATION header. delivering...");
}
deliverManager.deliverSingleDestinationMessage(message);
break;
default:
throw new IllegalStateException("Unknown header type received " + header);
}
return null;
case Event.VIEW_CHANGE:
handleViewChange((View) evt.getArg());
break;
case Event.SET_LOCAL_ADDRESS:
this.localAddress = (Address) evt.getArg();
break;
default:
break;
}
return up_prot.up(evt);
}
@Override
public void deliver(Message message) {
message.setDest(localAddress);
if (log.isDebugEnabled()) {
log.debug("Deliver message " + message + " in total order");
}
up_prot.up(new Event(Event.MSG, message));
statsCollector.incrementMessageDeliver();
}
private void handleViewChange(View view) {
if (log.isTraceEnabled()) {
log.trace("Handle view " + view);
}
View oldView = currentView;
currentView = view;
//basis behavior: drop leavers message (as senders)
List<Address> leavers = Util.leftMembers(oldView, view);
deliverManager.removeLeavers(leavers);
//basis behavior: avoid waiting for the acks
Collection<MessageID> pendingSentMessages = senderManager.getPendingMessageIDs();
for (MessageID messageID : pendingSentMessages) {
long finalSequenceNumber = senderManager.removeLeavers(messageID, leavers);
if (finalSequenceNumber != SenderManager.NOT_READY) {
Message finalMessage = new Message();
finalMessage.setSrc(localAddress);
ToaHeader finalHeader = ToaHeader.createNewHeader(
ToaHeader.FINAL_MESSAGE,messageID);
finalHeader.setSequencerNumber(finalSequenceNumber);
finalMessage.putHeader(this.id, finalHeader);
finalMessage.setFlag(Message.Flag.OOB);
finalMessage.setFlag(Message.Flag.DONT_BUNDLE);
Set<Address> destinations = senderManager.getDestination(messageID);
if (destinations.contains(localAddress)) {
destinations.remove(localAddress);
}
if (log.isTraceEnabled()) {
log.trace("Message " + messageID + " is ready to be deliver. Final sequencer number is " +
finalSequenceNumber);
}
send(destinations,finalMessage, false);
//returns true if we are in destination set
if (senderManager.markSent(messageID)) {
deliverManager.markReadyToDeliver(messageID, finalSequenceNumber);
}
}
}
// TODO: Future work: How to add fault tolerance? (simple and efficient)
}
private void handleDownMessage(Event evt) {
Message message = (Message) evt.getArg();
Address dest = message.getDest();
if (dest != null && dest instanceof AnycastAddress && !message.isFlagSet(Message.Flag.NO_TOTAL_ORDER)) {
//anycast message
sendTotalOrderAnycastMessage(((AnycastAddress)dest).getAddresses(),message);
} else if (dest != null && dest instanceof AnycastAddress) {
//anycast address with NO_TOTAL_ORDER flag (should no be possible, but...)
send(((AnycastAddress)dest).getAddresses(),message, true);
} else {
//normal message
down_prot.down(evt);
}
}
private void sendTotalOrderAnycastMessage(Collection<Address> destinations, Message message) {
boolean trace = log.isTraceEnabled();
boolean warn = log.isWarnEnabled();
long startTime = statsCollector.now();
long duration = -1;
if (trace) {
log.trace("sending total order anycast message");
}
if (destinations.isEmpty()) {
if (warn) {
log.warn("sending an anycast with an empty list");
}
throw new IllegalStateException("AnycastAddress must have at least one element");
}
if (destinations.size() == 1) {
if (log.isDebugEnabled()) {
log.debug("sending an AnycastAddress with 1 element");
}
message.putHeader(id, ToaHeader.createSingleDestinationHeader());
message.setDest(destinations.iterator().next());
down_prot.down(new Event(Event.MSG, message));
return;
}
boolean deliverToMySelf = destinations.contains(localAddress);
try {
MessageID messageID = new MessageID(localAddress, messageIdCounter.getAndIncrement());
long sequenceNumber = sequenceNumberManager.getAndIncrement();
ToaHeader header = ToaHeader.createNewHeader(ToaHeader.DATA_MESSAGE,
messageID);
header.setSequencerNumber(sequenceNumber);
header.addDestinations(destinations);
message.putHeader(this.id, header);
senderManager.addNewMessageToSend(messageID,destinations,sequenceNumber,deliverToMySelf);
if (deliverToMySelf) {
deliverManager.addNewMessageToDeliver(messageID, message, sequenceNumber);
}
if (trace) {
log.trace("Sending message " + messageID + " to " + destinations + " with initial sequence number of " +
sequenceNumber);
}
send(destinations,message, false);
duration = statsCollector.now() - startTime;
} catch (Exception e) {
logException("Exception caught while sending anycast message. Error is " + e.getLocalizedMessage(),
e);
} finally {
statsCollector.addAnycastSentDuration(duration,(destinations.size() - (deliverToMySelf? 1 : 0)));
}
}
private void send(Collection<Address> destinations, Message msg, boolean sendToMyself) {
if (destinations == null) {
down_prot.down(new Event(Event.MSG,msg));
} else {
if (log.isDebugEnabled()) {
log.debug("sending anycast total order message " + msg + " to " + destinations);
}
for (Address address : destinations) {
if (!sendToMyself && address.equals(localAddress)) {
continue;
}
Message cpy = msg.copy();
cpy.setDest(address);
down_prot.down(new Event(Event.MSG,cpy));
}
}
}
private void handleDataMessage(Message message, ToaHeader header) {
long startTime = statsCollector.now();
long duration = -1;
try {
MessageID messageID = header.getMessageID();
//create the sequence number and put it in deliver manager
long myProposeSequenceNumber = sequenceNumberManager.updateAndGet(header.getSequencerNumber());
deliverManager.addNewMessageToDeliver(messageID, message, myProposeSequenceNumber);
if (log.isTraceEnabled()) {
log.trace("Received the message with " + header + ". The proposed sequence number is " +
myProposeSequenceNumber);
}
//create a new message and send it back
Message proposeMessage = new Message();
proposeMessage.setSrc(localAddress);
proposeMessage.setDest(messageID.getAddress());
ToaHeader newHeader = ToaHeader.createNewHeader(
ToaHeader.PROPOSE_MESSAGE,messageID);
newHeader.setSequencerNumber(myProposeSequenceNumber);
proposeMessage.putHeader(this.id, newHeader);
proposeMessage.setFlag(Message.Flag.OOB);
proposeMessage.setFlag(Message.Flag.DONT_BUNDLE);
//multicastSenderThread.addUnicastMessage(proposeMessage);
down_prot.down(new Event(Event.MSG, proposeMessage));
duration = statsCollector.now() - startTime;
} catch (Exception e) {
logException("Exception caught while processing the data message " + header.getMessageID(), e);
} finally {
statsCollector.addDataMessageDuration(duration);
}
}
private void handleSequenceNumberPropose(Address from, ToaHeader header) {
long startTime = statsCollector.now();
long duration = -1;
boolean lastProposeReceived = false;
boolean trace = log.isTraceEnabled();
try {
MessageID messageID = header.getMessageID();
if (trace) {
log.trace("Received the proposed sequence number message with " + header + " from " +
from);
}
sequenceNumberManager.update(header.getSequencerNumber());
long finalSequenceNumber = senderManager.addPropose(messageID, from,
header.getSequencerNumber());
if (finalSequenceNumber != SenderManager.NOT_READY) {
lastProposeReceived = true;
Message finalMessage = new Message();
finalMessage.setSrc(localAddress);
ToaHeader finalHeader = ToaHeader.createNewHeader(
ToaHeader.FINAL_MESSAGE,messageID);
finalHeader.setSequencerNumber(finalSequenceNumber);
finalMessage.putHeader(this.id, finalHeader);
finalMessage.setFlag(Message.Flag.OOB);
finalMessage.setFlag(Message.Flag.DONT_BUNDLE);
Set<Address> destinations = senderManager.getDestination(messageID);
if (destinations.contains(localAddress)) {
destinations.remove(localAddress);
}
if (trace) {
log.trace("Message " + messageID + " is ready to be deliver. Final sequencer number is " +
finalSequenceNumber);
}
send(destinations,finalMessage, false);
//returns true if we are in destination set
if (senderManager.markSent(messageID)) {
deliverManager.markReadyToDeliver(messageID, finalSequenceNumber);
}
}
duration = statsCollector.now() - startTime;
} catch (Exception e) {
logException("Exception caught while processing the propose sequence number for " + header.getMessageID(), e);
} finally {
statsCollector.addProposeSequenceNumberDuration(duration, lastProposeReceived);
}
}
private void handleFinalSequenceNumber(ToaHeader header) {
long startTime = statsCollector.now();
long duration = -1;
try {
MessageID messageID = header.getMessageID();
if (log.isTraceEnabled()) {
log.trace("Received the final sequence number message with " + header);
}
sequenceNumberManager.update(header.getSequencerNumber());
deliverManager.markReadyToDeliver(messageID, header.getSequencerNumber());
duration = statsCollector.now() - startTime;
} catch (Exception e) {
logException("Exception caught while processing the final sequence number for " + header.getMessageID(), e);
} finally {
statsCollector.addFinalSequenceNumberDuration(duration);
}
}
private void logException(String msg, Exception e) {
if (log.isDebugEnabled()) {
log.debug(msg, e);
} else if (log.isWarnEnabled()) {
log.warn(msg + ". Error is " + e.getLocalizedMessage());
}
}
@ManagedOperation
public String getMessageList() {
return deliverManager.getMessageSet().toString();
}
@Override
public void enableStats(boolean flag) {
super.enableStats(flag);
statsCollector.setStatsEnabled(flag);
}
@Override
public void resetStats() {
super.resetStats();
statsCollector.clearStats();
}
@ManagedAttribute(description = "The average duration (in milliseconds) in processing and sending the anycast " +
"message to all the recipients", writable = false)
public double getAvgToaSendDuration() {
return statsCollector.getAvgAnycastSentDuration();
}
@ManagedAttribute(description = "The average duration (in milliseconds) in processing a data message received",
writable = false)
public double getAvgDataMessageReceivedDuration() {
return statsCollector.getAvgDataMessageReceivedDuration();
}
@ManagedAttribute(description = "The average duration (in milliseconds) in processing a propose message received" +
"(not the last one", writable = false)
public double getAvgProposeMessageReceivedDuration() {
return statsCollector.getAvgProposeMesageReceivedDuration();
}
@ManagedAttribute(description = "The average duration (in milliseconds) in processing the last propose message " +
"received. This last propose message will originate the sending of the final message", writable = false)
public double getAvgLastProposeMessageReceivedDuration() {
return statsCollector.getAvgLastProposeMessageReceivedDuration();
}
@ManagedAttribute(description = "The average duration (in milliseconds) in processing a final message received",
writable = false)
public double getAvgFinalMessageReceivedDuration() {
return statsCollector.getAvgFinalMessageReceivedDuration();
}
@ManagedAttribute(description = "The number of anycast messages sent", writable = false)
public int getNumberOfAnycastMessagesSent() {
return statsCollector.getNumberOfAnycastMessagesSent();
}
@ManagedAttribute(description = "The number of final anycast sent", writable = false)
public int getNumberOfFinalAnycastSent() {
return statsCollector.getNumberOfFinalAnycastsSent();
}
@ManagedAttribute(description = "The number of anycast messages delivered", writable = false)
public int getNumberOfAnycastMessagesDelivered() {
return statsCollector.getAnycastDelivered();
}
@ManagedAttribute(description = "The number of propose messages sent", writable = false)
public int getNumberOfProposeMessageSent() {
return statsCollector.getNumberOfProposeMessagesSent();
}
@ManagedAttribute(description = "The number of final messages delivered", writable = false)
public int getNumberOfFinalMessagesDelivered() {
return statsCollector.getNumberOfFinalMessagesDelivered();
}
@ManagedAttribute(description = "The number of data messages delivered", writable = false)
public int getNumberOfDataMessagesDelivered() {
return statsCollector.getNumberOfProposeMessagesSent();
}
@ManagedAttribute(description = "The number of propose messages received", writable = false)
public int getNumberOfProposeMessageReceived() {
return statsCollector.getNumberOfProposeMessagesReceived();
}
@ManagedAttribute(description = "The average number of unicasts messages created per anycast message",
writable = false)
public double getAvgNumberOfUnicastSentPerAnycast() {
return statsCollector.getAvgNumberOfUnicastSentPerAnycast();
}
}