From e6e9e7eb3091a5c84b245ecc88f70b0d30a8f3fd Mon Sep 17 00:00:00 2001 From: Chinmay Soman Date: Sun, 31 Mar 2013 16:23:50 -0700 Subject: [PATCH] * Added monitoring (JMX) for fat client wrapper, netty server and request handler * Fixed some bugs and cleanup * Added unit tests for dynamic timeout store and REST API validation --- .../coordinator/CoordinatorConfig.java | 77 ++++++ .../coordinator/CoordinatorErrorStats.java | 52 ++++ .../CoordinatorPipelineFactory.java | 9 +- .../coordinator/CoordinatorService.java | 140 ++++++++-- .../DynamicTimeoutStoreClient.java | 62 ++++- .../coordinator/FatClientWrapper.java | 133 ++++++---- .../HttpDeleteRequestExecutor.java | 41 +-- .../HttpGetAllRequestExecutor.java | 40 +-- .../coordinator/HttpGetRequestExecutor.java | 41 ++- .../coordinator/HttpPutRequestExecutor.java | 46 ++-- .../coordinator/NoopHttpRequestHandler.java | 15 +- .../coordinator/RESTErrorHandler.java | 27 +- .../VoldemortHttpRequestHandler.java | 60 ++--- .../CompositeDeleteVoldemortRequest.java | 26 +- .../CompositeGetAllVoldemortRequest.java | 28 +- .../store/CompositeGetVoldemortRequest.java | 33 ++- .../store/CompositePutVoldemortRequest.java | 25 +- ...CompositeVersionedPutVoldemortRequest.java | 26 +- .../store/CompositeVoldemortRequest.java | 32 ++- .../store/routed/PipelineRoutedStats.java | 10 +- .../voldemort/config/fat-client-config.avro | 11 + test/common/voldemort/config/single-store.xml | 16 ++ .../coordinator/CoordinatorRestAPITest.java | 245 ++++++++++++++++++ .../DynamicTimeoutStoreClientTest.java | 145 +++++++++++ 24 files changed, 1127 insertions(+), 213 deletions(-) create mode 100644 src/java/voldemort/coordinator/CoordinatorErrorStats.java create mode 100644 test/common/voldemort/config/fat-client-config.avro create mode 100644 test/unit/voldemort/coordinator/CoordinatorRestAPITest.java create mode 100644 test/unit/voldemort/coordinator/DynamicTimeoutStoreClientTest.java diff --git a/src/java/voldemort/coordinator/CoordinatorConfig.java b/src/java/voldemort/coordinator/CoordinatorConfig.java index 670f278b6d..4711ec9a72 100644 --- a/src/java/voldemort/coordinator/CoordinatorConfig.java +++ b/src/java/voldemort/coordinator/CoordinatorConfig.java @@ -1,3 +1,19 @@ +/* + * Copyright 2013 LinkedIn, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + package voldemort.coordinator; import java.io.BufferedInputStream; @@ -22,6 +38,7 @@ public class CoordinatorConfig { private volatile int fatClientWrapperCorePoolSize = 20; private volatile int fatClientWrapperKeepAliveInSecs = 60; private volatile int metadataCheckIntervalInMs = 5000; + private volatile int nettyServerPort = 8080; /* Propery names for propery-based configuration */ public static final String BOOTSTRAP_URLS_PROPERTY = "bootstrap_urls"; @@ -30,6 +47,7 @@ public class CoordinatorConfig { public static final String FAT_CLIENT_WRAPPER_CORE_POOL_SIZE_PROPERTY = "fat_client_wrapper_core_pool_size"; public static final String FAT_CLIENT_WRAPPER_POOL_KEEPALIVE_IN_SECS = "fat_client_wrapper_pool_keepalive_in_secs"; public static final String METADATA_CHECK_INTERVAL_IN_MS = "metadata_check_interval_in_ms"; + public static final String NETTY_SERVER_PORT = "netty_server_port"; /** * Instantiate the coordinator config using a properties file @@ -61,6 +79,17 @@ public CoordinatorConfig(Properties properties) { setProperties(properties); } + /** + * Dummy constructor for testing purposes + */ + public CoordinatorConfig() {} + + /** + * Set the values using the specified Properties object + * + * @param properties Properties object containing specific property values + * for the Coordinator config + */ private void setProperties(Properties properties) { Props props = new Props(properties); if(props.containsKey(BOOTSTRAP_URLS_PROPERTY)) { @@ -90,6 +119,10 @@ private void setProperties(Properties properties) { setMetadataCheckIntervalInMs(props.getInt(METADATA_CHECK_INTERVAL_IN_MS, this.metadataCheckIntervalInMs)); } + + if(props.containsKey(NETTY_SERVER_PORT)) { + setMetadataCheckIntervalInMs(props.getInt(NETTY_SERVER_PORT, this.nettyServerPort)); + } } public String[] getBootstrapURLs() { @@ -98,6 +131,14 @@ public String[] getBootstrapURLs() { return this.bootstrapURLs.toArray(new String[this.bootstrapURLs.size()]); } + /** + * Sets the bootstrap URLs used by the different Fat clients inside the + * Coordinator + * + * @param bootstrapUrls list of bootstrap URLs defining which cluster to + * connect to + * @return + */ public CoordinatorConfig setBootstrapURLs(List bootstrapUrls) { this.bootstrapURLs = Utils.notNull(bootstrapUrls); if(this.bootstrapURLs.size() <= 0) @@ -109,6 +150,13 @@ public String getFatClientConfigPath() { return fatClientConfigPath; } + /** + * Defines individual config for each of the fat clients managed by the + * Coordinator + * + * @param fatClientConfigPath The path of the file containing the fat client + * config in Avro format + */ public void setFatClientConfigPath(String fatClientConfigPath) { this.fatClientConfigPath = fatClientConfigPath; } @@ -117,6 +165,10 @@ public int getFatClientWrapperMaxPoolSize() { return fatClientWrapperMaxPoolSize; } + /** + * @param fatClientWrapperMaxPoolSize Defines the Maximum pool size for the + * thread pool used in the Fat client wrapper + */ public void setFatClientWrapperMaxPoolSize(int fatClientWrapperMaxPoolSize) { this.fatClientWrapperMaxPoolSize = fatClientWrapperMaxPoolSize; } @@ -125,6 +177,10 @@ public int getFatClientWrapperCorePoolSize() { return fatClientWrapperCorePoolSize; } + /** + * @param fatClientWrapperMaxPoolSize Defines the Core pool size for the + * thread pool used in the Fat client wrapper + */ public void setFatClientWrapperCorePoolSize(int fatClientWrapperCorePoolSize) { this.fatClientWrapperCorePoolSize = fatClientWrapperCorePoolSize; } @@ -133,6 +189,10 @@ public int getFatClientWrapperKeepAliveInSecs() { return fatClientWrapperKeepAliveInSecs; } + /** + * @param fatClientWrapperKeepAliveInSecs Defines the Keep alive period in + * seconds for the thread pool used in the Fat client wrapper + */ public void setFatClientWrapperKeepAliveInSecs(int fatClientWrapperKeepAliveInSecs) { this.fatClientWrapperKeepAliveInSecs = fatClientWrapperKeepAliveInSecs; } @@ -141,8 +201,25 @@ public int getMetadataCheckIntervalInMs() { return metadataCheckIntervalInMs; } + /** + * @param metadataCheckIntervalInMs Defines the frequency with which to + * check for updates in the cluster metadata (Eg: cluster.xml and + * stores.xml) + */ public void setMetadataCheckIntervalInMs(int metadataCheckIntervalInMs) { this.metadataCheckIntervalInMs = metadataCheckIntervalInMs; } + public int getServerPort() { + return nettyServerPort; + } + + /** + * @param serverPort Defines the port to use while bootstrapping the Netty + * server + */ + public void setServerPort(int serverPort) { + this.nettyServerPort = serverPort; + } + } diff --git a/src/java/voldemort/coordinator/CoordinatorErrorStats.java b/src/java/voldemort/coordinator/CoordinatorErrorStats.java new file mode 100644 index 0000000000..96a55ca917 --- /dev/null +++ b/src/java/voldemort/coordinator/CoordinatorErrorStats.java @@ -0,0 +1,52 @@ +package voldemort.coordinator; + +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.atomic.AtomicLong; + +import voldemort.VoldemortException; +import voldemort.annotations.jmx.JmxGetter; +import voldemort.store.InsufficientOperationalNodesException; +import voldemort.store.InsufficientZoneResponsesException; +import voldemort.store.InvalidMetadataException; +import voldemort.store.routed.PipelineRoutedStats; + +/** + * Class to keep track of all the errors in the Coordinator service + * + */ +public class CoordinatorErrorStats extends PipelineRoutedStats { + + CoordinatorErrorStats() { + super(); + this.errCountMap.put(RejectedExecutionException.class, new AtomicLong(0)); + this.errCountMap.put(IllegalArgumentException.class, new AtomicLong(0)); + this.errCountMap.put(VoldemortException.class, new AtomicLong(0)); + } + + @Override + public boolean isSevere(Exception ve) { + if(ve instanceof InsufficientOperationalNodesException + || ve instanceof InsufficientZoneResponsesException + || ve instanceof InvalidMetadataException || ve instanceof RejectedExecutionException + || ve instanceof IllegalArgumentException || ve instanceof VoldemortException) + return true; + else + return false; + } + + @JmxGetter(name = "numRejectedExecutionExceptions", description = "Number of rejected tasks by the Fat client") + public long getNumRejectedExecutionExceptions() { + return errCountMap.get(RejectedExecutionException.class).get(); + } + + @JmxGetter(name = "numIllegalArgumentExceptions", description = "Number of bad requests received by the Coordinator") + public long getNumIllegalArgumentExceptions() { + return errCountMap.get(IllegalArgumentException.class).get(); + } + + @JmxGetter(name = "numVoldemortExceptions", description = "Number of failed Voldemort operations") + public long getNumVoldemortExceptions() { + return errCountMap.get(VoldemortException.class).get(); + } + +} diff --git a/src/java/voldemort/coordinator/CoordinatorPipelineFactory.java b/src/java/voldemort/coordinator/CoordinatorPipelineFactory.java index 1646f02130..0252539a57 100644 --- a/src/java/voldemort/coordinator/CoordinatorPipelineFactory.java +++ b/src/java/voldemort/coordinator/CoordinatorPipelineFactory.java @@ -36,9 +36,13 @@ public class CoordinatorPipelineFactory implements ChannelPipelineFactory { private boolean noop = false; private Map fatClientMap; + private CoordinatorErrorStats errorStats = null; - public CoordinatorPipelineFactory(Map fatClientMap, boolean noop) { + public CoordinatorPipelineFactory(Map fatClientMap, + CoordinatorErrorStats errorStats, + boolean noop) { this.fatClientMap = fatClientMap; + this.errorStats = errorStats; this.noop = noop; } @@ -56,7 +60,8 @@ public ChannelPipeline getPipeline() throws Exception { if(this.noop) { pipeline.addLast("handler", new NoopHttpRequestHandler()); } else { - pipeline.addLast("handler", new VoldemortHttpRequestHandler(this.fatClientMap)); + pipeline.addLast("handler", new VoldemortHttpRequestHandler(this.fatClientMap, + this.errorStats)); } return pipeline; } diff --git a/src/java/voldemort/coordinator/CoordinatorService.java b/src/java/voldemort/coordinator/CoordinatorService.java index a6a8a508bc..56b57a9579 100644 --- a/src/java/voldemort/coordinator/CoordinatorService.java +++ b/src/java/voldemort/coordinator/CoordinatorService.java @@ -1,5 +1,5 @@ /* - * Copyright 2008-2013 LinkedIn, Inc + * Copyright 2013 LinkedIn, Inc * * Licensed under the Apache License, Version 2.0 (the "License"); you may not * use this file except in compliance with the License. You may obtain a copy of @@ -31,6 +31,7 @@ import java.util.Properties; import java.util.concurrent.Callable; import java.util.concurrent.Executors; +import java.util.concurrent.ThreadPoolExecutor; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; @@ -40,8 +41,11 @@ import org.apache.commons.io.IOUtils; import org.apache.log4j.Logger; import org.jboss.netty.bootstrap.ServerBootstrap; +import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory; +import voldemort.annotations.jmx.JmxGetter; +import voldemort.annotations.jmx.JmxManaged; import voldemort.client.ClientConfig; import voldemort.client.SocketStoreClientFactory; import voldemort.client.SystemStoreRepository; @@ -52,6 +56,9 @@ import voldemort.server.VoldemortServer; import voldemort.store.StoreDefinition; import voldemort.store.metadata.MetadataStore; +import voldemort.store.stats.StoreStats; +import voldemort.store.stats.Tracked; +import voldemort.utils.JmxUtils; import voldemort.utils.SystemTime; import voldemort.utils.Utils; import voldemort.xml.StoreDefinitionsMapper; @@ -63,24 +70,33 @@ * clients and invokes the corresponding Fat client API. * */ +@JmxManaged(description = "A Coordinator Service for proxying Voldemort HTTP requests") public class CoordinatorService extends AbstractService { private CoordinatorConfig config = null; - public CoordinatorService(CoordinatorConfig config) { - super(ServiceType.COORDINATOR); - this.config = config; - } - - private static boolean noop = false; - private static SocketStoreClientFactory storeClientFactory = null; - private static AsyncMetadataVersionManager asyncMetadataManager = null; - private static SchedulerService schedulerService = null; + private boolean noop = false; + private SocketStoreClientFactory storeClientFactory = null; + private AsyncMetadataVersionManager asyncMetadataManager = null; + private SchedulerService schedulerService = null; private static final Logger logger = Logger.getLogger(CoordinatorService.class); - private static Map fatClientMap = null; + private Map fatClientMap = null; public final static Schema CLIENT_CONFIGS_AVRO_SCHEMA = Schema.parse("{ \"name\": \"clientConfigs\", \"type\":\"array\"," + "\"items\": { \"name\": \"clientConfig\", \"type\": \"map\", \"values\":\"string\" }}}"); private static final String STORE_NAME_KEY = "store_name"; + protected ThreadPoolExecutor workerPool = null; + private final CoordinatorErrorStats errorStats; + private final StoreStats coordinatorPerfStats; + private ServerBootstrap bootstrap = null; + private Channel nettyServerChannel = null; + + public CoordinatorService(CoordinatorConfig config) { + super(ServiceType.COORDINATOR); + this.config = config; + this.coordinatorPerfStats = new StoreStats(); + this.errorStats = new CoordinatorErrorStats(); + RESTErrorHandler.setErrorStatsHandler(errorStats); + } /** * Initializes all the Fat clients (1 per store) for the cluster that this @@ -114,7 +130,9 @@ private void initializeFatClients() { this.config, fatClientConfigMap.get(storeName), storesXml, - clusterXml)); + clusterXml, + this.errorStats, + this.coordinatorPerfStats)); } } @@ -145,6 +163,8 @@ public Void call() throws Exception { }; + // For now track changes in cluster.xml only + // TODO: Modify this to track stores.xml in the future asyncMetadataManager = new AsyncMetadataVersionManager(sysRepository, rebootstrapCallback, null); @@ -156,15 +176,34 @@ public Void call() throws Exception { this.config.getMetadataCheckIntervalInMs()); // Configure the server. - ServerBootstrap bootstrap = new ServerBootstrap(new NioServerSocketChannelFactory(Executors.newCachedThreadPool(), - Executors.newCachedThreadPool())); - bootstrap.setOption("backlog", 1000); + this.workerPool = (ThreadPoolExecutor) Executors.newCachedThreadPool(); + this.bootstrap = new ServerBootstrap(new NioServerSocketChannelFactory(Executors.newCachedThreadPool(), + workerPool)); + this.bootstrap.setOption("backlog", 1000); + this.bootstrap.setOption("child.tcpNoDelay", true); + this.bootstrap.setOption("child.keepAlive", true); + this.bootstrap.setOption("child.reuseAddress", true); // Set up the event pipeline factory. - bootstrap.setPipelineFactory(new CoordinatorPipelineFactory(fatClientMap, noop)); + this.bootstrap.setPipelineFactory(new CoordinatorPipelineFactory(this.fatClientMap, + this.errorStats, + noop)); + + // Register the Mbean + // Netty Queue stats + JmxUtils.registerMbean(this, + JmxUtils.createObjectName(JmxUtils.getPackageName(this.getClass()), + JmxUtils.getClassName(this.getClass()))); + + // Error stats Mbean + JmxUtils.registerMbean(this.errorStats, + JmxUtils.createObjectName(JmxUtils.getPackageName(this.errorStats.getClass()), + JmxUtils.getClassName(this.errorStats.getClass()))); // Bind and start to accept incoming connections. - bootstrap.bind(new InetSocketAddress(8080)); + this.nettyServerChannel = this.bootstrap.bind(new InetSocketAddress(this.config.getServerPort())); + + logger.info("Coordinator service started on port " + this.config.getServerPort()); } /** @@ -231,7 +270,17 @@ private static Map readClientConfig(String configFilePath, } @Override - protected void stopInner() {} + protected void stopInner() { + if(this.nettyServerChannel != null) { + this.nettyServerChannel.close(); + } + + JmxUtils.unregisterMbean(JmxUtils.createObjectName(JmxUtils.getPackageName(this.getClass()), + JmxUtils.getClassName(this.getClass()))); + + JmxUtils.unregisterMbean(JmxUtils.createObjectName(JmxUtils.getPackageName(this.errorStats.getClass()), + JmxUtils.getClassName(this.errorStats.getClass()))); + } public static void main(String[] args) throws Exception { CoordinatorConfig config = null; @@ -264,4 +313,59 @@ public void run() { } }); } + + @JmxGetter(name = "numberOfActiveThreads", description = "The number of active Netty worker threads.") + public int getNumberOfActiveThreads() { + return this.workerPool.getActiveCount(); + } + + @JmxGetter(name = "numberOfThreads", description = "The total number of Netty worker threads, active and idle.") + public int getNumberOfThreads() { + return this.workerPool.getPoolSize(); + } + + @JmxGetter(name = "queuedRequests", description = "Number of requests in the Netty worker queue waiting to execute.") + public int getQueuedRequests() { + return this.workerPool.getQueue().size(); + } + + @JmxGetter(name = "averageGetCompletionTimeInMs", description = "The avg. time in ms for GET calls to complete.") + public double getAverageGetCompletionTimeInMs() { + return this.coordinatorPerfStats.getAvgTimeInMs(Tracked.GET); + } + + @JmxGetter(name = "averagePutCompletionTimeInMs", description = "The avg. time in ms for GET calls to complete.") + public double getAveragePutCompletionTimeInMs() { + return this.coordinatorPerfStats.getAvgTimeInMs(Tracked.PUT); + } + + @JmxGetter(name = "averageGetAllCompletionTimeInMs", description = "The avg. time in ms for GET calls to complete.") + public double getAverageGetAllCompletionTimeInMs() { + return this.coordinatorPerfStats.getAvgTimeInMs(Tracked.GET_ALL); + } + + @JmxGetter(name = "averageDeleteCompletionTimeInMs", description = "The avg. time in ms for GET calls to complete.") + public double getAverageDeleteCompletionTimeInMs() { + return this.coordinatorPerfStats.getAvgTimeInMs(Tracked.DELETE); + } + + @JmxGetter(name = "q99GetLatencyInMs", description = "") + public long getQ99GetLatency() { + return this.coordinatorPerfStats.getQ99LatencyInMs(Tracked.GET); + } + + @JmxGetter(name = "q99PutLatencyInMs", description = "") + public long getQ99PutLatency() { + return this.coordinatorPerfStats.getQ99LatencyInMs(Tracked.PUT); + } + + @JmxGetter(name = "q99GetAllLatencyInMs", description = "") + public long getQ99GetAllLatency() { + return this.coordinatorPerfStats.getQ99LatencyInMs(Tracked.GET_ALL); + } + + @JmxGetter(name = "q99DeleteLatencyInMs", description = "") + public long getQ99DeleteLatency() { + return this.coordinatorPerfStats.getQ99LatencyInMs(Tracked.DELETE); + } } diff --git a/src/java/voldemort/coordinator/DynamicTimeoutStoreClient.java b/src/java/voldemort/coordinator/DynamicTimeoutStoreClient.java index 17f3d71af5..7409c08fcd 100644 --- a/src/java/voldemort/coordinator/DynamicTimeoutStoreClient.java +++ b/src/java/voldemort/coordinator/DynamicTimeoutStoreClient.java @@ -1,5 +1,5 @@ /* - * Copyright 2008-2013 LinkedIn, Inc + * Copyright 2013 LinkedIn, Inc * * Licensed under the Apache License, Version 2.0 (the "License"); you may not * use this file except in compliance with the License. You may obtain a copy of @@ -29,6 +29,7 @@ import voldemort.store.CompositeVersionedPutVoldemortRequest; import voldemort.store.CompositeVoldemortRequest; import voldemort.store.InvalidMetadataException; +import voldemort.store.Store; import voldemort.store.StoreTimeoutException; import voldemort.versioning.ObsoleteVersionException; import voldemort.versioning.VectorClock; @@ -68,6 +69,17 @@ public DynamicTimeoutStoreClient(String storeName, bootStrap(clusterXml, storesXml); } + /** + * Dummy constructor for Unit test purposes + * + * @param customStore A custom store object to use for performing the + * operations + */ + public DynamicTimeoutStoreClient(Store customStore) { + this.store = customStore; + this.metadataRefreshAttempts = 1; + } + // Bootstrap using the given cluster xml and stores xml // The super class bootStrap() method is used to handle the // InvalidMetadataException @@ -76,6 +88,13 @@ public void bootStrap(String customClusterXml, String customStoresXml) { this.store = factory.getRawStore(storeName, null, customStoresXml, customClusterXml, null); } + /** + * Performs a get operation with the specified composite request object + * + * @param requestWrapper A composite request object containing the key (and + * / or default value) and timeout. + * @return The Versioned value corresponding to the key + */ public Versioned getWithCustomTimeout(CompositeVoldemortRequest requestWrapper) { validateTimeout(requestWrapper.getRoutingTimeoutInMs()); for(int attempts = 0; attempts < this.metadataRefreshAttempts; attempts++) { @@ -92,14 +111,21 @@ public Versioned getWithCustomTimeout(CompositeVoldemortRequest request + " metadata refresh attempts failed."); } + /** + * Performs a put operation with the specified composite request object + * + * @param requestWrapper A composite request object containing the key and + * value + * @return Version of the value for the successful put + */ public Version putWithCustomTimeout(CompositeVoldemortRequest requestWrapper) { validateTimeout(requestWrapper.getRoutingTimeoutInMs()); Versioned versioned; long startTime = System.currentTimeMillis(); // We use the full timeout for doing the Get. In this, we're being - // optimistic that the subsequent put might be faster all the steps - // might finish within the alloted time + // optimistic that the subsequent put might be faster such that all the + // steps might finish within the alloted time versioned = getWithCustomTimeout(requestWrapper); long endTime = System.currentTimeMillis(); @@ -119,6 +145,15 @@ public Version putWithCustomTimeout(CompositeVoldemortRequest requestWrapp (requestWrapper.getRoutingTimeoutInMs() - (endTime - startTime)))); } + /** + * Performs a Versioned put operation with the specified composite request + * object + * + * @param requestWrapper Composite request object containing the key and the + * versioned object + * @return Version of the value for the successful put + * @throws ObsoleteVersionException + */ public Version putVersionedWithCustomTimeout(CompositeVoldemortRequest requestWrapper) throws ObsoleteVersionException { validateTimeout(requestWrapper.getRoutingTimeoutInMs()); @@ -136,6 +171,14 @@ public Version putVersionedWithCustomTimeout(CompositeVoldemortRequest req + " metadata refresh attempts failed."); } + /** + * Performs a get all operation with the specified composite request object + * + * @param requestWrapper Composite request object containing a reference to + * the Iterable keys + * + * @return Map of the keys to the corresponding versioned values + */ public Map> getAllWithCustomTimeout(CompositeVoldemortRequest requestWrapper) { validateTimeout(requestWrapper.getRoutingTimeoutInMs()); Map>> items = null; @@ -161,6 +204,13 @@ public Map> getAllWithCustomTimeout(CompositeVoldemortRequest deleteRequestObject) { validateTimeout(deleteRequestObject.getRoutingTimeoutInMs()); if(deleteRequestObject.getVersion() == null) { @@ -194,7 +244,11 @@ public boolean deleteWithCustomTimeout(CompositeVoldemortRequest deleteReq return store.delete(deleteRequestObject); } - // Make sure that the timeout specified is valid + /** + * Function to check that the timeout specified is valid + * + * @param opTimeoutInMs The specified timeout in milliseconds + */ private void validateTimeout(long opTimeoutInMs) { if(opTimeoutInMs <= 0) { throw new IllegalArgumentException("Illegal parameter: Timeout is too low: " diff --git a/src/java/voldemort/coordinator/FatClientWrapper.java b/src/java/voldemort/coordinator/FatClientWrapper.java index 81f0e08b31..7d299a3951 100644 --- a/src/java/voldemort/coordinator/FatClientWrapper.java +++ b/src/java/voldemort/coordinator/FatClientWrapper.java @@ -1,5 +1,5 @@ /* - * Copyright 2008-2013 LinkedIn, Inc + * Copyright 2013 LinkedIn, Inc * * Licensed under the Apache License, Version 2.0 (the "License"); you may not * use this file except in compliance with the License. You may obtain a copy of @@ -16,10 +16,9 @@ package voldemort.coordinator; -import java.util.concurrent.ExecutorService; +import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionHandler; -import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -27,10 +26,14 @@ import org.apache.log4j.Logger; import org.jboss.netty.channel.MessageEvent; +import voldemort.annotations.jmx.JmxGetter; +import voldemort.annotations.jmx.JmxManaged; import voldemort.client.ClientConfig; import voldemort.client.SocketStoreClientFactory; import voldemort.store.CompositeVoldemortRequest; +import voldemort.store.stats.StoreStats; import voldemort.utils.ByteArray; +import voldemort.utils.JmxUtils; /** * A Wrapper class to provide asynchronous API for calling the fat client @@ -38,13 +41,17 @@ * of invoking the Fat Client methods on its own * */ +@JmxManaged(description = "A Wrapper for a Fat client in order to execute requests asynchronously") public class FatClientWrapper { - private ExecutorService fatClientExecutor; + private ThreadPoolExecutor fatClientExecutor; private SocketStoreClientFactory storeClientFactory; private DynamicTimeoutStoreClient dynamicTimeoutClient; private final CoordinatorConfig config; private final Logger logger = Logger.getLogger(FatClientWrapper.class); + private final String storeName; + private final CoordinatorErrorStats errorStats; + private final StoreStats coordinatorPerfStats; /** * @@ -53,12 +60,16 @@ public class FatClientWrapper { * @param clientConfig The config used to bootstrap the fat client * @param storesXml Stores XML used to bootstrap the fat client * @param clusterXml Cluster XML used to bootstrap the fat client + * @param errorStats + * @param coordinatorPerfStats */ public FatClientWrapper(String storeName, CoordinatorConfig config, ClientConfig clientConfig, String storesXml, - String clusterXml) { + String clusterXml, + CoordinatorErrorStats errorStats, + StoreStats coordinatorPerfStats) { this.config = config; @@ -68,10 +79,8 @@ public FatClientWrapper(String storeName, this.config.getFatClientWrapperKeepAliveInSecs(), // Keepalive TimeUnit.SECONDS, // Keepalive // Timeunit - new SynchronousQueue(), // Queue - // for - // pending - // tasks + new ArrayBlockingQueue(this.config.getFatClientWrapperMaxPoolSize(), + true), new ThreadFactory() { @@ -95,7 +104,6 @@ public void rejectedExecution(Runnable r, } }); - // this.fatClientRequestQueue = new SynchronousQueue(); this.storeClientFactory = new SocketStoreClientFactory(clientConfig); this.dynamicTimeoutClient = new DynamicTimeoutStoreClient(storeName, @@ -103,7 +111,24 @@ public void rejectedExecution(Runnable r, 1, storesXml, clusterXml); + this.errorStats = errorStats; + this.coordinatorPerfStats = coordinatorPerfStats; + this.storeName = storeName; + // Register the Mbean + JmxUtils.registerMbean(this, + JmxUtils.createObjectName(JmxUtils.getPackageName(this.getClass()), + JmxUtils.getClassName(this.getClass()) + + "-" + storeName)); + + } + + public void close() { + // Register the Mbean + JmxUtils.unregisterMbean(JmxUtils.createObjectName(JmxUtils.getPackageName(this.getClass()), + JmxUtils.getClassName(this.getClass()) + + "-" + this.storeName)); + this.storeClientFactory.close(); } /** @@ -111,22 +136,25 @@ public void rejectedExecution(Runnable r, * * @param getRequestObject Contains the key used in the get operation * @param getRequestMessageEvent MessageEvent to write the response back to + * @param startTimestampInNs The start timestamp used to measure turnaround + * time */ void submitGetRequest(final CompositeVoldemortRequest getRequestObject, - final MessageEvent getRequestMessageEvent) { + final MessageEvent getRequestMessageEvent, + long startTimestampInNs) { try { this.fatClientExecutor.submit(new HttpGetRequestExecutor(getRequestObject, getRequestMessageEvent, - this.dynamicTimeoutClient)); + this.dynamicTimeoutClient, + startTimestampInNs, + this.coordinatorPerfStats)); if(logger.isDebugEnabled()) { logger.debug("Submitted a get request"); } - // Keep track of this request for monitoring - // this.fatClientRequestQueue.add(f); } catch(RejectedExecutionException rej) { - handleRejectedException(getRequestMessageEvent); + handleRejectedException(rej, getRequestMessageEvent); } } @@ -136,50 +164,56 @@ void submitGetRequest(final CompositeVoldemortRequest getRequ * @param getAllRequestObject Contains the keys used in the getAll oepration * @param getAllRequestMessageEvent MessageEvent to write the response back * to + * @param storeName Name of the store to be specified in the response + * (header) + * @param startTimestampInNs The start timestamp used to measure turnaround + * time */ void submitGetAllRequest(final CompositeVoldemortRequest getAllRequestObject, final MessageEvent getAllRequestMessageEvent, - final String storeName) { + final String storeName, + long startTimestampInNs) { try { this.fatClientExecutor.submit(new HttpGetAllRequestExecutor(getAllRequestObject, getAllRequestMessageEvent, this.dynamicTimeoutClient, - storeName)); + storeName, + startTimestampInNs, + this.coordinatorPerfStats)); if(logger.isDebugEnabled()) { logger.debug("Submitted a get all request"); } - // Keep track of this request for monitoring - // this.fatClientRequestQueue.add(f); } catch(RejectedExecutionException rej) { - handleRejectedException(getAllRequestMessageEvent); + handleRejectedException(rej, getAllRequestMessageEvent); } } /** * Interface to perform put operation on the Fat client * - * @param key: ByteArray representation of the key to put - * @param value: value corresponding to the key to put - * @param putRequest: MessageEvent to write the response on. - * @param operationTimeoutInMs The timeout value for this operation + * @param putRequestObject Request object containing the key and value + * @param putRequestMessageEvent MessageEvent to write the response on. + * @param startTimestampInNs The start timestamp used to measure turnaround + * time */ void submitPutRequest(final CompositeVoldemortRequest putRequestObject, - final MessageEvent putRequest) { + final MessageEvent putRequestMessageEvent, + long startTimestampInNs) { try { this.fatClientExecutor.submit(new HttpPutRequestExecutor(putRequestObject, - putRequest, - this.dynamicTimeoutClient)); + putRequestMessageEvent, + this.dynamicTimeoutClient, + startTimestampInNs, + this.coordinatorPerfStats)); if(logger.isDebugEnabled()) { logger.debug("Submitted a put request"); } - // Keep track of this request for monitoring - // this.fatClientRequestQueue.add(f); } catch(RejectedExecutionException rej) { - handleRejectedException(putRequest); + handleRejectedException(rej, putRequestMessageEvent); } } @@ -189,37 +223,46 @@ void submitPutRequest(final CompositeVoldemortRequest putRequ * @param deleteRequestObject Contains the key and the version used in the * delete operation * @param deleteRequestEvent MessageEvent to write the response back to + * @param startTimestampInNs The start timestamp used to measure turnaround + * time */ public void submitDeleteRequest(CompositeVoldemortRequest deleteRequestObject, - MessageEvent deleteRequestEvent) { + MessageEvent deleteRequestEvent, + long startTimestampInNs) { try { this.fatClientExecutor.submit(new HttpDeleteRequestExecutor(deleteRequestObject, deleteRequestEvent, - this.dynamicTimeoutClient)); + this.dynamicTimeoutClient, + startTimestampInNs, + this.coordinatorPerfStats)); - // Keep track of this request for monitoring - // this.fatClientRequestQueue.add(f); } catch(RejectedExecutionException rej) { - handleRejectedException(deleteRequestEvent); + handleRejectedException(rej, deleteRequestEvent); } } // TODO: Add a custom HTTP Error status 429: Too many requests - private void handleRejectedException(MessageEvent getRequest) { + private void handleRejectedException(RejectedExecutionException rej, MessageEvent getRequest) { + this.errorStats.reportException(rej); logger.error("rejected !!!"); getRequest.getChannel().write(null); // Write error back to the thin // client - // String errorDescription = - // "Request queue for store " + - // this.dynamicTimeoutClient.getStoreName() - // + " is full !"); - // logger.error(errorDescription); - // RESTErrorHandler.handleError(REQUEST_TIMEOUT, - // this.getRequestMessageEvent, - // false, - // errorDescription); } + @JmxGetter(name = "numberOfActiveThreads", description = "The number of active Fat client wrapper threads.") + public int getNumberOfActiveThreads() { + return this.fatClientExecutor.getActiveCount(); + } + + @JmxGetter(name = "numberOfThreads", description = "The total number of Fat client wrapper threads, active and idle.") + public int getNumberOfThreads() { + return this.fatClientExecutor.getPoolSize(); + } + + @JmxGetter(name = "queuedRequests", description = "Number of requests in the Fat client wrapper queue waiting to execute.") + public int getQueuedRequests() { + return this.fatClientExecutor.getQueue().size(); + } } diff --git a/src/java/voldemort/coordinator/HttpDeleteRequestExecutor.java b/src/java/voldemort/coordinator/HttpDeleteRequestExecutor.java index 9009c19d16..c1b2e79aab 100644 --- a/src/java/voldemort/coordinator/HttpDeleteRequestExecutor.java +++ b/src/java/voldemort/coordinator/HttpDeleteRequestExecutor.java @@ -1,5 +1,5 @@ /* - * Copyright 2008-2013 LinkedIn, Inc + * Copyright 2013 LinkedIn, Inc * * Licensed under the Apache License, Version 2.0 (the "License"); you may not * use this file except in compliance with the License. You may obtain a copy of @@ -17,17 +17,13 @@ package voldemort.coordinator; import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.CONTENT_LENGTH; -import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TRANSFER_ENCODING; -import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE; import static org.jboss.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR; import static org.jboss.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND; -import static org.jboss.netty.handler.codec.http.HttpResponseStatus.OK; +import static org.jboss.netty.handler.codec.http.HttpResponseStatus.NO_CONTENT; import static org.jboss.netty.handler.codec.http.HttpResponseStatus.REQUEST_TIMEOUT; import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1; import org.apache.log4j.Logger; -import org.jboss.netty.channel.ChannelFuture; -import org.jboss.netty.channel.ChannelFutureListener; import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.handler.codec.http.DefaultHttpResponse; import org.jboss.netty.handler.codec.http.HttpResponse; @@ -35,6 +31,8 @@ import voldemort.VoldemortException; import voldemort.store.CompositeVoldemortRequest; import voldemort.store.StoreTimeoutException; +import voldemort.store.stats.StoreStats; +import voldemort.store.stats.Tracked; import voldemort.utils.ByteArray; /** @@ -49,6 +47,8 @@ public class HttpDeleteRequestExecutor implements Runnable { DynamicTimeoutStoreClient storeClient; private final Logger logger = Logger.getLogger(HttpDeleteRequestExecutor.class); private final CompositeVoldemortRequest deleteRequestObject; + private final long startTimestampInNs; + private final StoreStats coordinatorPerfStats; /** * @@ -58,31 +58,37 @@ public class HttpDeleteRequestExecutor implements Runnable { * error * @param storeClient Reference to the fat client for performing this Delete * operation + * @param coordinatorPerfStats Stats object used to measure the turnaround + * time + * @param startTimestampInNs start timestamp of the request */ public HttpDeleteRequestExecutor(CompositeVoldemortRequest deleteRequestObject, MessageEvent requestEvent, - DynamicTimeoutStoreClient storeClient) { + DynamicTimeoutStoreClient storeClient, + long startTimestampInNs, + StoreStats coordinatorPerfStats) { this.deleteRequestMessageEvent = requestEvent; this.storeClient = storeClient; this.deleteRequestObject = deleteRequestObject; + this.startTimestampInNs = startTimestampInNs; + this.coordinatorPerfStats = coordinatorPerfStats; } public void writeResponse() { // 1. Create the Response object - HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK); + HttpResponse response = new DefaultHttpResponse(HTTP_1_1, NO_CONTENT); // 2. Set the right headers - response.setHeader(CONTENT_TYPE, "binary"); - response.setHeader(CONTENT_TRANSFER_ENCODING, "binary"); response.setHeader(CONTENT_LENGTH, "0"); - // Write the response to the Netty Channel - ChannelFuture future = this.deleteRequestMessageEvent.getChannel().write(response); - - // Close the non-keep-alive connection after the write operation is - // done. - future.addListener(ChannelFutureListener.CLOSE); + // Update the stats + if(this.coordinatorPerfStats != null) { + long durationInNs = System.nanoTime() - startTimestampInNs; + this.coordinatorPerfStats.recordTime(Tracked.DELETE, durationInNs); + } + // Write the response to the Netty Channel + this.deleteRequestMessageEvent.getChannel().write(response); } @Override @@ -94,7 +100,6 @@ public void run() { } else { RESTErrorHandler.handleError(NOT_FOUND, this.deleteRequestMessageEvent, - false, "Requested Key with the specified version does not exist"); } @@ -103,14 +108,12 @@ public void run() { logger.error(errorDescription); RESTErrorHandler.handleError(REQUEST_TIMEOUT, this.deleteRequestMessageEvent, - false, errorDescription); } catch(VoldemortException ve) { ve.printStackTrace(); String errorDescription = "Voldemort Exception: " + ve.getMessage(); RESTErrorHandler.handleError(INTERNAL_SERVER_ERROR, this.deleteRequestMessageEvent, - false, errorDescription); } } diff --git a/src/java/voldemort/coordinator/HttpGetAllRequestExecutor.java b/src/java/voldemort/coordinator/HttpGetAllRequestExecutor.java index 7ef491e677..c8a774b989 100644 --- a/src/java/voldemort/coordinator/HttpGetAllRequestExecutor.java +++ b/src/java/voldemort/coordinator/HttpGetAllRequestExecutor.java @@ -1,5 +1,5 @@ /* - * Copyright 2008-2013 LinkedIn, Inc + * Copyright 2013 LinkedIn, Inc * * Licensed under the Apache License, Version 2.0 (the "License"); you may not * use this file except in compliance with the License. You may obtain a copy of @@ -44,8 +44,6 @@ import org.codehaus.jackson.map.ObjectMapper; import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.ChannelBuffers; -import org.jboss.netty.channel.ChannelFuture; -import org.jboss.netty.channel.ChannelFutureListener; import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.handler.codec.http.DefaultHttpResponse; import org.jboss.netty.handler.codec.http.HttpResponse; @@ -53,6 +51,8 @@ import voldemort.VoldemortException; import voldemort.store.CompositeVoldemortRequest; import voldemort.store.StoreTimeoutException; +import voldemort.store.stats.StoreStats; +import voldemort.store.stats.Tracked; import voldemort.utils.ByteArray; import voldemort.versioning.VectorClock; import voldemort.versioning.Versioned; @@ -70,6 +70,8 @@ public class HttpGetAllRequestExecutor implements Runnable { private final Logger logger = Logger.getLogger(HttpGetRequestExecutor.class); private final CompositeVoldemortRequest getAllRequestObject; private final String storeName; + private final long startTimestampInNs; + private final StoreStats coordinatorPerfStats; /** * @@ -79,15 +81,24 @@ public class HttpGetAllRequestExecutor implements Runnable { * error * @param storeClient Reference to the fat client for performing this Get * operation + * @param storeName Name of the store intended to be included in the + * response (content-location) + * @param coordinatorPerfStats Stats object used to measure the turnaround + * time + * @param startTimestampInNs start timestamp of the request */ public HttpGetAllRequestExecutor(CompositeVoldemortRequest getAllRequestObject, MessageEvent requestMessageEvent, DynamicTimeoutStoreClient storeClient, - String storeName) { + String storeName, + long startTimestampInNs, + StoreStats coordinatorPerfStats) { this.getRequestMessageEvent = requestMessageEvent; this.storeClient = storeClient; this.getAllRequestObject = getAllRequestObject; this.storeName = storeName; + this.startTimestampInNs = startTimestampInNs; + this.coordinatorPerfStats = coordinatorPerfStats; } public void writeResponse(Map> responseVersioned) { @@ -160,13 +171,14 @@ public void writeResponse(Map> responseVersioned) { response.setContent(responseContent); response.setHeader(CONTENT_LENGTH, response.getContent().readableBytes()); - // Write the response to the Netty Channel - ChannelFuture future = this.getRequestMessageEvent.getChannel().write(response); - - // Close the non-keep-alive connection after the write operation is - // done. - future.addListener(ChannelFutureListener.CLOSE); + // Update the stats + if(this.coordinatorPerfStats != null) { + long durationInNs = System.nanoTime() - startTimestampInNs; + this.coordinatorPerfStats.recordTime(Tracked.GET_ALL, durationInNs); + } + // Write the response to the Netty Channel + this.getRequestMessageEvent.getChannel().write(response); } @Override @@ -176,7 +188,6 @@ public void run() { if(responseVersioned == null) { RESTErrorHandler.handleError(NOT_FOUND, this.getRequestMessageEvent, - false, "Requested Key does not exist"); } writeResponse(responseVersioned); @@ -184,22 +195,17 @@ public void run() { String errorDescription = "GETALL Failed !!! Illegal Arguments : " + illegalArgsException.getMessage(); logger.error(errorDescription); - RESTErrorHandler.handleError(BAD_REQUEST, - this.getRequestMessageEvent, - false, - errorDescription); + RESTErrorHandler.handleError(BAD_REQUEST, this.getRequestMessageEvent, errorDescription); } catch(StoreTimeoutException timeoutException) { String errorDescription = "GET Request timed out: " + timeoutException.getMessage(); logger.error(errorDescription); RESTErrorHandler.handleError(REQUEST_TIMEOUT, this.getRequestMessageEvent, - false, errorDescription); } catch(VoldemortException ve) { String errorDescription = "Voldemort Exception: " + ve.getMessage(); RESTErrorHandler.handleError(INTERNAL_SERVER_ERROR, this.getRequestMessageEvent, - false, errorDescription); } } diff --git a/src/java/voldemort/coordinator/HttpGetRequestExecutor.java b/src/java/voldemort/coordinator/HttpGetRequestExecutor.java index 61c4a543c8..c87490977d 100644 --- a/src/java/voldemort/coordinator/HttpGetRequestExecutor.java +++ b/src/java/voldemort/coordinator/HttpGetRequestExecutor.java @@ -1,5 +1,5 @@ /* - * Copyright 2008-2013 LinkedIn, Inc + * Copyright 2013 LinkedIn, Inc * * Licensed under the Apache License, Version 2.0 (the "License"); you may not * use this file except in compliance with the License. You may obtain a copy of @@ -42,6 +42,8 @@ import voldemort.VoldemortException; import voldemort.store.CompositeVoldemortRequest; import voldemort.store.StoreTimeoutException; +import voldemort.store.stats.StoreStats; +import voldemort.store.stats.Tracked; import voldemort.utils.ByteArray; import voldemort.versioning.VectorClock; import voldemort.versioning.Versioned; @@ -59,6 +61,20 @@ public class HttpGetRequestExecutor implements Runnable { DynamicTimeoutStoreClient storeClient; private final Logger logger = Logger.getLogger(HttpGetRequestExecutor.class); private final CompositeVoldemortRequest getRequestObject; + private final long startTimestampInNs; + private final StoreStats coordinatorPerfStats; + + /** + * Dummy constructor invoked during a Noop Get operation + * + * @param requestEvent MessageEvent used to write the response + */ + public HttpGetRequestExecutor(MessageEvent requestEvent) { + this.getRequestMessageEvent = requestEvent; + this.getRequestObject = null; + this.startTimestampInNs = 0; + this.coordinatorPerfStats = null; + } /** * @@ -68,13 +84,20 @@ public class HttpGetRequestExecutor implements Runnable { * error * @param storeClient Reference to the fat client for performing this Get * operation + * @param coordinatorPerfStats Stats object used to measure the turnaround + * time + * @param startTimestampInNs start timestamp of the request */ public HttpGetRequestExecutor(CompositeVoldemortRequest getRequestObject, MessageEvent requestEvent, - DynamicTimeoutStoreClient storeClient) { + DynamicTimeoutStoreClient storeClient, + long startTimestampInNs, + StoreStats coordinatorPerfStats) { this.getRequestMessageEvent = requestEvent; this.storeClient = storeClient; this.getRequestObject = getRequestObject; + this.startTimestampInNs = startTimestampInNs; + this.coordinatorPerfStats = coordinatorPerfStats; } public void writeResponse(Versioned responseVersioned) { @@ -120,6 +143,12 @@ public void writeResponse(Versioned responseVersioned) { logger.debug("Response = " + response); } + // Update the stats + if(this.coordinatorPerfStats != null) { + long durationInNs = System.nanoTime() - startTimestampInNs; + this.coordinatorPerfStats.recordTime(Tracked.GET, durationInNs); + } + // Write the response to the Netty Channel this.getRequestMessageEvent.getChannel().write(response); } @@ -134,7 +163,6 @@ public void run() { } else { RESTErrorHandler.handleError(NOT_FOUND, this.getRequestMessageEvent, - false, "Requested Key does not exist"); } if(logger.isDebugEnabled()) { @@ -146,22 +174,17 @@ public void run() { String errorDescription = "PUT Failed !!! Illegal Arguments : " + illegalArgsException.getMessage(); logger.error(errorDescription); - RESTErrorHandler.handleError(BAD_REQUEST, - this.getRequestMessageEvent, - false, - errorDescription); + RESTErrorHandler.handleError(BAD_REQUEST, this.getRequestMessageEvent, errorDescription); } catch(StoreTimeoutException timeoutException) { String errorDescription = "GET Request timed out: " + timeoutException.getMessage(); logger.error(errorDescription); RESTErrorHandler.handleError(REQUEST_TIMEOUT, this.getRequestMessageEvent, - false, errorDescription); } catch(VoldemortException ve) { String errorDescription = "Voldemort Exception: " + ve.getMessage(); RESTErrorHandler.handleError(INTERNAL_SERVER_ERROR, this.getRequestMessageEvent, - false, errorDescription); } } diff --git a/src/java/voldemort/coordinator/HttpPutRequestExecutor.java b/src/java/voldemort/coordinator/HttpPutRequestExecutor.java index ebbc7acc0d..e3d69ff8d7 100644 --- a/src/java/voldemort/coordinator/HttpPutRequestExecutor.java +++ b/src/java/voldemort/coordinator/HttpPutRequestExecutor.java @@ -1,5 +1,5 @@ /* - * Copyright 2008-2013 LinkedIn, Inc + * Copyright 2013 LinkedIn, Inc * * Licensed under the Apache License, Version 2.0 (the "License"); you may not * use this file except in compliance with the License. You may obtain a copy of @@ -17,10 +17,9 @@ package voldemort.coordinator; import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.CONTENT_LENGTH; -import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE; import static org.jboss.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST; +import static org.jboss.netty.handler.codec.http.HttpResponseStatus.CREATED; import static org.jboss.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR; -import static org.jboss.netty.handler.codec.http.HttpResponseStatus.OK; import static org.jboss.netty.handler.codec.http.HttpResponseStatus.PRECONDITION_FAILED; import static org.jboss.netty.handler.codec.http.HttpResponseStatus.REQUEST_TIMEOUT; import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1; @@ -33,6 +32,8 @@ import voldemort.VoldemortException; import voldemort.store.CompositeVoldemortRequest; import voldemort.store.StoreTimeoutException; +import voldemort.store.stats.StoreStats; +import voldemort.store.stats.Tracked; import voldemort.utils.ByteArray; import voldemort.versioning.ObsoleteVersionException; @@ -48,10 +49,19 @@ public class HttpPutRequestExecutor implements Runnable { DynamicTimeoutStoreClient storeClient; private final Logger logger = Logger.getLogger(HttpPutRequestExecutor.class); private final CompositeVoldemortRequest putRequestObject; + private final long startTimestampInNs; + private final StoreStats coordinatorPerfStats; + /** + * Dummy constructor invoked during a Noop Put operation + * + * @param requestEvent MessageEvent used to write the response + */ public HttpPutRequestExecutor(MessageEvent requestEvent) { this.putRequestMessageEvent = requestEvent; this.putRequestObject = null; + this.startTimestampInNs = 0; + this.coordinatorPerfStats = null; } /** @@ -62,25 +72,37 @@ public HttpPutRequestExecutor(MessageEvent requestEvent) { * error * @param storeClient Reference to the fat client for performing this Get * operation + * @param coordinatorPerfStats Stats object used to measure the turnaround + * time + * @param startTimestampInNs start timestamp of the request */ public HttpPutRequestExecutor(CompositeVoldemortRequest putRequestObject, MessageEvent requestEvent, - DynamicTimeoutStoreClient storeClient) { + DynamicTimeoutStoreClient storeClient, + long startTimestampInNs, + StoreStats coordinatorPerfStats) { this.putRequestMessageEvent = requestEvent; this.storeClient = storeClient; this.putRequestObject = putRequestObject; + this.startTimestampInNs = startTimestampInNs; + this.coordinatorPerfStats = coordinatorPerfStats; } public void writeResponse() { // 1. Create the Response object - HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK); + HttpResponse response = new DefaultHttpResponse(HTTP_1_1, CREATED); // 2. Set the right headers - response.setHeader(CONTENT_TYPE, "application/json"); - - // 3. Copy the data into the payload response.setHeader(CONTENT_LENGTH, 0); + // TODO: return the Version back to the client + + // Update the stats + if(this.coordinatorPerfStats != null) { + long durationInNs = System.nanoTime() - startTimestampInNs; + this.coordinatorPerfStats.recordTime(Tracked.PUT, durationInNs); + } + // Write the response to the Netty Channel this.putRequestMessageEvent.getChannel().write(response); } @@ -99,16 +121,12 @@ public void run() { String errorDescription = "PUT Failed !!! Illegal Arguments : " + illegalArgsException.getMessage(); logger.error(errorDescription); - RESTErrorHandler.handleError(BAD_REQUEST, - this.putRequestMessageEvent, - false, - errorDescription); + RESTErrorHandler.handleError(BAD_REQUEST, this.putRequestMessageEvent, errorDescription); } catch(ObsoleteVersionException oe) { String errorDescription = "PUT Failed !!! Obsolete version exception: " + oe.getMessage(); RESTErrorHandler.handleError(PRECONDITION_FAILED, this.putRequestMessageEvent, - false, errorDescription); } catch(StoreTimeoutException timeoutException) { @@ -116,14 +134,12 @@ public void run() { logger.error(errorDescription); RESTErrorHandler.handleError(REQUEST_TIMEOUT, this.putRequestMessageEvent, - false, errorDescription); } catch(VoldemortException ve) { String errorDescription = "Voldemort Exception: " + ve.getMessage(); RESTErrorHandler.handleError(INTERNAL_SERVER_ERROR, this.putRequestMessageEvent, - false, errorDescription); } } diff --git a/src/java/voldemort/coordinator/NoopHttpRequestHandler.java b/src/java/voldemort/coordinator/NoopHttpRequestHandler.java index 7b0574453a..0c0ceb56aa 100644 --- a/src/java/voldemort/coordinator/NoopHttpRequestHandler.java +++ b/src/java/voldemort/coordinator/NoopHttpRequestHandler.java @@ -1,5 +1,5 @@ /* - * Copyright 2008-2013 LinkedIn, Inc + * Copyright 2013 LinkedIn, Inc * * Licensed under the Apache License, Version 2.0 (the "License"); you may not * use this file except in compliance with the License. You may obtain a copy of @@ -22,8 +22,6 @@ import org.jboss.netty.handler.codec.http.HttpRequest; import voldemort.common.VoldemortOpCode; -import voldemort.store.CompositeGetVoldemortRequest; -import voldemort.utils.ByteArray; import voldemort.versioning.Versioned; /** @@ -43,16 +41,11 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Ex switch(operationType) { case VoldemortOpCode.GET_OP_CODE: - HttpGetRequestExecutor getExecutor = new HttpGetRequestExecutor(new CompositeGetVoldemortRequest(null, - 0l, - false), - e, - null); + HttpGetRequestExecutor getExecutor = new HttpGetRequestExecutor(e); Versioned responseVersioned = null; - byte[] nullByteArray = new byte[1]; - nullByteArray[0] = 0; - responseVersioned = new Versioned(nullByteArray); + byte[] sampleByteArray = "a".getBytes(); + responseVersioned = new Versioned(sampleByteArray); getExecutor.writeResponse(responseVersioned); break; case VoldemortOpCode.PUT_OP_CODE: diff --git a/src/java/voldemort/coordinator/RESTErrorHandler.java b/src/java/voldemort/coordinator/RESTErrorHandler.java index f83e923248..d57b835aa7 100644 --- a/src/java/voldemort/coordinator/RESTErrorHandler.java +++ b/src/java/voldemort/coordinator/RESTErrorHandler.java @@ -1,5 +1,5 @@ /* - * Copyright 2008-2013 LinkedIn, Inc + * Copyright 2013 LinkedIn, Inc * * Licensed under the Apache License, Version 2.0 (the "License"); you may not * use this file except in compliance with the License. You may obtain a copy of @@ -20,14 +20,14 @@ import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1; import org.jboss.netty.buffer.ChannelBuffers; -import org.jboss.netty.channel.ChannelFuture; -import org.jboss.netty.channel.ChannelFutureListener; import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.handler.codec.http.DefaultHttpResponse; import org.jboss.netty.handler.codec.http.HttpResponse; import org.jboss.netty.handler.codec.http.HttpResponseStatus; import org.jboss.netty.util.CharsetUtil; +import voldemort.VoldemortException; + /** * A Generic class used to propagate the error back to the client over the Netty * channel @@ -35,10 +35,15 @@ */ public class RESTErrorHandler { - public static void handleError(HttpResponseStatus status, - MessageEvent e, - boolean keepAlive, - String message) { + static CoordinatorErrorStats errorStats; + + public static void setErrorStatsHandler(CoordinatorErrorStats errorStatsObj) { + errorStats = errorStatsObj; + } + + public static void handleError(HttpResponseStatus status, MessageEvent e, String message) { + errorStats.reportException(new VoldemortException()); + // 1. Create the Response object HttpResponse response = new DefaultHttpResponse(HTTP_1_1, status); @@ -47,12 +52,6 @@ public static void handleError(HttpResponseStatus status, + message + "\r\n", CharsetUtil.UTF_8)); // Write the response to the Netty Channel - ChannelFuture future = e.getChannel().write(response); - - // Close the non-keep-alive connection after the write operation is - // done. - if(!keepAlive) { - future.addListener(ChannelFutureListener.CLOSE); - } + e.getChannel().write(response); } } diff --git a/src/java/voldemort/coordinator/VoldemortHttpRequestHandler.java b/src/java/voldemort/coordinator/VoldemortHttpRequestHandler.java index 0af1a280c0..fe10519a8c 100644 --- a/src/java/voldemort/coordinator/VoldemortHttpRequestHandler.java +++ b/src/java/voldemort/coordinator/VoldemortHttpRequestHandler.java @@ -1,5 +1,5 @@ /* - * Copyright 2008-2013 LinkedIn, Inc + * Copyright 2013 LinkedIn, Inc * * Licensed under the Apache License, Version 2.0 (the "License"); you may not * use this file except in compliance with the License. You may obtain a copy of @@ -16,7 +16,6 @@ package voldemort.coordinator; -import static org.jboss.netty.handler.codec.http.HttpHeaders.isKeepAlive; import static org.jboss.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST; import java.io.IOException; @@ -35,10 +34,8 @@ import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.channel.SimpleChannelUpstreamHandler; import org.jboss.netty.handler.codec.http.HttpChunk; -import org.jboss.netty.handler.codec.http.HttpChunkTrailer; import org.jboss.netty.handler.codec.http.HttpMethod; import org.jboss.netty.handler.codec.http.HttpRequest; -import org.jboss.netty.util.CharsetUtil; import voldemort.common.VoldemortOpCode; import voldemort.store.CompositeDeleteVoldemortRequest; @@ -60,8 +57,6 @@ public class VoldemortHttpRequestHandler extends SimpleChannelUpstreamHandler { public HttpRequest request; private boolean readingChunks; - /** Buffer that stores the response content */ - private final StringBuilder buf = new StringBuilder(); private Map fatClientMap; private final Logger logger = Logger.getLogger(VoldemortHttpRequestHandler.class); public static final String X_VOLD_REQUEST_TIMEOUT_MS = "X-VOLD-Request-Timeout-ms"; @@ -70,20 +65,25 @@ public class VoldemortHttpRequestHandler extends SimpleChannelUpstreamHandler { public static final String CUSTOM_RESOLVING_STRATEGY = "custom"; public static final String DEFAULT_RESOLVING_STRATEGY = "timestamp"; + private CoordinatorErrorStats errorStats = null; + // Implicit constructor defined for the derived classes public VoldemortHttpRequestHandler() {} - public VoldemortHttpRequestHandler(Map fatClientMap) { + public VoldemortHttpRequestHandler(Map fatClientMap, + CoordinatorErrorStats errorStats) { this.fatClientMap = fatClientMap; + this.errorStats = errorStats; } /** - * Function to parse the HTTP headers and build a Voldemort request object + * Function to parse (and validate) the HTTP headers and build a Voldemort + * request object * * @param requestURI URI of the REST request * @param httpMethod Message Event object used to write the response to * @param e The REST (Voldemort) operation type - * @return true if a valid request was received. False otherwise + * @return A composite request object corresponding to the incoming request */ private CompositeVoldemortRequest parseRequest(String requestURI, MessageEvent e, @@ -160,10 +160,6 @@ private CompositeVoldemortRequest parseRequest(String request break; case VoldemortOpCode.DELETE_OP_CODE: VectorClock vc = getVectorClock(this.request.getHeader(X_VOLD_VECTOR_CLOCK)); - if(vc == null) { - // handleBadRequest(e, - // "Incorrect vector clock specified in the request"); - } requestWrapper = new CompositeDeleteVoldemortRequest(keyList.get(0), vc, operationTimeoutInMs); @@ -192,6 +188,8 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Ex readingChunks = true; } else { + long startTimeStampInNs = System.nanoTime(); + CompositeVoldemortRequest requestObject = parseRequest(requestURI, e, this.request.getMethod()); @@ -205,11 +203,13 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Ex } if(storeName == null || fatClientWrapper == null) { + this.errorStats.reportException(new IllegalArgumentException()); handleBadRequest(e, "Invalid store name. Critical error."); return; } if(requestObject == null) { + this.errorStats.reportException(new IllegalArgumentException()); handleBadRequest(e, "Illegal request."); return; } @@ -219,28 +219,28 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Ex if(logger.isDebugEnabled()) { logger.debug("Incoming get request"); } - fatClientWrapper.submitGetRequest(requestObject, e); + fatClientWrapper.submitGetRequest(requestObject, e, startTimeStampInNs); break; case VoldemortOpCode.GET_ALL_OP_CODE: - fatClientWrapper.submitGetAllRequest(requestObject, e, storeName); + fatClientWrapper.submitGetAllRequest(requestObject, + e, + storeName, + startTimeStampInNs); break; case VoldemortOpCode.PUT_OP_CODE: if(logger.isDebugEnabled()) { logger.debug("Incoming put request"); } - fatClientWrapper.submitPutRequest(requestObject, e); + fatClientWrapper.submitPutRequest(requestObject, e, startTimeStampInNs); break; case VoldemortOpCode.DELETE_OP_CODE: - fatClientWrapper.submitDeleteRequest(requestObject, e); + fatClientWrapper.submitDeleteRequest(requestObject, e, startTimeStampInNs); break; default: String errorMessage = "Illegal operation."; logger.error(errorMessage); - RESTErrorHandler.handleError(BAD_REQUEST, - e, - isKeepAlive(request), - errorMessage); + RESTErrorHandler.handleError(BAD_REQUEST, e, errorMessage); return; } @@ -249,23 +249,7 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Ex HttpChunk chunk = (HttpChunk) e.getMessage(); if(chunk.isLast()) { readingChunks = false; - buf.append("END OF CONTENT\r\n"); - - HttpChunkTrailer trailer = (HttpChunkTrailer) chunk; - if(!trailer.getHeaderNames().isEmpty()) { - buf.append("\r\n"); - for(String name: trailer.getHeaderNames()) { - for(String value: trailer.getHeaders(name)) { - buf.append("TRAILING HEADER: " + name + " = " + value + "\r\n"); - } - } - buf.append("\r\n"); - } - - } else { - buf.append("CHUNK: " + chunk.getContent().toString(CharsetUtil.UTF_8) + "\r\n"); } - } } @@ -312,7 +296,7 @@ private VectorClock getVectorClock(String vectorClockHeader) { private void handleBadRequest(MessageEvent e, String msg) { String errorMessage = msg; logger.error(errorMessage); - RESTErrorHandler.handleError(BAD_REQUEST, e, false, errorMessage); + RESTErrorHandler.handleError(BAD_REQUEST, e, errorMessage); } /** diff --git a/src/java/voldemort/store/CompositeDeleteVoldemortRequest.java b/src/java/voldemort/store/CompositeDeleteVoldemortRequest.java index 08c70a7927..4a5e7a458d 100644 --- a/src/java/voldemort/store/CompositeDeleteVoldemortRequest.java +++ b/src/java/voldemort/store/CompositeDeleteVoldemortRequest.java @@ -1,11 +1,33 @@ +/* + * Copyright 2013 LinkedIn, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + package voldemort.store; import voldemort.common.VoldemortOpCode; import voldemort.versioning.Version; +/** + * A class that defines a composite delete request containing the key to delete, + * corresponding version (if present in the incoming HTTP request) and the + * timeout + * + */ public class CompositeDeleteVoldemortRequest extends CompositeVoldemortRequest { - public CompositeDeleteVoldemortRequest(K key, Version version, long timeout) { - super(key, null, null, null, version, timeout, true, VoldemortOpCode.DELETE_OP_CODE); + public CompositeDeleteVoldemortRequest(K key, Version version, long timeoutInMs) { + super(key, null, null, null, version, timeoutInMs, true, VoldemortOpCode.DELETE_OP_CODE); } } diff --git a/src/java/voldemort/store/CompositeGetAllVoldemortRequest.java b/src/java/voldemort/store/CompositeGetAllVoldemortRequest.java index 2c548b9bbd..0999581c56 100644 --- a/src/java/voldemort/store/CompositeGetAllVoldemortRequest.java +++ b/src/java/voldemort/store/CompositeGetAllVoldemortRequest.java @@ -1,16 +1,40 @@ +/* + * Copyright 2013 LinkedIn, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + package voldemort.store; import voldemort.common.VoldemortOpCode; +/** + * A class that defines a composite get all request containing a reference to + * the iterable keys, a flag to indicate if the conflicts should be resolved and + * the timeout + * + */ public class CompositeGetAllVoldemortRequest extends CompositeVoldemortRequest { - public CompositeGetAllVoldemortRequest(Iterable keys, long timeout, boolean resolveConflicts) { + public CompositeGetAllVoldemortRequest(Iterable keys, + long timeoutInMs, + boolean resolveConflicts) { super(null, null, keys, null, null, - timeout, + timeoutInMs, resolveConflicts, VoldemortOpCode.GET_ALL_OP_CODE); } diff --git a/src/java/voldemort/store/CompositeGetVoldemortRequest.java b/src/java/voldemort/store/CompositeGetVoldemortRequest.java index 3826d06760..8cc9f2613a 100644 --- a/src/java/voldemort/store/CompositeGetVoldemortRequest.java +++ b/src/java/voldemort/store/CompositeGetVoldemortRequest.java @@ -1,10 +1,39 @@ +/* + * Copyright 2013 LinkedIn, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + package voldemort.store; import voldemort.common.VoldemortOpCode; +/** + * A class that defines a composite get request containing the key, a flag to + * indicate whether the conflicts should be resolved and the timeout + * + */ + public class CompositeGetVoldemortRequest extends CompositeVoldemortRequest { - public CompositeGetVoldemortRequest(K key, long timeout, boolean resolveConflicts) { - super(key, null, null, null, null, timeout, resolveConflicts, VoldemortOpCode.GET_OP_CODE); + public CompositeGetVoldemortRequest(K key, long timeoutInMs, boolean resolveConflicts) { + super(key, + null, + null, + null, + null, + timeoutInMs, + resolveConflicts, + VoldemortOpCode.GET_OP_CODE); } } diff --git a/src/java/voldemort/store/CompositePutVoldemortRequest.java b/src/java/voldemort/store/CompositePutVoldemortRequest.java index e187993390..723404fb67 100644 --- a/src/java/voldemort/store/CompositePutVoldemortRequest.java +++ b/src/java/voldemort/store/CompositePutVoldemortRequest.java @@ -1,10 +1,31 @@ +/* + * Copyright 2013 LinkedIn, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + package voldemort.store; import voldemort.common.VoldemortOpCode; +/** + * A class that defines a composite put request containing the key, the value + * and the timeout + * + */ public class CompositePutVoldemortRequest extends CompositeVoldemortRequest { - public CompositePutVoldemortRequest(K key, V rawValue, long timeout) { - super(key, rawValue, null, null, null, timeout, true, VoldemortOpCode.PUT_OP_CODE); + public CompositePutVoldemortRequest(K key, V rawValue, long timeoutInMs) { + super(key, rawValue, null, null, null, timeoutInMs, true, VoldemortOpCode.PUT_OP_CODE); } } diff --git a/src/java/voldemort/store/CompositeVersionedPutVoldemortRequest.java b/src/java/voldemort/store/CompositeVersionedPutVoldemortRequest.java index cc96d6e2b3..8b04b3dc66 100644 --- a/src/java/voldemort/store/CompositeVersionedPutVoldemortRequest.java +++ b/src/java/voldemort/store/CompositeVersionedPutVoldemortRequest.java @@ -1,12 +1,34 @@ +/* + * Copyright 2013 LinkedIn, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + package voldemort.store; import voldemort.common.VoldemortOpCode; import voldemort.versioning.Versioned; +/** + * A class that defines a composite put request containing the key, the + * versioned value and the timeout + * + */ + public class CompositeVersionedPutVoldemortRequest extends CompositeVoldemortRequest { - public CompositeVersionedPutVoldemortRequest(K key, Versioned value, long timeout) { - super(key, null, null, value, null, timeout, true, VoldemortOpCode.PUT_OP_CODE); + public CompositeVersionedPutVoldemortRequest(K key, Versioned value, long timeoutInMs) { + super(key, null, null, value, null, timeoutInMs, true, VoldemortOpCode.PUT_OP_CODE); } } diff --git a/src/java/voldemort/store/CompositeVoldemortRequest.java b/src/java/voldemort/store/CompositeVoldemortRequest.java index f8e834f0a2..822251e3f5 100644 --- a/src/java/voldemort/store/CompositeVoldemortRequest.java +++ b/src/java/voldemort/store/CompositeVoldemortRequest.java @@ -1,8 +1,28 @@ +/* + * Copyright 2013 LinkedIn, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + package voldemort.store; import voldemort.versioning.Version; import voldemort.versioning.Versioned; +/** + * A base class for the different types of Voldemort requests containing all the + * necessary components + */ public class CompositeVoldemortRequest { private final K key; @@ -10,7 +30,7 @@ public class CompositeVoldemortRequest { private final Iterable getAllIterableKeys; private final Versioned value; private Version version; - private long routingTimeout; + private long routingTimeoutInMs; private final boolean resolveConflicts; private final byte operationType; @@ -19,13 +39,13 @@ public CompositeVoldemortRequest(K key, Iterable keys, Versioned value, Version version, - long timeout, + long timeoutInMs, boolean resolveConflicts, byte operationType) { this.key = key; this.rawValue = rawValue; this.getAllIterableKeys = keys; - this.routingTimeout = timeout; + this.routingTimeoutInMs = timeoutInMs; this.value = value; this.version = version; this.resolveConflicts = resolveConflicts; @@ -49,11 +69,11 @@ public void setVersion(Version version) { } public long getRoutingTimeoutInMs() { - return routingTimeout; + return routingTimeoutInMs; } - public void setRoutingTimeoutInMs(long timeout) { - this.routingTimeout = timeout; + public void setRoutingTimeoutInMs(long timeoutInMs) { + this.routingTimeoutInMs = timeoutInMs; } public boolean resolveConflicts() { diff --git a/src/java/voldemort/store/routed/PipelineRoutedStats.java b/src/java/voldemort/store/routed/PipelineRoutedStats.java index 77918d28c6..d3a41ae2ef 100644 --- a/src/java/voldemort/store/routed/PipelineRoutedStats.java +++ b/src/java/voldemort/store/routed/PipelineRoutedStats.java @@ -21,11 +21,11 @@ */ public class PipelineRoutedStats { - private ConcurrentHashMap, AtomicLong> errCountMap; - private AtomicLong severeExceptionCount; - private AtomicLong benignExceptionCount; + protected ConcurrentHashMap, AtomicLong> errCountMap; + protected AtomicLong severeExceptionCount; + protected AtomicLong benignExceptionCount; - PipelineRoutedStats() { + protected PipelineRoutedStats() { errCountMap = new ConcurrentHashMap, AtomicLong>(); errCountMap.put(InvalidMetadataException.class, new AtomicLong(0)); errCountMap.put(InsufficientOperationalNodesException.class, new AtomicLong(0)); @@ -99,7 +99,7 @@ public void reportException(Exception e) { errCountMap.get(e.getClass()).incrementAndGet(); } - private boolean isSevere(Exception ve) { + public boolean isSevere(Exception ve) { if(ve instanceof InsufficientOperationalNodesException || ve instanceof InsufficientZoneResponsesException || ve instanceof InvalidMetadataException) diff --git a/test/common/voldemort/config/fat-client-config.avro b/test/common/voldemort/config/fat-client-config.avro new file mode 100644 index 0000000000..097bf03d1c --- /dev/null +++ b/test/common/voldemort/config/fat-client-config.avro @@ -0,0 +1,11 @@ +[ +{ + "store_name": "test", + "socket_timeout_ms": "1500", + "routing_timeout_ms": "1500" +}, +{ + "store_name": "slow-store-test", + "connection_timeout_ms": "500" +} +] diff --git a/test/common/voldemort/config/single-store.xml b/test/common/voldemort/config/single-store.xml index a0be7311f6..07ec63da39 100644 --- a/test/common/voldemort/config/single-store.xml +++ b/test/common/voldemort/config/single-store.xml @@ -18,4 +18,20 @@ UTF-8 + + slow-store-test + slow + Test slow store + consistent-routing + client + 1 + 1 + 1 + + string + + + string + + diff --git a/test/unit/voldemort/coordinator/CoordinatorRestAPITest.java b/test/unit/voldemort/coordinator/CoordinatorRestAPITest.java new file mode 100644 index 0000000000..7b287f9be1 --- /dev/null +++ b/test/unit/voldemort/coordinator/CoordinatorRestAPITest.java @@ -0,0 +1,245 @@ +package voldemort.coordinator; + +import static org.junit.Assert.fail; + +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.net.HttpURLConnection; +import java.net.URL; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; + +import org.apache.commons.codec.binary.Base64; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import voldemort.ServerTestUtils; +import voldemort.cluster.Cluster; +import voldemort.server.VoldemortServer; +import voldemort.store.socket.SocketStoreFactory; +import voldemort.store.socket.clientrequest.ClientRequestExecutorPool; + +public class CoordinatorRestAPITest { + + private VoldemortServer[] servers; + private Cluster cluster; + public static String socketUrl = ""; + private static final String STORE_NAME = "test"; + private static final String STORES_XML = "test/common/voldemort/config/single-store.xml"; + private static final String FAT_CLIENT_CONFIG_FILE_PATH = "test/common/voldemort/config/fat-client-config.avro"; + private final SocketStoreFactory socketStoreFactory = new ClientRequestExecutorPool(2, + 10000, + 100000, + 32 * 1024); + private CoordinatorService coordinator = null; + private final String coordinatorURL = "http://localhost:8080"; + + @Before + public void setUp() throws Exception { + int numServers = 1; + servers = new VoldemortServer[numServers]; + int partitionMap[][] = { { 0, 2, 4, 6, 1, 3, 5, 7 } }; + Properties props = new Properties(); + props.setProperty("storage.configs", + "voldemort.store.bdb.BdbStorageConfiguration,voldemort.store.slow.SlowStorageConfiguration"); + + cluster = ServerTestUtils.startVoldemortCluster(numServers, + servers, + partitionMap, + socketStoreFactory, + true, // useNio + null, + STORES_XML, + props); + + CoordinatorConfig config = new CoordinatorConfig(); + List bootstrapUrls = new ArrayList(); + socketUrl = servers[0].getIdentityNode().getSocketUrl().toString(); + bootstrapUrls.add(socketUrl); + + System.out.println("\n\n************************ Starting the Coordinator *************************"); + + config.setBootstrapURLs(bootstrapUrls); + config.setFatClientConfigPath(FAT_CLIENT_CONFIG_FILE_PATH); + + this.coordinator = new CoordinatorService(config); + if(!this.coordinator.isStarted()) { + this.coordinator.start(); + } + } + + @After + public void tearDown() throws Exception { + if(this.socketStoreFactory != null) { + this.socketStoreFactory.close(); + } + + if(this.coordinator != null && this.coordinator.isStarted()) { + this.coordinator.stop(); + } + } + + private void doPut(String key, String payload) { + try { + // Create the right URL and Http connection + HttpURLConnection conn = null; + String base64Key = new String(Base64.encodeBase64(key.getBytes())); + URL url = new URL(this.coordinatorURL + "/" + STORE_NAME + "/" + base64Key); + conn = (HttpURLConnection) url.openConnection(); + + // Set the right headers + conn.setRequestMethod("POST"); + conn.setDoOutput(true); + conn.setDoInput(true); + conn.setRequestProperty("Content-Type", "binary"); + conn.setRequestProperty("Content-Length", "" + payload.length()); + conn.setRequestProperty(VoldemortHttpRequestHandler.X_VOLD_REQUEST_TIMEOUT_MS, "1000"); + + // Write the payload + OutputStream out = conn.getOutputStream(); + out.write(payload.getBytes()); + out.close(); + + // Check for the right response code + if(conn.getResponseCode() != 201) { + System.err.println("Illegal response during PUT : " + conn.getResponseMessage()); + fail("Incorrect response received for a HTTP put request :" + + conn.getResponseCode()); + } + } catch(Exception e) { + e.printStackTrace(); + fail("Error in sending the REST request"); + } + } + + private boolean doDelete(String key) { + try { + + // Create the right URL and Http connection + HttpURLConnection conn = null; + String base64Key = new String(Base64.encodeBase64(key.getBytes())); + URL url = new URL(this.coordinatorURL + "/" + STORE_NAME + "/" + base64Key); + conn = (HttpURLConnection) url.openConnection(); + + // Set the right headers + conn.setRequestMethod("DELETE"); + conn.setDoInput(true); + conn.setRequestProperty(VoldemortHttpRequestHandler.X_VOLD_REQUEST_TIMEOUT_MS, "1000"); + + // Check for the right response code + if(conn.getResponseCode() != 204) { + System.err.println("Illegal response during DELETE : " + conn.getResponseMessage()); + fail("Incorrect response received for a HTTP put request :" + + conn.getResponseCode()); + } else { + return true; + } + + } catch(Exception e) { + e.printStackTrace(); + fail("Error in sending the REST request"); + } + + return false; + } + + private String doGet(String key) { + String response = null; + try { + + // Create the right URL and Http connection + HttpURLConnection conn = null; + String base64Key = new String(Base64.encodeBase64(key.getBytes())); + URL url = new URL(this.coordinatorURL + "/" + STORE_NAME + "/" + base64Key); + conn = (HttpURLConnection) url.openConnection(); + + // Set the right headers + conn.setRequestMethod("GET"); + conn.setDoInput(true); + conn.setRequestProperty(VoldemortHttpRequestHandler.X_VOLD_REQUEST_TIMEOUT_MS, "1000"); + + if(conn.getResponseCode() == 404) { + return null; + } + + // Check for the right response code + if(conn.getResponseCode() != 200) { + System.err.println("Illegal response during GET : " + conn.getResponseMessage()); + fail("Incorrect response received for a HTTP put request :" + + conn.getResponseCode()); + } + + // Buffer the result into a string + BufferedReader rd = new BufferedReader(new InputStreamReader(conn.getInputStream())); + StringBuilder sb = new StringBuilder(); + String line; + while((line = rd.readLine()) != null) { + sb.append(line); + } + rd.close(); + conn.disconnect(); + + response = sb.toString(); + + } catch(Exception e) { + e.printStackTrace(); + fail("Error in sending the REST request"); + } + + return response; + } + + @Test + public void testRESTReadAfterWrite() { + String key = "Which_Imperial_IPA_do_I_want_to_drink"; + String payload = "Pliny the Younger"; + + // 1. Do a put + doPut(key, payload); + + // 2. Do a get on the same key + String response = doGet(key); + if(response == null) { + fail("key does not exist after a put. "); + } + System.out.println("Received value: " + response); + if(!response.equals(payload)) { + fail("Received value is incorrect ! Expected : " + payload + " but got : " + response); + } + } + + @Test + public void testDelete() { + String key = "Which_sour_beer_do_I_want_to_drink"; + String payload = "Duchesse De Bourgogne"; + + // 1. Do a put + doPut(key, payload); + + // 2. Do a get on the same key + String response = doGet(key); + if(response == null) { + fail("key does not exist after a put. "); + } + System.out.println("Received value: " + response); + if(!response.equals(payload)) { + fail("Received value is incorrect ! Expected : " + payload + " but got : " + response); + } + + // 3. Do a delete + boolean isDeleted = doDelete(key); + if(!isDeleted) { + fail("Could not delete the key. Error !"); + } + + // 4. Do a get on the same key : this should fail + response = doGet(key); + if(response != null) { + fail("key still exists after deletion. "); + } + } + +} diff --git a/test/unit/voldemort/coordinator/DynamicTimeoutStoreClientTest.java b/test/unit/voldemort/coordinator/DynamicTimeoutStoreClientTest.java new file mode 100644 index 0000000000..62d67a9f40 --- /dev/null +++ b/test/unit/voldemort/coordinator/DynamicTimeoutStoreClientTest.java @@ -0,0 +1,145 @@ +package voldemort.coordinator; + +import static org.junit.Assert.fail; + +import java.io.File; +import java.util.Properties; + +import org.apache.commons.io.FileUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import voldemort.ServerTestUtils; +import voldemort.client.ClientConfig; +import voldemort.client.SocketStoreClientFactory; +import voldemort.cluster.Cluster; +import voldemort.server.VoldemortServer; +import voldemort.store.CompositeGetVoldemortRequest; +import voldemort.store.CompositePutVoldemortRequest; +import voldemort.store.InsufficientOperationalNodesException; +import voldemort.store.socket.SocketStoreFactory; +import voldemort.store.socket.clientrequest.ClientRequestExecutorPool; +import voldemort.utils.ByteArray; +import voldemort.versioning.Versioned; +import voldemort.xml.ClusterMapper; + +/** + * Class to test the Fat Client wrapper + */ +public class DynamicTimeoutStoreClientTest { + + private VoldemortServer[] servers; + private Cluster cluster; + public static String socketUrl = ""; + private static final String STORE_NAME = "slow-store-test"; + private static final String STORES_XML = "test/common/voldemort/config/single-store.xml"; + private static final String SLOW_STORE_DELAY = "500"; + private final SocketStoreFactory socketStoreFactory = new ClientRequestExecutorPool(2, + 10000, + 100000, + 32 * 1024); + private DynamicTimeoutStoreClient dynamicTimeoutClient = null; + + /** + * Setup a one node Voldemort cluster with a 'slow' store + * (SlowStorageEngine) with a delay of 500 ms for get and put. + * + * @throws java.lang.Exception + */ + @Before + public void setUp() throws Exception { + int numServers = 1; + servers = new VoldemortServer[numServers]; + int partitionMap[][] = { { 0, 2, 4, 6, 1, 3, 5, 7 } }; + Properties props = new Properties(); + props.setProperty("storage.configs", + "voldemort.store.bdb.BdbStorageConfiguration,voldemort.store.slow.SlowStorageConfiguration"); + props.setProperty("testing.slow.queueing.get.ms", SLOW_STORE_DELAY); + props.setProperty("testing.slow.queueing.put.ms", SLOW_STORE_DELAY); + + cluster = ServerTestUtils.startVoldemortCluster(numServers, + servers, + partitionMap, + socketStoreFactory, + true, // useNio + null, + STORES_XML, + props); + + socketUrl = servers[0].getIdentityNode().getSocketUrl().toString(); + String bootstrapUrl = socketUrl; + ClientConfig clientConfig = new ClientConfig().setBootstrapUrls(bootstrapUrl) + .setEnableCompressionLayer(false) + .setEnableSerializationLayer(false) + .enableDefaultClient(true) + .setEnableLazy(false); + + String storesXml = FileUtils.readFileToString(new File(STORES_XML), "UTF-8"); + ClusterMapper mapper = new ClusterMapper(); + + this.dynamicTimeoutClient = new DynamicTimeoutStoreClient(STORE_NAME, + new SocketStoreClientFactory(clientConfig), + 1, + storesXml, + mapper.writeCluster(cluster)); + } + + /** + * @throws java.lang.Exception + */ + @After + public void tearDown() throws Exception { + if(this.socketStoreFactory != null) { + this.socketStoreFactory.close(); + } + } + + /** + * Test the dynamic per call timeout. We do a regular put with the default + * configured timeout. We then do a put with a dynamic timeout of 200 ms + * which is less than the delay at the server side. After this we do a get + * with a dynamic timeout of 1500 ms which should succeed and return the + * value from the first put. + */ + @Test + public void test() { + long incorrectTimeout = 200; + long correctTimeout = 1500; + String key = "a"; + String value = "First"; + String newValue = "Second"; + + try { + this.dynamicTimeoutClient.put(new ByteArray(key.getBytes()), value.getBytes()); + } catch(Exception e) { + fail("Error in regular put."); + } + + long startTime = System.currentTimeMillis(); + try { + this.dynamicTimeoutClient.putWithCustomTimeout(new CompositePutVoldemortRequest(new ByteArray(key.getBytes()), + newValue.getBytes(), + incorrectTimeout)); + fail("Should not reach this point. The small (incorrect) timeout did not work."); + } catch(InsufficientOperationalNodesException ion) { + System.out.println("This failed as Expected."); + } + + try { + Versioned versionedValue = this.dynamicTimeoutClient.getWithCustomTimeout(new CompositeGetVoldemortRequest(new ByteArray(key.getBytes()), + correctTimeout, + true)); + long endTime = System.currentTimeMillis(); + System.out.println("Total time taken = " + (endTime - startTime)); + String response = new String(versionedValue.getValue()); + if(!response.equals(value)) { + fail("The returned value does not match. Expected: " + value + " but Received: " + + response); + } + } catch(Exception e) { + e.printStackTrace(); + fail("The dynamic per call timeout did not work !"); + } + } +}