Permalink
Browse files

added bucket and vbucket uuid support

bucket uuid is now non-zero
but still not checked on other endpoints
vbucket uuid is used for _pre_replicate
  • Loading branch information...
1 parent e48b608 commit 2a7c186c6ba84efbd165f4d7390419feb60784a1 @mschoch mschoch committed Apr 3, 2014
@@ -113,13 +113,13 @@ protected void doStart() throws ElasticsearchException {
try {
publishAddressHostX = networkService.resolvePublishHostAddress(publishHost);
} catch (IOException e) {
- throw new BindHttpException("FAiled to resolve publish address host [" + publishHost + "]", e);
+ throw new BindHttpException("Failed to resolve publish address host [" + publishHost + "]", e);
}
final InetAddress publishAddressHost = publishAddressHostX;
capiBehavior = new ElasticSearchCAPIBehavior(client, logger, defaultDocumentType, checkpointDocumentType, dynamicTypePath, resolveConflicts.booleanValue(), maxConcurrentRequests);
- couchbaseBehavior = new ElasticSearchCouchbaseBehavior(client);
+ couchbaseBehavior = new ElasticSearchCouchbaseBehavior(client, logger, checkpointDocumentType);
PortsRange portsRange = new PortsRange(port);
final AtomicReference<Exception> lastException = new AtomicReference<Exception>();
@@ -33,11 +33,13 @@
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.get.GetRequestBuilder;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.get.MultiGetItemResponse;
import org.elasticsearch.action.get.MultiGetRequestBuilder;
import org.elasticsearch.action.get.MultiGetResponse;
import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.index.IndexRequest.OpType;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.Client;
@@ -453,4 +455,79 @@ protected String getDatabaseNameWithoutUUID(String database) {
return stats;
}
+
+ protected String getUUIDFromCheckpointDocSource(Map<String, Object> source) {
+ Map<String,Object> docMap = (Map<String,Object>)source.get("doc");
+ String uuid = (String)docMap.get("uuid");
+ return uuid;
+ }
+
+ protected String lookupUUID(String bucket, String id) {
+ GetRequestBuilder builder = client.prepareGet();
+ builder.setIndex(bucket);
+ builder.setId(id);
+ builder.setType(this.checkpointDocumentType);
+ builder.setFetchSource(true);
+
+ String bucketUUID = null;
+ GetResponse response;
+ ListenableActionFuture<GetResponse> laf = builder.execute();
+ if(laf != null) {
+ response = laf.actionGet();
+ if(response.isExists()) {
+ Map<String,Object> responseMap = response.getSourceAsMap();
+ bucketUUID = this.getUUIDFromCheckpointDocSource(responseMap);
+ }
+ }
+
+ return bucketUUID;
+ }
+
+ protected void storeUUID(String bucket, String id, String uuid) {
+ Map<String,Object> doc = new HashMap<String, Object>();
+ doc.put("uuid", uuid);
+ Map<String, Object> toBeIndexed = new HashMap<String, Object>();
+ toBeIndexed.put("doc", doc);
+
+ IndexRequestBuilder builder = client.prepareIndex();
+ builder.setIndex(bucket);
+ builder.setId(id);
+ builder.setType(this.checkpointDocumentType);
+ builder.setSource(toBeIndexed);
+ builder.setOpType(OpType.CREATE);
+
+ IndexResponse response;
+ ListenableActionFuture<IndexResponse> laf = builder.execute();
+ if(laf != null) {
+ response = laf.actionGet();
+ if(!response.isCreated()) {
+ logger.error("did not succeed creating uuid");
+ }
+ }
+ }
+
+ public String getVBucketUUID(String pool, String bucket, int vbucket) {
+ IndicesExistsRequestBuilder existsBuilder = client.admin().indices().prepareExists(bucket);
+ IndicesExistsResponse response = existsBuilder.execute().actionGet();
+ if(response.isExists()) {
+ int tries = 0;
+ String key = String.format("vbucket%dUUID",vbucket);
+ String bucketUUID = this.lookupUUID(bucket, key);
+ while(bucketUUID == null && tries < 100) {
+ logger.debug("vbucket {} UUID doesn't exist yet, creaating", vbucket);
+ String newUUID = UUID.randomUUID().toString().replace("-", "");
+ storeUUID(bucket, key, newUUID);
+ bucketUUID = this.lookupUUID(bucket, key);
+ tries++;
+ }
+
+ if(bucketUUID == null) {
+ throw new RuntimeException("failed to find/create bucket uuid after 100 tries");
+ }
+
+ return bucketUUID;
+ }
+ return null;
+ }
+
}
@@ -19,28 +19,39 @@
import java.util.Map;
import java.util.UUID;
+import org.elasticsearch.action.ListenableActionFuture;
import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequestBuilder;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequestBuilder;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequestBuilder;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
+import org.elasticsearch.action.get.GetRequestBuilder;
+import org.elasticsearch.action.get.GetResponse;
+import org.elasticsearch.action.index.IndexRequest.OpType;
+import org.elasticsearch.action.index.IndexRequestBuilder;
+import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.metadata.AliasMetaData;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.hppc.cursors.ObjectCursor;
+import org.elasticsearch.common.logging.ESLogger;
import com.couchbase.capi.CouchbaseBehavior;
public class ElasticSearchCouchbaseBehavior implements CouchbaseBehavior {
protected Client client;
+ protected ESLogger logger;
+ protected String checkpointDocumentType;
- public ElasticSearchCouchbaseBehavior(Client client) {
+ public ElasticSearchCouchbaseBehavior(Client client, ESLogger logger, String checkpointDocumentType) {
this.client = client;
+ this.logger = logger;
+ this.checkpointDocumentType = checkpointDocumentType;
}
@Override
@@ -97,12 +108,76 @@ public String getPoolUUID(String pool) {
return null;
}
+ protected String getUUIDFromCheckpointDocSource(Map<String, Object> source) {
+ Map<String,Object> docMap = (Map<String,Object>)source.get("doc");
+ String uuid = (String)docMap.get("uuid");
+ return uuid;
+ }
+
+ protected String lookupUUID(String bucket, String id) {
+ GetRequestBuilder builder = client.prepareGet();
+ builder.setIndex(bucket);
+ builder.setId(id);
+ builder.setType(this.checkpointDocumentType);
+ builder.setFetchSource(true);
+
+ String bucketUUID = null;
+ GetResponse response;
+ ListenableActionFuture<GetResponse> laf = builder.execute();
+ if(laf != null) {
+ response = laf.actionGet();
+ if(response.isExists()) {
+ Map<String,Object> responseMap = response.getSourceAsMap();
+ bucketUUID = this.getUUIDFromCheckpointDocSource(responseMap);
+ }
+ }
+
+ return bucketUUID;
+ }
+
+ protected void storeUUID(String bucket, String id, String uuid) {
+ Map<String,Object> doc = new HashMap<String, Object>();
+ doc.put("uuid", uuid);
+ Map<String, Object> toBeIndexed = new HashMap<String, Object>();
+ toBeIndexed.put("doc", doc);
+
+ IndexRequestBuilder builder = client.prepareIndex();
+ builder.setIndex(bucket);
+ builder.setId(id);
+ builder.setType(this.checkpointDocumentType);
+ builder.setSource(toBeIndexed);
+ builder.setOpType(OpType.CREATE);
+
+ IndexResponse response;
+ ListenableActionFuture<IndexResponse> laf = builder.execute();
+ if(laf != null) {
+ response = laf.actionGet();
+ if(!response.isCreated()) {
+ logger.error("did not succeed creating uuid");
+ }
+ }
+ }
+
@Override
public String getBucketUUID(String pool, String bucket) {
IndicesExistsRequestBuilder existsBuilder = client.admin().indices().prepareExists(bucket);
IndicesExistsResponse response = existsBuilder.execute().actionGet();
if(response.isExists()) {
- return "00000000000000000000000000000000";
+ int tries = 0;
+ String bucketUUID = this.lookupUUID(bucket, "bucketUUID");
+ while(bucketUUID == null && tries < 100) {
+ logger.debug("bucket UUID doesn't exist yet, creaating");
+ String newUUID = UUID.randomUUID().toString().replace("-", "");
+ storeUUID(bucket, "bucketUUID", newUUID);
+ bucketUUID = this.lookupUUID(bucket, "bucketUUID");
+ tries++;
+ }
+
+ if(bucketUUID == null) {
+ throw new RuntimeException("failed to find/create bucket uuid after 100 tries");
+ }
+
+ return bucketUUID;
}
return null;
}

0 comments on commit 2a7c186

Please sign in to comment.