forked from voldemort/voldemort
/
StealerBasedRebalanceTask.java
118 lines (98 loc) · 4.75 KB
/
StealerBasedRebalanceTask.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
package voldemort.client.rebalance.task;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;
import voldemort.VoldemortException;
import voldemort.client.protocol.admin.AdminClient;
import voldemort.client.rebalance.RebalancePartitionsInfo;
import voldemort.server.rebalance.AlreadyRebalancingException;
import voldemort.store.UnreachableStoreException;
import voldemort.store.metadata.MetadataStore;
import voldemort.store.metadata.MetadataStore.VoldemortState;
import voldemort.utils.RebalanceUtils;
import com.google.common.collect.Lists;
/**
* Immutable class that executes a {@link RebalancePartitionsInfo} instance on
* the rebalance client side.
*
* This is run from the stealer nodes perspective
*/
public class StealerBasedRebalanceTask extends RebalanceTask {
private static final Logger logger = Logger.getLogger(StealerBasedRebalanceTask.class);
private final int stealerNodeId;
// TODO: What is the use of this parameter!?!?!?!?!
private final int maxTries;
public StealerBasedRebalanceTask(final int taskId,
final RebalancePartitionsInfo stealInfo,
final long timeoutSeconds,
final int maxTries,
final Semaphore donorPermit,
final AdminClient adminClient) {
super(taskId, Lists.newArrayList(stealInfo), timeoutSeconds, donorPermit, adminClient);
this.maxTries = maxTries;
this.stealerNodeId = stealInfo.getStealerId();
}
private int startNodeRebalancing() {
int nTries = 0;
AlreadyRebalancingException rebalanceException = null;
while(nTries < maxTries) {
nTries++;
try {
RebalanceUtils.printLog(taskId, logger, "Starting on node " + stealerNodeId
+ " rebalancing task " + stealInfos.get(0));
int asyncOperationId = adminClient.rebalanceOps.rebalanceNode(stealInfos.get(0));
return asyncOperationId;
} catch(AlreadyRebalancingException e) {
RebalanceUtils.printLog(taskId,
logger,
"Node "
+ stealerNodeId
+ " is currently rebalancing. Waiting till completion");
adminClient.rpcOps.waitForCompletion(stealerNodeId,
MetadataStore.SERVER_STATE_KEY,
VoldemortState.NORMAL_SERVER.toString(),
timeoutSeconds,
TimeUnit.SECONDS);
rebalanceException = e;
}
}
throw new VoldemortException("Failed to start rebalancing with plan: " + getStealInfos(),
rebalanceException);
}
public void run() {
int rebalanceAsyncId = INVALID_REBALANCE_ID;
try {
RebalanceUtils.printLog(taskId, logger, "Acquiring donor permit for node "
+ stealInfos.get(0).getDonorId() + " for "
+ stealInfos);
donorPermit.acquire();
rebalanceAsyncId = startNodeRebalancing();
// Wait for the task to get over
adminClient.rpcOps.waitForCompletion(stealerNodeId,
rebalanceAsyncId,
timeoutSeconds,
TimeUnit.SECONDS);
RebalanceUtils.printLog(taskId,
logger,
"Succesfully finished rebalance for async operation id "
+ rebalanceAsyncId);
} catch(UnreachableStoreException e) {
exception = e;
logger.error("Stealer node " + stealerNodeId
+ " is unreachable, please make sure it is up and running : "
+ e.getMessage(),
e);
} catch(Exception e) {
exception = e;
logger.error("Rebalance failed : " + e.getMessage(), e);
} finally {
donorPermit.release();
isComplete.set(true);
}
}
@Override
public String toString() {
return "Stealer based rebalance task on stealer node " + stealerNodeId + " : "
+ getStealInfos();
}
}