Browse files

Make bench_shim support a pre-new-api riak client

  • Loading branch information...
1 parent 6a0e19a commit c72e5951c49f0e9ee4f8fda11cd2e2f6c32113e7 @russelldb russelldb committed Aug 23, 2011
View
2 pom.xml
@@ -23,7 +23,7 @@
<dependency>
<groupId>com.basho.riak</groupId>
<artifactId>riak-client</artifactId>
- <version>0.14.2-SNAPSHOT</version>
+ <version>0.14.1-Trifork</version>
</dependency>
</dependencies>
View
36 src/main/java/com/basho/riak/bench/ClientFactory.java
@@ -16,55 +16,35 @@
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
-import com.basho.riak.client.http.RiakConfig;
-import com.basho.riak.client.raw.RawClient;
-import com.basho.riak.client.raw.http.HTTPClientAdapter;
-import com.basho.riak.client.raw.pbc.PBClientAdapter;
-import com.basho.riak.pbc.RiakClient;
+import com.basho.riak.client.RiakConfig;
+import com.basho.riak.client.RiakClient;
/**
* @author russell
*
*/
public class ClientFactory {
- private static final ConcurrentHashMap<String, RawClient> HTTP_CLIENTS = new ConcurrentHashMap<String, RawClient>();
-
- private static final RawClient httpClient;
-
+ private static final RiakClient httpClient;
+
static {
RiakConfig conf = new RiakConfig(makeUrl("den-test-01.den.basho", 80));
- com.basho.riak.client.http.RiakClient delegate = new com.basho.riak.client.http.RiakClient(conf);
- httpClient = new HTTPClientAdapter(delegate);
+ httpClient = new RiakClient(conf);
}
/**
* @param config
* @return
* @throws IOException
*/
- public static RawClient newClient(ClientConfig config) throws IOException {
- RawClient client = null;
+ public static RiakClient newClient(ClientConfig config) throws IOException {
+ RiakClient client = null;
Transport transport = config.getTransport();
switch (transport) {
case PB:
- client = new PBClientAdapter(new RiakClient(config.getHost(), config.getPort(), config.getBufferSizeKb()));
- break;
+ throw new UnsupportedOperationException("Only HTTP for 0.14.1 builds");
case HTTP:
-// String key = config.getHost() + ":" + config.getPort();
-// RawClient cachedClient = HTTP_CLIENTS.get(key);
-// if (cachedClient == null) {
-// RiakConfig conf = new RiakConfig(makeUrl(config.getHost(), config.getPort()));
-// com.basho.riak.client.http.RiakClient del = new com.basho.riak.client.http.RiakClient(conf);
-// client = new HTTPClientAdapter(del);
-//
-// client = HTTP_CLIENTS.putIfAbsent(key, cachedClient);
-// if(client == null) {
-// client = cachedClient;
-// }
-//
-// }
client = httpClient;
break;
default:
View
83 src/main/java/com/basho/riak/bench/ClientShim.java
@@ -17,10 +17,11 @@
import java.io.IOException;
-import com.basho.riak.client.builders.RiakObjectBuilder;
-import com.basho.riak.client.raw.RawClient;
-import com.basho.riak.client.raw.RiakResponse;
-import com.basho.riak.client.raw.StoreMeta;
+import com.basho.riak.client.RiakClient;
+import com.basho.riak.client.RiakObject;
+import com.basho.riak.client.request.RequestMeta;
+import com.basho.riak.client.response.FetchResponse;
+import com.basho.riak.client.response.StoreResponse;
import com.ericsson.otp.erlang.OtpErlangAtom;
import com.ericsson.otp.erlang.OtpErlangDecodeException;
import com.ericsson.otp.erlang.OtpErlangExit;
@@ -41,7 +42,7 @@
public class ClientShim implements Runnable {
private final OtpMbox mbox;
- private final RawClient rawClient;
+ private final RiakClient client;
private final String host;
/**
@@ -62,8 +63,7 @@ public ClientShim(final OtpMbox mbox, String host, int port, int bufferSizeKb, T
throws IOException {
this.mbox = mbox;
final ClientConfig clientConfig = new ClientConfig(host, port, transport, bufferSizeKb);
- this.rawClient = ClientFactory.newClient(clientConfig);
- this.rawClient.generateAndSetClientId();
+ this.client = ClientFactory.newClient(clientConfig);
this.host = host;
}
@@ -90,20 +90,21 @@ public void run() {
final PutArgs putArgs = PutArgs.from(args);
final GetArgs getArgs = GetArgs.from(args);
- RiakObjectBuilder rob = RiakObjectBuilder.newBuilder(putArgs.getBucket(), putArgs.getKey());
-
switch (op) {
case GET:
try {
- RiakResponse response = rawClient.fetch(getArgs.getBucket(), getArgs.getKey(), getArgs.getR());
+ FetchResponse response = client.fetch(getArgs.getBucket(), getArgs.getKey(),
+ RequestMeta.readParams(getArgs.getR()));
- if (response == null || (!response.hasValue() && response.getVclock() == null)) {
+ if (response.getStatusCode() == 404) {
// send not found
reply = reply("ok", "notfound");
- } else {
+ } else if (response.isSuccess()) {
// send found message back
reply = reply("ok", "found");
+ } else {
+ reply = errorReply(new Exception(response.getBodyAsString()), putArgs, getArgs);
}
} catch (Exception e) {
// send error message
@@ -112,49 +113,71 @@ public void run() {
break;
case PUT:
try {
- rawClient.store(rob.withValue(putArgs.getValue()).build(),
- new StoreMeta(putArgs.getW(), putArgs.getDw(), false));
- reply = reply("ok");
+ StoreResponse resp = client.store(new RiakObject(putArgs.getBucket(), putArgs.getKey(),
+ putArgs.getValue()),
+ RequestMeta.writeParams(putArgs.getW(), putArgs.getDw()));
+
+ if (resp.isSuccess()) {
+ reply = reply("ok");
+ } else {
+ reply = errorReply(new Exception(resp.getBodyAsString()), putArgs, getArgs);
+ }
} catch (Exception e) {
reply = errorReply(e, putArgs, getArgs);
}
break;
case DELETE:
try {
- rawClient.delete(getArgs.getBucket(), getArgs.getKey(), getArgs.getR());
+ client.delete(getArgs.getBucket(), getArgs.getKey(), RequestMeta.readParams(getArgs.getR()));
reply = reply("ok");
- } catch (IOException e) {
+ } catch (Exception e) {
reply = errorReply(e, putArgs, getArgs);
}
break;
case CREATE_UPDATE:
try {
- RiakResponse response = rawClient.fetch(getArgs.getBucket(), getArgs.getKey(), getArgs.getR());
-
- rob.withValue(putArgs.getValue());
+ RiakObject obj = null;
+ FetchResponse response = client.fetch(getArgs.getBucket(), getArgs.getKey(),
+ RequestMeta.readParams(getArgs.getR()));
- if (response != null && (response.hasValue() && response.getVclock() != null)) {
- rob.withVClock(response.getVclock()).build();
+ if (response != null && response.hasObject()) {
+ obj = response.getObject();
+ obj.setValue(putArgs.getValue());
+ } else {
+ obj = new RiakObject(putArgs.getBucket(), putArgs.getKey(), putArgs.getValue());
}
- rawClient.store(rob.build(), new StoreMeta(putArgs.getW(), putArgs.getDw(), false));
- reply = reply("ok");
+ StoreResponse resp = client.store(obj, RequestMeta.writeParams(putArgs.getW(), putArgs.getDw()));
+
+ if (resp.isSuccess()) {
+ reply = reply("ok");
+ } else {
+ reply = errorReply(new Exception(resp.getBodyAsString()), putArgs, getArgs);
+ }
} catch (Exception e) {
reply = errorReply(e, putArgs, getArgs);
}
break;
case UPDATE:
try {
- RiakResponse response = rawClient.fetch(getArgs.getBucket(), getArgs.getKey(), getArgs.getR());
- if (response == null || (!response.hasValue() && response.getVclock() == null)) {
+ FetchResponse response = client.fetch(getArgs.getBucket(), getArgs.getKey(),
+ RequestMeta.readParams(getArgs.getR()));
+
+ if (!response.hasObject()) {
reply = reply("error", "notfound");
} else {
- rob.withValue(putArgs.getValue());
- rawClient.store(rob.withVClock(response.getVclock()).build(),
- new StoreMeta(putArgs.getW(), putArgs.getDw(), false));
- reply = reply("ok");
+ RiakObject obj = response.getObject();
+ obj.setValue(putArgs.getValue());
+
+ StoreResponse resp = client.store(obj, RequestMeta.writeParams(putArgs.getW(), putArgs.getDw()));
+
+ if (resp.isSuccess()) {
+ reply = reply("ok");
+ } else {
+ reply = errorReply(new Exception(resp.getBodyAsString()), putArgs, getArgs);
+ }
}
} catch (Exception e) {
reply = errorReply(e, putArgs, getArgs);
View
6 src/main/java/com/basho/riak/bench/GetArgs.java
@@ -17,7 +17,7 @@
import org.apache.commons.codec.binary.Base64;
-import com.basho.riak.client.util.CharsetUtils;
+import com.basho.riak.client.util.ClientUtils;
import com.ericsson.otp.erlang.OtpErlangAtom;
import com.ericsson.otp.erlang.OtpErlangBinary;
import com.ericsson.otp.erlang.OtpErlangList;
@@ -58,14 +58,14 @@ public int getR() {
* @return the bucket
*/
public String getBucket() {
- return CharsetUtils.asString(Base64.encodeBase64Chunked(bucket), CharsetUtils.ISO_8859_1);
+ return new String(Base64.encodeBase64Chunked(bucket), ClientUtils.ISO_8859_1);
}
/**
* @return the key
*/
public String getKey() {
- return CharsetUtils.asString(Base64.encodeBase64Chunked(key), CharsetUtils.ISO_8859_1);
+ return new String(Base64.encodeBase64Chunked(key), ClientUtils.ISO_8859_1);
}
/**
View
6 src/main/java/com/basho/riak/bench/PutArgs.java
@@ -17,7 +17,7 @@
import org.apache.commons.codec.binary.Base64;
-import com.basho.riak.client.util.CharsetUtils;
+import com.basho.riak.client.util.ClientUtils;
import com.ericsson.otp.erlang.OtpErlangAtom;
import com.ericsson.otp.erlang.OtpErlangBinary;
import com.ericsson.otp.erlang.OtpErlangList;
@@ -57,14 +57,14 @@ public PutArgs(byte[] bucket, byte[] key, byte[] value, int w, int dw) {
* @return the bucket
*/
public String getBucket() {
- return CharsetUtils.asString(Base64.encodeBase64Chunked(bucket), CharsetUtils.ISO_8859_1);
+ return new String(Base64.encodeBase64Chunked(bucket), ClientUtils.ISO_8859_1);
}
/**
* @return the key
*/
public String getKey() {
- return CharsetUtils.asString(Base64.encodeBase64Chunked(key), CharsetUtils.ISO_8859_1);
+ return new String(Base64.encodeBase64Chunked(key), ClientUtils.ISO_8859_1);
}
/**

0 comments on commit c72e595

Please sign in to comment.