-
Notifications
You must be signed in to change notification settings - Fork 476
/
StateTransferTest2.java
191 lines (159 loc) · 6.75 KB
/
StateTransferTest2.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
package org.jgroups.tests;
import org.jgroups.*;
import org.jgroups.protocols.pbcast.*;
import org.jgroups.stack.Protocol;
import org.jgroups.stack.ProtocolStack;
import org.jgroups.util.Util;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import java.io.*;
import java.util.Iterator;
/**
* Tests state transfer API (including exception handling)
* @author Bela Ban
*/
@Test(groups=Global.STACK_DEPENDENT,sequential=true)
public class StateTransferTest2 extends ChannelTestBase {
/*@DataProvider(name="createChannels")
protected Iterator<JChannel[]> createChannels() {
return new MyIterator(new Class[]{STATE_TRANSFER.class, STREAMING_STATE_TRANSFER.class,
STREAMING_STATE_TRANSFER_SOCKET.class});
}*/
@DataProvider(name="createChannels")
protected Iterator<JChannel[]> createChannels() {
return new MyIterator(new Class[]{STATE_TRANSFER.class});
}
@Test(dataProvider="createChannels")
public void testSuccessfulStateTransfer(final JChannel c1, final JChannel c2) throws ChannelException {
try {
StateHandler sh1=new StateHandler("Bela", false, false), sh2=new StateHandler(null, false, false);
c1.setReceiver(sh1);
c2.setReceiver(sh2);
boolean rc=c2.getState(null, 0);
assert rc;
Object state=sh2.getReceivedState();
System.out.println("state = " + state);
assert state != null && state.equals("Bela");
}
finally {
Util.close(c2, c1);
}
}
@Test(dataProvider="createChannels")
public void testUnsuccessfulStateTransferFailureAtStateProvider(final JChannel c1, final JChannel c2) throws ChannelException {
try {
StateHandler sh1=new StateHandler("Bela", true, false), sh2=new StateHandler(null, false, false);
c1.setReceiver(sh1);
c2.setReceiver(sh2);
try {
c2.getState(null, 0);
assert false : "we shouldn't get here; getState() should have thrown an exception";
}
catch(ChannelException ex) {
System.out.println("getState() threw an exception - as expected: " + ex);
}
Object state=sh2.getReceivedState();
System.out.println("state = " + state);
assert state == null;
}
finally {
Util.close(c2, c1);
}
}
@Test(dataProvider="createChannels")
public void testUnsuccessfulStateTransferFailureAtStateRequester(final JChannel c1, final JChannel c2) throws ChannelException {
StateHandler sh1=new StateHandler("Bela", false, false), sh2=new StateHandler(null, false, true);
c1.setReceiver(sh1);
c2.setReceiver(sh2);
try {
c2.getState(null, 0);
assert false : "we shouldn't get here; getState() should have thrown an exception";
}
catch(StateTransferException ste) {
System.out.println("getState() threw an exception - as expected: " + ste);
}
finally {
Util.close(c2, c1);
}
}
protected class MyIterator implements Iterator<JChannel[]> {
protected final Class[] stream_transfer_prots;
protected int index=0;
public MyIterator(Class[] stream_transfer_prots) {
this.stream_transfer_prots=stream_transfer_prots;
}
public boolean hasNext() {
return index < stream_transfer_prots.length;
}
public JChannel[] next() {
try {
Class next_class=stream_transfer_prots[index++];
System.out.println("State transfer protocol used: " + next_class);
return createStateProviderAndRequesterChannels(next_class);
}
catch(Exception e) {
throw new RuntimeException("failed creating a new channel", e);
}
}
public void remove() {
}
protected JChannel[] createStateProviderAndRequesterChannels(Class state_transfer_class) throws Exception {
JChannel[] retval=new JChannel[2];
retval[0]=createChannel(true, 2, "Provider");
replaceStateTransferProtocolWith(retval[0], state_transfer_class);
retval[1]=createChannel(retval[0], "Requester");
for(JChannel ch: retval)
ch.connect("StateTransferTest2");
return retval;
}
protected void replaceStateTransferProtocolWith(JChannel ch, Class state_transfer_class) throws Exception {
ProtocolStack stack=ch.getProtocolStack();
if(stack.findProtocol(state_transfer_class) != null)
return; // protocol of the right class is already in stack
Protocol prot=stack.findProtocol(STATE_TRANSFER.class, StreamingStateTransfer.class);
Protocol new_state_transfer_protcol=(Protocol)state_transfer_class.newInstance();
if(prot != null) {
stack.replaceProtocol(prot, new_state_transfer_protcol);
}
else { // no state transfer protocol found in stack
Protocol flush=stack.findProtocol(FLUSH.class);
if(flush != null)
stack.insertProtocol(new_state_transfer_protcol, ProtocolStack.BELOW, FLUSH.class);
else
stack.insertProtocolAtTop(new_state_transfer_protcol);
}
}
}
protected static class StateHandler extends ReceiverAdapter {
protected final boolean get_error;
protected final boolean set_error;
protected final Object state_to_send;
protected Object received_state=null;
public StateHandler(Object state_to_send, boolean get_error, boolean set_error) {
this.state_to_send=state_to_send;
this.get_error=get_error;
this.set_error=set_error;
}
public Object getReceivedState() {
return received_state;
}
public void getState(OutputStream ostream) throws Exception {
if(get_error)
throw new RuntimeException("[dummy failure] state could not be serialized");
DataOutputStream out=new DataOutputStream(new BufferedOutputStream(ostream, 200));
Util.objectToStream(state_to_send, out);
out.flush();
}
public void setState(InputStream istream) throws Exception {
if(set_error)
throw new RuntimeException("[dummy failure] state could not be set");
DataInputStream in=new DataInputStream(istream);
try {
received_state=Util.objectFromStream(in);
}
finally {
Util.close(in);
}
}
}
}