Skip to content
Browse files

added bucket UUID cache with configurable expiration

new config option default value 300000 (5 minutes in ms)
couchbase.bucketUUIDCacheEvictMs: 300000
  • Loading branch information...
1 parent 5d853d3 commit d4f638938bf2154f70f6d8542af430c5aedf230c @mschoch mschoch committed Apr 9, 2014
View
16 src/main/java/org/elasticsearch/transport/couchbase/capi/CouchbaseCAPITransportImpl.java
@@ -16,11 +16,14 @@
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.MetaDataMappingService;
+import org.elasticsearch.common.cache.Cache;
+import org.elasticsearch.common.cache.CacheBuilder;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.network.NetworkService;
@@ -71,6 +74,8 @@
private long bulkIndexRetries;
private long bulkIndexRetryWaitMs;
+ private long bucketUUIDCacheEvictMs;
+ private Cache<String, String> bucketUUIDCache;
@Inject
public CouchbaseCAPITransportImpl(Settings settings, RestController restController, NetworkService networkService, IndicesService indicesService, MetaDataMappingService metaDataMappingService, Client client) {
@@ -91,6 +96,8 @@ public CouchbaseCAPITransportImpl(Settings settings, RestController restControll
this.maxConcurrentRequests = settings.getAsLong("couchbase.maxConcurrentRequests", 1024L);
this.bulkIndexRetries = settings.getAsLong("couchbase.bulkIndexRetries", 1024L);
this.bulkIndexRetryWaitMs = settings.getAsLong("couchbase.bulkIndexRetryWaitMs", 1000L);
+ this.bucketUUIDCacheEvictMs = settings.getAsLong("couchbase.bucketUUIDCacheEvictMs", 300000L);
+
int defaultNumVbuckets = 1024;
if(System.getProperty("os.name").toLowerCase().contains("mac")) {
@@ -99,11 +106,16 @@ public CouchbaseCAPITransportImpl(Settings settings, RestController restControll
}
this.numVbuckets = settings.getAsInt("couchbase.num_vbuckets", defaultNumVbuckets);
+
+
+ this.bucketUUIDCache = CacheBuilder.newBuilder().expireAfterWrite(this.bucketUUIDCacheEvictMs, TimeUnit.MILLISECONDS).build();
}
@Override
protected void doStart() throws ElasticsearchException {
+
+
// Bind and start to accept incoming connections.
InetAddress hostAddressX;
try {
@@ -123,8 +135,8 @@ protected void doStart() throws ElasticsearchException {
final InetAddress publishAddressHost = publishAddressHostX;
- capiBehavior = new ElasticSearchCAPIBehavior(client, logger, defaultDocumentType, checkpointDocumentType, dynamicTypePath, resolveConflicts.booleanValue(), maxConcurrentRequests, bulkIndexRetries, bulkIndexRetryWaitMs);
- couchbaseBehavior = new ElasticSearchCouchbaseBehavior(client, logger, checkpointDocumentType);
+ capiBehavior = new ElasticSearchCAPIBehavior(client, logger, defaultDocumentType, checkpointDocumentType, dynamicTypePath, resolveConflicts.booleanValue(), maxConcurrentRequests, bulkIndexRetries, bulkIndexRetryWaitMs, bucketUUIDCache);
+ couchbaseBehavior = new ElasticSearchCouchbaseBehavior(client, logger, checkpointDocumentType, bucketUUIDCache);
PortsRange portsRange = new PortsRange(port);
final AtomicReference<Exception> lastException = new AtomicReference<Exception>();
View
65 src/main/java/org/elasticsearch/transport/couchbase/capi/ElasticSearchCAPIBehavior.java
@@ -45,6 +45,7 @@
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.Base64;
+import org.elasticsearch.common.cache.Cache;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.metrics.CounterMetric;
import org.elasticsearch.common.metrics.MeanMetric;
@@ -72,7 +73,9 @@
protected long bulkIndexRetries;
protected long bulkIndexRetryWaitMs;
- public ElasticSearchCAPIBehavior(Client client, ESLogger logger, String defaultDocumentType, String checkpointDocumentType, String dynamicTypePath, boolean resolveConflicts, long maxConcurrentRequests, long bulkIndexRetries, long bulkIndexRetryWaitMs) {
+ protected Cache<String, String> bucketUUIDCache;
+
+ public ElasticSearchCAPIBehavior(Client client, ESLogger logger, String defaultDocumentType, String checkpointDocumentType, String dynamicTypePath, boolean resolveConflicts, long maxConcurrentRequests, long bulkIndexRetries, long bulkIndexRetryWaitMs, Cache<String, String> bucketUUIDCache) {
this.client = client;
this.logger = logger;
this.defaultDocumentType = defaultDocumentType;
@@ -89,6 +92,7 @@ public ElasticSearchCAPIBehavior(Client client, ESLogger logger, String defaultD
this.maxConcurrentRequests = maxConcurrentRequests;
this.bulkIndexRetries = bulkIndexRetries;
this.bulkIndexRetryWaitMs = bulkIndexRetryWaitMs;
+ this.bucketUUIDCache = bucketUUIDCache;
}
@Override
@@ -99,19 +103,30 @@ public ElasticSearchCAPIBehavior(Client client, ESLogger logger, String defaultD
}
@Override
- public boolean databaseExists(String database) {
+ public String databaseExists(String database) {
String index = getElasticSearchIndexNameFromDatabase(database);
IndicesExistsRequestBuilder existsBuilder = client.admin().indices().prepareExists(index);
IndicesExistsResponse response = existsBuilder.execute().actionGet();
if(response.isExists()) {
- return true;
+ String uuid = getBucketUUIDFromDatabase(database);
+ if(uuid != null) {
+ logger.debug("included uuid, validating");
+ String actualUUID = getBucketUUID("default", index);
+ if(!uuid.equals(actualUUID)) {
+ return "uuids_dont_match";
+ }
+ } else {
+ logger.debug("no uuid in database name");
+ }
+ return null;
}
- return false;
+ return "missing";
}
@Override
public Map<String, Object> getDatabaseDetails(String database) {
- if(databaseExists(database)) {
+ String doesNotExistReason = databaseExists(database);
+ if(doesNotExistReason == null) {
Map<String, Object> responseMap = new HashMap<String, Object>();
responseMap.put("db_name", getDatabaseNameWithoutUUID(database));
return responseMap;
@@ -466,6 +481,15 @@ protected String getElasticSearchIndexNameFromDatabase(String database) {
}
}
+ protected String getBucketUUIDFromDatabase(String database) {
+ String[] pieces = database.split(";", 2);
+ if(pieces.length < 2) {
+ return null;
+ } else {
+ return pieces[1];
+ }
+ }
+
protected String getDatabaseNameWithoutUUID(String database) {
int semicolonIndex = database.indexOf(';');
if(semicolonIndex >= 0) {
@@ -570,4 +594,35 @@ public String getVBucketUUID(String pool, String bucket, int vbucket) {
return null;
}
+ @Override
+ public String getBucketUUID(String pool, String bucket) {
+ // first look for bucket UUID in cache
+ String bucketUUID = this.bucketUUIDCache.getIfPresent(bucket);
+ if (bucketUUID != null) {
+ logger.debug("found bucket UUID in cache");
+ return bucketUUID;
+ }
+
+ logger.debug("bucket UUID not in cache, looking up");
+ IndicesExistsRequestBuilder existsBuilder = client.admin().indices().prepareExists(bucket);
+ IndicesExistsResponse response = existsBuilder.execute().actionGet();
+ if(response.isExists()) {
+ int tries = 0;
+ bucketUUID = this.lookupUUID(bucket, "bucketUUID");
+ while(bucketUUID == null && tries < 100) {
+ logger.debug("bucket UUID doesn't exist yet, creaating, attempt: {}", tries+1);
+ String newUUID = UUID.randomUUID().toString().replace("-", "");
+ storeUUID(bucket, "bucketUUID", newUUID);
+ bucketUUID = this.lookupUUID(bucket, "bucketUUID");
+ tries++;
+ }
+
+ if(bucketUUID != null) {
+ // store it in the cache
+ bucketUUIDCache.put(bucket, bucketUUID);
+ return bucketUUID;
+ }
+ }
+ throw new RuntimeException("failed to find/create bucket uuid");
+ }
}
View
27 src/main/java/org/elasticsearch/transport/couchbase/capi/ElasticSearchCouchbaseBehavior.java
@@ -36,6 +36,7 @@
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.metadata.AliasMetaData;
import org.elasticsearch.cluster.metadata.IndexMetaData;
+import org.elasticsearch.common.cache.Cache;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.hppc.cursors.ObjectCursor;
import org.elasticsearch.common.logging.ESLogger;
@@ -47,11 +48,13 @@
protected Client client;
protected ESLogger logger;
protected String checkpointDocumentType;
+ protected Cache<String, String> bucketUUIDCache;
- public ElasticSearchCouchbaseBehavior(Client client, ESLogger logger, String checkpointDocumentType) {
+ public ElasticSearchCouchbaseBehavior(Client client, ESLogger logger, String checkpointDocumentType, Cache<String, String> bucketUUIDCache) {
this.client = client;
this.logger = logger;
this.checkpointDocumentType = checkpointDocumentType;
+ this.bucketUUIDCache = bucketUUIDCache;
}
@Override
@@ -160,26 +163,34 @@ protected void storeUUID(String bucket, String id, String uuid) {
@Override
public String getBucketUUID(String pool, String bucket) {
+ // first look for bucket UUID in cache
+ String bucketUUID = this.bucketUUIDCache.getIfPresent(bucket);
+ if (bucketUUID != null) {
+ logger.debug("found bucket UUID in cache");
+ return bucketUUID;
+ }
+
+ logger.debug("bucket UUID not in cache, looking up");
IndicesExistsRequestBuilder existsBuilder = client.admin().indices().prepareExists(bucket);
IndicesExistsResponse response = existsBuilder.execute().actionGet();
if(response.isExists()) {
int tries = 0;
- String bucketUUID = this.lookupUUID(bucket, "bucketUUID");
+ bucketUUID = this.lookupUUID(bucket, "bucketUUID");
while(bucketUUID == null && tries < 100) {
- logger.debug("bucket UUID doesn't exist yet, creaating");
+ logger.debug("bucket UUID doesn't exist yet, creaating, attempt: {}", tries+1);
String newUUID = UUID.randomUUID().toString().replace("-", "");
storeUUID(bucket, "bucketUUID", newUUID);
bucketUUID = this.lookupUUID(bucket, "bucketUUID");
tries++;
}
- if(bucketUUID == null) {
- throw new RuntimeException("failed to find/create bucket uuid after 100 tries");
+ if(bucketUUID != null) {
+ // store it in the cache
+ bucketUUIDCache.put(bucket, bucketUUID);
+ return bucketUUID;
}
-
- return bucketUUID;
}
- return null;
+ throw new RuntimeException("failed to find/create bucket uuid");
}
@Override

0 comments on commit d4f6389

Please sign in to comment.
Something went wrong with that request. Please try again.