Permalink
Browse files

Revert "added new parameters to monitor health of node"

This reverts commit ea89ae8.
  • Loading branch information...
1 parent ea89ae8 commit 037f11a6db19e0e2ab4c974695d08a2cbb45d0b7 @mschoch mschoch committed Feb 21, 2013
@@ -33,15 +33,17 @@
import org.elasticsearch.rest.RestController;
import org.elasticsearch.transport.couchbase.CouchbaseCAPITransport;
+import com.couchbase.capi.CAPIBehavior;
import com.couchbase.capi.CAPIServer;
+import com.couchbase.capi.CouchbaseBehavior;
public class CouchbaseCAPITransportImpl extends AbstractLifecycleComponent<CouchbaseCAPITransport> implements CouchbaseCAPITransport {
public static final String DEFAULT_DOCUMENT_TYPE_DOCUMENT = "couchbaseDocument";
public static final String DEFAULT_DOCUMENT_TYPE_CHECKPOINT = "couchbaseCheckpoint";
- private ElasticSearchCAPIBehavior capiBehavior;
- private ElasticSearchCouchbaseBehavior couchbaseBehavior;
+ private CAPIBehavior capiBehavior;
+ private CouchbaseBehavior couchbaseBehavior;
private CAPIServer server;
private Client client;
private final NetworkService networkService;
@@ -65,10 +67,7 @@
private final int numVbuckets;
- private final long healthCheckInterval;
- private final long healthCheckThreshold;
-
- private HealthChecker healthChecker;
+ private final long maxConcurrentBulkDocs;
@Inject
public CouchbaseCAPITransportImpl(Settings settings, RestController restController, NetworkService networkService, IndicesService indicesService, MetaDataMappingService metaDataMappingService, Client client) {
@@ -86,9 +85,7 @@ public CouchbaseCAPITransportImpl(Settings settings, RestController restControll
this.checkpointDocumentType = settings.get("couchbase.checkpointDocumentType", DEFAULT_DOCUMENT_TYPE_CHECKPOINT);
this.dynamicTypePath = settings.get("couchbase.dynamicTypePath");
this.resolveConflicts = settings.getAsBoolean("couchbase.resolveConflicts", true);
-
- this.healthCheckInterval = settings.getAsLong("couchbase.healthCheckInterval", 30000L);
- this.healthCheckThreshold = settings.getAsLong("couchbase.healthCheckThreshold", 10000L);
+ this.maxConcurrentBulkDocs = settings.getAsLong("couchbase.maxConcurrentBulkDocs", 64L);
int defaultNumVbuckets = 1024;
if(System.getProperty("os.name").toLowerCase().contains("mac")) {
@@ -121,11 +118,8 @@ protected void doStart() throws ElasticSearchException {
final InetAddress publishAddressHost = publishAddressHostX;
- capiBehavior = new ElasticSearchCAPIBehavior(client, logger, defaultDocumentType, checkpointDocumentType, dynamicTypePath, resolveConflicts.booleanValue());
+ capiBehavior = new ElasticSearchCAPIBehavior(client, logger, defaultDocumentType, checkpointDocumentType, dynamicTypePath, resolveConflicts.booleanValue(), maxConcurrentBulkDocs);
couchbaseBehavior = new ElasticSearchCouchbaseBehavior(client);
- this.healthChecker = new HealthChecker(healthCheckInterval, healthCheckThreshold, logger, couchbaseBehavior, capiBehavior);
- Thread healthCheckerThread = new Thread(healthChecker);
- healthCheckerThread.start();
PortsRange portsRange = new PortsRange(port);
final AtomicReference<Exception> lastException = new AtomicReference<Exception>();
@@ -165,7 +159,6 @@ public boolean onPortNumber(int portNumber) {
@Override
protected void doStop() throws ElasticSearchException {
- this.healthChecker.stop();
if(server != null) {
try {
server.stop();
@@ -23,6 +23,8 @@
import java.util.Map.Entry;
import java.util.UUID;
+import javax.servlet.UnavailableException;
+
import org.codehaus.jackson.map.ObjectMapper;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequestBuilder;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
@@ -59,15 +61,11 @@
protected MeanMetric meanRevsDiffRequests;
protected CounterMetric activeBulkDocsRequests;
protected MeanMetric meanBulkDocsRequests;
- protected CounterMetric totalServiceUnavailableErrors;
-
- protected boolean available = true;
+ protected CounterMetric totalTooManyConcurrentBulkDocsErrors;
- public void setAvailable(boolean available) {
- this.available = available;
- }
+ protected long maxConcurrentBulkDocs;
- public ElasticSearchCAPIBehavior(Client client, ESLogger logger, String defaultDocumentType, String checkpointDocumentType, String dynamicTypePath, boolean resolveConflicts) {
+ public ElasticSearchCAPIBehavior(Client client, ESLogger logger, String defaultDocumentType, String checkpointDocumentType, String dynamicTypePath, boolean resolveConflicts, long maxConcurrentBulkDocs) {
this.client = client;
this.logger = logger;
this.defaultDocumentType = defaultDocumentType;
@@ -79,7 +77,9 @@ public ElasticSearchCAPIBehavior(Client client, ESLogger logger, String defaultD
this.meanRevsDiffRequests = new MeanMetric();
this.activeBulkDocsRequests = new CounterMetric();
this.meanBulkDocsRequests = new MeanMetric();
- this.totalServiceUnavailableErrors = new CounterMetric();
+ this.totalTooManyConcurrentBulkDocsErrors = new CounterMetric();
+
+ this.maxConcurrentBulkDocs = maxConcurrentBulkDocs;
}
@Override
@@ -122,12 +122,6 @@ public boolean ensureFullCommit(String database) {
public Map<String, Object> revsDiff(String database,
Map<String, Object> revsMap) {
- // check to see if we're too busy
- if(!available) {
- totalServiceUnavailableErrors.inc();
- throw new IllegalStateException("Service Temporarily Unavailable");
- }
-
long start = System.currentTimeMillis();
activeRevsDiffRequests.inc();
logger.trace("_revs_diff request for {} : {}", database, revsMap);
@@ -184,11 +178,11 @@ public boolean ensureFullCommit(String database) {
}
@Override
- public List<Object> bulkDocs(String database, List<Map<String, Object>> docs) {
- // check to see if we're too busy
- if(!available) {
- totalServiceUnavailableErrors.inc();
- throw new IllegalStateException("Service Temporarily Unavailable");
+ public List<Object> bulkDocs(String database, List<Map<String, Object>> docs) throws UnavailableException {
+ // check to see if too many bulk docs requests are already active
+ if(activeBulkDocsRequests.count() > maxConcurrentBulkDocs) {
+ totalTooManyConcurrentBulkDocsErrors.inc();
+ throw new UnavailableException("Too many concurrent _bulk_docs requests");
}
long start = System.currentTimeMillis();
@@ -402,6 +396,7 @@ protected String getDatabaseNameWithoutUUID(String database) {
bulkDocsStats.put("totalCount", meanBulkDocsRequests.count());
bulkDocsStats.put("totalTime", meanBulkDocsRequests.sum());
bulkDocsStats.put("avgTime", meanBulkDocsRequests.mean());
+ bulkDocsStats.put("tooManyConcurrentBulkDocsErrors", totalTooManyConcurrentBulkDocsErrors.count());
Map<String, Object> revsDiffStats = new HashMap<String, Object>();
revsDiffStats.put("activeCount", activeRevsDiffRequests.count());
@@ -411,7 +406,6 @@ protected String getDatabaseNameWithoutUUID(String database) {
stats.put("_bulk_docs", bulkDocsStats);
stats.put("_revs_diff", revsDiffStats);
- stats.put("totalServiceUnavailableErrors", totalServiceUnavailableErrors.count());
return stats;
}
@@ -37,12 +37,6 @@
protected Client client;
- protected boolean available = true;
-
- public void setAvailable(boolean available) {
- this.available = available;
- }
-
public ElasticSearchCouchbaseBehavior(Client client) {
this.client = client;
}
@@ -1,55 +0,0 @@
-package org.elasticsearch.transport.couchbase.capi;
-
-import org.elasticsearch.common.logging.ESLogger;
-
-
-public class HealthChecker implements Runnable {
-
- private boolean running = true;
- private ESLogger logger;
- private ElasticSearchCAPIBehavior capiBehavior;
- private ElasticSearchCouchbaseBehavior couchbaseBehavior;
- private long checkInterval;
- private long threshold;
-
- public HealthChecker(long checkInterval, long threshold, ESLogger logger, ElasticSearchCouchbaseBehavior couchbaseBehavior, ElasticSearchCAPIBehavior capiBehavior) {
- this.couchbaseBehavior = couchbaseBehavior;
- this.capiBehavior = capiBehavior;
- this.checkInterval = checkInterval;
- this.threshold = threshold;
- this.logger = logger;
- }
-
- @Override
- public void run() {
- while(running) {
- try {
- Thread.sleep(checkInterval);
-
- long start = System.currentTimeMillis();
- couchbaseBehavior.getBucketsInPool("default");
- long end = System.currentTimeMillis();
-
- long took = end - start;
- if(took > threshold) {
- logger.info("Health check took {} exceeding threshold {}, marking service unavailable.", took, threshold);
- couchbaseBehavior.setAvailable(false);
- capiBehavior.setAvailable(false);
- } else {
- logger.info("Health check passed, took {}", took);
- couchbaseBehavior.setAvailable(true);
- capiBehavior.setAvailable(true);
- }
-
-
- } catch (InterruptedException e) {
- continue;
- }
- }
- }
-
- public void stop() {
- running = false;
- }
-
-}

0 comments on commit 037f11a

Please sign in to comment.