Skip to content

Commit

Permalink
Add persist/replicate properties
Browse files Browse the repository at this point in the history
  • Loading branch information
daschl committed Apr 20, 2015
1 parent 6717261 commit f4acfee
Showing 1 changed file with 56 additions and 9 deletions.
65 changes: 56 additions & 9 deletions couchbase-2/src/main/java/com/yahoo/ycsb/db/Couchbase2Client.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package com.yahoo.ycsb.db;

import com.couchbase.client.java.Bucket;
import com.couchbase.client.java.Cluster;
import com.couchbase.client.java.CouchbaseCluster;
import com.couchbase.client.java.PersistTo;
import com.couchbase.client.java.ReplicateTo;
import com.couchbase.client.java.document.JsonDocument;
import com.couchbase.client.java.document.json.JsonObject;
import com.yahoo.ycsb.ByteIterator;
Expand All @@ -11,7 +14,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.Vector;

Expand All @@ -23,15 +25,55 @@ public class Couchbase2Client extends DB {
private static final int ERROR = 1;

private static Bucket bucket;
private static Cluster cluster;

private static PersistTo persistTo;
private static ReplicateTo replicateTo;

@Override
public void init() throws DBException {
CouchbaseCluster cluster = CouchbaseCluster.create();
bucket = cluster.openBucket();
String host = getProperties().getProperty("couchbase.host", "127.0.0.1");
String bucketName = getProperties().getProperty("couchbase.bucket", "default");
String bucketPassword = getProperties().getProperty("couchbase.password", "");

cluster = CouchbaseCluster.create(host);
bucket = cluster.openBucket(bucketName, bucketPassword);

persistTo = parsePersistTo(getProperties().getProperty("couchbase.persistTo", "0"));
replicateTo = parseReplicateTo(getProperties().getProperty("couchbase.replicateTo", "0"));
}

private static PersistTo parsePersistTo(final String persistTo) {
if (persistTo.toLowerCase().equals("master")) {
return PersistTo.MASTER;
} else if (persistTo.equals("1")) {
return PersistTo.ONE;
} else if (persistTo.equals("2")) {
return PersistTo.TWO;
} else if (persistTo.equals("3")) {
return PersistTo.THREE;
} else if (persistTo.equals("4")) {
return PersistTo.FOUR;
} else {
return PersistTo.NONE;
}
}

private static ReplicateTo parseReplicateTo(final String replicateTo) {
if (replicateTo.equals("1")) {
return ReplicateTo.ONE;
} else if (replicateTo.equals("2")) {
return ReplicateTo.TWO;
} else if (replicateTo.equals("3")) {
return ReplicateTo.THREE;
} else {
return ReplicateTo.NONE;
}
}

@Override
public void cleanup() throws DBException {
cluster.disconnect();
}

@Override
Expand All @@ -40,9 +82,14 @@ public int read(String table, String key, Set<String> fields, HashMap<String, By

try {
JsonDocument loaded = bucket.get(formattedKey);
if (loaded == null) {
return ERROR;
}

if (fields == null) {
result.putAll((Map) loaded.content().toMap());
for (String field : loaded.content().getNames()) {
result.put(field, new StringByteIterator(loaded.content().getString(field)));
}
} else {
for (String field : fields) {
result.put(field, new StringByteIterator(loaded.content().getString(field)));
Expand Down Expand Up @@ -73,11 +120,11 @@ public int update(String table, String key, HashMap<String, ByteIterator> values
for (String k : values.keySet()) {
payload.put(k, values.get(k).toString());
}
bucket.replace(JsonDocument.create(formattedKey, payload));
bucket.replace(JsonDocument.create(formattedKey, payload), persistTo, replicateTo);
return SUCCESS;
} catch (Exception ex) {
if (LOGGER.isErrorEnabled()) {
LOGGER.error("Could not insert key " + formattedKey, ex);
LOGGER.error("Could not update key " + formattedKey, ex);
}
return ERROR;
}
Expand All @@ -92,7 +139,7 @@ public int insert(String table, String key, HashMap<String, ByteIterator> values
for (String k : values.keySet()) {
payload.put(k, values.get(k).toString());
}
bucket.insert(JsonDocument.create(formattedKey, payload));
bucket.insert(JsonDocument.create(formattedKey, payload), persistTo, replicateTo);
return SUCCESS;
} catch (Exception ex) {
if (LOGGER.isErrorEnabled()) {
Expand All @@ -106,7 +153,7 @@ public int insert(String table, String key, HashMap<String, ByteIterator> values
public int delete(String table, String key) {
String formattedKey = formatKey(table, key);
try {
bucket.remove(formattedKey);
bucket.remove(formattedKey, persistTo, replicateTo);
return SUCCESS;
} catch (Exception ex) {
if (LOGGER.isErrorEnabled()) {
Expand All @@ -117,7 +164,7 @@ public int delete(String table, String key) {
}

private static String formatKey(final String table, final String key) {
return table + "::" + key;
return table + "-" + key;
}

}

0 comments on commit f4acfee

Please sign in to comment.