forked from voldemort/voldemort
-
Notifications
You must be signed in to change notification settings - Fork 0
/
RebalanceMetadataConsistencyTest.java
206 lines (165 loc) · 7.78 KB
/
RebalanceMetadataConsistencyTest.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
package voldemort.client.rebalance;
import java.lang.reflect.Method;
import java.util.List;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import voldemort.ServerTestUtils;
import voldemort.client.RoutingTier;
import voldemort.cluster.Cluster;
import voldemort.routing.RoutingStrategyType;
import voldemort.serialization.SerializerDefinition;
import voldemort.server.rebalance.Rebalancer;
import voldemort.store.Store;
import voldemort.store.StoreDefinition;
import voldemort.store.StoreDefinitionBuilder;
import voldemort.store.bdb.BdbStorageConfiguration;
import voldemort.store.memory.InMemoryStorageEngine;
import voldemort.store.metadata.MetadataStore;
import voldemort.versioning.Versioned;
import voldemort.xml.ClusterMapper;
import voldemort.xml.StoreDefinitionsMapper;
import com.google.common.collect.Lists;
/**
*
* This test checks if we can interleave write to cluster and store metadata we
* spwan a writer and a reader to see if we see an inconsistent state of the
* system at any point
*
*/
public class RebalanceMetadataConsistencyTest {
private MetadataStore metadataStore;
private Cluster currentCluster;
private Cluster targetCluster;
protected static String testStoreNameRW = "test";
protected static String testStoreNameRW2 = "test2";
private StoreDefinition rwStoreDefWithReplication;
private StoreDefinition rwStoreDefWithReplication2;
private Rebalancer rebalancer;
Cluster checkCluster;
List<StoreDefinition> checkstores;
/**
* Convenient method to execute private methods from other classes.
*
* @param test Instance of the class we want to test
* @param methodName Name of the method we want to test
* @param params Arguments we want to pass to the method
* @return Object with the result of the executed method
* @throws Exception
*/
public static Object invokePrivateMethod(Object test, String methodName, Object params[])
throws Exception {
Object ret = null;
final Method[] methods = test.getClass().getDeclaredMethods();
for(int i = 0; i < methods.length; ++i) {
if(methods[i].getName().equals(methodName)) {
methods[i].setAccessible(true);
ret = methods[i].invoke(test, params);
break;
}
}
return ret;
}
@Before
public void setUp() {
currentCluster = ServerTestUtils.getLocalCluster(3, new int[][] { { 0 }, { 1, 3 }, { 2 } });
targetCluster = ServerTestUtils.getLocalCluster(3,
new int[][] { { 0 }, { 1 }, { 2 }, { 3 } });
rwStoreDefWithReplication = new StoreDefinitionBuilder().setName(testStoreNameRW)
.setType(BdbStorageConfiguration.TYPE_NAME)
.setKeySerializer(new SerializerDefinition("string"))
.setValueSerializer(new SerializerDefinition("string"))
.setRoutingPolicy(RoutingTier.CLIENT)
.setRoutingStrategyType(RoutingStrategyType.CONSISTENT_STRATEGY)
.setReplicationFactor(2)
.setPreferredReads(1)
.setRequiredReads(1)
.setPreferredWrites(1)
.setRequiredWrites(1)
.build();
Store<String, String, String> innerStore = new InMemoryStorageEngine<String, String, String>("inner-store");
innerStore.put(MetadataStore.CLUSTER_KEY,
new Versioned<String>(new ClusterMapper().writeCluster(currentCluster)),
null);
innerStore.put(MetadataStore.STORES_KEY,
new Versioned<String>(new StoreDefinitionsMapper().writeStoreList(Lists.newArrayList(rwStoreDefWithReplication))),
null);
rwStoreDefWithReplication2 = new StoreDefinitionBuilder().setName(testStoreNameRW2)
.setType(BdbStorageConfiguration.TYPE_NAME)
.setKeySerializer(new SerializerDefinition("string"))
.setValueSerializer(new SerializerDefinition("string"))
.setRoutingPolicy(RoutingTier.CLIENT)
.setRoutingStrategyType(RoutingStrategyType.CONSISTENT_STRATEGY)
.setReplicationFactor(2)
.setPreferredReads(1)
.setRequiredReads(1)
.setPreferredWrites(1)
.setRequiredWrites(1)
.build();
metadataStore = new MetadataStore(innerStore, 0);
rebalancer = new Rebalancer(null, metadataStore, null, null);
}
@SuppressWarnings({ "rawtypes", "unchecked" })
@Test
public void testThreading() {
for(int i = 0; i < 3000; i++) {
Cluster cluster;
StoreDefinition storeDef;
if((i % 2) == 0) {
cluster = currentCluster;
storeDef = rwStoreDefWithReplication;
} else {
cluster = targetCluster;
storeDef = rwStoreDefWithReplication2;
}
ThreadWriter tw = new ThreadWriter(cluster, storeDef);
Thread writer = new Thread(tw);
writer.start();
ThreadReader tr = new ThreadReader();
Thread reader = new Thread(tr);
reader.start();
}
}
class ThreadWriter implements Runnable {
Cluster cluster;
StoreDefinition storeDef;
ThreadWriter(Cluster cluster, StoreDefinition storeDef) {
this.cluster = cluster;
this.storeDef = storeDef;
}
/**
* Run method adding data to a fictitious database
*/
@Override
public void run() {
test();
}
public void test() {
Object[] params = { MetadataStore.CLUSTER_KEY, this.cluster, MetadataStore.STORES_KEY,
Lists.newArrayList(this.storeDef) };
try {
invokePrivateMethod(rebalancer, "changeClusterAndStores", params);
} catch(Exception e) {
e.printStackTrace();
}
}
}
class ThreadReader implements Runnable {
/**
* Run method adding data to a fictitious database
*/
@Override
public void run() {
metadataStore.readLock.lock();
checkCluster = metadataStore.getCluster();
checkstores = metadataStore.getStoreDefList();
metadataStore.readLock.unlock();
if(checkCluster.equals(currentCluster)) {
Assert.assertEquals(checkstores.get(0), rwStoreDefWithReplication);
}
if(checkCluster.equals(targetCluster)) {
Assert.assertEquals(checkstores.get(0), rwStoreDefWithReplication2);
}
}
}
}