diff --git a/riak/pom.xml b/riak/pom.xml
index 2819505953..c073641cfa 100644
--- a/riak/pom.xml
+++ b/riak/pom.xml
@@ -16,7 +16,7 @@
com.basho.riak
riak-client
- 1.0.5
+ 1.1.0
jar
@@ -29,6 +29,18 @@
core
${project.version}
+
+ org.mockito
+ mockito-all
+ 1.8.0
+ test
+
+
+ org.testng
+ testng
+ 6.1.1
+ test
+
diff --git a/riak/src/main/java/com/yahoo/ycsb/db/RiakClient12.java b/riak/src/main/java/com/yahoo/ycsb/db/RiakClient12.java
deleted file mode 100644
index 1ff77508a9..0000000000
--- a/riak/src/main/java/com/yahoo/ycsb/db/RiakClient12.java
+++ /dev/null
@@ -1,123 +0,0 @@
-package com.yahoo.ycsb.db;
-
-import com.basho.riak.client.IRiakClient;
-import com.basho.riak.client.IRiakObject;
-import com.basho.riak.client.RiakFactory;
-import com.basho.riak.client.bucket.Bucket;
-import com.basho.riak.client.convert.JSONConverter;
-import com.basho.riak.client.raw.pbc.PBClientConfig;
-import com.basho.riak.client.raw.pbc.PBClusterConfig;
-import com.yahoo.ycsb.ByteIterator;
-import com.yahoo.ycsb.DB;
-import com.yahoo.ycsb.DBException;
-import com.yahoo.ycsb.StringByteIterator;
-
-import java.util.*;
-
-/*
- This is considered pre-alpha, gin-inspired code.
- Use at your own risk. It's currently awaiting review.
-*/
-
-public class RiakClient12 extends DB {
- IRiakClient riakClient;
- public static final int OK = 0;
- public static final int ERROR = -1;
- public static final String RIAK_CLUSTER_HOSTS = "riak_cluster_hosts";
- public static final String RIAK_CLUSTER_HOST_DEFAULT = "127.0.0.1:8087";
-
- public void init() throws DBException {
- try {
- Properties props = getProperties();
- String cluster_hosts = props.getProperty(RIAK_CLUSTER_HOSTS, RIAK_CLUSTER_HOST_DEFAULT);
- String[] servers = cluster_hosts.split(",");
- if(servers.length == 1) {
- String[] ipAndPort = servers[0].split(":");
- String ip = ipAndPort[0].trim();
- int port = Integer.parseInt(ipAndPort[1].trim());
- System.out.println("Riak connection to " + ip + ":" + port);
- riakClient = RiakFactory.pbcClient(ip, port);
- } else {
- PBClusterConfig clusterConf = new PBClusterConfig(200);
- for(String server:servers) {
- String[] ipAndPort = server.split(":");
- String ip = ipAndPort[0].trim();
- int port = Integer.parseInt(ipAndPort[1].trim());
- System.out.println("Riak connection to " + ip + ":" + port);
- PBClientConfig node = PBClientConfig.Builder
- .from(PBClientConfig.defaults())
- .withHost(ip)
- .withPort(port).build();
- clusterConf.addClient(node);
- }
- riakClient = RiakFactory.newClient(clusterConf);
- }
- } catch (Exception e) {
- e.printStackTrace();
- throw new DBException("Error connecting to Riak: " + e.getMessage());
- }
- }
-
- public void cleanup() throws DBException {
- riakClient.shutdown();
- }
-
- public int read(String table, String key, Set fields,
- HashMap result) {
- try {
- Bucket bucket = riakClient.fetchBucket(table).lazyLoadBucketProperties().execute();
- Map m = bucket.fetch(key, Map.class).execute();
- StringByteIterator.putAllAsStrings(m, result);
- } catch (Exception e) {
- e.printStackTrace();
- return ERROR;
- }
- return OK;
- }
-
- public int scan(String table, String startkey, int recordcount,
- Set fields, Vector> result) {
- // NOT implemented
- return OK;
- }
-
- public int update(String table, String key,
- HashMap values) {
- try {
- Bucket bucket = riakClient.fetchBucket(table).lazyLoadBucketProperties().execute();
- IRiakObject robj = bucket.fetch(key).execute();
- HashMap m = StringByteIterator.getStringMap(values);
- @SuppressWarnings("unchecked")
- IRiakObject obj = new JSONConverter(m.getClass(), table, key).fromDomain(m,robj.getVClock());
- bucket.store(obj);
- } catch (Exception e) {
- insert(table, key, values);
- }
- return OK;
- }
-
- public int insert(String table, String key,
- HashMap values) {
- try {
- Bucket bucket = riakClient.fetchBucket(table).lazyLoadBucketProperties().execute();
- HashMap m = StringByteIterator.getStringMap(values);
- @SuppressWarnings("unchecked")
- IRiakObject obj = new JSONConverter(m.getClass(), table, key).fromDomain(m,null);
- bucket.store(obj);
- } catch (Exception e) {
- e.printStackTrace();
- return ERROR;
- }
- return OK;
- }
-
- public int delete(String table, String key) {
- try {
- riakClient.fetchBucket(table).execute().delete(key);
- } catch (Exception e) {
- e.printStackTrace();
- return ERROR;
- }
- return OK;
- }
-}
diff --git a/riak/src/main/java/com/yahoo/ycsb/db/RiakClient13.java b/riak/src/main/java/com/yahoo/ycsb/db/RiakClient13.java
new file mode 100644
index 0000000000..f8a1bca2cf
--- /dev/null
+++ b/riak/src/main/java/com/yahoo/ycsb/db/RiakClient13.java
@@ -0,0 +1,284 @@
+package com.yahoo.ycsb.db;
+
+import com.basho.riak.client.IRiakObject;
+import com.basho.riak.client.builders.RiakObjectBuilder;
+import com.basho.riak.client.raw.RawClient;
+import com.basho.riak.client.raw.RiakResponse;
+import com.basho.riak.client.raw.pbc.PBClientConfig;
+import com.basho.riak.client.raw.pbc.PBClusterClientFactory;
+import com.basho.riak.client.raw.pbc.PBClusterConfig;
+import com.basho.riak.client.util.CharsetUtils;
+import com.yahoo.ycsb.ByteIterator;
+import com.yahoo.ycsb.DB;
+import com.yahoo.ycsb.DBException;
+import com.yahoo.ycsb.StringByteIterator;
+import org.codehaus.jackson.JsonNode;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.node.ObjectNode;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.*;
+
+public class RiakClient13 extends DB {
+ private RawClient rawClient;
+ public static final int OK = 0;
+ public static final int ERROR = -1;
+
+ public static final String RIAK_CLUSTER_HOSTS = "riak_cluster_hosts";
+ public static final String RIAK_CLUSTER_HOST_DEFAULT = "127.0.0.1:10017";
+ public static final int RIAK_POOL_TOTAL_MAX_CONNECTIONS = 500;
+ public static final int RIAK_POOL_IDLE_CONNETION_TTL_MILLIS = 1000;
+ public static final int RIAK_POOL_INITIAL_POOL_SIZE = 50;
+ public static final int RIAK_POOL_REQUEST_TIMEOUT_MILLIS = 1000;
+ public static final int RIAK_POOL_CONNECTION_TIMEOUT_MILLIS = 1000;
+ private static final Charset CHARSET_UTF8 = Charset.forName("UTF-8");
+ private static final String CONTENT_TYPE_JSON_UTF8 = "application/json;charset=UTF-8";
+
+ ObjectMapper om = new ObjectMapper();
+
+ public void init() throws DBException {
+ try {
+ Properties props = getProperties();
+ String cluster_hosts = props.getProperty(RIAK_CLUSTER_HOSTS, RIAK_CLUSTER_HOST_DEFAULT);
+ String[] servers = cluster_hosts.split(",");
+ PBClusterConfig clusterConf = new PBClusterConfig(RIAK_POOL_TOTAL_MAX_CONNECTIONS);
+ for(String server:servers) {
+ String[] ipAndPort = server.split(":");
+ String ip = ipAndPort[0].trim();
+ int port = Integer.parseInt(ipAndPort[1].trim());
+ System.out.println("Riak connection to " + ip + ":" + port);
+ PBClientConfig node = PBClientConfig.Builder
+ .from(PBClientConfig.defaults())
+ .withHost(ip)
+ .withPort(port)
+ .withIdleConnectionTTLMillis(RIAK_POOL_IDLE_CONNETION_TTL_MILLIS)
+ .withInitialPoolSize(RIAK_POOL_INITIAL_POOL_SIZE)
+ .withRequestTimeoutMillis(RIAK_POOL_REQUEST_TIMEOUT_MILLIS)
+ .withConnectionTimeoutMillis(RIAK_POOL_CONNECTION_TIMEOUT_MILLIS)
+ .build();
+ clusterConf.addClient(node);
+ }
+ rawClient = PBClusterClientFactory.getInstance().newClient(clusterConf);
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new DBException("Error connecting to Riak: " + e.getMessage());
+ }
+ }
+
+ public void cleanup() throws DBException {
+ rawClient.shutdown();
+ }
+
+ public int read(String bucket, String key, Set fields,
+ HashMap result) {
+
+ try {
+
+ RiakResponse response = rawClient.fetch(bucket, key);
+
+ if(response.hasSiblings()) {
+ System.out.println("Siblings detected");
+ }
+ if(response.hasValue()) {
+ IRiakObject obj = response.getRiakObjects()[0];
+ riakObjToJson(obj, fields, result);
+ }
+ } catch(Exception e) {
+ e.printStackTrace();
+ }
+ return OK;
+ }
+
+ public int scan(String bucket, String startkey, int recordcount,
+ Set fields, Vector> result) {
+ return OK;
+ }
+
+ public int update(String bucket, String key,
+ HashMap values) {
+
+ try {
+ RiakResponse response = rawClient.fetch(bucket, key);
+ if(response.hasSiblings()) {
+ System.out.println("Siblings detected");
+ }
+ if(response.hasValue()) {
+ IRiakObject obj = response.getRiakObjects()[0];
+ byte[] data = updateJson(obj, values);
+ RiakObjectBuilder builder =
+ RiakObjectBuilder.newBuilder(bucket, key)
+ .withContentType(CONTENT_TYPE_JSON_UTF8)
+ .withValue(data)
+ .withVClock(response.getVclock());
+ rawClient.store(builder.build());
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ return ERROR;
+ }
+
+ return OK;
+ }
+
+
+ public int insert(String bucket, String key,
+ HashMap values) {
+ RiakObjectBuilder builder =
+ RiakObjectBuilder.newBuilder(bucket, key)
+ .withContentType(CONTENT_TYPE_JSON_UTF8);
+ try {
+ byte[] rawValue = jsonToBytes(values);
+ rawClient.store(builder.withValue(rawValue).build());
+ } catch (Exception e) {
+ e.printStackTrace();
+ return ERROR;
+ }
+
+ return OK;
+ }
+
+ public int delete(String bucket, String key) {
+ try {
+ rawClient.delete(bucket, key);
+ } catch (IOException e) {
+ e.printStackTrace();
+ return ERROR;
+ }
+ return OK;
+ }
+
+ private byte[] updateJson(IRiakObject object, Map values) throws IOException {
+ String contentType = object.getContentType();
+ Charset charSet = CharsetUtils.getCharset(contentType);
+ byte[] data = object.getValue();
+ String dataInCharset = CharsetUtils.asString(data, charSet);
+
+ JsonNode jsonNode = om.readTree(dataInCharset);
+ for (Map.Entry entry : values.entrySet()) {
+ ((ObjectNode) jsonNode).put(entry.getKey(), entry.getValue().toString());
+ }
+ return jsonNode.toString().getBytes(CHARSET_UTF8);
+ }
+
+ private byte[] jsonToBytes(Map values) {
+ ObjectNode objNode = om.createObjectNode();
+ for (Map.Entry entry : values.entrySet()) {
+ objNode.put(entry.getKey(), entry.getValue().toString());
+ }
+ return objNode.toString().getBytes(CHARSET_UTF8);
+ }
+
+ public void riakObjToJson(IRiakObject object, Set fields, Map result)
+ throws IOException {
+ String contentType = object.getContentType();
+ Charset charSet = CharsetUtils.getCharset(contentType);
+ byte[] data = object.getValue();
+ String dataInCharset = CharsetUtils.asString(data, charSet);
+
+ JsonNode jsonNode = om.readTree(dataInCharset);
+ if(fields != null) {
+ // return a subset of all available fields in the json node
+ for(String field: fields) {
+ JsonNode f = jsonNode.get(field);
+ result.put(field, new StringByteIterator(f.toString()));
+ }
+ } else {
+ // no fields specified, just return them all
+ Iterator> jsonFields = jsonNode.getFields();
+ while(jsonFields.hasNext()) {
+ Map.Entry field = jsonFields.next();
+ result.put(field.getKey(), new StringByteIterator(field.getValue().toString()));
+ }
+ }
+ }
+
+
+
+ public static void main(String[] args)
+ {
+ RiakClient13 cli = new RiakClient13();
+
+ Properties props = new Properties();
+ props.setProperty(RIAK_CLUSTER_HOSTS, "localhost:10017, localhost:10027, localhost:10037");
+
+ cli.setProperties(props);
+
+ try {
+ cli.init();
+ } catch(Exception e) {
+ e.printStackTrace();
+ }
+
+
+ String bucket = "people";
+ String key = "person1";
+
+ {
+ HashMap values = new HashMap();
+ values.put("first_name", "Dave");
+ values.put("last_name", "Parfitt");
+ values.put("city", "Buffalo, NY");
+ cli.insert(bucket, key, StringByteIterator.getByteIteratorMap(values));
+ System.out.println("Added person");
+ }
+
+ {
+ Set fields = new HashSet();
+ fields.add("first_name");
+ fields.add("last_name");
+ HashMap results = new HashMap();
+ cli.read(bucket, key,fields, results);
+ System.out.println(results.toString());
+ System.out.println("Read person");
+ }
+
+ {
+ HashMap updateValues = new HashMap();
+ updateValues.put("twitter", "@metadave");
+ cli.update("people", "person1", StringByteIterator.getByteIteratorMap(updateValues));
+ System.out.println("Updated person");
+ }
+
+ {
+ HashMap finalResults = new HashMap();
+ cli.read(bucket, key, null, finalResults);
+ System.out.println(finalResults.toString());
+ System.out.println("Read person");
+ }
+ }
+
+// public static void main(String[] args) throws Exception {
+// PBClusterConfig clusterConf = new PBClusterConfig(200);
+// String[] servers = {"localhost:10017", "localhost:10027", "localhost:10037"};
+// for(String server:servers) {
+// String[] ipAndPort = server.split(":");
+// String ip = ipAndPort[0].trim();
+// int port = Integer.parseInt(ipAndPort[1].trim());
+// System.out.println("Riak connection to " + ip + ":" + port);
+// PBClientConfig node = PBClientConfig.Builder
+// .from(PBClientConfig.defaults())
+// .withHost(ip)
+// .withPort(port)
+// .withIdleConnectionTTLMillis(100)
+// .withInitialPoolSize(10)
+// .withRequestTimeoutMillis(1000)
+// .withConnectionTimeoutMillis(1000)
+// .build();
+// clusterConf.addClient(node);
+// }
+//
+// RawClient rawClient = PBClusterClientFactory.getInstance().newClient(clusterConf);
+// //RiakClient pbcClient = new RiakClient("127.0.0.1",100);
+// //RawClient rawClient = new PBClientAdapter(pbcClient);
+// //IRiakObject obj = RiakObjectBuilder.newBuilder("FOO","BAR").withValue("THIS IS A TEST").build();
+// //rawClient.store(obj);
+// RiakResponse resp = rawClient.fetch("FOO","BAR");
+// String ct = resp.getRiakObjects()[0].getContentType();
+// System.out.println(ct);
+// System.out.println(CharsetUtils.getCharset(ct));
+//
+// //String[] chunks = ct.split(";");
+// }
+}
diff --git a/riak/src/main/test/com/yahoo/ycsb/db/RiakClient13Test.java b/riak/src/main/test/com/yahoo/ycsb/db/RiakClient13Test.java
new file mode 100644
index 0000000000..50497bfd52
--- /dev/null
+++ b/riak/src/main/test/com/yahoo/ycsb/db/RiakClient13Test.java
@@ -0,0 +1,20 @@
+package com.yahoo.ycsb.db;
+
+import com.yahoo.ycsb.ByteIterator;
+import com.yahoo.ycsb.DBException;
+import com.yahoo.ycsb.StringByteIterator;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+import static org.testng.AssertJUnit.assertEquals;
+
+import java.util.HashMap;
+import java.util.Set;
+import java.util.Vector;
+
+
+public class RiakClient13Test {
+
+}