Skip to content
Browse files

refactoring to split into multiple patches.

  • Loading branch information...
1 parent 3d43dcb commit 0b02f897f541cd2e1cbb715e4165d022a8d7bea7 @bbansal bbansal committed Mar 31, 2009
View
207 src/java/voldemort/client/admin/AdminClient.java
@@ -40,7 +40,6 @@
import voldemort.store.socket.SocketPool;
import voldemort.utils.ByteArray;
import voldemort.utils.ByteUtils;
-import voldemort.utils.ClusterUtils;
import voldemort.versioning.VectorClock;
import voldemort.versioning.Versioned;
import voldemort.xml.ClusterMapper;
@@ -73,6 +72,14 @@ public void close() throws VoldemortException {
// don't close the socket pool, it is shared
}
+ public Node getConnectedNode() {
+ return currentNode;
+ }
+
+ public MetadataStore getMetaDataStore() {
+ return metadataStore;
+ }
+
public void updateClusterMetaData(int nodeId, Cluster cluster, String cluster_key)
throws VoldemortException {
Node node = metadataStore.getCluster().getNodeById(nodeId);
@@ -237,159 +244,6 @@ public void requestPutEntriesAsStream(int nodeId,
}
}
- /**
- * Rebalances the cluster by stealing partitions from current Cluster
- * configuration. <strong> Steps </strong>
- * <ul>
- * <li>Get Current Cluster configuration from {@link MetadataStore}</li>
- * <li>update current config as {@link MetadataStore#OLD_CLUSTER_KEY}</li>
- * <li>Set Current Server state as {@link SERVER_STATE#REBALANCING_STATE}</li>
- * <li>create a new cluster config by stealing partitions from all nodes</li>
- * <li>For All nodes do
- * <ul>
- * <li>identify steal list for this node and make a temp. cluster Config</li>
- * <li>Update ALL servers with temp. cluster Config</li>
- * <li>steal partitions</li>
- * </ul>
- * </li>
- * <li>Set Current Server state as {@link SERVER_STATE#NORMAL_STATE}</li>
- * </ul>
- * <p>
- * TODO: HIGH Failure Scenarios
- * <ul>
- * <li>StealerNode dies</li>
- * <li>DonorNode dies</li>
- * </ul>
- *
- * @throws IOException
- */
- public void stealPartitionsFromCluster(int stealerNodeId, String storeName) throws IOException {
- logger.info("Node(" + currentNode.getId() + ") Starting Steal Parttion Process");
- Cluster currentCluster = metadataStore.getCluster();
- updateClusterMetaData(stealerNodeId, currentCluster, MetadataStore.OLD_CLUSTER_KEY);
-
- logger.info("Node(" + currentNode.getId() + ") State changed to REBALANCING MODE");
- setRebalancingStateAndRestart(stealerNodeId);
-
- Node stealerNode = currentCluster.getNodeById(stealerNodeId);
- if(stealerNode == null) {
- throw new VoldemortException("stealerNode id:" + stealerNodeId
- + " should be present in initial cluster");
- }
-
- Cluster updatedCluster = ClusterUtils.updateClusterStealPartitions(currentCluster,
- stealerNode);
- try {
- for(Node donorNode: currentCluster.getNodes()) {
- if(donorNode.getId() != stealerNodeId) {
- List<Integer> stealList = getStealList(currentCluster,
- updatedCluster,
- donorNode.getId(),
- stealerNodeId);
- logger.info("Node(" + currentNode.getId() + ") Stealing from node:"
- + donorNode.getId() + " stealList:" + stealList);
-
- if(stealList.size() > 0) {
- Cluster tempCluster = getTempCluster(currentCluster,
- donorNode,
- stealerNode,
- stealList);
-
- logger.info("tempCluster:" + ClusterUtils.GetClusterAsString(tempCluster));
-
- // set tempCluster on Donor node and stream partitions
- updateClusterMetaData(donorNode.getId(),
- tempCluster,
- MetadataStore.CLUSTER_KEY);
- pipeGetAndPutStreams(donorNode.getId(), stealerNodeId, storeName, stealList);
- }
- }
- }
-
- for(Node node: currentCluster.getNodes()) {
- updateClusterMetaData(node.getId(), updatedCluster, MetadataStore.CLUSTER_KEY);
- }
- setNormalStateAndRestart(stealerNode.getId());
- logger.info("Node(" + currentNode.getId() + ") State changed back to NORMAL MODE");
-
- logger.info("Node(" + currentNode.getId() + ") Steal process completed.");
- } catch(Exception e) {
- // undo all changes
- for(Node node: currentCluster.getNodes()) {
- updateClusterMetaData(node.getId(), updatedCluster, MetadataStore.OLD_CLUSTER_KEY);
- }
- throw new VoldemortException("Steal Partitions for " + stealerNodeId + " failed", e);
- }
- }
-
- /**
- * Rebalances the cluster by deleting current node and returning partitions
- * to other nodes in cluster. <strong> Steps </strong>
- * <ul>
- * <li>Get Current Cluster configuration from {@link MetadataStore}</li>
- * <li>Create new Cluster config by identifying partitions to return</li>
- * <li>For All nodes do
- * <ul>
- * <li>identify steal list for this node 'K'</li>
- * <li>update current config as {@link MetadataStore#OLD_CLUSTER_KEY} on
- * remote node 'K'</li>
- * <li>create a temp cluster config</li>
- * <li>Update ALL servers with temp cluster Config</li>
- * <li>Set remote node 'K' state as {@link SERVER_STATE#REBALANCING_STATE}</li>
- * <li>return partitions</li>
- * <li>Set remote node 'K' state as {@link SERVER_STATE#NORMAL_STATE}</li>
- * </ul>
- * </li>
- * </ul>
- *
- * @throws IOException
- */
- public void donatePartitionsToCluster(int donorNodeId,
- String storeName,
- int numPartitions,
- boolean deleteNode) throws IOException {
- logger.info("Node(" + currentNode.getId() + ") Starting Donate Partition Process");
-
- Cluster currentCluster = metadataStore.getCluster();
- Cluster updatedCluster = ClusterUtils.updateClusterDonatePartitions(currentCluster,
- donorNodeId,
- numPartitions,
- deleteNode);
- Node donorNode = updatedCluster.getNodeById(donorNodeId);
-
- logger.info("originalCluster:" + ClusterUtils.GetClusterAsString(currentCluster));
- logger.info("updatedCluster:" + ClusterUtils.GetClusterAsString(updatedCluster));
-
- for(Node node: updatedCluster.getNodes()) {
- if(node.getId() != donorNode.getId()) {
- logger.info("Node(" + donorNodeId + ") Donating to node:" + node.getId());
-
- updateClusterMetaData(node.getId(), currentCluster, MetadataStore.OLD_CLUSTER_KEY);
-
- List<Integer> stealList = getStealList(currentCluster,
- updatedCluster,
- donorNode.getId(),
- node.getId());
- if(stealList.size() == 0) {
- continue;
- }
- Cluster tempCluster = getTempCluster(currentCluster, donorNode, node, stealList);
- logger.info("tempCluster:" + ClusterUtils.GetClusterAsString(tempCluster));
-
- for(Node tempNode: tempCluster.getNodes()) {
- updateClusterMetaData(tempNode.getId(), tempCluster, MetadataStore.CLUSTER_KEY);
- }
-
- setRebalancingStateAndRestart(node.getId());
-
- pipeGetAndPutStreams(donorNode.getId(), node.getId(), storeName, stealList);
-
- setNormalStateAndRestart(node.getId());
- }
- }
- logger.info("Node(" + currentNode.getId() + ") Donate process completed ..");
- }
-
public void restartServices(int nodeId) {
Cluster currentCluster = metadataStore.getCluster();
Node node = currentCluster.getNodeById(nodeId);
@@ -411,39 +265,16 @@ public void restartServices(int nodeId) {
}
}
- public void setNormalStateAndRestart(int nodeId) {
- Cluster currentCluster = metadataStore.getCluster();
- Node node = currentCluster.getNodeById(nodeId);
- SocketDestination destination = new SocketDestination(node.getHost(), node.getAdminPort());
-
- SocketAndStreams sands = pool.checkout(destination);
- try {
- DataOutputStream outputStream = sands.getOutputStream();
- outputStream.writeByte(VoldemortOpCode.NORMAL_SERVER_MODE_OP_CODE);
- outputStream.flush();
-
- DataInputStream inputStream = sands.getInputStream();
- checkException(inputStream);
- } catch(IOException e) {
- close(sands.getSocket());
- throw new VoldemortException(e);
- } finally {
- pool.checkin(destination, sands);
- }
-
- // restart current node
- restartServices(nodeId);
- }
-
- public void setRebalancingStateAndRestart(int nodeId) {
+ public void changeStateAndRestart(int nodeId, SERVER_STATE state) {
Cluster currentCluster = metadataStore.getCluster();
Node node = currentCluster.getNodeById(nodeId);
SocketDestination destination = new SocketDestination(node.getHost(), node.getAdminPort());
SocketAndStreams sands = pool.checkout(destination);
try {
DataOutputStream outputStream = sands.getOutputStream();
- outputStream.writeByte(VoldemortOpCode.REBALANCING_SERVER_MODE_OP_CODE);
+ outputStream.writeByte(VoldemortOpCode.SERVER_STATE_CHANGE_OP_CODE);
+ outputStream.writeUTF(state.toString());
outputStream.flush();
DataInputStream inputStream = sands.getInputStream();
@@ -493,10 +324,10 @@ public void setRebalancingStateAndRestart(int nodeId) {
}
}
- private Cluster getTempCluster(Cluster currentCluster,
- Node fromNode,
- Node toNode,
- List<Integer> stealList) {
+ public Cluster getTempCluster(Cluster currentCluster,
+ Node fromNode,
+ Node toNode,
+ List<Integer> stealList) {
ArrayList<Node> nodes = new ArrayList<Node>();
for(Node node: currentCluster.getNodes()) {
if(fromNode.getId() == node.getId()) {
@@ -525,16 +356,16 @@ private Cluster getTempCluster(Cluster currentCluster,
return new Cluster(currentCluster.getName(), nodes);
}
- public void pipeGetAndPutStreams(int getNodeId,
- int putNodeId,
+ public void pipeGetAndPutStreams(int donorNodeId,
+ int stealerNodeId,
String storeName,
List<Integer> stealList) throws IOException {
- requestPutEntriesAsStream(putNodeId, storeName, requestGetPartitionsAsStream(getNodeId,
+ requestPutEntriesAsStream(stealerNodeId, storeName, requestGetPartitionsAsStream(donorNodeId,
storeName,
stealList));
}
- private List<Integer> getStealList(Cluster old, Cluster updated, int fromNode, int toNode) {
+ public List<Integer> getStealList(Cluster old, Cluster updated, int fromNode, int toNode) {
ArrayList<Integer> stealList = new ArrayList<Integer>();
List<Integer> oldPartitions = old.getNodeById(fromNode).getPartitionIds();
List<Integer> updatedPartitions = updated.getNodeById(toNode).getPartitionIds();
View
185 src/java/voldemort/client/admin/PartitionRebalanceClient.java
@@ -0,0 +1,185 @@
+package voldemort.client.admin;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.log4j.Logger;
+
+import voldemort.VoldemortException;
+import voldemort.cluster.Cluster;
+import voldemort.cluster.Node;
+import voldemort.server.VoldemortServer.SERVER_STATE;
+import voldemort.store.ErrorCodeMapper;
+import voldemort.store.metadata.MetadataStore;
+import voldemort.store.socket.SocketPool;
+import voldemort.utils.ClusterUtils;
+
+public class PartitionRebalanceClient extends AdminClient {
+
+ private static final Logger logger = Logger.getLogger(PartitionRebalanceClient.class);
+ private final ErrorCodeMapper errorCodeMapper = new ErrorCodeMapper();
+
+ public PartitionRebalanceClient(Node currentNode,
+ MetadataStore metadataStore,
+ SocketPool socketPool) {
+ super(currentNode, metadataStore, socketPool);
+ }
+
+ /**
+ * Rebalances the cluster by stealing partitions from current Cluster
+ * configuration. <strong> Steps </strong>
+ * <ul>
+ * <li>Get Current Cluster configuration from {@link MetadataStore}</li>
+ * <li>update current config as {@link MetadataStore#OLD_CLUSTER_KEY}</li>
+ * <li>Set Current Server state as
+ * {@link SERVER_STATE#REBALANCING_STEALER_STATE}</li>
+ * <li>create a new cluster config by stealing partitions from all nodes</li>
+ * <li>For All nodes do
+ * <ul>
+ * <li>identify steal list for this node and make a temp. cluster Config</li>
+ * <li>Update ALL servers with temp. cluster Config</li>
+ * <li>steal partitions</li>
+ * </ul>
+ * </li>
+ * <li>Set Current Server state as {@link SERVER_STATE#NORMAL_STATE}</li>
+ * </ul>
+ * <p>
+ * TODO: HIGH Failure Scenarios
+ * <ul>
+ * <li>StealerNode dies</li>
+ * <li>DonorNode dies</li>
+ * </ul>
+ *
+ * @throws IOException
+ */
+ public void stealPartitionsFromCluster(int stealerNodeId, String storeName) throws IOException {
+ logger.info("Node(" + getConnectedNode().getId() + ") Starting Steal Parttion Process");
+ Cluster currentCluster = getMetaDataStore().getCluster();
+ updateClusterMetaData(stealerNodeId, currentCluster, MetadataStore.OLD_CLUSTER_KEY);
+
+ logger.info("Node(" + getConnectedNode().getId() + ") State changed to REBALANCING MODE");
+ changeStateAndRestart(stealerNodeId, SERVER_STATE.REBALANCING_STEALER_STATE);
+
+ Node stealerNode = currentCluster.getNodeById(stealerNodeId);
+ if(stealerNode == null) {
+ throw new VoldemortException("stealerNode id:" + stealerNodeId
+ + " should be present in initial cluster");
+ }
+
+ Cluster updatedCluster = ClusterUtils.updateClusterStealPartitions(currentCluster,
+ stealerNode);
+ try {
+ for(Node donorNode: currentCluster.getNodes()) {
+ if(donorNode.getId() != stealerNodeId) {
+ List<Integer> stealList = getStealList(currentCluster,
+ updatedCluster,
+ donorNode.getId(),
+ stealerNodeId);
+ logger.info("Node(" + getConnectedNode().getId() + ") Stealing from node:"
+ + donorNode.getId() + " stealList:" + stealList);
+
+ if(stealList.size() > 0) {
+ Cluster tempCluster = getTempCluster(currentCluster,
+ donorNode,
+ stealerNode,
+ stealList);
+
+ logger.info("tempCluster:" + ClusterUtils.GetClusterAsString(tempCluster));
+
+ // set tempCluster on Donor node and stream partitions
+ updateClusterMetaData(donorNode.getId(),
+ tempCluster,
+ MetadataStore.CLUSTER_KEY);
+
+ changeStateAndRestart(donorNode.getId(),
+ SERVER_STATE.REBALANCING_DONOR_STATE);
+ pipeGetAndPutStreams(donorNode.getId(), stealerNodeId, storeName, stealList);
+ changeStateAndRestart(donorNode.getId(), SERVER_STATE.NORMAL_STATE);
+ }
+ }
+ }
+
+ for(Node node: currentCluster.getNodes()) {
+ updateClusterMetaData(node.getId(), updatedCluster, MetadataStore.CLUSTER_KEY);
+ }
+ changeStateAndRestart(stealerNode.getId(), SERVER_STATE.NORMAL_STATE);
+ logger.info("Node(" + getConnectedNode().getId()
+ + ") State changed back to NORMAL MODE");
+
+ logger.info("Node(" + getConnectedNode().getId() + ") Steal process completed.");
+ } catch(Exception e) {
+ // undo all changes
+ for(Node node: currentCluster.getNodes()) {
+ updateClusterMetaData(node.getId(), updatedCluster, MetadataStore.OLD_CLUSTER_KEY);
+ }
+ throw new VoldemortException("Steal Partitions for " + stealerNodeId + " failed", e);
+ }
+ }
+
+ /**
+ * Rebalances the cluster by deleting current node and returning partitions
+ * to other nodes in cluster. <strong> Steps </strong>
+ * <ul>
+ * <li>Get Current Cluster configuration from {@link MetadataStore}</li>
+ * <li>Create new Cluster config by identifying partitions to return</li>
+ * <li>For All nodes do
+ * <ul>
+ * <li>identify steal list for this node 'K'</li>
+ * <li>update current config as {@link MetadataStore#OLD_CLUSTER_KEY} on
+ * remote node 'K'</li>
+ * <li>create a temp cluster config</li>
+ * <li>Update ALL servers with temp cluster Config</li>
+ * <li>Set remote node 'K' state as
+ * {@link SERVER_STATE#REBALANCING_STEALER_STATE}</li>
+ * <li>return partitions</li>
+ * <li>Set remote node 'K' state as {@link SERVER_STATE#NORMAL_STATE}</li>
+ * </ul>
+ * </li>
+ * </ul>
+ *
+ * @throws IOException
+ */
+ public void donatePartitionsToCluster(int donorNodeId,
+ String storeName,
+ int numPartitions,
+ boolean deleteNode) throws IOException {
+ logger.info("Node(" + getConnectedNode().getId() + ") Starting Donate Partition Process");
+
+ Cluster currentCluster = getMetaDataStore().getCluster();
+ Cluster updatedCluster = ClusterUtils.updateClusterDonatePartitions(currentCluster,
+ donorNodeId,
+ numPartitions,
+ deleteNode);
+ Node donorNode = updatedCluster.getNodeById(donorNodeId);
+ changeStateAndRestart(donorNodeId, SERVER_STATE.REBALANCING_DONOR_STATE);
+ logger.info("originalCluster:" + ClusterUtils.GetClusterAsString(currentCluster));
+ logger.info("updatedCluster:" + ClusterUtils.GetClusterAsString(updatedCluster));
+
+ for(Node node: updatedCluster.getNodes()) {
+ if(node.getId() != donorNode.getId()) {
+ logger.info("Node(" + donorNodeId + ") Donating to node:" + node.getId());
+
+ updateClusterMetaData(node.getId(), currentCluster, MetadataStore.OLD_CLUSTER_KEY);
+
+ List<Integer> stealList = getStealList(currentCluster,
+ updatedCluster,
+ donorNode.getId(),
+ node.getId());
+ if(stealList.size() == 0) {
+ continue;
+ }
+ Cluster tempCluster = getTempCluster(currentCluster, donorNode, node, stealList);
+ logger.info("tempCluster:" + ClusterUtils.GetClusterAsString(tempCluster));
+
+ for(Node tempNode: tempCluster.getNodes()) {
+ updateClusterMetaData(tempNode.getId(), tempCluster, MetadataStore.CLUSTER_KEY);
+ }
+
+ pipeGetAndPutStreams(donorNode.getId(), node.getId(), storeName, stealList);
+
+ changeStateAndRestart(node.getId(), SERVER_STATE.NORMAL_STATE);
+ }
+ }
+ logger.info("Node(" + getConnectedNode().getId() + ") Donate process completed ..");
+ }
+}
View
5 src/java/voldemort/serialization/VoldemortOpCode.java
@@ -26,7 +26,6 @@
public static final byte UPDATE_CLUSTER_METADATA_OP_CODE = 6;
public static final byte UPDATE_STORES_METADATA_OP_CODE = 7;
public static final byte RESTART_SERVICES_OP_CODE = 8;
- public static final byte REBALANCING_SERVER_MODE_OP_CODE = 9;
- public static final byte NORMAL_SERVER_MODE_OP_CODE = 10;
- public static final byte REDIRECT_GET_OP_CODE = 11;
+ public static final byte SERVER_STATE_CHANGE_OP_CODE = 9;
+ public static final byte REDIRECT_GET_OP_CODE = 10;
}
View
3 src/java/voldemort/server/VoldemortServer.java
@@ -71,7 +71,8 @@
public static enum SERVER_STATE {
NORMAL_STATE,
- REBALANCING_STATE,
+ REBALANCING_STEALER_STATE,
+ REBALANCING_DONOR_STATE
}
public VoldemortServer(VoldemortConfig config) {
View
41 src/java/voldemort/server/admin/AdminServiceRequestHandler.java
@@ -30,8 +30,8 @@
import voldemort.routing.RoutingStrategy;
import voldemort.serialization.VoldemortOpCode;
import voldemort.server.UnableUpdateMetadataException;
-import voldemort.server.VoldemortServer;
import voldemort.server.VoldemortService;
+import voldemort.server.VoldemortServer.SERVER_STATE;
import voldemort.store.ErrorCodeMapper;
import voldemort.store.StorageEngine;
import voldemort.store.StoreDefinition;
@@ -99,11 +99,8 @@ public void handleRequest() throws IOException {
case VoldemortOpCode.RESTART_SERVICES_OP_CODE:
handleRestartServicesRequest();
break;
- case VoldemortOpCode.REBALANCING_SERVER_MODE_OP_CODE:
- handleRebalancingServerModeRequest();
- break;
- case VoldemortOpCode.NORMAL_SERVER_MODE_OP_CODE:
- handleNormalServerModeRequest();
+ case VoldemortOpCode.SERVER_STATE_CHANGE_OP_CODE:
+ handleServerStateChangeRequest();
break;
case VoldemortOpCode.REDIRECT_GET_OP_CODE:
engine = readStorageEngine();
@@ -353,11 +350,11 @@ private void handleRestartServicesRequest() throws IOException {
}
}
- private void handleRebalancingServerModeRequest() throws IOException {
+ private void handleServerStateChangeRequest() throws IOException {
try {
List<Versioned<byte[]>> serverState = metadataStore.get(ByteUtils.getBytes(MetadataStore.SERVER_STATE_KEY,
"UTF-8"));
-
+ SERVER_STATE newState = SERVER_STATE.valueOf(inputStream.readUTF());
// update version
VectorClock updatedVersion = new VectorClock();
if(serverState.size() > 0) {
@@ -368,8 +365,7 @@ private void handleRebalancingServerModeRequest() throws IOException {
metadataStore.put(new ByteArray(ByteUtils.getBytes(MetadataStore.SERVER_STATE_KEY,
"UTF-8")),
- new Versioned<byte[]>(ByteUtils.getBytes(VoldemortServer.SERVER_STATE.REBALANCING_STATE.toString(),
- "UTF-8"),
+ new Versioned<byte[]>(ByteUtils.getBytes(newState.toString(), "UTF-8"),
updatedVersion));
outputStream.writeShort(0);
@@ -381,31 +377,6 @@ private void handleRebalancingServerModeRequest() throws IOException {
}
- private void handleNormalServerModeRequest() throws IOException {
- try {
- List<Versioned<byte[]>> serverState = metadataStore.get(ByteUtils.getBytes(MetadataStore.SERVER_STATE_KEY,
- "UTF-8"));
-
- // update version
- VectorClock updatedVersion = new VectorClock();
- if(serverState.size() > 0) {
- updatedVersion = ((VectorClock) serverState.get(0).getVersion());
- }
- updatedVersion.incrementVersion(nodeId, System.currentTimeMillis());
-
- metadataStore.put(new ByteArray(ByteUtils.getBytes(MetadataStore.SERVER_STATE_KEY,
- "UTF-8")),
- new Versioned<byte[]>(ByteUtils.getBytes(VoldemortServer.SERVER_STATE.NORMAL_STATE.toString(),
- "UTF-8"),
- updatedVersion));
- outputStream.writeShort(0);
- } catch(VoldemortException e) {
- e.printStackTrace();
- writeException(outputStream, e);
- return;
- }
- }
-
private void handleRedirectGetRequest(StorageEngine<ByteArray, byte[]> engine, byte[] key)
throws IOException {
List<Versioned<byte[]>> results = null;
View
2 src/java/voldemort/server/http/StoreServlet.java
@@ -90,7 +90,7 @@ public void init() throws ServletException {
}
/**
- * TODO support {@link VoldemortServer#REBALANCING_STATE} Get should
+ * TODO support {@link VoldemortServer#REBALANCING_STEALER_STATE} Get should
* Redirect depending on old cluster configuration see {@link SocketServer}
* implementation for reference
*/
View
42 src/java/voldemort/server/http/gui/AdminServlet.java
@@ -91,13 +91,15 @@ protected void doPost(HttpServletRequest req, HttpServletResponse resp)
private void handleStealRequest(HttpServletRequest req) throws ServletException, IOException {
String storeName = getParam(req, "storeName");
- if(storeName.equals("all")) {
- for(String storeKey: server.getStoreMap().keySet()) {
- client.stealPartitionsFromCluster(server.getIdentityNode().getId(), storeKey);
- }
- } else {
- client.stealPartitionsFromCluster(server.getIdentityNode().getId(), storeName);
- }
+ // if(storeName.equals("all")) {
+ // for(String storeKey: server.getStoreMap().keySet()) {
+ // client.stealPartitionsFromCluster(server.getIdentityNode().getId(),
+ // storeKey);
+ // }
+ // } else {
+ // client.stealPartitionsFromCluster(server.getIdentityNode().getId(),
+ // storeName);
+ // }
}
private void handleDonateRequest(HttpServletRequest req) throws ServletException, IOException {
@@ -110,19 +112,19 @@ private void handleDonateRequest(HttpServletRequest req) throws ServletException
numPartitions = server.getIdentityNode().getNumberOfPartitions();
}
- if(storeName.equals("all")) {
- for(String storeKey: server.getStoreMap().keySet()) {
- client.donatePartitionsToCluster(server.getIdentityNode().getId(),
- storeName,
- numPartitions,
- deleteNode);
- }
- } else {
- client.donatePartitionsToCluster(server.getIdentityNode().getId(),
- storeName,
- numPartitions,
- deleteNode);
- }
+ // if(storeName.equals("all")) {
+ // for(String storeKey: server.getStoreMap().keySet()) {
+ // client.donatePartitionsToCluster(server.getIdentityNode().getId(),
+ // storeName,
+ // numPartitions,
+ // deleteNode);
+ // }
+ // } else {
+ // client.donatePartitionsToCluster(server.getIdentityNode().getId(),
+ // storeName,
+ // numPartitions,
+ // deleteNode);
+ // }
}
public boolean hasParam(HttpServletRequest request, String param) {
View
4 src/java/voldemort/server/socket/StreamStoreRequestHandler.java
@@ -97,7 +97,7 @@ public StreamStoreRequestHandler(ConcurrentMap<String, ? extends Store<ByteArray
storeDefs = null;
}
- if(SERVER_STATE.REBALANCING_STATE.equals(state)) {
+ if(SERVER_STATE.REBALANCING_STEALER_STATE.equals(state)) {
List<Versioned<byte[]>> oldClusterInfo = metadataStore.get(ByteUtils.getBytes(MetadataStore.OLD_CLUSTER_KEY,
"UTF-8"));
if(oldClusterInfo.size() != 1) {
@@ -179,7 +179,7 @@ private void handleGet(Store<ByteArray, byte[]> store, ByteArray key) throws IOE
private List<Versioned<byte[]>> doGet(Store<ByteArray, byte[]> store, ByteArray key)
throws IOException {
- if(VoldemortServer.SERVER_STATE.REBALANCING_STATE.equals(state)) {
+ if(VoldemortServer.SERVER_STATE.REBALANCING_STEALER_STATE.equals(state)) {
return doGetRebalancingState(store, key);
}
return doGetNormalState(store, key);
View
17 test/common/voldemort/config/stores.xml
@@ -33,7 +33,22 @@
</value-serializer>
</store>
<store>
- <name>test-replication-1</name>
+ <name>test-replication-memory</name>
+ <persistence>memory</persistence>
+ <routing>client</routing>
+ <replication-factor>1</replication-factor>
+ <required-reads>1</required-reads>
+ <required-writes>1</required-writes>
+ <key-serializer>
+ <type>string</type>
+ <schema-info>UTF-8</schema-info>
+ </key-serializer>
+ <value-serializer>
+ <type>java-serialization</type>
+ </value-serializer>
+ </store>
+ <store>
+ <name>test-replication-persistent</name>
<persistence>bdb</persistence>
<routing>client</routing>
<replication-factor>1</replication-factor>
View
19 test/unit/voldemort/server/AdminServiceTest.java
@@ -58,6 +58,8 @@
public class AdminServiceTest extends TestCase {
private static String TEMP_DIR = "test/unit/temp-output";
+ private static String storeName = "test-replication-memory";
+
VoldemortConfig config;
VoldemortServer server;
Cluster cluster;
@@ -246,18 +248,19 @@ public void testStateTransitions() {
AdminClient client = new AdminClient(server.getIdentityNode(),
server.getMetaDataStore(),
new SocketPool(100, 100, 2000, 10000));
- client.setRebalancingStateAndRestart(server.getIdentityNode().getId());
+ client.changeStateAndRestart(server.getIdentityNode().getId(),
+ SERVER_STATE.REBALANCING_STEALER_STATE);
List<Versioned<byte[]>> values = server.getMetaDataStore()
.get(ByteUtils.getBytes(MetadataStore.SERVER_STATE_KEY,
"UTF-8"));
SERVER_STATE state = SERVER_STATE.valueOf(new String(values.get(0).getValue()));
assertEquals("State should be changed correctly to rebalancing state",
- SERVER_STATE.REBALANCING_STATE,
+ SERVER_STATE.REBALANCING_STEALER_STATE,
state);
// change back to NORMAL state
- client.setNormalStateAndRestart(server.getIdentityNode().getId());
+ client.changeStateAndRestart(server.getIdentityNode().getId(), SERVER_STATE.NORMAL_STATE);
values = server.getMetaDataStore().get(ByteUtils.getBytes(MetadataStore.SERVER_STATE_KEY,
"UTF-8"));
@@ -268,19 +271,19 @@ public void testStateTransitions() {
// lets revert back to REBALANCING STATE AND CHECK (last time I promise
// :) )
- client.setRebalancingStateAndRestart(server.getIdentityNode().getId());
+ client.changeStateAndRestart(server.getIdentityNode().getId(),
+ SERVER_STATE.REBALANCING_DONOR_STATE);
values = server.getMetaDataStore().get(ByteUtils.getBytes(MetadataStore.SERVER_STATE_KEY,
"UTF-8"));
state = SERVER_STATE.valueOf(new String(values.get(0).getValue()));
assertEquals("State should be changed correctly to rebalancing state",
- SERVER_STATE.REBALANCING_STATE,
+ SERVER_STATE.REBALANCING_DONOR_STATE,
state);
}
public void testGetPartitionsAsStream() throws IOException {
// user store should be present
- String storeName = "test-replication-1";
Store<ByteArray, byte[]> store = server.getStoreMap().get(storeName);
assertNotSame("Store '" + storeName + "' should not be null", null, store);
@@ -336,7 +339,6 @@ public void testGetPartitionsAsStream() throws IOException {
}
public void testPutEntriesAsStream() throws IOException {
- String storeName = "test-replication-1";
Store<ByteArray, byte[]> store = server.getStoreMap().get(storeName);
assertNotSame("Store '" + storeName + "' should not be null", null, store);
@@ -370,9 +372,6 @@ public void testPutEntriesAsStream() throws IOException {
}
public void testPipeGetAndPutStreams() throws IOException {
- // store should be present
- // user store should be present
- String storeName = "test-replication-1";
Store<ByteArray, byte[]> store = server.getStoreMap().get(storeName);
assertNotSame("Store '" + storeName + "' should not be null", null, store);
View
22 test/unit/voldemort/server/RebalancingTest.java
@@ -28,7 +28,7 @@
import org.apache.commons.io.FileUtils;
import voldemort.ServerTestUtils;
-import voldemort.client.admin.AdminClient;
+import voldemort.client.admin.PartitionRebalanceClient;
import voldemort.cluster.Cluster;
import voldemort.cluster.Node;
import voldemort.routing.ConsistentRoutingStrategy;
@@ -48,7 +48,7 @@
public class RebalancingTest extends TestCase {
private static String TEMP_DIR = "test/unit/temp-output";
- private static String storeName = "test-replication-1";
+ private static String storeName = "test-replication-persistent";
VoldemortServer server1;
VoldemortServer server2;
@@ -132,9 +132,12 @@ public void testStealPartitions() throws IOException {
server3.start();
// do stealPartitions
- AdminClient client = new AdminClient(server3.getIdentityNode(),
- server3.getMetaDataStore(),
- new SocketPool(100, 100, 2000, 10000));
+ PartitionRebalanceClient client = new PartitionRebalanceClient(server3.getIdentityNode(),
+ server3.getMetaDataStore(),
+ new SocketPool(100,
+ 100,
+ 2000,
+ 10000));
// persist updated Cluster to metadata here
client.updateClusterMetaData(2, updatedCluster, MetadataStore.CLUSTER_KEY);
@@ -189,9 +192,12 @@ public void testDonatePartitions() throws IOException {
VoldemortServer server3 = new VoldemortServer(config, updatedCluster);
server3.start();
- AdminClient client = new AdminClient(server1.getIdentityNode(),
- server1.getMetaDataStore(),
- new SocketPool(100, 100, 2000, 10000));
+ PartitionRebalanceClient client = new PartitionRebalanceClient(server1.getIdentityNode(),
+ server1.getMetaDataStore(),
+ new SocketPool(100,
+ 100,
+ 2000,
+ 10000));
// persist updated Cluster to metadata for node 1
client.updateClusterMetaData(0, updatedCluster, MetadataStore.CLUSTER_KEY);

0 comments on commit 0b02f89

Please sign in to comment.
Something went wrong with that request. Please try again.