-
Notifications
You must be signed in to change notification settings - Fork 470
/
ABP.java
207 lines (179 loc) · 6.51 KB
/
ABP.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
package org.jgroups.protocols;
import org.jgroups.*;
import org.jgroups.annotations.Experimental;
import org.jgroups.annotations.MBean;
import org.jgroups.annotations.Property;
import org.jgroups.annotations.Unsupported;
import org.jgroups.stack.Protocol;
import org.jgroups.util.ConcurrentLinkedBlockingQueue;
import org.jgroups.util.TimeScheduler;
import org.jgroups.util.Util;
import java.io.DataInput;
import java.io.DataOutput;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
/**
* Alternating Bit Protocol. Use without any UNICASTX protocol. Place it somewhere below NAKACKX. Provides reliable
* p2p unicasting. Design is in ./doc/design/ABP.txt
* @author Bela Ban
* @since 3.6.3
*/
@Experimental @Unsupported
@MBean(description="Alternating Bit Protocol, for reliable p2p unicasts")
public class ABP extends Protocol {
@Property(description="Interval (in ms) at which a sent msg is resent")
protected long resend_interval=1000;
protected final ConcurrentHashMap<Address,Entry> send_map=new ConcurrentHashMap<>(),
recv_map=new ConcurrentHashMap<>();
protected TimeScheduler timer;
protected Address local_addr;
public void init() throws Exception {
super.init();
timer=getTransport().getTimer();
}
public Object down(Event evt) {
switch(evt.getType()) {
case Event.VIEW_CHANGE:
View view=evt.getArg();
send_map.keySet().retainAll(view.getMembers());
recv_map.keySet().retainAll(view.getMembers());
break;
case Event.SET_LOCAL_ADDRESS:
local_addr=evt.getArg();
break;
}
return down_prot.down(evt);
}
public Object down(Message msg) {
Address dest;
if((dest=msg.dest()) == null) // we only handle unicast messages
return down_prot.down(msg);
Entry entry=getEntry(send_map, dest);
entry.send(msg);
return null;
}
public Object up(Message msg) {
Address dest=msg.dest(), sender=msg.src();
if(dest == null) // we don't handle multicast messages
return up_prot.up(msg);
ABPHeader hdr=msg.getHeader(id);
if(hdr == null)
return up_prot.up(msg);
switch(hdr.type) {
case data:
Entry entry=getEntry(recv_map, sender);
log.trace("%s: <-- %s.msg(%d)", local_addr, sender, hdr.bit);
if(entry.handleMessage(sender, hdr.bit)) {
// deliver
return up_prot.up(msg);
}
break;
case ack:
log.trace("%s: <-- %s.ack(%d)", local_addr, sender, hdr.bit);
entry=getEntry(send_map, sender);
entry.handleAck(hdr.bit);
break;
}
return null;
}
protected Entry getEntry(ConcurrentMap<Address,Entry> map, Address dest) {
Entry entry=map.get(dest);
if(entry == null) {
Entry existing=map.putIfAbsent(dest, entry=new Entry());
if(existing != null)
entry=existing;
}
return entry;
}
protected enum Type {data, ack};
protected class Entry implements Runnable {
protected byte bit=0;
protected final BlockingQueue<Message> send_queue=new ConcurrentLinkedBlockingQueue<>(500);
protected Thread xmit_task;
protected void send(Message msg) {
synchronized(send_queue) {
send_queue.add(msg);
}
startTask();
}
protected synchronized boolean handleMessage(Address sender, byte msg_bit) {
boolean retval=false;
if(this.bit == msg_bit) {
this.bit^=1;
retval=true;
}
byte ack_bit=(byte)(this.bit ^ 1);
Message ack=new Message(sender).putHeader(id, new ABPHeader(Type.ack, ack_bit));
log.trace("%s: --> %s.ack(%d)", local_addr, sender, ack_bit);
down_prot.down(ack);
return retval;
}
protected synchronized void handleAck(byte ack_bit) {
if(this.bit == ack_bit) {
this.bit^=1;
if(!send_queue.isEmpty())
send_queue.remove(0);
}
}
protected synchronized void startTask() {
if(xmit_task == null || !xmit_task.isAlive()) {
xmit_task=new Thread(this, "ABP.XmitTask");
xmit_task.setDaemon(true);
xmit_task.start();
}
}
public void run() {
Message msg=null, copy;
while(true) {
synchronized(this) {
try {
msg=send_queue.poll(1000, TimeUnit.MILLISECONDS);
if(msg == null) {
Util.sleep(1000);
continue;
}
}
catch(InterruptedException e) {
return;
}
copy=msg.copy().putHeader(id, new ABPHeader(Type.data, bit));
}
log.trace("%s: --> %s.msg(%d). Msg: %s", local_addr, copy.dest(), bit, copy.printHeaders());
down_prot.down(copy);
}
}
}
protected static class ABPHeader extends Header {
protected Type type;
protected byte bit; // either 1 or 0
public ABPHeader() {}
public ABPHeader(Type type, byte bit) {
this.type=type;
this.bit=bit;
}
public short getMagicId() {return 87;}
@Override
public Supplier<? extends Header> create() {return ABPHeader::new;}
@Override
public int serializedSize() {
return Global.BYTE_SIZE *2;
}
@Override
public void writeTo(DataOutput out) throws Exception {
out.writeByte(type.ordinal());
out.writeByte(bit);
}
@Override
public void readFrom(DataInput in) throws Exception {
type=Type.values()[in.readByte()];
bit=in.readByte();
}
@Override
public String toString() {
return "ABP (" + bit + ")";
}
}
}