diff --git a/src/java/voldemort/coordinator/HttpRequestHandler.java b/src/java/voldemort/coordinator/HttpRequestHandler.java
new file mode 100644
index 0000000000..b456bf021c
--- /dev/null
+++ b/src/java/voldemort/coordinator/HttpRequestHandler.java
@@ -0,0 +1,258 @@
+package voldemort.coordinator;
+
+/*
+ * Copyright 2009 Red Hat, Inc.
+ *
+ * Red Hat licenses this file to you under the Apache License, version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+import static org.jboss.netty.handler.codec.http.HttpHeaders.isKeepAlive;
+import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.CONTENT_LENGTH;
+import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
+import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.COOKIE;
+import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.SET_COOKIE;
+import static org.jboss.netty.handler.codec.http.HttpResponseStatus.OK;
+import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelFutureListener;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+import org.jboss.netty.handler.codec.http.Cookie;
+import org.jboss.netty.handler.codec.http.CookieDecoder;
+import org.jboss.netty.handler.codec.http.CookieEncoder;
+import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
+import org.jboss.netty.handler.codec.http.HttpChunk;
+import org.jboss.netty.handler.codec.http.HttpChunkTrailer;
+import org.jboss.netty.handler.codec.http.HttpRequest;
+import org.jboss.netty.handler.codec.http.HttpResponse;
+import org.jboss.netty.handler.codec.http.QueryStringDecoder;
+import org.jboss.netty.util.CharsetUtil;
+
+import voldemort.client.ZenStoreClient;
+import voldemort.utils.ByteArray;
+import voldemort.versioning.ObsoleteVersionException;
+import voldemort.versioning.VectorClock;
+import voldemort.versioning.Version;
+import voldemort.versioning.Versioned;
+
+/**
+ * @author The Netty Project
+ * @author Andy Taylor (andy.taylor@jboss.org)
+ * @author Trustin Lee
+ *
+ * @version $Rev: 2288 $, $Date: 2010-05-27 21:40:50 +0900 (Thu, 27 May 2010) $
+ */
+public class HttpRequestHandler extends SimpleChannelUpstreamHandler {
+
+ public HttpRequest request;
+ private boolean readingChunks;
+ /** Buffer that stores the response content */
+ private final StringBuilder buf = new StringBuilder();
+ private ZenStoreClient storeClient;
+ public ChannelBuffer responseContent;
+ private final static String STORE_NAME = "store_name";
+
+ public static enum OP_TYPE {
+ GET,
+ PUT
+ };
+
+ public HttpRequestHandler(ZenStoreClient storeClient) {
+ this.storeClient = storeClient;
+ }
+
+ public OP_TYPE getOperationType(String path) {
+ if(path.equals("/put")) {
+ return OP_TYPE.PUT;
+ }
+
+ return OP_TYPE.GET;
+ }
+
+ public ByteArray readKey(ChannelBuffer content) {
+ int keySize = content.readInt();
+ byte[] key = new byte[keySize];
+ content.readBytes(key);
+ return new ByteArray(key);
+ }
+
+ public void writeResults(List> values) throws IOException {
+ responseContent.writeInt(values.size());
+ for(Versioned v: values) {
+ byte[] clock = ((VectorClock) v.getVersion()).toBytes();
+ byte[] value = (byte[]) v.getValue();
+ responseContent.writeInt(clock.length + value.length);
+ responseContent.writeBytes(clock);
+ responseContent.writeBytes(value);
+ }
+ }
+
+ private byte[] readValue(ChannelBuffer content) {
+ int valueSize = content.readInt();
+ byte[] value = new byte[valueSize];
+ content.readBytes(value);
+ return value;
+ }
+
+ @Override
+ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
+ String storeName = "";
+ List> results = new ArrayList>();
+
+ if(!readingChunks) {
+ HttpRequest request = this.request = (HttpRequest) e.getMessage();
+ QueryStringDecoder queryStringDecoder = new QueryStringDecoder(request.getUri());
+
+ // Decode the operation type
+ OP_TYPE operation = getOperationType(queryStringDecoder.getPath());
+
+ Map> params = queryStringDecoder.getParameters();
+ if(params != null && params.containsKey(STORE_NAME)) {
+ storeName = params.get(STORE_NAME).get(0);
+ } else {
+ System.err.println("Store Name missing. Critical error");
+ this.responseContent = ChannelBuffers.copiedBuffer("Store Name missing. Critical error".getBytes());
+ return;
+ // TODO: Return the right error code here
+ }
+
+ if(request.isChunked()) {
+ readingChunks = true;
+ } else {
+
+ ChannelBuffer content = request.getContent();
+ if(!content.readable()) {
+ System.err.println("Contents not readable");
+ this.responseContent = ChannelBuffers.copiedBuffer("Contents not readable".getBytes());
+ return;
+ }
+
+ // TODO: Check for correct number of parameters and Decoding
+
+ switch(operation) {
+ case GET:
+ // System.out.println("GET operation");
+ ByteArray getKey = readKey(content);
+ Versioned responseVersioned = this.storeClient.get(getKey);
+ if(responseVersioned == null) {
+ byte[] nullByteArray = new byte[1];
+ nullByteArray[0] = 0;
+ responseVersioned = new Versioned(nullByteArray);
+ }
+ results.add(responseVersioned);
+ byte[] responseValue = (byte[]) responseVersioned.getValue();
+ this.responseContent = ChannelBuffers.dynamicBuffer(responseValue.length);
+ writeResults(results);
+ break;
+ case PUT:
+ // System.out.println("PUT operation");
+ ByteArray putKey = readKey(content);
+ byte[] putValue = readValue(content);
+ try {
+ Version putVersion = this.storeClient.put(putKey, putValue);
+ } catch(ObsoleteVersionException oe) {
+ // Ideally propagate the exception !
+ }
+ this.responseContent = ChannelBuffers.EMPTY_BUFFER;
+ break;
+ default:
+ System.err.println("Illegal operation.");
+ this.responseContent = ChannelBuffers.copiedBuffer("Illegal operation.".getBytes());
+ return;
+ }
+
+ writeResponse(e);
+ }
+ } else {
+ HttpChunk chunk = (HttpChunk) e.getMessage();
+ if(chunk.isLast()) {
+ readingChunks = false;
+ buf.append("END OF CONTENT\r\n");
+
+ HttpChunkTrailer trailer = (HttpChunkTrailer) chunk;
+ if(!trailer.getHeaderNames().isEmpty()) {
+ buf.append("\r\n");
+ for(String name: trailer.getHeaderNames()) {
+ for(String value: trailer.getHeaders(name)) {
+ buf.append("TRAILING HEADER: " + name + " = " + value + "\r\n");
+ }
+ }
+ buf.append("\r\n");
+ }
+
+ writeResponse(e);
+ } else {
+ buf.append("CHUNK: " + chunk.getContent().toString(CharsetUtil.UTF_8) + "\r\n");
+ }
+
+ }
+ }
+
+ public void writeResponse(MessageEvent e) {
+ // Decide whether to close the connection or not.
+ boolean keepAlive = isKeepAlive(request);
+
+ // Build the response object.
+ HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
+ response.setContent(this.responseContent);
+ // response.setHeader(CONTENT_TYPE, "text/plain; charset=UTF-8");
+ response.setHeader(CONTENT_TYPE, "application/pdf");
+ // response.setChunked(true);
+
+ if(keepAlive) {
+ // Add 'Content-Length' header only for a keep-alive connection.
+ response.setHeader(CONTENT_LENGTH, response.getContent().readableBytes());
+ }
+
+ // Encode the cookie.
+ String cookieString = request.getHeader(COOKIE);
+ if(cookieString != null) {
+ CookieDecoder cookieDecoder = new CookieDecoder();
+ Set cookies = cookieDecoder.decode(cookieString);
+ if(!cookies.isEmpty()) {
+ // Reset the cookies if necessary.
+ CookieEncoder cookieEncoder = new CookieEncoder(true);
+ for(Cookie cookie: cookies) {
+ cookieEncoder.addCookie(cookie);
+ }
+ response.addHeader(SET_COOKIE, cookieEncoder.encode());
+ }
+ }
+
+ // Write the response.
+ ChannelFuture future = e.getChannel().write(response);
+
+ // Close the non-keep-alive connection after the write operation is
+ // done.
+ if(!keepAlive) {
+ future.addListener(ChannelFutureListener.CLOSE);
+ }
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
+ e.getCause().printStackTrace();
+ e.getChannel().close();
+ }
+}
diff --git a/src/java/voldemort/coordinator/HttpResponseHandler.java b/src/java/voldemort/coordinator/HttpResponseHandler.java
new file mode 100644
index 0000000000..69660314a9
--- /dev/null
+++ b/src/java/voldemort/coordinator/HttpResponseHandler.java
@@ -0,0 +1,77 @@
+package voldemort.coordinator;
+/*
+ * Copyright 2009 Red Hat, Inc.
+ *
+ * Red Hat licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+import org.jboss.netty.handler.codec.http.HttpChunk;
+import org.jboss.netty.handler.codec.http.HttpResponse;
+import org.jboss.netty.util.CharsetUtil;
+
+/**
+ * @author The Netty Project
+ * @author Andy Taylor (andy.taylor@jboss.org)
+ * @author Trustin Lee
+ *
+ * @version $Rev: 2189 $, $Date: 2010-02-19 18:02:57 +0900 (Fri, 19 Feb 2010) $
+ */
+public class HttpResponseHandler extends SimpleChannelUpstreamHandler {
+
+ private boolean readingChunks;
+
+ @Override
+ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
+ if (!readingChunks) {
+ HttpResponse response = (HttpResponse) e.getMessage();
+
+ System.out.println("STATUS: " + response.getStatus());
+ System.out.println("VERSION: " + response.getProtocolVersion());
+ System.out.println();
+
+ if (!response.getHeaderNames().isEmpty()) {
+ for (String name: response.getHeaderNames()) {
+ for (String value: response.getHeaders(name)) {
+ System.out.println("HEADER: " + name + " = " + value);
+ }
+ }
+ System.out.println();
+ }
+
+ if (response.getStatus().getCode() == 200 && response.isChunked()) {
+ readingChunks = true;
+ System.out.println("CHUNKED CONTENT {");
+ } else {
+ ChannelBuffer content = response.getContent();
+ if (content.readable()) {
+ System.out.println("CONTENT {");
+ System.out.println(content.toString(CharsetUtil.UTF_8));
+ System.out.println("} END OF CONTENT");
+ }
+ }
+ } else {
+ HttpChunk chunk = (HttpChunk) e.getMessage();
+ if (chunk.isLast()) {
+ readingChunks = false;
+ System.out.println("} END OF CHUNKED CONTENT");
+ } else {
+ System.out.print(chunk.getContent().toString(CharsetUtil.UTF_8));
+ System.out.flush();
+ }
+ }
+ }
+}
diff --git a/src/java/voldemort/coordinator/HttpServer.java b/src/java/voldemort/coordinator/HttpServer.java
new file mode 100644
index 0000000000..17dde69c96
--- /dev/null
+++ b/src/java/voldemort/coordinator/HttpServer.java
@@ -0,0 +1,80 @@
+package voldemort.coordinator;
+
+/*
+ * Copyright 2009 Red Hat, Inc.
+ *
+ * Red Hat licenses this file to you under the Apache License, version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.Executors;
+
+import org.jboss.netty.bootstrap.ServerBootstrap;
+import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
+
+import voldemort.client.ClientConfig;
+import voldemort.client.SocketStoreClientFactory;
+import voldemort.client.ZenStoreClient;
+import voldemort.client.protocol.RequestFormatType;
+
+/**
+ * An HTTP server that sends back the content of the received HTTP request in a
+ * pretty plaintext form.
+ *
+ * @author The Netty Project
+ * @author Andy Taylor (andy.taylor@jboss.org)
+ * @author Trustin Lee
+ *
+ * @version $Rev: 2080 $, $Date: 2010-01-26 18:04:19 +0900 (Tue, 26 Jan 2010) $
+ */
+public class HttpServer {
+
+ private static boolean noop = false;
+
+ public static void main(String[] args) {
+
+ if(args.length < 1) {
+ System.err.println("Missing argument: ");
+ System.exit(-1);
+ }
+
+ if(args.length == 2) {
+ if(args[1].equals("noop")) {
+ noop = true;
+ }
+ }
+
+ // Configure the server.
+ ServerBootstrap bootstrap = new ServerBootstrap(new NioServerSocketChannelFactory(Executors.newCachedThreadPool(),
+ Executors.newCachedThreadPool()));
+ bootstrap.setOption("backlog", 1000);
+
+ // Setup the Voldemort client
+ ClientConfig clientConfig = new ClientConfig().setBootstrapUrls(args[0])
+ .setEnableLazy(false)
+ .setRequestFormatType(RequestFormatType.VOLDEMORT_V3)
+ .setEnableCompressionLayer(false)
+ .setEnableSerializationLayer(false)
+ .setEnableInconsistencyResolvingLayer(false);
+
+ SocketStoreClientFactory factory = new SocketStoreClientFactory(clientConfig);
+ ZenStoreClient storeClient = (ZenStoreClient) factory.getStoreClient("test");
+ System.out.println("Successfully created ZenStoreClient");
+
+ // Set up the event pipeline factory.
+ bootstrap.setPipelineFactory(new HttpServerPipelineFactory(storeClient, noop));
+
+ // Bind and start to accept incoming connections.
+ bootstrap.bind(new InetSocketAddress(8080));
+ }
+}
diff --git a/src/java/voldemort/coordinator/HttpServerPipelineFactory.java b/src/java/voldemort/coordinator/HttpServerPipelineFactory.java
new file mode 100644
index 0000000000..1e8a1202e4
--- /dev/null
+++ b/src/java/voldemort/coordinator/HttpServerPipelineFactory.java
@@ -0,0 +1,71 @@
+package voldemort.coordinator;
+
+/*
+ * Copyright 2009 Red Hat, Inc.
+ *
+ * Red Hat licenses this file to you under the Apache License, version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+import static org.jboss.netty.channel.Channels.pipeline;
+
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.handler.codec.http.HttpChunkAggregator;
+import org.jboss.netty.handler.codec.http.HttpContentCompressor;
+import org.jboss.netty.handler.codec.http.HttpRequestDecoder;
+import org.jboss.netty.handler.codec.http.HttpResponseEncoder;
+
+import voldemort.client.ZenStoreClient;
+
+/**
+ * @author The Netty Project
+ * @author Andy Taylor (andy.taylor@jboss.org)
+ * @author Trustin Lee
+ *
+ * @version $Rev: 2080 $, $Date: 2010-01-26 18:04:19 +0900 (Tue, 26 Jan 2010) $
+ */
+public class HttpServerPipelineFactory implements ChannelPipelineFactory {
+
+ private ZenStoreClient storeClient;
+ boolean noop = false;
+
+ public HttpServerPipelineFactory(ZenStoreClient storeClient, boolean noop) {
+ this.storeClient = storeClient;
+ this.noop = noop;
+ }
+
+ public ChannelPipeline getPipeline() throws Exception {
+ // Create a default pipeline implementation.
+ ChannelPipeline pipeline = pipeline();
+
+ // Uncomment the following line if you want HTTPS
+ // SSLEngine engine =
+ // SecureChatSslContextFactory.getServerContext().createSSLEngine();
+ // engine.setUseClientMode(false);
+ // pipeline.addLast("ssl", new SslHandler(engine));
+
+ pipeline.addLast("decoder", new HttpRequestDecoder());
+ // Uncomment the following line if you don't want to handle HttpChunks.
+ pipeline.addLast("aggregator", new HttpChunkAggregator(1048576));
+ pipeline.addLast("encoder", new HttpResponseEncoder());
+ // Remove the following line if you don't want automatic content
+ // compression.
+ pipeline.addLast("deflater", new HttpContentCompressor());
+ if(this.noop) {
+ pipeline.addLast("handler", new NoopHttpRequestHandler(null));
+ } else {
+ pipeline.addLast("handler", new HttpRequestHandler(storeClient));
+ }
+ return pipeline;
+ }
+}
diff --git a/src/java/voldemort/coordinator/NoopHttpRequestHandler.java b/src/java/voldemort/coordinator/NoopHttpRequestHandler.java
new file mode 100644
index 0000000000..f53e33687a
--- /dev/null
+++ b/src/java/voldemort/coordinator/NoopHttpRequestHandler.java
@@ -0,0 +1,58 @@
+package voldemort.coordinator;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.handler.codec.http.HttpRequest;
+import org.jboss.netty.handler.codec.http.QueryStringDecoder;
+
+import voldemort.client.ZenStoreClient;
+import voldemort.versioning.Versioned;
+
+public class NoopHttpRequestHandler extends HttpRequestHandler {
+
+ public NoopHttpRequestHandler(ZenStoreClient storeClient) {
+ super(storeClient);
+ }
+
+ @Override
+ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
+ String storeName = "";
+ List> results = new ArrayList>();
+
+ HttpRequest request = this.request = (HttpRequest) e.getMessage();
+ QueryStringDecoder queryStringDecoder = new QueryStringDecoder(request.getUri());
+
+ // Decode the operation type
+ OP_TYPE operation = getOperationType(queryStringDecoder.getPath());
+
+ ChannelBuffer content = request.getContent();
+
+ switch(operation) {
+ case GET:
+ Versioned responseVersioned = null;
+ byte[] nullByteArray = new byte[1];
+ nullByteArray[0] = 0;
+ responseVersioned = new Versioned(nullByteArray);
+ results.add(responseVersioned);
+ byte[] responseValue = (byte[]) responseVersioned.getValue();
+ this.responseContent = ChannelBuffers.dynamicBuffer(responseValue.length);
+ writeResults(results);
+ break;
+ case PUT:
+ this.responseContent = ChannelBuffers.EMPTY_BUFFER;
+ break;
+ default:
+ System.err.println("Illegal operation.");
+ this.responseContent = ChannelBuffers.copiedBuffer("Illegal operation.".getBytes());
+ return;
+ }
+
+ writeResponse(e);
+
+ }
+}
diff --git a/src/java/voldemort/coordinator/R2StoreWrapper.java b/src/java/voldemort/coordinator/R2StoreWrapper.java
new file mode 100644
index 0000000000..095d8dd64e
--- /dev/null
+++ b/src/java/voldemort/coordinator/R2StoreWrapper.java
@@ -0,0 +1,206 @@
+package voldemort.coordinator;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.HttpURLConnection;
+import java.net.URI;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import voldemort.VoldemortException;
+import voldemort.store.Store;
+import voldemort.store.StoreCapabilityType;
+import voldemort.store.StoreUtils;
+import voldemort.utils.ByteArray;
+import voldemort.utils.ByteUtils;
+import voldemort.versioning.VectorClock;
+import voldemort.versioning.Version;
+import voldemort.versioning.Versioned;
+
+import com.linkedin.common.callback.FutureCallback;
+import com.linkedin.common.util.None;
+import com.linkedin.data.ByteString;
+import com.linkedin.r2.message.rest.RestRequest;
+import com.linkedin.r2.message.rest.RestRequestBuilder;
+import com.linkedin.r2.message.rest.RestResponse;
+import com.linkedin.r2.transport.common.Client;
+import com.linkedin.r2.transport.common.bridge.client.TransportClient;
+import com.linkedin.r2.transport.common.bridge.client.TransportClientAdapter;
+import com.linkedin.r2.transport.http.client.HttpClientFactory;
+
+public class R2StoreWrapper implements Store {
+
+ private URL url = null;
+ HttpURLConnection conn = null;
+ private HttpClientFactory _clientFactory;
+ private Client client = null;
+ private String baseURL;
+
+ public R2StoreWrapper(String baseURL) {
+ try {
+ _clientFactory = new HttpClientFactory();
+ final TransportClient transportClient = _clientFactory.getClient(new HashMap());
+ client = new TransportClientAdapter(transportClient);
+ this.baseURL = baseURL;
+ } catch(Exception e) {
+ e.printStackTrace();
+ }
+
+ }
+
+ @Override
+ public void close() throws VoldemortException {
+ final FutureCallback callback = new FutureCallback();
+ client.shutdown(callback);
+ try {
+ callback.get();
+ } catch(InterruptedException e) {
+ e.printStackTrace();
+ } catch(ExecutionException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public boolean delete(ByteArray arg0, Version arg1) throws VoldemortException {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ @Override
+ public List> get(ByteArray key, byte[] transforms) throws VoldemortException {
+
+ List> resultList = new ArrayList>();
+
+ try {
+ // Create the byte[] array
+ ByteArrayOutputStream outputBytes = new ByteArrayOutputStream();
+ DataOutputStream outputStream = new DataOutputStream(outputBytes);
+ writeGetRequest(outputStream, key);
+
+ // Create the REST request with this byte array
+ RestRequestBuilder rb = new RestRequestBuilder(new URI(this.baseURL
+ + "/get?store_name=test"));
+ rb.setMethod("POST");
+ rb.setEntity(outputBytes.toByteArray());
+ rb.setHeader("CONTENT_TYPE", "application/pdf");
+
+ RestRequest request = rb.build();
+ Future f = client.restRequest(request);
+
+ // This will block
+ RestResponse response = f.get();
+ final ByteString entity = response.getEntity();
+ if(entity != null) {
+ // System.out.println(entity.asString("UTF-8"));
+ DataInputStream inputStream = new DataInputStream(entity.asInputStream());
+ resultList = readResults(inputStream);
+ } else {
+ System.out.println("NOTHING!");
+ }
+
+ } catch(VoldemortException ve) {
+ throw ve;
+ } catch(Exception e) {
+ e.printStackTrace();
+ }
+
+ return resultList;
+ }
+
+ @Override
+ public void put(ByteArray key, Versioned value, byte[] transform)
+ throws VoldemortException {
+ try {
+ // Create the byte[] array
+ ByteArrayOutputStream outputBytes = new ByteArrayOutputStream();
+ DataOutputStream outputStream = new DataOutputStream(outputBytes);
+ writePutRequest(outputStream, key, value.getValue());
+
+ // Create the REST request with this byte array
+ RestRequestBuilder rb = new RestRequestBuilder(new URI(this.baseURL
+ + "/put?store_name=test"));
+ rb.setMethod("POST");
+ rb.setEntity(outputBytes.toByteArray());
+ rb.setHeader("CONTENT_TYPE", "application/pdf");
+
+ RestRequest request = rb.build();
+ Future f = client.restRequest(request);
+
+ // This will block
+ RestResponse response = f.get();
+ final ByteString entity = response.getEntity();
+ if(entity != null) {
+ // System.out.println(entity.asString("UTF-8"));
+ } else {
+ System.out.println("NOTHING!");
+ }
+ } catch(VoldemortException ve) {
+ throw ve;
+ } catch(Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ private void writeGetRequest(DataOutputStream outputStream, ByteArray key) throws IOException {
+ StoreUtils.assertValidKey(key);
+ outputStream.writeInt(key.length());
+ outputStream.write(key.get());
+ }
+
+ private List> readResults(DataInputStream inputStream) throws IOException {
+ int resultSize = inputStream.readInt();
+ List> results = new ArrayList>(resultSize);
+ for(int i = 0; i < resultSize; i++) {
+ int valueSize = inputStream.readInt();
+ byte[] bytes = new byte[valueSize];
+ ByteUtils.read(inputStream, bytes);
+ VectorClock clock = new VectorClock(bytes);
+ results.add(new Versioned(ByteUtils.copy(bytes,
+ clock.sizeInBytes(),
+ bytes.length), clock));
+ }
+ return results;
+ }
+
+ private void writePutRequest(DataOutputStream outputStream, ByteArray key, byte[] value)
+ throws IOException {
+ writeGetRequest(outputStream, key);
+ outputStream.writeInt(value.length);
+ outputStream.write(value);
+ }
+
+ @Override
+ public Map>> getAll(Iterable arg0,
+ Map arg1)
+ throws VoldemortException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public Object getCapability(StoreCapabilityType arg0) {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public String getName() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public List getVersions(ByteArray arg0) {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+}
diff --git a/src/java/voldemort/coordinator/RESTClient.java b/src/java/voldemort/coordinator/RESTClient.java
new file mode 100644
index 0000000000..ca4695e66e
--- /dev/null
+++ b/src/java/voldemort/coordinator/RESTClient.java
@@ -0,0 +1,175 @@
+package voldemort.coordinator;
+
+import java.util.List;
+import java.util.Map;
+
+import voldemort.client.RoutingTier;
+import voldemort.client.StoreClient;
+import voldemort.client.UpdateAction;
+import voldemort.cluster.Node;
+import voldemort.routing.RoutingStrategyType;
+import voldemort.serialization.DefaultSerializerFactory;
+import voldemort.serialization.Serializer;
+import voldemort.serialization.SerializerDefinition;
+import voldemort.serialization.SerializerFactory;
+import voldemort.store.Store;
+import voldemort.store.StoreDefinition;
+import voldemort.store.StoreDefinitionBuilder;
+import voldemort.store.serialized.SerializingStore;
+import voldemort.store.versioned.InconsistencyResolvingStore;
+import voldemort.utils.ByteArray;
+import voldemort.versioning.ChainedResolver;
+import voldemort.versioning.InconsistencyResolver;
+import voldemort.versioning.ObsoleteVersionException;
+import voldemort.versioning.TimeBasedInconsistencyResolver;
+import voldemort.versioning.VectorClock;
+import voldemort.versioning.VectorClockInconsistencyResolver;
+import voldemort.versioning.Version;
+import voldemort.versioning.Versioned;
+
+public class RESTClient implements StoreClient {
+
+ private Store clientStore = null;
+ private SerializerFactory serializerFactory = new DefaultSerializerFactory();
+ private StoreDefinition storeDef;
+
+ public RESTClient(String bootstrapURL, String storeName) {
+
+ String baseURL = "http://" + bootstrapURL.split(":")[1].substring(2) + ":8080";
+ // The lowest layer : Transporting request to coordinator
+ Store store = (Store) new R2StoreWrapper(baseURL);
+
+ // TODO
+ // Get the store definition so that we can learn the Serialization
+ // and
+ // compression properties
+
+ // TODO
+ // Add compression layer
+
+ // Add Serialization layer
+ storeDef = new StoreDefinitionBuilder().setName(storeName)
+ .setType("bdb")
+ .setKeySerializer(new SerializerDefinition("string"))
+ .setValueSerializer(new SerializerDefinition("string"))
+ .setRoutingPolicy(RoutingTier.CLIENT)
+ .setRoutingStrategyType(RoutingStrategyType.CONSISTENT_STRATEGY)
+ .setReplicationFactor(1)
+ .setPreferredReads(1)
+ .setRequiredReads(1)
+ .setPreferredWrites(1)
+ .setRequiredWrites(1)
+ .build();
+ Serializer keySerializer = (Serializer) serializerFactory.getSerializer(storeDef.getKeySerializer());
+ Serializer valueSerializer = (Serializer) serializerFactory.getSerializer(storeDef.getValueSerializer());
+ clientStore = SerializingStore.wrap(store, keySerializer, valueSerializer, null);
+
+ // Add inconsistency Resolving layer
+ InconsistencyResolver> secondaryResolver = new TimeBasedInconsistencyResolver();
+ clientStore = new InconsistencyResolvingStore(clientStore,
+ new ChainedResolver>(new VectorClockInconsistencyResolver(),
+ secondaryResolver));
+ }
+
+ @Override
+ public V getValue(K key) {
+ return getValue(key, null);
+ }
+
+ @Override
+ public V getValue(K key, V defaultValue) {
+ Versioned retVal = get(key);
+ return retVal.getValue();
+ }
+
+ @Override
+ public Versioned get(K key) {
+ return get(key, null);
+ }
+
+ @Override
+ public Versioned get(K key, Object transforms) {
+ return this.clientStore.get(key, null).get(0);
+ }
+
+ @Override
+ public Map> getAll(Iterable keys) {
+ return null;
+ }
+
+ @Override
+ public Map> getAll(Iterable keys, Map transforms) {
+ return null;
+ }
+
+ @Override
+ public Versioned get(K key, Versioned defaultValue) {
+ return this.clientStore.get(key, null).get(0);
+ }
+
+ @Override
+ public Version put(K key, V value) {
+ clientStore.put(key, new Versioned(value), null);
+ return new VectorClock();
+ }
+
+ @Override
+ public Version put(K key, V value, Object transforms) {
+ return put(key, value);
+ }
+
+ @Override
+ public Version put(K key, Versioned versioned) throws ObsoleteVersionException {
+ clientStore.put(key, versioned, null);
+ return new VectorClock();
+ }
+
+ @Override
+ public boolean putIfNotObsolete(K key, Versioned versioned) {
+ return false;
+ }
+
+ @Override
+ public boolean applyUpdate(UpdateAction action) {
+ return applyUpdate(action, 3);
+ }
+
+ @Override
+ public boolean applyUpdate(UpdateAction action, int maxTries) {
+ boolean success = false;
+ try {
+ for(int i = 0; i < maxTries; i++) {
+ try {
+ action.update(this);
+ success = true;
+ return success;
+ } catch(ObsoleteVersionException e) {
+ // ignore for now
+ }
+ }
+ } finally {
+ if(!success)
+ action.rollback();
+ }
+
+ // if we got here we have seen too many ObsoleteVersionExceptions
+ // and have rolled back the updates
+ return false;
+ }
+
+ @Override
+ public boolean delete(K key) {
+ return false;
+ }
+
+ @Override
+ public boolean delete(K key, Version version) {
+ return false;
+ }
+
+ @Override
+ public List getResponsibleNodes(K key) {
+ return null;
+ }
+
+}
diff --git a/src/java/voldemort/coordinator/SampleRESTClient.java b/src/java/voldemort/coordinator/SampleRESTClient.java
new file mode 100644
index 0000000000..b709b55aee
--- /dev/null
+++ b/src/java/voldemort/coordinator/SampleRESTClient.java
@@ -0,0 +1,17 @@
+package voldemort.coordinator;
+
+public class SampleRESTClient {
+
+ public static void main(String[] args) {
+
+ // Create the client
+ RESTClient clientStore = new RESTClient("http://localhost:8080",
+ "test");
+
+ // Sample put
+ clientStore.put("a", "How goesasdfsafsadf it adsfasdfasdf!!!");
+
+ // Do a sample operation:
+ System.out.println("Received response : " + clientStore.get("a"));
+ }
+}
diff --git a/test/integration/voldemort/performance/benchmark/Benchmark.java b/test/integration/voldemort/performance/benchmark/Benchmark.java
index 165240c668..979e09564c 100644
--- a/test/integration/voldemort/performance/benchmark/Benchmark.java
+++ b/test/integration/voldemort/performance/benchmark/Benchmark.java
@@ -358,7 +358,8 @@ public void initializeStore(Props benchmarkProps) throws Exception {
}
SocketStoreClientFactory socketFactory = new SocketStoreClientFactory(clientConfig);
// this.storeClient = socketFactory.getStoreClient(storeName);
- this.storeClient = new RESTClient();
+ this.storeClient = new RESTClient(clientConfig.getBootstrapUrls()[0],
+ storeName);
StoreDefinition storeDef = getStoreDefinition(socketFactory, storeName);
this.keyType = findKeyType(storeDef);
benchmarkProps.put(Benchmark.KEY_TYPE, this.keyType);