forked from voldemort/voldemort
/
StealerBasedRebalanceTask.java
147 lines (124 loc) · 5.73 KB
/
StealerBasedRebalanceTask.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
/*
* Copyright 2013 LinkedIn, Inc
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package voldemort.client.rebalance.task;
import java.util.concurrent.Semaphore;
import org.apache.log4j.Logger;
import voldemort.VoldemortException;
import voldemort.client.protocol.admin.AdminClient;
import voldemort.client.rebalance.RebalanceBatchPlanProgressBar;
import voldemort.client.rebalance.RebalancePartitionsInfo;
import voldemort.server.rebalance.AlreadyRebalancingException;
import voldemort.store.UnreachableStoreException;
import voldemort.store.metadata.MetadataStore;
import voldemort.store.metadata.MetadataStore.VoldemortState;
import com.google.common.collect.Lists;
/**
* Immutable class that executes a {@link RebalancePartitionsInfo} instance on
* the rebalance client side.
*
* This is run from the stealer nodes perspective
*/
public class StealerBasedRebalanceTask extends RebalanceTask {
private static final Logger logger = Logger.getLogger(StealerBasedRebalanceTask.class);
private final int stealerNodeId;
// TODO: What is the use of maxTries for stealer-based tasks? Need to
// validate reason for existence or remove.
// NOTES FROM VINOTH:
// I traced the code down and it seems like this is basically used to
// reissue StealerBasedRebalanceTask when it encounters an
// AlreadyRebalancingException (which is tied to obtaining a rebalance
// permit for the donor node) .. In general, I vote for removing this
// parameter.. I think we should have the controller wait/block with a
// decent log message if it truly blocked on other tasks to complete... But,
// we need to check how likely this retry is saving us grief today and
// probably stick to it for sometime, as we stabliize the code base with the
// new planner/controller et al...Right way to do this.. Controller simply
// submits "work" to the server and servers are mature enough to throttle
// and process them as fast as they can. Since that looks like changing all
// the server execution frameworks, let's stick with this for now..
// TODO: Decide fate of maxTries argument after some integration tests are
// done.
private final int maxTries;
public StealerBasedRebalanceTask(final int batchId,
final int taskId,
final RebalancePartitionsInfo stealInfo,
final int maxTries,
final Semaphore donorPermit,
final AdminClient adminClient,
final RebalanceBatchPlanProgressBar progressBar) {
super(batchId,
taskId,
Lists.newArrayList(stealInfo),
donorPermit,
adminClient,
progressBar,
logger);
this.maxTries = maxTries;
this.stealerNodeId = stealInfo.getStealerId();
taskLog(toString());
}
private int startNodeRebalancing() {
int nTries = 0;
AlreadyRebalancingException rebalanceException = null;
while(nTries < maxTries) {
nTries++;
try {
taskLog("Trying to start async rebalance task on stealer node " + stealerNodeId);
int asyncOperationId = adminClient.rebalanceOps.rebalanceNode(stealInfos.get(0));
taskLog("Started async rebalance task on stealer node " + stealerNodeId);
return asyncOperationId;
} catch(AlreadyRebalancingException e) {
taskLog("Node " + stealerNodeId
+ " is currently rebalancing. Waiting till completion");
adminClient.rpcOps.waitForCompletion(stealerNodeId,
MetadataStore.SERVER_STATE_KEY,
VoldemortState.NORMAL_SERVER.toString());
rebalanceException = e;
}
}
throw new VoldemortException("Failed to start rebalancing with plan: " + getStealInfos(),
rebalanceException);
}
@Override
public void run() {
int rebalanceAsyncId = INVALID_REBALANCE_ID;
try {
acquirePermit(stealInfos.get(0).getDonorId());
// Start rebalance task and then wait.
rebalanceAsyncId = startNodeRebalancing();
taskStart(rebalanceAsyncId);
adminClient.rpcOps.waitForCompletion(stealerNodeId, rebalanceAsyncId);
taskDone(rebalanceAsyncId);
} catch(UnreachableStoreException e) {
exception = e;
logger.error("Stealer node " + stealerNodeId
+ " is unreachable, please make sure it is up and running : "
+ e.getMessage(),
e);
} catch(Exception e) {
exception = e;
logger.error("Rebalance failed : " + e.getMessage(), e);
} finally {
donorPermit.release();
isComplete.set(true);
}
}
@Override
public String toString() {
return "Stealer based rebalance task on stealer node " + stealerNodeId + " : "
+ getStealInfos();
}
}