Skip to content

Commit

Permalink
added new parameters to monitor health of node
Browse files Browse the repository at this point in the history
couchbase.healthCheckInterval determines how often we check the health of the node
couchbase.healthCheckThreshold determines how long the health check operation can take
if it takes longer than this, the service will be marked unavailable until it
returns to a value below the threshold
current the healthcheck operation is retreiving the bucket map
aka :9091/pools/default/buckets
  • Loading branch information
mschoch committed Feb 20, 2013
1 parent 32e6f1a commit ea89ae8
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 21 deletions.
Expand Up @@ -33,17 +33,15 @@
import org.elasticsearch.rest.RestController;
import org.elasticsearch.transport.couchbase.CouchbaseCAPITransport;

import com.couchbase.capi.CAPIBehavior;
import com.couchbase.capi.CAPIServer;
import com.couchbase.capi.CouchbaseBehavior;

public class CouchbaseCAPITransportImpl extends AbstractLifecycleComponent<CouchbaseCAPITransport> implements CouchbaseCAPITransport {

public static final String DEFAULT_DOCUMENT_TYPE_DOCUMENT = "couchbaseDocument";
public static final String DEFAULT_DOCUMENT_TYPE_CHECKPOINT = "couchbaseCheckpoint";

private CAPIBehavior capiBehavior;
private CouchbaseBehavior couchbaseBehavior;
private ElasticSearchCAPIBehavior capiBehavior;
private ElasticSearchCouchbaseBehavior couchbaseBehavior;
private CAPIServer server;
private Client client;
private final NetworkService networkService;
Expand All @@ -67,7 +65,10 @@ public class CouchbaseCAPITransportImpl extends AbstractLifecycleComponent<Couch

private final int numVbuckets;

private final long maxConcurrentBulkDocs;
private final long healthCheckInterval;
private final long healthCheckThreshold;

private HealthChecker healthChecker;

@Inject
public CouchbaseCAPITransportImpl(Settings settings, RestController restController, NetworkService networkService, IndicesService indicesService, MetaDataMappingService metaDataMappingService, Client client) {
Expand All @@ -85,7 +86,9 @@ public CouchbaseCAPITransportImpl(Settings settings, RestController restControll
this.checkpointDocumentType = settings.get("couchbase.checkpointDocumentType", DEFAULT_DOCUMENT_TYPE_CHECKPOINT);
this.dynamicTypePath = settings.get("couchbase.dynamicTypePath");
this.resolveConflicts = settings.getAsBoolean("couchbase.resolveConflicts", true);
this.maxConcurrentBulkDocs = settings.getAsLong("couchbase.maxConcurrentBulkDocs", 64L);

this.healthCheckInterval = settings.getAsLong("couchbase.healthCheckInterval", 30000L);
this.healthCheckThreshold = settings.getAsLong("couchbase.healthCheckThreshold", 10000L);

int defaultNumVbuckets = 1024;
if(System.getProperty("os.name").toLowerCase().contains("mac")) {
Expand Down Expand Up @@ -118,8 +121,11 @@ protected void doStart() throws ElasticSearchException {
final InetAddress publishAddressHost = publishAddressHostX;


capiBehavior = new ElasticSearchCAPIBehavior(client, logger, defaultDocumentType, checkpointDocumentType, dynamicTypePath, resolveConflicts.booleanValue(), maxConcurrentBulkDocs);
capiBehavior = new ElasticSearchCAPIBehavior(client, logger, defaultDocumentType, checkpointDocumentType, dynamicTypePath, resolveConflicts.booleanValue());
couchbaseBehavior = new ElasticSearchCouchbaseBehavior(client);
this.healthChecker = new HealthChecker(healthCheckInterval, healthCheckThreshold, logger, couchbaseBehavior, capiBehavior);
Thread healthCheckerThread = new Thread(healthChecker);
healthCheckerThread.start();

PortsRange portsRange = new PortsRange(port);
final AtomicReference<Exception> lastException = new AtomicReference<Exception>();
Expand Down Expand Up @@ -159,6 +165,7 @@ public boolean onPortNumber(int portNumber) {

@Override
protected void doStop() throws ElasticSearchException {
this.healthChecker.stop();
if(server != null) {
try {
server.stop();
Expand Down
Expand Up @@ -23,8 +23,6 @@
import java.util.Map.Entry;
import java.util.UUID;

import javax.servlet.UnavailableException;

import org.codehaus.jackson.map.ObjectMapper;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequestBuilder;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
Expand Down Expand Up @@ -61,11 +59,15 @@ public class ElasticSearchCAPIBehavior implements CAPIBehavior {
protected MeanMetric meanRevsDiffRequests;
protected CounterMetric activeBulkDocsRequests;
protected MeanMetric meanBulkDocsRequests;
protected CounterMetric totalTooManyConcurrentBulkDocsErrors;
protected CounterMetric totalServiceUnavailableErrors;

protected boolean available = true;

protected long maxConcurrentBulkDocs;
public void setAvailable(boolean available) {
this.available = available;
}

public ElasticSearchCAPIBehavior(Client client, ESLogger logger, String defaultDocumentType, String checkpointDocumentType, String dynamicTypePath, boolean resolveConflicts, long maxConcurrentBulkDocs) {
public ElasticSearchCAPIBehavior(Client client, ESLogger logger, String defaultDocumentType, String checkpointDocumentType, String dynamicTypePath, boolean resolveConflicts) {
this.client = client;
this.logger = logger;
this.defaultDocumentType = defaultDocumentType;
Expand All @@ -77,9 +79,7 @@ public ElasticSearchCAPIBehavior(Client client, ESLogger logger, String defaultD
this.meanRevsDiffRequests = new MeanMetric();
this.activeBulkDocsRequests = new CounterMetric();
this.meanBulkDocsRequests = new MeanMetric();
this.totalTooManyConcurrentBulkDocsErrors = new CounterMetric();

this.maxConcurrentBulkDocs = maxConcurrentBulkDocs;
this.totalServiceUnavailableErrors = new CounterMetric();
}

@Override
Expand Down Expand Up @@ -122,6 +122,12 @@ public boolean ensureFullCommit(String database) {
public Map<String, Object> revsDiff(String database,
Map<String, Object> revsMap) {

// check to see if we're too busy
if(!available) {
totalServiceUnavailableErrors.inc();
throw new IllegalStateException("Service Temporarily Unavailable");
}

long start = System.currentTimeMillis();
activeRevsDiffRequests.inc();
logger.trace("_revs_diff request for {} : {}", database, revsMap);
Expand Down Expand Up @@ -178,11 +184,11 @@ public Map<String, Object> revsDiff(String database,
}

@Override
public List<Object> bulkDocs(String database, List<Map<String, Object>> docs) throws UnavailableException {
// check to see if too many bulk docs requests are already active
if(activeBulkDocsRequests.count() > maxConcurrentBulkDocs) {
totalTooManyConcurrentBulkDocsErrors.inc();
throw new UnavailableException("Too many concurrent _bulk_docs requests");
public List<Object> bulkDocs(String database, List<Map<String, Object>> docs) {
// check to see if we're too busy
if(!available) {
totalServiceUnavailableErrors.inc();
throw new IllegalStateException("Service Temporarily Unavailable");
}

long start = System.currentTimeMillis();
Expand Down Expand Up @@ -396,7 +402,6 @@ public Map<String, Object> getStats() {
bulkDocsStats.put("totalCount", meanBulkDocsRequests.count());
bulkDocsStats.put("totalTime", meanBulkDocsRequests.sum());
bulkDocsStats.put("avgTime", meanBulkDocsRequests.mean());
bulkDocsStats.put("tooManyConcurrentBulkDocsErrors", totalTooManyConcurrentBulkDocsErrors.count());

Map<String, Object> revsDiffStats = new HashMap<String, Object>();
revsDiffStats.put("activeCount", activeRevsDiffRequests.count());
Expand All @@ -406,6 +411,7 @@ public Map<String, Object> getStats() {

stats.put("_bulk_docs", bulkDocsStats);
stats.put("_revs_diff", revsDiffStats);
stats.put("totalServiceUnavailableErrors", totalServiceUnavailableErrors.count());

return stats;
}
Expand Down
Expand Up @@ -37,6 +37,12 @@ public class ElasticSearchCouchbaseBehavior implements CouchbaseBehavior {

protected Client client;

protected boolean available = true;

public void setAvailable(boolean available) {
this.available = available;
}

public ElasticSearchCouchbaseBehavior(Client client) {
this.client = client;
}
Expand Down
@@ -0,0 +1,55 @@
package org.elasticsearch.transport.couchbase.capi;

import org.elasticsearch.common.logging.ESLogger;


public class HealthChecker implements Runnable {

private boolean running = true;
private ESLogger logger;
private ElasticSearchCAPIBehavior capiBehavior;
private ElasticSearchCouchbaseBehavior couchbaseBehavior;
private long checkInterval;
private long threshold;

public HealthChecker(long checkInterval, long threshold, ESLogger logger, ElasticSearchCouchbaseBehavior couchbaseBehavior, ElasticSearchCAPIBehavior capiBehavior) {
this.couchbaseBehavior = couchbaseBehavior;
this.capiBehavior = capiBehavior;
this.checkInterval = checkInterval;
this.threshold = threshold;
this.logger = logger;
}

@Override
public void run() {
while(running) {
try {
Thread.sleep(checkInterval);

long start = System.currentTimeMillis();
couchbaseBehavior.getBucketsInPool("default");
long end = System.currentTimeMillis();

long took = end - start;
if(took > threshold) {
logger.info("Health check took {} exceeding threshold {}, marking service unavailable.", took, threshold);
couchbaseBehavior.setAvailable(false);
capiBehavior.setAvailable(false);
} else {
logger.info("Health check passed, took {}", took);
couchbaseBehavior.setAvailable(true);
capiBehavior.setAvailable(true);
}


} catch (InterruptedException e) {
continue;
}
}
}

public void stop() {
running = false;
}

}

0 comments on commit ea89ae8

Please sign in to comment.