Skip to content

Commit

Permalink
* Added monitoring (JMX) for fat client wrapper, netty server and req…
Browse files Browse the repository at this point in the history
…uest handler

* Fixed some bugs and cleanup
* Added unit tests for dynamic timeout store and REST API validation
  • Loading branch information
Chinmay Soman committed Apr 26, 2013
1 parent 6ca2f83 commit e6e9e7e
Show file tree
Hide file tree
Showing 24 changed files with 1,127 additions and 213 deletions.
77 changes: 77 additions & 0 deletions src/java/voldemort/coordinator/CoordinatorConfig.java
@@ -1,3 +1,19 @@
/*
* Copyright 2013 LinkedIn, Inc
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package voldemort.coordinator;

import java.io.BufferedInputStream;
Expand All @@ -22,6 +38,7 @@ public class CoordinatorConfig {
private volatile int fatClientWrapperCorePoolSize = 20;
private volatile int fatClientWrapperKeepAliveInSecs = 60;
private volatile int metadataCheckIntervalInMs = 5000;
private volatile int nettyServerPort = 8080;

/* Propery names for propery-based configuration */
public static final String BOOTSTRAP_URLS_PROPERTY = "bootstrap_urls";
Expand All @@ -30,6 +47,7 @@ public class CoordinatorConfig {
public static final String FAT_CLIENT_WRAPPER_CORE_POOL_SIZE_PROPERTY = "fat_client_wrapper_core_pool_size";
public static final String FAT_CLIENT_WRAPPER_POOL_KEEPALIVE_IN_SECS = "fat_client_wrapper_pool_keepalive_in_secs";
public static final String METADATA_CHECK_INTERVAL_IN_MS = "metadata_check_interval_in_ms";
public static final String NETTY_SERVER_PORT = "netty_server_port";

/**
* Instantiate the coordinator config using a properties file
Expand Down Expand Up @@ -61,6 +79,17 @@ public CoordinatorConfig(Properties properties) {
setProperties(properties);
}

/**
* Dummy constructor for testing purposes
*/
public CoordinatorConfig() {}

/**
* Set the values using the specified Properties object
*
* @param properties Properties object containing specific property values
* for the Coordinator config
*/
private void setProperties(Properties properties) {
Props props = new Props(properties);
if(props.containsKey(BOOTSTRAP_URLS_PROPERTY)) {
Expand Down Expand Up @@ -90,6 +119,10 @@ private void setProperties(Properties properties) {
setMetadataCheckIntervalInMs(props.getInt(METADATA_CHECK_INTERVAL_IN_MS,
this.metadataCheckIntervalInMs));
}

if(props.containsKey(NETTY_SERVER_PORT)) {
setMetadataCheckIntervalInMs(props.getInt(NETTY_SERVER_PORT, this.nettyServerPort));
}
}

public String[] getBootstrapURLs() {
Expand All @@ -98,6 +131,14 @@ public String[] getBootstrapURLs() {
return this.bootstrapURLs.toArray(new String[this.bootstrapURLs.size()]);
}

/**
* Sets the bootstrap URLs used by the different Fat clients inside the
* Coordinator
*
* @param bootstrapUrls list of bootstrap URLs defining which cluster to
* connect to
* @return
*/
public CoordinatorConfig setBootstrapURLs(List<String> bootstrapUrls) {
this.bootstrapURLs = Utils.notNull(bootstrapUrls);
if(this.bootstrapURLs.size() <= 0)
Expand All @@ -109,6 +150,13 @@ public String getFatClientConfigPath() {
return fatClientConfigPath;
}

/**
* Defines individual config for each of the fat clients managed by the
* Coordinator
*
* @param fatClientConfigPath The path of the file containing the fat client
* config in Avro format
*/
public void setFatClientConfigPath(String fatClientConfigPath) {
this.fatClientConfigPath = fatClientConfigPath;
}
Expand All @@ -117,6 +165,10 @@ public int getFatClientWrapperMaxPoolSize() {
return fatClientWrapperMaxPoolSize;
}

/**
* @param fatClientWrapperMaxPoolSize Defines the Maximum pool size for the
* thread pool used in the Fat client wrapper
*/
public void setFatClientWrapperMaxPoolSize(int fatClientWrapperMaxPoolSize) {
this.fatClientWrapperMaxPoolSize = fatClientWrapperMaxPoolSize;
}
Expand All @@ -125,6 +177,10 @@ public int getFatClientWrapperCorePoolSize() {
return fatClientWrapperCorePoolSize;
}

/**
* @param fatClientWrapperMaxPoolSize Defines the Core pool size for the
* thread pool used in the Fat client wrapper
*/
public void setFatClientWrapperCorePoolSize(int fatClientWrapperCorePoolSize) {
this.fatClientWrapperCorePoolSize = fatClientWrapperCorePoolSize;
}
Expand All @@ -133,6 +189,10 @@ public int getFatClientWrapperKeepAliveInSecs() {
return fatClientWrapperKeepAliveInSecs;
}

/**
* @param fatClientWrapperKeepAliveInSecs Defines the Keep alive period in
* seconds for the thread pool used in the Fat client wrapper
*/
public void setFatClientWrapperKeepAliveInSecs(int fatClientWrapperKeepAliveInSecs) {
this.fatClientWrapperKeepAliveInSecs = fatClientWrapperKeepAliveInSecs;
}
Expand All @@ -141,8 +201,25 @@ public int getMetadataCheckIntervalInMs() {
return metadataCheckIntervalInMs;
}

/**
* @param metadataCheckIntervalInMs Defines the frequency with which to
* check for updates in the cluster metadata (Eg: cluster.xml and
* stores.xml)
*/
public void setMetadataCheckIntervalInMs(int metadataCheckIntervalInMs) {
this.metadataCheckIntervalInMs = metadataCheckIntervalInMs;
}

public int getServerPort() {
return nettyServerPort;
}

/**
* @param serverPort Defines the port to use while bootstrapping the Netty
* server
*/
public void setServerPort(int serverPort) {
this.nettyServerPort = serverPort;
}

}
52 changes: 52 additions & 0 deletions src/java/voldemort/coordinator/CoordinatorErrorStats.java
@@ -0,0 +1,52 @@
package voldemort.coordinator;

import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicLong;

import voldemort.VoldemortException;
import voldemort.annotations.jmx.JmxGetter;
import voldemort.store.InsufficientOperationalNodesException;
import voldemort.store.InsufficientZoneResponsesException;
import voldemort.store.InvalidMetadataException;
import voldemort.store.routed.PipelineRoutedStats;

/**
* Class to keep track of all the errors in the Coordinator service
*
*/
public class CoordinatorErrorStats extends PipelineRoutedStats {

CoordinatorErrorStats() {
super();
this.errCountMap.put(RejectedExecutionException.class, new AtomicLong(0));
this.errCountMap.put(IllegalArgumentException.class, new AtomicLong(0));
this.errCountMap.put(VoldemortException.class, new AtomicLong(0));
}

@Override
public boolean isSevere(Exception ve) {
if(ve instanceof InsufficientOperationalNodesException
|| ve instanceof InsufficientZoneResponsesException
|| ve instanceof InvalidMetadataException || ve instanceof RejectedExecutionException
|| ve instanceof IllegalArgumentException || ve instanceof VoldemortException)
return true;
else
return false;
}

@JmxGetter(name = "numRejectedExecutionExceptions", description = "Number of rejected tasks by the Fat client")
public long getNumRejectedExecutionExceptions() {
return errCountMap.get(RejectedExecutionException.class).get();
}

@JmxGetter(name = "numIllegalArgumentExceptions", description = "Number of bad requests received by the Coordinator")
public long getNumIllegalArgumentExceptions() {
return errCountMap.get(IllegalArgumentException.class).get();
}

@JmxGetter(name = "numVoldemortExceptions", description = "Number of failed Voldemort operations")
public long getNumVoldemortExceptions() {
return errCountMap.get(VoldemortException.class).get();
}

}
Expand Up @@ -36,9 +36,13 @@ public class CoordinatorPipelineFactory implements ChannelPipelineFactory {

private boolean noop = false;
private Map<String, FatClientWrapper> fatClientMap;
private CoordinatorErrorStats errorStats = null;

public CoordinatorPipelineFactory(Map<String, FatClientWrapper> fatClientMap, boolean noop) {
public CoordinatorPipelineFactory(Map<String, FatClientWrapper> fatClientMap,
CoordinatorErrorStats errorStats,
boolean noop) {
this.fatClientMap = fatClientMap;
this.errorStats = errorStats;
this.noop = noop;
}

Expand All @@ -56,7 +60,8 @@ public ChannelPipeline getPipeline() throws Exception {
if(this.noop) {
pipeline.addLast("handler", new NoopHttpRequestHandler());
} else {
pipeline.addLast("handler", new VoldemortHttpRequestHandler(this.fatClientMap));
pipeline.addLast("handler", new VoldemortHttpRequestHandler(this.fatClientMap,
this.errorStats));
}
return pipeline;
}
Expand Down

0 comments on commit e6e9e7e

Please sign in to comment.