Skip to content

Commit

Permalink
Standardized config and cleaned up some code
Browse files Browse the repository at this point in the history
  • Loading branch information
Chinmay Soman committed Apr 26, 2013
1 parent 4f097fe commit 18e3346
Show file tree
Hide file tree
Showing 5 changed files with 108 additions and 80 deletions.
65 changes: 65 additions & 0 deletions src/java/voldemort/client/ClientConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,14 @@ public class ClientConfig {
private volatile boolean enablePipelineRoutedStore = true;
private volatile int clientZoneId = Zone.DEFAULT_ZONE_ID;

/*
* Following properties are required for the Fat client wrapper to be
* embedded inside the CoordinatorService
*/
private volatile int fatClientWrapperMaxPoolSize = 20;
private volatile int fatClientWrapperCorePoolSize = 20;
private volatile int fatClientWrapperKeepAliveInSecs = 60;

/*
* The following are only used with a non pipe line routed, i.e non NIO
* based client
Expand Down Expand Up @@ -165,6 +173,9 @@ public ClientConfig() {}
public static final String ENABLE_COMPRESSION_LAYER = "enable_compression_layer";
public static final String ENABLE_SERIALIZATION_LAYER = "enable_serialization_layer";
public static final String ENABLE_INCONSISTENCY_RESOLVING_LAYER = "enable_inconsistency_resolving_layer";
public static final String FAT_CLIENT_WRAPPER_MAX_POOL_SIZE_PROPERTY = "fat_client_wrapper_max_pool_size";
public static final String FAT_CLIENT_WRAPPER_CORE_POOL_SIZE_PROPERTY = "fat_client_wrapper_core_pool_size";
public static final String FAT_CLIENT_WRAPPER_POOL_KEEPALIVE_IN_SECS = "fat_client_wrapper_pool_keepalive_in_secs";

/**
* Instantiate the client config using a properties file
Expand Down Expand Up @@ -380,6 +391,21 @@ private void setProperties(Properties properties) {
this.setEnableInconsistencyResolvingLayer(props.getBoolean(ENABLE_INCONSISTENCY_RESOLVING_LAYER));
}

if(props.containsKey(FAT_CLIENT_WRAPPER_CORE_POOL_SIZE_PROPERTY)) {
this.setFatClientWrapperCorePoolSize(props.getInt(FAT_CLIENT_WRAPPER_CORE_POOL_SIZE_PROPERTY,
this.fatClientWrapperCorePoolSize));
}

if(props.containsKey(FAT_CLIENT_WRAPPER_MAX_POOL_SIZE_PROPERTY)) {
this.setFatClientWrapperMaxPoolSize(props.getInt(FAT_CLIENT_WRAPPER_MAX_POOL_SIZE_PROPERTY,
this.fatClientWrapperMaxPoolSize));
}

if(props.containsKey(FAT_CLIENT_WRAPPER_POOL_KEEPALIVE_IN_SECS)) {
this.setFatClientWrapperKeepAliveInSecs(props.getInt(FAT_CLIENT_WRAPPER_POOL_KEEPALIVE_IN_SECS,
this.fatClientWrapperKeepAliveInSecs));
}

}

/**
Expand Down Expand Up @@ -1105,6 +1131,45 @@ public ClientConfig setEnableInconsistencyResolvingLayer(boolean enableInconsist
return this;
}

public int getFatClientWrapperMaxPoolSize() {
return fatClientWrapperMaxPoolSize;
}

/**
* @param fatClientWrapperMaxPoolSize Defines the Maximum pool size for the
* thread pool used in the Fat client wrapper
*/
public ClientConfig setFatClientWrapperMaxPoolSize(int fatClientWrapperMaxPoolSize) {
this.fatClientWrapperMaxPoolSize = fatClientWrapperMaxPoolSize;
return this;
}

public int getFatClientWrapperCorePoolSize() {
return fatClientWrapperCorePoolSize;
}

/**
* @param fatClientWrapperMaxPoolSize Defines the Core pool size for the
* thread pool used in the Fat client wrapper
*/
public ClientConfig setFatClientWrapperCorePoolSize(int fatClientWrapperCorePoolSize) {
this.fatClientWrapperCorePoolSize = fatClientWrapperCorePoolSize;
return this;
}

public int getFatClientWrapperKeepAliveInSecs() {
return fatClientWrapperKeepAliveInSecs;
}

/**
* @param fatClientWrapperKeepAliveInSecs Defines the Keep alive period in
* seconds for the thread pool used in the Fat client wrapper
*/
public ClientConfig setFatClientWrapperKeepAliveInSecs(int fatClientWrapperKeepAliveInSecs) {
this.fatClientWrapperKeepAliveInSecs = fatClientWrapperKeepAliveInSecs;
return this;
}

public String toString() {
StringBuilder clientConfigInfo = new StringBuilder();
clientConfigInfo.append("Max connections per node: " + this.maxConnectionsPerNode + "\n");
Expand Down
75 changes: 18 additions & 57 deletions src/java/voldemort/coordinator/CoordinatorConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,20 +34,16 @@ public class CoordinatorConfig {

private volatile List<String> bootstrapURLs = null;
private volatile String fatClientConfigPath = null;
private volatile int fatClientWrapperMaxPoolSize = 20;
private volatile int fatClientWrapperCorePoolSize = 20;
private volatile int fatClientWrapperKeepAliveInSecs = 60;
private volatile int metadataCheckIntervalInMs = 5000;
private volatile int nettyServerPort = 8080;
private volatile int nettyServerBacklog = 1000;

/* Propery names for propery-based configuration */
public static final String BOOTSTRAP_URLS_PROPERTY = "bootstrap_urls";
public static final String FAT_CLIENTS_CONFIG_FILE_PATH_PROPERTY = "fat_clients_config_file_path";
public static final String FAT_CLIENT_WRAPPER_MAX_POOL_SIZE_PROPERTY = "fat_client_wrapper_max_pool_size";
public static final String FAT_CLIENT_WRAPPER_CORE_POOL_SIZE_PROPERTY = "fat_client_wrapper_core_pool_size";
public static final String FAT_CLIENT_WRAPPER_POOL_KEEPALIVE_IN_SECS = "fat_client_wrapper_pool_keepalive_in_secs";
public static final String METADATA_CHECK_INTERVAL_IN_MS = "metadata_check_interval_in_ms";
public static final String NETTY_SERVER_PORT = "netty_server_port";
public static final String NETTY_SERVER_BACKLOG = "netty_server_backlog";

/**
* Instantiate the coordinator config using a properties file
Expand Down Expand Up @@ -100,21 +96,6 @@ private void setProperties(Properties properties) {
setFatClientConfigPath(props.getString(FAT_CLIENTS_CONFIG_FILE_PATH_PROPERTY));
}

if(props.containsKey(FAT_CLIENT_WRAPPER_CORE_POOL_SIZE_PROPERTY)) {
setFatClientWrapperCorePoolSize(props.getInt(FAT_CLIENT_WRAPPER_CORE_POOL_SIZE_PROPERTY,
this.fatClientWrapperCorePoolSize));
}

if(props.containsKey(FAT_CLIENT_WRAPPER_MAX_POOL_SIZE_PROPERTY)) {
setFatClientWrapperMaxPoolSize(props.getInt(FAT_CLIENT_WRAPPER_MAX_POOL_SIZE_PROPERTY,
this.fatClientWrapperMaxPoolSize));
}

if(props.containsKey(FAT_CLIENT_WRAPPER_POOL_KEEPALIVE_IN_SECS)) {
setFatClientWrapperKeepAliveInSecs(props.getInt(FAT_CLIENT_WRAPPER_POOL_KEEPALIVE_IN_SECS,
this.fatClientWrapperKeepAliveInSecs));
}

if(props.containsKey(METADATA_CHECK_INTERVAL_IN_MS)) {
setMetadataCheckIntervalInMs(props.getInt(METADATA_CHECK_INTERVAL_IN_MS,
this.metadataCheckIntervalInMs));
Expand All @@ -123,6 +104,10 @@ private void setProperties(Properties properties) {
if(props.containsKey(NETTY_SERVER_PORT)) {
setMetadataCheckIntervalInMs(props.getInt(NETTY_SERVER_PORT, this.nettyServerPort));
}

if(props.containsKey(NETTY_SERVER_BACKLOG)) {
setMetadataCheckIntervalInMs(props.getInt(NETTY_SERVER_BACKLOG, this.nettyServerBacklog));
}
}

public String[] getBootstrapURLs() {
Expand Down Expand Up @@ -161,42 +146,6 @@ public void setFatClientConfigPath(String fatClientConfigPath) {
this.fatClientConfigPath = fatClientConfigPath;
}

public int getFatClientWrapperMaxPoolSize() {
return fatClientWrapperMaxPoolSize;
}

/**
* @param fatClientWrapperMaxPoolSize Defines the Maximum pool size for the
* thread pool used in the Fat client wrapper
*/
public void setFatClientWrapperMaxPoolSize(int fatClientWrapperMaxPoolSize) {
this.fatClientWrapperMaxPoolSize = fatClientWrapperMaxPoolSize;
}

public int getFatClientWrapperCorePoolSize() {
return fatClientWrapperCorePoolSize;
}

/**
* @param fatClientWrapperMaxPoolSize Defines the Core pool size for the
* thread pool used in the Fat client wrapper
*/
public void setFatClientWrapperCorePoolSize(int fatClientWrapperCorePoolSize) {
this.fatClientWrapperCorePoolSize = fatClientWrapperCorePoolSize;
}

public int getFatClientWrapperKeepAliveInSecs() {
return fatClientWrapperKeepAliveInSecs;
}

/**
* @param fatClientWrapperKeepAliveInSecs Defines the Keep alive period in
* seconds for the thread pool used in the Fat client wrapper
*/
public void setFatClientWrapperKeepAliveInSecs(int fatClientWrapperKeepAliveInSecs) {
this.fatClientWrapperKeepAliveInSecs = fatClientWrapperKeepAliveInSecs;
}

public int getMetadataCheckIntervalInMs() {
return metadataCheckIntervalInMs;
}
Expand All @@ -222,4 +171,16 @@ public void setServerPort(int serverPort) {
this.nettyServerPort = serverPort;
}

public int getNettyServerBacklog() {
return nettyServerBacklog;
}

/**
* @param nettyServerBacklog Defines the netty server backlog value
*
*/
public void setNettyServerBacklog(int nettyServerBacklog) {
this.nettyServerBacklog = nettyServerBacklog;
}

}
34 changes: 17 additions & 17 deletions src/java/voldemort/coordinator/CoordinatorService.java
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@
@JmxManaged(description = "A Coordinator Service for proxying Voldemort HTTP requests")
public class CoordinatorService extends AbstractService {

private CoordinatorConfig config = null;
private CoordinatorConfig coordinatorConfig = null;

private boolean noop = false;
private SocketStoreClientFactory storeClientFactory = null;
Expand All @@ -92,7 +92,7 @@ public class CoordinatorService extends AbstractService {

public CoordinatorService(CoordinatorConfig config) {
super(ServiceType.COORDINATOR);
this.config = config;
this.coordinatorConfig = config;
this.coordinatorPerfStats = new StoreStats();
this.errorStats = new CoordinatorErrorStats();
RESTErrorHandler.setErrorStatsHandler(errorStats);
Expand All @@ -113,8 +113,8 @@ private void initializeFatClients() {

List<StoreDefinition> storeDefList = storeMapper.readStoreList(new StringReader(storesXml),
false);
Map<String, ClientConfig> fatClientConfigMap = readClientConfig(this.config.getFatClientConfigPath(),
this.config.getBootstrapURLs());
Map<String, ClientConfig> fatClientConfigMap = readClientConfig(this.coordinatorConfig.getFatClientConfigPath(),
this.coordinatorConfig.getBootstrapURLs());
// For now Simply create the map of store definition to
// FatClientWrappers
// TODO: After the fat client improvements is done, modify this to
Expand All @@ -127,7 +127,7 @@ private void initializeFatClients() {
logger.info("Creating a Fat client wrapper for store: " + storeName);
logger.info("Using config: " + fatClientConfigMap.get(storeName));
fatClientMap.put(storeName, new FatClientWrapper(storeName,
this.config,
this.coordinatorConfig,
fatClientConfigMap.get(storeName),
storesXml,
clusterXml,
Expand All @@ -141,7 +141,7 @@ protected void startInner() {

// Initialize the Voldemort Metadata
ClientConfig clientConfig = new ClientConfig();
clientConfig.setBootstrapUrls(this.config.getBootstrapURLs());
clientConfig.setBootstrapUrls(this.coordinatorConfig.getBootstrapURLs());
storeClientFactory = new SocketStoreClientFactory(clientConfig);
initializeFatClients();

Expand Down Expand Up @@ -173,13 +173,13 @@ public Void call() throws Exception {
schedulerService.schedule(asyncMetadataManager.getClass().getName(),
asyncMetadataManager,
new Date(),
this.config.getMetadataCheckIntervalInMs());
this.coordinatorConfig.getMetadataCheckIntervalInMs());

// Configure the server.
this.workerPool = (ThreadPoolExecutor) Executors.newCachedThreadPool();
this.bootstrap = new ServerBootstrap(new NioServerSocketChannelFactory(Executors.newCachedThreadPool(),
workerPool));
this.bootstrap.setOption("backlog", 1000);
this.bootstrap.setOption("backlog", this.coordinatorConfig.getNettyServerBacklog());
this.bootstrap.setOption("child.tcpNoDelay", true);
this.bootstrap.setOption("child.keepAlive", true);
this.bootstrap.setOption("child.reuseAddress", true);
Expand All @@ -201,9 +201,9 @@ public Void call() throws Exception {
JmxUtils.getClassName(this.errorStats.getClass())));

// Bind and start to accept incoming connections.
this.nettyServerChannel = this.bootstrap.bind(new InetSocketAddress(this.config.getServerPort()));
this.nettyServerChannel = this.bootstrap.bind(new InetSocketAddress(this.coordinatorConfig.getServerPort()));

logger.info("Coordinator service started on port " + this.config.getServerPort());
logger.info("Coordinator service started on port " + this.coordinatorConfig.getServerPort());
}

/**
Expand Down Expand Up @@ -244,14 +244,14 @@ private static Map<String, ClientConfig> readClientConfig(String configFilePath,
throw new Exception("Illegal Store Name !!!");
}

ClientConfig config = new ClientConfig(props);
config.setBootstrapUrls(bootstrapURLs)
.setEnableCompressionLayer(false)
.setEnableSerializationLayer(false)
.enableDefaultClient(true)
.setEnableLazy(false);
ClientConfig fatClientConfig = new ClientConfig(props);
fatClientConfig.setBootstrapUrls(bootstrapURLs)
.setEnableCompressionLayer(false)
.setEnableSerializationLayer(false)
.enableDefaultClient(true)
.setEnableLazy(false);

storeNameConfigMap.put(storeName, config);
storeNameConfigMap.put(storeName, fatClientConfig);

}
}
Expand Down
2 changes: 2 additions & 0 deletions src/java/voldemort/coordinator/DynamicTimeoutStoreClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
* features: 1) Per call timeout facility 2) Ability to disable resolution per
* call
*
* TODO: Merge this with DefaultStoreClient eventually.
*
* @param <K> Type of the Key
* @param <V> Type of the Value
*/
Expand Down
12 changes: 6 additions & 6 deletions src/java/voldemort/coordinator/FatClientWrapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public class FatClientWrapper {
private ThreadPoolExecutor fatClientExecutor;
private SocketStoreClientFactory storeClientFactory;
private DynamicTimeoutStoreClient<ByteArray, byte[]> dynamicTimeoutClient;
private final CoordinatorConfig config;
private final CoordinatorConfig coordinatorConfig;
private final Logger logger = Logger.getLogger(FatClientWrapper.class);
private final String storeName;
private final CoordinatorErrorStats errorStats;
Expand All @@ -71,15 +71,15 @@ public FatClientWrapper(String storeName,
CoordinatorErrorStats errorStats,
StoreStats coordinatorPerfStats) {

this.config = config;
this.coordinatorConfig = config;

// TODO: Import this from Config
this.fatClientExecutor = new ThreadPoolExecutor(this.config.getFatClientWrapperCorePoolSize(),
this.config.getFatClientWrapperMaxPoolSize(),
this.config.getFatClientWrapperKeepAliveInSecs(), // Keepalive
this.fatClientExecutor = new ThreadPoolExecutor(clientConfig.getFatClientWrapperCorePoolSize(),
clientConfig.getFatClientWrapperMaxPoolSize(),
clientConfig.getFatClientWrapperKeepAliveInSecs(), // Keepalive
TimeUnit.SECONDS, // Keepalive
// Timeunit
new ArrayBlockingQueue<Runnable>(this.config.getFatClientWrapperMaxPoolSize(),
new ArrayBlockingQueue<Runnable>(clientConfig.getFatClientWrapperMaxPoolSize(),
true),

new ThreadFactory() {
Expand Down

0 comments on commit 18e3346

Please sign in to comment.