-
Notifications
You must be signed in to change notification settings - Fork 6
/
ListenersCanLoseData.java
executable file
·137 lines (108 loc) · 5.48 KB
/
ListenersCanLoseData.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
package com.benstopford.coherence.bootstrap.morecomplex;
import com.benstopford.coherence.bootstrap.structures.framework.cluster.ClusterRunner;
import com.benstopford.coherence.bootstrap.structures.framework.cluster.PersistentPortTracker;
import com.tangosol.net.NamedCache;
import com.tangosol.net.messaging.ConnectionException;
import com.tangosol.util.AbstractMapListener;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
import static java.lang.System.*;
import static junit.framework.Assert.assertTrue;
import static junit.framework.Assert.fail;
import static org.junit.Assert.assertFalse;
/**
* This test shows how updates can be lost if you use simple listeners. Here update 3 will be lost.
* <p/>
* To solve this problem simply catch the exception and run a query against the cache to see what changed since you were
* last updated. this may mean you need to have a last updated field. If so then use and index to make the query faster
* and make it ordered (as you will be doing a range query)
* BTS, 25-Jan-2008
*/
public final class ListenersCanLoseData extends ClusterRunner {
private final List<Object> valuesSentToClient1 = new ArrayList<Object>();
private final List<Object> valuesSentToClient2 = new ArrayList<Object>();
@Test
public void shouldLoseUpdateWhenConnectionProxyGoesDown() throws Exception {
new PersistentPortTracker().incrementExtendPort("com.benstopford.extend.port2");
//start one data node and two data-disabled proxies listening on different ports
startCoherenceProcess("config/basic-cache.xml");
Process extendProxy1 = startCoherenceProcess("config/basic-extend-enabled-cache-32001.xml", LOCAL_STORAGE_FALSE);
startCoherenceProcess("config/basic-extend-enabled-cache-32002.xml", LOCAL_STORAGE_FALSE);
//connect to each of the extend proxies
NamedCache cacheViaConnection1 = getCacheConnection1();
NamedCache cacheViaConnection2 = getCacheConnection2();
//dynamically add listener to connection 1
cacheViaConnection1.addMapListener(new AbstractMapListener() {
public void entryInserted(com.tangosol.util.MapEvent mapEvent) {
out.println("Entry was inserted " + mapEvent.getNewValue());
valuesSentToClient1.add(mapEvent.getNewValue());
}
public void entryUpdated(com.tangosol.util.MapEvent mapEvent) {
out.println("Entry was udpated to " + mapEvent.getNewValue());
valuesSentToClient1.add(mapEvent.getNewValue());
}
});
cacheViaConnection2.addMapListener(new AbstractMapListener() {
public void entryInserted(com.tangosol.util.MapEvent mapEvent) {
out.println("Entry was inserted " + mapEvent.getNewValue());
valuesSentToClient2.add(mapEvent.getNewValue());
}
public void entryUpdated(com.tangosol.util.MapEvent mapEvent) {
out.println("Entry was udpated to " + mapEvent.getNewValue());
valuesSentToClient2.add(mapEvent.getNewValue());
}
});
cacheViaConnection1.put("Foo", "1");
cacheViaConnection2.put("Foo", "2");
Thread.sleep(1000);
//kill the first extend proxy process
extendProxy1.destroy();
out.println("waiting for socket to timeout");
Thread.sleep(40 * 1000);
//check that connection 1 now throws as exception due to the proxy being dead.
try {
cacheViaConnection1.size();
fail("Connetion should exception should have been thrown as proxy is down");
} catch (ConnectionException expected) {
}
//add entry via the second connection
cacheViaConnection2.put("Foo", "3"); //this one should be dropped
Thread.sleep(1000);
//restart dead extend proxy
extendProxy1 = startCoherenceProcess("config/basic-extend-enabled-cache-32001.xml", "-Dtangosol.coherence.distributed.localstorage=false ");
Thread.sleep(1000);
//add value via restarted extend proxy
cacheViaConnection1.put("Foo", "4");
Thread.sleep(1000);
//check that the listener was called for all but the entry where the connection had been down
//proving that listeners can loose updates if connections are terminated.
assertTrue(valuesSentToClient1.contains("1"));
assertTrue(valuesSentToClient1.contains("2"));
assertFalse(valuesSentToClient1.contains("3")); //*****THIS UPDATE WAS LOST****
assertTrue(valuesSentToClient1.contains("4"));
//check that the connnection that remained did recieve all updates
assertTrue(valuesSentToClient2.contains("1"));
assertTrue(valuesSentToClient2.contains("2"));
assertTrue(valuesSentToClient2.contains("3"));
assertTrue(valuesSentToClient2.contains("4"));
}
private NamedCache getCacheConnection1() {
return cacheViaExtend();
}
private NamedCache getCacheConnection2() {
return getCache("config/extend-client-32002.xml", "foo");
}
@Before
public void setUp() throws Exception {
valuesSentToClient1.clear();
valuesSentToClient2.clear();
super.setUp();
}
@After
public void tearDown() throws Exception {
super.tearDown();
}
}