Skip to content

Commit

Permalink
add support for conflict resolution
Browse files Browse the repository at this point in the history
  • Loading branch information
mschoch committed Dec 4, 2012
1 parent f509f5a commit 3694d52
Showing 1 changed file with 47 additions and 3 deletions.
Expand Up @@ -17,6 +17,7 @@
import java.io.InputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
Expand All @@ -30,6 +31,8 @@
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.get.MultiGetItemResponse;
import org.elasticsearch.action.get.MultiGetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.index.IndexResponse;
Expand All @@ -48,13 +51,15 @@ public class ElasticSearchCAPIBehavior implements CAPIBehavior {
protected String defaultDocumentType;
protected String checkpointDocumentType;
protected String dynamicTypePath;
protected boolean resolveConflicts;

public ElasticSearchCAPIBehavior(Client client, ESLogger logger, String defaultDocumentType, String checkpointDocumentType, String dynamicTypePath) {
public ElasticSearchCAPIBehavior(Client client, ESLogger logger, String defaultDocumentType, String checkpointDocumentType, String dynamicTypePath, boolean resolveConflicts) {
this.client = client;
this.logger = logger;
this.defaultDocumentType = defaultDocumentType;
this.checkpointDocumentType = checkpointDocumentType;
this.dynamicTypePath = dynamicTypePath;
this.resolveConflicts = resolveConflicts;
}

@Override
Expand Down Expand Up @@ -97,14 +102,53 @@ public boolean ensureFullCommit(String database) {
public Map<String, Object> revsDiff(String database,
Map<String, Object> revsMap) {

logger.trace("_revs_diff request for: {}", revsMap);

// start with all entries in the response map
Map<String, Object> responseMap = new HashMap<String, Object>();
for (Entry<String, Object> entry : revsMap.entrySet()) {
String id = entry.getKey();
Object revs = entry.getValue();
Map<String, Object> rev = new HashMap<String, Object>();
String revs = (String)entry.getValue();
Map<String, String> rev = new HashMap<String, String>();
rev.put("missing", revs);
responseMap.put(id, rev);
}
logger.trace("_revs_diff response is: {}", responseMap);

// if resolve conflicts mode is enabled
// perform a multi-get query to find information
// about revisions we already have
if (resolveConflicts) {
String index = getElasticSearchIndexNameFromDatabase(database);
MultiGetResponse response = client.prepareMultiGet().add(index, defaultDocumentType, responseMap.keySet()).execute().actionGet();
if(response != null) {
Iterator<MultiGetItemResponse> iterator = response.iterator();
while(iterator.hasNext()) {
MultiGetItemResponse item = iterator.next();
if(item.response().exists()) {
String itemId = item.id();
Map<String, Object> source = item.response().sourceAsMap();
if(source != null) {
Map<String, Object> meta = (Map<String, Object>)source.get("meta");
if(meta != null) {
String rev = (String)meta.get("rev");
//retrieve the revision passed in from Couchbase
Map<String, String> sourceRevMap = (Map<String, String>)responseMap.get(itemId);
String sourceRev = sourceRevMap.get("missing");
if(rev.equals(sourceRev)) {
// if our revision is the same as the source rev
// remove it from the response map
responseMap.remove(itemId);
logger.trace("_revs_diff already have id: {} rev: {}", itemId, rev);
}
}
}
}
}
}
logger.trace("_revs_diff response AFTER conflict resolution {}", responseMap);
}

return responseMap;
}

Expand Down

0 comments on commit 3694d52

Please sign in to comment.