Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Support for pulling data from hdfs for swapping.

  • Loading branch information...
commit a0140b698b94327034aaa7ac3edb93c59bea2629 1 parent e7e3779
@jkreps jkreps authored
View
6 src/java/voldemort/server/http/gui/ReadOnlyStoreManagementServlet.java
@@ -25,6 +25,8 @@
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
+import org.apache.log4j.Logger;
+
import voldemort.VoldemortException;
import voldemort.server.VoldemortServer;
import voldemort.server.http.VoldemortServletContextListener;
@@ -38,6 +40,7 @@
public class ReadOnlyStoreManagementServlet extends HttpServlet {
private static final long serialVersionUID = 1;
+ private static final Logger logger = Logger.getLogger(ReadOnlyStoreManagementServlet.class);
private Map<String, RandomAccessFileStore> stores;
private VelocityEngine velocityEngine;
@@ -113,8 +116,11 @@ protected void doPost(HttpServletRequest req, HttpServletResponse resp)
indexFile = new File(indexUrl);
dataFile = new File(dataUrl);
} else {
+ logger.info("Executing fetch of " + indexUrl);
indexFile = fileFetcher.fetchFile(indexUrl);
+ logger.info("Executing fetch of " + dataUrl);
dataFile = fileFetcher.fetchFile(dataUrl);
+ logger.info("Fetch complete.");
}
resp.getWriter().write(indexFile.getAbsolutePath());
resp.getWriter().write("\n");
View
30 src/java/voldemort/server/http/gui/templates/read-only-mgmt.vm
@@ -36,6 +36,35 @@
</head>
<body>
<div class="content">
+ <h2>Fetch Data Files</h2>
+ <form method="post">
+ <input type="hidden" name="operation" value="fetch"/>
+ <table align="center">
+ <tr>
+ <td>Store</td>
+ <td>
+ <select name="store">
+ #foreach($store in $stores.keySet())
+ <option value="$store">$store</option>
+ #end
+ </select>
+ </td>
+ </tr>
+ <tr>
+ <td>Index File</td>
+ <td><input type="text" name="index"/></td>
+ </tr>
+ <tr>
+ <td>Data File</td>
+ <td><input type="text" name="data"/></td>
+ </tr>
+ <tr>
+ <td colspan="2">
+ <input type="submit" value="Submit"/>
+ </td>
+ </tr>
+ </table>
+ </form>
<h2>Swap Data Files</h2>
<form method="post">
<input type="hidden" name="operation" value="swap"/>
@@ -65,6 +94,7 @@
</tr>
</table>
</form>
+
</div>
</body>
</html>
View
152 src/java/voldemort/store/readonly/StoreSwapper.java
@@ -0,0 +1,152 @@
+package voldemort.store.readonly;
+
+import java.io.File;
+import java.io.StringReader;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.httpclient.HttpClient;
+import org.apache.commons.httpclient.HttpConnectionManager;
+import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
+import org.apache.commons.httpclient.methods.PostMethod;
+import org.apache.log4j.Logger;
+
+import voldemort.VoldemortException;
+import voldemort.cluster.Cluster;
+import voldemort.cluster.Node;
+import voldemort.utils.Utils;
+import voldemort.xml.ClusterMapper;
+
+public class StoreSwapper {
+
+ private static final Logger logger = Logger.getLogger(StoreSwapper.class);
+
+ private final Cluster cluster;
+ private final ExecutorService executor;
+ private final HttpClient httpClient;
+ private final String readOnlyMgmtPath;
+ private final String basePath;
+
+ private StoreSwapper(Cluster cluster,
+ ExecutorService executor,
+ HttpClient httpClient,
+ String readOnlyMgmtPath,
+ String basePath) {
+ super();
+ this.cluster = cluster;
+ this.executor = executor;
+ this.httpClient = httpClient;
+ this.readOnlyMgmtPath = readOnlyMgmtPath;
+ this.basePath = basePath;
+ }
+
+ public void swapStoreData(String storeName) {
+ List<String[]> fetched = invokeFetch(storeName);
+ invokeSwap(storeName, fetched);
+ }
+
+ private List<String[]> invokeFetch(final String storeName) {
+ // do fetch
+ Map<Integer, Future<String[]>> fetchFiles = new HashMap<Integer, Future<String[]>>();
+ for(final Node node: cluster.getNodes()) {
+ fetchFiles.put(node.getId(), executor.submit(new Callable<String[]>() {
+
+ public String[] call() throws Exception {
+ String url = node.getHttpUrl() + "/" + readOnlyMgmtPath;
+ PostMethod post = new PostMethod(url);
+ post.addParameter("operation", "fetch");
+ post.addParameter("index", basePath + "/" + storeName + "/" + node.getId()
+ + ".index");
+ post.addParameter("data", basePath + "/" + storeName + "/" + node.getId()
+ + ".data");
+ int responseCode = httpClient.executeMethod(post);
+ String response = post.getResponseBodyAsString(10000);
+ if(responseCode != 200)
+ throw new VoldemortException("Swap request on node " + node.getId()
+ + " failed: " + post.getStatusText());
+ String[] files = response.split("\n");
+ if(files.length != 2)
+ throw new VoldemortException("Expected two files, but found "
+ + files.length + " in '" + response + "'.");
+ return files;
+ }
+ }));
+ }
+
+ // wait for all operations to complete successfully
+ List<String[]> results = new ArrayList<String[]>();
+ for(int nodeId = 0; nodeId < cluster.getNumberOfNodes(); nodeId++) {
+ Future<String[]> val = fetchFiles.get(nodeId);
+ try {
+ results.add(val.get());
+ } catch(ExecutionException e) {
+ throw new VoldemortException(e.getCause());
+ } catch(InterruptedException e) {
+ throw new VoldemortException(e);
+ }
+ }
+
+ return results;
+ }
+
+ private void invokeSwap(String storeName, List<String[]> fetchFiles) {
+ // do swap
+ int successes = 0;
+ Exception exception = null;
+ for(int nodeId = 0; nodeId < cluster.getNumberOfNodes(); nodeId++) {
+ try {
+ Node node = cluster.getNodeById(nodeId);
+ String url = node.getHttpUrl() + "/" + readOnlyMgmtPath;
+ PostMethod post = new PostMethod(url);
+ post.addParameter("operation", "swap");
+ String indexFile = fetchFiles.get(nodeId)[0];
+ String dataFile = fetchFiles.get(nodeId)[1];
+ logger.info("Swapping for node " + nodeId + " index = " + indexFile + ", data = "
+ + dataFile);
+ post.addParameter("index", indexFile);
+ post.addParameter("data", dataFile);
+ post.addParameter("store", storeName);
+ int responseCode = httpClient.executeMethod(post);
+ String response = post.getStatusText();
+ if(responseCode == 200)
+ successes++;
+ else
+ throw new VoldemortException(response);
+ } catch(Exception e) {
+ exception = e;
+ logger.error("Exception thrown during swap operation on node " + nodeId + ": ", e);
+ }
+ }
+
+ if(exception != null)
+ throw new VoldemortException(exception);
+ }
+
+ public static void main(String[] args) throws Exception {
+ if(args.length != 4)
+ Utils.croak("USAGE: cluster.xml store_name mgmtpath file_path");
+ String clusterXml = args[0];
+ String storeName = args[1];
+ String mgmtPath = args[2];
+ String filePath = args[3];
+
+ String clusterStr = Utils.readString(new File(clusterXml));
+ Cluster cluster = new ClusterMapper().readCluster(new StringReader(clusterStr));
+ ExecutorService executor = Executors.newFixedThreadPool(10);
+ HttpConnectionManager manager = new MultiThreadedHttpConnectionManager();
+ HttpClient client = new HttpClient(manager);
+ StoreSwapper swapper = new StoreSwapper(cluster, executor, client, mgmtPath, filePath);
+ swapper.swapStoreData(storeName);
+ executor.shutdownNow();
+ executor.awaitTermination(1, TimeUnit.SECONDS);
+ System.exit(0);
+ }
+}
View
20 src/java/voldemort/utils/Utils.java
@@ -16,7 +16,10 @@
package voldemort.utils;
+import java.io.BufferedReader;
import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
@@ -407,4 +410,21 @@ private static RuntimeException getCause(InvocationTargetException e) {
else
throw new IllegalArgumentException(e.getCause());
}
+
+ /**
+ * Read the contents of the file as a string
+ *
+ * @param f The file to read
+ * @return The string read
+ * @throws IOException If the file doesn't exists or there are errors while
+ * reading it
+ */
+ public static String readString(File f) throws IOException {
+ BufferedReader reader = new BufferedReader(new FileReader(f));
+ StringBuilder builder = new StringBuilder();
+ for(int curr = reader.read(); curr >= 0; curr = reader.read())
+ builder.append((char) curr);
+ reader.close();
+ return builder.toString();
+ }
}
Please sign in to comment.
Something went wrong with that request. Please try again.