From 903e7495abd20a41f220fa254518fbca9be65800 Mon Sep 17 00:00:00 2001 From: Vinoth Chandar Date: Thu, 10 Jan 2013 13:31:27 -0800 Subject: [PATCH] Monitoring for streaming operations --- .../voldemort/server/StoreRepository.java | 56 +++++ .../voldemort/server/VoldemortServer.java | 4 +- .../protocol/SocketRequestHandlerFactory.java | 18 +- .../server/protocol/StreamRequestHandler.java | 2 + .../admin/AdminServiceRequestHandler.java | 29 +-- .../FetchEntriesStreamRequestHandler.java | 27 ++- .../admin/FetchKeysStreamRequestHandler.java | 21 +- ...hPartitionEntriesStreamRequestHandler.java | 25 +- ...etchPartitionFileStreamRequestHandler.java | 22 +- ...etchPartitionKeysStreamRequestHandler.java | 24 +- .../admin/FetchStreamRequestHandler.java | 25 +- ...ePartitionEntriesStreamRequestHandler.java | 43 ++-- .../UpdateSlopEntriesRequestHandler.java | 49 ++-- .../voldemort/store/stats/SimpleCounter.java | 132 +++++++++++ .../voldemort/store/stats/StreamStats.java | 220 ------------------ .../voldemort/store/stats/StreamStatsJmx.java | 107 --------- .../voldemort/store/stats/StreamingStats.java | 188 +++++++++++++++ .../store/stats/SimpleCounterTest.java | 154 ++++++++++++ 18 files changed, 677 insertions(+), 469 deletions(-) create mode 100644 src/java/voldemort/store/stats/SimpleCounter.java delete mode 100644 src/java/voldemort/store/stats/StreamStats.java delete mode 100644 src/java/voldemort/store/stats/StreamStatsJmx.java create mode 100644 src/java/voldemort/store/stats/StreamingStats.java create mode 100644 test/unit/voldemort/store/stats/SimpleCounterTest.java diff --git a/src/java/voldemort/server/StoreRepository.java b/src/java/voldemort/server/StoreRepository.java index 94cfaaaf21..5325385d9b 100644 --- a/src/java/voldemort/server/StoreRepository.java +++ b/src/java/voldemort/server/StoreRepository.java @@ -12,7 +12,9 @@ import voldemort.store.StorageEngine; import voldemort.store.Store; import voldemort.store.slop.SlopStorageEngine; +import voldemort.store.stats.StreamingStats; import voldemort.utils.ByteArray; +import voldemort.utils.JmxUtils; import voldemort.utils.Pair; /** @@ -59,6 +61,16 @@ public class StoreRepository { */ private final ConcurrentMap> storageEngines; + /** + * Aggregated statistics about streaming operations + */ + private StreamingStats aggregatedStreamStats; + /** + * Maintains statistics about streaming reads/writes performed against all + * the local storage engines in this node + */ + private ConcurrentMap streamingStatsMap; + /* * Routed stores that write and read from multiple nodes */ @@ -80,10 +92,21 @@ public class StoreRepository { */ private RepairJob repairJob; + /** + * Constructor invoked by tests + */ public StoreRepository() { + this(true); + } + + public StoreRepository(boolean jmxEnabled) { super(); this.localStores = new ConcurrentHashMap>(); this.storageEngines = new ConcurrentHashMap>(); + if(jmxEnabled) { + this.streamingStatsMap = new ConcurrentHashMap(); + this.aggregatedStreamStats = new StreamingStats(); + } this.routedStores = new ConcurrentHashMap>(); this.nodeStores = new ConcurrentHashMap, Store>(); this.redirectingSocketStores = new ConcurrentHashMap, Store>(); @@ -127,9 +150,38 @@ public void addStorageEngine(StorageEngine engine) { if(found != null) throw new VoldemortException("Storage Engine '" + engine.getName() + "' has already been initialized."); + + // register streaming stats object for the store + if(streamingStatsMap != null) { + // lazily register the aggregated mbean + if(storageEngines.size() == 1) { + JmxUtils.registerMbean(aggregatedStreamStats, + JmxUtils.createObjectName(this.getClass().getCanonicalName(), + "aggregated-streaming-stats")); + } + + StreamingStats stat = new StreamingStats(aggregatedStreamStats); + JmxUtils.registerMbean(stat, JmxUtils.createObjectName(this.getClass() + .getCanonicalName(), + engine.getName() + + "-streaming-stats")); + streamingStatsMap.putIfAbsent(engine.getName(), stat); + } } public Store removeStorageEngine(String storeName) { + // register streaming stats object for the store + if(streamingStatsMap != null) { + JmxUtils.unregisterMbean(JmxUtils.createObjectName(this.getClass().getCanonicalName(), + storeName)); + streamingStatsMap.remove(storeName); + // lazily unregister the aggregated mbean + if(storageEngines.size() == 1) { + JmxUtils.unregisterMbean(JmxUtils.createObjectName(this.getClass() + .getCanonicalName(), + "aggregated-streaming-stats")); + } + } return this.storageEngines.remove(storeName); } @@ -234,4 +286,8 @@ public RepairJob getRepairJob() { public void registerRepairJob(RepairJob job) { repairJob = job; } + + public StreamingStats getStreamingStats(String store) { + return streamingStatsMap.get(store); + } } diff --git a/src/java/voldemort/server/VoldemortServer.java b/src/java/voldemort/server/VoldemortServer.java index fd22f46f6f..613a2e0b82 100644 --- a/src/java/voldemort/server/VoldemortServer.java +++ b/src/java/voldemort/server/VoldemortServer.java @@ -83,7 +83,7 @@ public class VoldemortServer extends AbstractService { public VoldemortServer(VoldemortConfig config) { super(ServiceType.VOLDEMORT); this.voldemortConfig = config; - this.storeRepository = new StoreRepository(); + this.storeRepository = new StoreRepository(config.isJmxEnabled()); this.metadata = MetadataStore.readFromDirectory(new File(this.voldemortConfig.getMetadataDirectory()), voldemortConfig.getNodeId()); this.identityNode = metadata.getCluster().getNodeById(voldemortConfig.getNodeId()); @@ -97,7 +97,7 @@ public VoldemortServer(VoldemortConfig config, Cluster cluster) { this.identityNode = cluster.getNodeById(voldemortConfig.getNodeId()); this.checkHostName(); - this.storeRepository = new StoreRepository(); + this.storeRepository = new StoreRepository(config.isJmxEnabled()); // update cluster details in metaDataStore ConfigurationStorageEngine metadataInnerEngine = new ConfigurationStorageEngine("metadata-config-store", voldemortConfig.getMetadataDirectory()); diff --git a/src/java/voldemort/server/protocol/SocketRequestHandlerFactory.java b/src/java/voldemort/server/protocol/SocketRequestHandlerFactory.java index e3c746eaf7..51c1f01996 100644 --- a/src/java/voldemort/server/protocol/SocketRequestHandlerFactory.java +++ b/src/java/voldemort/server/protocol/SocketRequestHandlerFactory.java @@ -12,9 +12,6 @@ import voldemort.server.storage.StorageService; import voldemort.store.ErrorCodeMapper; import voldemort.store.metadata.MetadataStore; -import voldemort.store.stats.StreamStats; -import voldemort.store.stats.StreamStatsJmx; -import voldemort.utils.JmxUtils; /** * A factory that gets the appropriate request handler for a given @@ -30,7 +27,6 @@ public class SocketRequestHandlerFactory implements RequestHandlerFactory { private final VoldemortConfig voldemortConfig; private final AsyncOperationService asyncService; private final Rebalancer rebalancer; - private final StreamStats stats; public SocketRequestHandlerFactory(StorageService storageService, StoreRepository repository, @@ -44,17 +40,6 @@ public SocketRequestHandlerFactory(StorageService storageService, this.voldemortConfig = voldemortConfig; this.asyncService = asyncService; this.rebalancer = rebalancer; - this.stats = new StreamStats(); - if(null != voldemortConfig && voldemortConfig.isJmxEnabled()) - if(this.voldemortConfig.isEnableJmxClusterName()) - JmxUtils.registerMbean(new StreamStatsJmx(stats), - JmxUtils.createObjectName(metadata.getCluster().getName() - + ".voldemort.store.stats", - "admin-streaming")); - else - JmxUtils.registerMbean(new StreamStatsJmx(stats), - JmxUtils.createObjectName("voldemort.store.stats", - "admin-streaming")); } public RequestHandler getRequestHandler(RequestFormatType type) { @@ -76,8 +61,7 @@ public RequestHandler getRequestHandler(RequestFormatType type) { metadata, voldemortConfig, asyncService, - rebalancer, - stats); + rebalancer); default: throw new VoldemortException("Unknown wire format " + type); } diff --git a/src/java/voldemort/server/protocol/StreamRequestHandler.java b/src/java/voldemort/server/protocol/StreamRequestHandler.java index 39b1ecbe57..91f7d7eac3 100644 --- a/src/java/voldemort/server/protocol/StreamRequestHandler.java +++ b/src/java/voldemort/server/protocol/StreamRequestHandler.java @@ -15,6 +15,8 @@ public interface StreamRequestHandler { + public final static int STAT_RECORDS_INTERVAL = 100000; + /** * Handles a "segment" of a streaming request. * diff --git a/src/java/voldemort/server/protocol/admin/AdminServiceRequestHandler.java b/src/java/voldemort/server/protocol/admin/AdminServiceRequestHandler.java index 3cf1783e42..bc1851abfb 100644 --- a/src/java/voldemort/server/protocol/admin/AdminServiceRequestHandler.java +++ b/src/java/voldemort/server/protocol/admin/AdminServiceRequestHandler.java @@ -60,7 +60,6 @@ import voldemort.store.readonly.ReadOnlyStorageEngine; import voldemort.store.readonly.ReadOnlyUtils; import voldemort.store.slop.SlopStorageEngine; -import voldemort.store.stats.StreamStats; import voldemort.utils.ByteArray; import voldemort.utils.ByteUtils; import voldemort.utils.ClosableIterator; @@ -97,7 +96,6 @@ public class AdminServiceRequestHandler implements RequestHandler { private final VoldemortConfig voldemortConfig; private final AsyncOperationService asyncService; private final Rebalancer rebalancer; - private final StreamStats stats; private FileFetcher fileFetcher; public AdminServiceRequestHandler(ErrorCodeMapper errorCodeMapper, @@ -106,8 +104,7 @@ public AdminServiceRequestHandler(ErrorCodeMapper errorCodeMapper, MetadataStore metadataStore, VoldemortConfig voldemortConfig, AsyncOperationService asyncService, - Rebalancer rebalancer, - StreamStats stats) { + Rebalancer rebalancer) { this.errorCodeMapper = errorCodeMapper; this.storageService = storageService; this.metadataStore = metadataStore; @@ -117,7 +114,6 @@ public AdminServiceRequestHandler(ErrorCodeMapper errorCodeMapper, .getContextClassLoader()); this.asyncService = asyncService; this.rebalancer = rebalancer; - this.stats = stats; setFetcherClass(voldemortConfig); } @@ -532,12 +528,14 @@ public StreamRequestHandler handleFetchROPartitionFiles(VAdminProto.FetchPartiti return new FetchPartitionFileStreamRequestHandler(request, metadataStore, voldemortConfig, - storeRepository, - stats); + storeRepository); } public StreamRequestHandler handleUpdateSlopEntries(VAdminProto.UpdateSlopEntriesRequest request) { - return new UpdateSlopEntriesRequestHandler(request, errorCodeMapper, storeRepository, stats); + return new UpdateSlopEntriesRequestHandler(request, + errorCodeMapper, + storeRepository, + voldemortConfig); } public StreamRequestHandler handleFetchPartitionEntries(VAdminProto.FetchPartitionEntriesRequest request) { @@ -552,16 +550,14 @@ public StreamRequestHandler handleFetchPartitionEntries(VAdminProto.FetchPartiti errorCodeMapper, voldemortConfig, storeRepository, - networkClassLoader, - stats); + networkClassLoader); else return new FetchEntriesStreamRequestHandler(request, metadataStore, errorCodeMapper, voldemortConfig, storeRepository, - networkClassLoader, - stats); + networkClassLoader); } else { if(storageEngine.isPartitionScanSupported()) return new FetchPartitionKeysStreamRequestHandler(request, @@ -569,16 +565,14 @@ public StreamRequestHandler handleFetchPartitionEntries(VAdminProto.FetchPartiti errorCodeMapper, voldemortConfig, storeRepository, - networkClassLoader, - stats); + networkClassLoader); else return new FetchKeysStreamRequestHandler(request, metadataStore, errorCodeMapper, voldemortConfig, storeRepository, - networkClassLoader, - stats); + networkClassLoader); } } @@ -587,8 +581,7 @@ public StreamRequestHandler handleUpdatePartitionEntries(VAdminProto.UpdateParti errorCodeMapper, voldemortConfig, storeRepository, - networkClassLoader, - stats); + networkClassLoader); } public VAdminProto.AsyncOperationListResponse handleAsyncOperationList(VAdminProto.AsyncOperationListRequest request) { diff --git a/src/java/voldemort/server/protocol/admin/FetchEntriesStreamRequestHandler.java b/src/java/voldemort/server/protocol/admin/FetchEntriesStreamRequestHandler.java index bf4c8e6857..f6ae543a97 100644 --- a/src/java/voldemort/server/protocol/admin/FetchEntriesStreamRequestHandler.java +++ b/src/java/voldemort/server/protocol/admin/FetchEntriesStreamRequestHandler.java @@ -12,12 +12,12 @@ import voldemort.server.VoldemortConfig; import voldemort.store.ErrorCodeMapper; import voldemort.store.metadata.MetadataStore; -import voldemort.store.stats.StreamStats; -import voldemort.store.stats.StreamStats.Operation; +import voldemort.store.stats.StreamingStats.Operation; import voldemort.utils.ByteArray; import voldemort.utils.ClosableIterator; import voldemort.utils.NetworkClassLoader; import voldemort.utils.RebalanceUtils; +import voldemort.utils.Time; import voldemort.versioning.Versioned; import com.google.protobuf.Message; @@ -39,15 +39,13 @@ public FetchEntriesStreamRequestHandler(FetchPartitionEntriesRequest request, ErrorCodeMapper errorCodeMapper, VoldemortConfig voldemortConfig, StoreRepository storeRepository, - NetworkClassLoader networkClassLoader, - StreamStats stats) { + NetworkClassLoader networkClassLoader) { super(request, metadataStore, errorCodeMapper, voldemortConfig, storeRepository, networkClassLoader, - stats, Operation.FETCH_ENTRIES); this.keyIterator = storageEngine.keys(); logger.info("Starting fetch entries for store '" + storageEngine.getName() @@ -62,6 +60,8 @@ public StreamRequestHandlerState handleRequest(DataInputStream inputStream, long startNs = System.nanoTime(); ByteArray key = keyIterator.next(); + if(streamStats != null) + streamStats.reportStreamingScan(operation); if(RebalanceUtils.checkKeyBelongsToPartition(nodeId, key.get(), @@ -71,12 +71,14 @@ public StreamRequestHandlerState handleRequest(DataInputStream inputStream, && counter % skipRecords == 0) { List> values = storageEngine.get(key, null); - stats.recordDiskTime(handle, System.nanoTime() - startNs); + if(streamStats != null) + streamStats.reportStorageTime(operation, System.nanoTime() - startNs); for(Versioned value: values) { throttler.maybeThrottle(key.length()); if(filter.accept(key, value)) { fetched++; - handle.incrementEntriesScanned(); + if(streamStats != null) + streamStats.reportStreamingFetch(operation); VAdminProto.FetchPartitionEntriesResponse.Builder response = VAdminProto.FetchPartitionEntriesResponse.newBuilder(); VAdminProto.PartitionEntry partitionEntry = VAdminProto.PartitionEntry.newBuilder() @@ -89,20 +91,22 @@ public StreamRequestHandlerState handleRequest(DataInputStream inputStream, startNs = System.nanoTime(); ProtoUtils.writeMessage(outputStream, message); - stats.recordNetworkTime(handle, System.nanoTime() - startNs); + if(streamStats != null) + streamStats.reportNetworkTime(operation, System.nanoTime() - startNs); throttler.maybeThrottle(AdminServiceRequestHandler.valueSize(value)); } } } else { - stats.recordDiskTime(handle, System.nanoTime() - startNs); + if(streamStats != null) + streamStats.reportStorageTime(operation, System.nanoTime() - startNs); } // log progress counter++; - if(0 == counter % 100000) { - long totalTime = (System.currentTimeMillis() - startTime) / 1000; + if(0 == counter % STAT_RECORDS_INTERVAL) { + long totalTime = (System.currentTimeMillis() - startTime) / Time.MS_PER_SECOND; logger.info("Fetch entries scanned " + counter + " entries, fetched " + fetched + " entries for store '" + storageEngine.getName() @@ -113,7 +117,6 @@ public StreamRequestHandlerState handleRequest(DataInputStream inputStream, if(keyIterator.hasNext()) return StreamRequestHandlerState.WRITING; else { - stats.closeHandle(handle); return StreamRequestHandlerState.COMPLETE; } } diff --git a/src/java/voldemort/server/protocol/admin/FetchKeysStreamRequestHandler.java b/src/java/voldemort/server/protocol/admin/FetchKeysStreamRequestHandler.java index f51e96d8c7..5cc3b9269c 100644 --- a/src/java/voldemort/server/protocol/admin/FetchKeysStreamRequestHandler.java +++ b/src/java/voldemort/server/protocol/admin/FetchKeysStreamRequestHandler.java @@ -11,8 +11,7 @@ import voldemort.server.VoldemortConfig; import voldemort.store.ErrorCodeMapper; import voldemort.store.metadata.MetadataStore; -import voldemort.store.stats.StreamStats; -import voldemort.store.stats.StreamStats.Operation; +import voldemort.store.stats.StreamingStats.Operation; import voldemort.utils.ByteArray; import voldemort.utils.ClosableIterator; import voldemort.utils.NetworkClassLoader; @@ -29,15 +28,13 @@ public FetchKeysStreamRequestHandler(FetchPartitionEntriesRequest request, ErrorCodeMapper errorCodeMapper, VoldemortConfig voldemortConfig, StoreRepository storeRepository, - NetworkClassLoader networkClassLoader, - StreamStats stats) { + NetworkClassLoader networkClassLoader) { super(request, metadataStore, errorCodeMapper, voldemortConfig, storeRepository, networkClassLoader, - stats, Operation.FETCH_KEYS); this.keyIterator = storageEngine.keys(); logger.info("Starting fetch keys for store '" + storageEngine.getName() @@ -52,7 +49,10 @@ public StreamRequestHandlerState handleRequest(DataInputStream inputStream, long startNs = System.nanoTime(); ByteArray key = keyIterator.next(); - stats.recordDiskTime(handle, System.nanoTime() - startNs); + if(streamStats != null) { + streamStats.reportStorageTime(operation, System.nanoTime() - startNs); + streamStats.reportStreamingScan(operation); + } throttler.maybeThrottle(key.length()); if(RebalanceUtils.checkKeyBelongsToPartition(nodeId, @@ -66,18 +66,20 @@ public StreamRequestHandlerState handleRequest(DataInputStream inputStream, response.setKey(ProtoUtils.encodeBytes(key)); fetched++; - handle.incrementEntriesScanned(); + if(streamStats != null) + streamStats.reportStreamingFetch(operation); Message message = response.build(); startNs = System.nanoTime(); ProtoUtils.writeMessage(outputStream, message); - stats.recordNetworkTime(handle, System.nanoTime() - startNs); + if(streamStats != null) + streamStats.reportNetworkTime(operation, System.nanoTime() - startNs); } // log progress counter++; - if(0 == counter % 100000) { + if(0 == counter % STAT_RECORDS_INTERVAL) { long totalTime = (System.currentTimeMillis() - startTime) / 1000; logger.info("Fetch keys scanned " + counter + " keys, fetched " + fetched @@ -89,7 +91,6 @@ public StreamRequestHandlerState handleRequest(DataInputStream inputStream, if(keyIterator.hasNext()) return StreamRequestHandlerState.WRITING; else { - stats.closeHandle(handle); return StreamRequestHandlerState.COMPLETE; } } diff --git a/src/java/voldemort/server/protocol/admin/FetchPartitionEntriesStreamRequestHandler.java b/src/java/voldemort/server/protocol/admin/FetchPartitionEntriesStreamRequestHandler.java index 191fd3e4fd..61bcc24fea 100644 --- a/src/java/voldemort/server/protocol/admin/FetchPartitionEntriesStreamRequestHandler.java +++ b/src/java/voldemort/server/protocol/admin/FetchPartitionEntriesStreamRequestHandler.java @@ -31,8 +31,7 @@ import voldemort.server.VoldemortConfig; import voldemort.store.ErrorCodeMapper; import voldemort.store.metadata.MetadataStore; -import voldemort.store.stats.StreamStats; -import voldemort.store.stats.StreamStats.Operation; +import voldemort.store.stats.StreamingStats.Operation; import voldemort.utils.ByteArray; import voldemort.utils.ClosableIterator; import voldemort.utils.NetworkClassLoader; @@ -60,15 +59,13 @@ public FetchPartitionEntriesStreamRequestHandler(FetchPartitionEntriesRequest re ErrorCodeMapper errorCodeMapper, VoldemortConfig voldemortConfig, StoreRepository storeRepository, - NetworkClassLoader networkClassLoader, - StreamStats stats) { + NetworkClassLoader networkClassLoader) { super(request, metadataStore, errorCodeMapper, voldemortConfig, storeRepository, networkClassLoader, - stats, Operation.FETCH_ENTRIES); logger.info("Starting fetch entries for store '" + storageEngine.getName() + "' with replica to partition mapping " + replicaToPartitionList); @@ -97,7 +94,6 @@ public StreamRequestHandlerState handleRequest(DataInputStream inputStream, if(entriesPartitionIterator == null) { // we are finally done if(currentIndex == partitionList.size()) { - stats.closeHandle(handle); return StreamRequestHandlerState.COMPLETE; } @@ -123,7 +119,6 @@ public StreamRequestHandlerState handleRequest(DataInputStream inputStream, } currentIndex++; } - } else { long startNs = System.nanoTime(); // do a check before reading in case partition has 0 elements @@ -134,14 +129,18 @@ public StreamRequestHandlerState handleRequest(DataInputStream inputStream, if(counter % skipRecords == 0) { // do the filtering Pair> entry = entriesPartitionIterator.next(); - stats.recordDiskTime(handle, System.nanoTime() - startNs); + if(streamStats != null) { + streamStats.reportStorageTime(operation, System.nanoTime() - startNs); + streamStats.reportStreamingScan(operation); + } ByteArray key = entry.getFirst(); Versioned value = entry.getSecond(); throttler.maybeThrottle(key.length()); if(filter.accept(key, value)) { fetched++; - handle.incrementEntriesScanned(); + if(streamStats != null) + streamStats.reportStreamingFetch(operation); VAdminProto.FetchPartitionEntriesResponse.Builder response = VAdminProto.FetchPartitionEntriesResponse.newBuilder(); VAdminProto.PartitionEntry partitionEntry = VAdminProto.PartitionEntry.newBuilder() @@ -153,15 +152,17 @@ public StreamRequestHandlerState handleRequest(DataInputStream inputStream, startNs = System.nanoTime(); ProtoUtils.writeMessage(outputStream, message); - stats.recordNetworkTime(handle, System.nanoTime() - startNs); + if(streamStats != null) + streamStats.reportNetworkTime(operation, System.nanoTime() - startNs); throttler.maybeThrottle(AdminServiceRequestHandler.valueSize(value)); } } else { - stats.recordDiskTime(handle, System.nanoTime() - startNs); + if(streamStats != null) + streamStats.reportStorageTime(operation, System.nanoTime() - startNs); } // log progress - if(0 == counter % 100000) { + if(0 == counter % STAT_RECORDS_INTERVAL) { long totalTime = (System.currentTimeMillis() - startTime) / Time.MS_PER_SECOND; logger.info("Fetch entries scanned " + counter + " entries, fetched " + fetched diff --git a/src/java/voldemort/server/protocol/admin/FetchPartitionFileStreamRequestHandler.java b/src/java/voldemort/server/protocol/admin/FetchPartitionFileStreamRequestHandler.java index a8b7b52238..87f74baa70 100644 --- a/src/java/voldemort/server/protocol/admin/FetchPartitionFileStreamRequestHandler.java +++ b/src/java/voldemort/server/protocol/admin/FetchPartitionFileStreamRequestHandler.java @@ -27,7 +27,8 @@ import voldemort.store.metadata.MetadataStore; import voldemort.store.readonly.ReadOnlyStorageConfiguration; import voldemort.store.readonly.ReadOnlyStorageEngine; -import voldemort.store.stats.StreamStats; +import voldemort.store.stats.StreamingStats; +import voldemort.store.stats.StreamingStats.Operation; import voldemort.utils.EventThrottler; import voldemort.utils.Pair; import voldemort.utils.RebalanceUtils; @@ -44,9 +45,7 @@ public class FetchPartitionFileStreamRequestHandler implements StreamRequestHand private final long blockSize; - private final StreamStats stats; - - private final StreamStats.Handle handle; + private final StreamingStats streamStats; private final Iterator> partitionIterator; @@ -79,8 +78,7 @@ private enum FetchStatus { protected FetchPartitionFileStreamRequestHandler(VAdminProto.FetchPartitionFilesRequest request, MetadataStore metadataStore, VoldemortConfig voldemortConfig, - StoreRepository storeRepository, - StreamStats stats) { + StoreRepository storeRepository) { this.request = request; StoreDefinition storeDef = metadataStore.getStoreDef(request.getStore()); boolean isReadOnly = storeDef.getType().compareTo(ReadOnlyStorageConfiguration.TYPE_NAME) == 0; @@ -100,8 +98,11 @@ protected FetchPartitionFileStreamRequestHandler(VAdminProto.FetchPartitionFiles voldemortConfig.getAdminSocketBufferSize()); this.storeDir = new File(storageEngine.getCurrentDirPath()); this.throttler = new EventThrottler(voldemortConfig.getStreamMaxReadBytesPerSec()); - this.stats = stats; - this.handle = stats.makeHandle(StreamStats.Operation.FETCH_FILE, replicaToPartitionList); + if(voldemortConfig.isJmxEnabled()) { + this.streamStats = storeRepository.getStreamingStats(storageEngine.getName()); + } else { + this.streamStats = null; + } this.partitionIterator = Collections.unmodifiableSet(replicaToPartitionTuples).iterator(); this.fetchStatus = FetchStatus.NEXT_PARTITION; this.currentChunkId = 0; @@ -158,7 +159,8 @@ private void handleSendIndexFile() throws IOException { this.chunkedFileWriter.close(); currentChunkId++; dataFile = indexFile = null; - handle.incrementEntriesScanned(); + if(streamStats != null) + streamStats.reportStreamingFetch(Operation.FETCH_FILE); if(currentChunkId >= numChunks) { fetchStatus = FetchStatus.NEXT_PARTITION; } else { @@ -237,9 +239,7 @@ private StreamRequestHandlerState handleNextPartition() { // partition list logger.info("Finished streaming files for partitions tuples " + replicaToPartitionTuples); - stats.closeHandle(handle); handlerState = StreamRequestHandlerState.COMPLETE; - } return handlerState; diff --git a/src/java/voldemort/server/protocol/admin/FetchPartitionKeysStreamRequestHandler.java b/src/java/voldemort/server/protocol/admin/FetchPartitionKeysStreamRequestHandler.java index 3a54fd93d1..ad0a0511d5 100644 --- a/src/java/voldemort/server/protocol/admin/FetchPartitionKeysStreamRequestHandler.java +++ b/src/java/voldemort/server/protocol/admin/FetchPartitionKeysStreamRequestHandler.java @@ -31,8 +31,7 @@ import voldemort.server.VoldemortConfig; import voldemort.store.ErrorCodeMapper; import voldemort.store.metadata.MetadataStore; -import voldemort.store.stats.StreamStats; -import voldemort.store.stats.StreamStats.Operation; +import voldemort.store.stats.StreamingStats.Operation; import voldemort.utils.ByteArray; import voldemort.utils.ClosableIterator; import voldemort.utils.NetworkClassLoader; @@ -58,15 +57,13 @@ public FetchPartitionKeysStreamRequestHandler(FetchPartitionEntriesRequest reque ErrorCodeMapper errorCodeMapper, VoldemortConfig voldemortConfig, StoreRepository storeRepository, - NetworkClassLoader networkClassLoader, - StreamStats stats) { + NetworkClassLoader networkClassLoader) { super(request, metadataStore, errorCodeMapper, voldemortConfig, storeRepository, networkClassLoader, - stats, Operation.FETCH_KEYS); logger.info("Starting fetch keys for store '" + storageEngine.getName() + "' with replica to partition mapping " + replicaToPartitionList); @@ -95,7 +92,6 @@ public StreamRequestHandlerState handleRequest(DataInputStream inputStream, if(keysPartitionIterator == null) { if(currentIndex == partitionList.size()) { // we are finally done - stats.closeHandle(handle); return StreamRequestHandlerState.COMPLETE; } @@ -131,7 +127,10 @@ public StreamRequestHandlerState handleRequest(DataInputStream inputStream, if(counter % skipRecords == 0) { // do the filtering ByteArray key = keysPartitionIterator.next(); - stats.recordDiskTime(handle, System.nanoTime() - startNs); + if(streamStats != null) { + streamStats.reportStorageTime(operation, System.nanoTime() - startNs); + streamStats.reportStreamingScan(operation); + } throttler.maybeThrottle(key.length()); if(filter.accept(key, null)) { @@ -139,19 +138,22 @@ public StreamRequestHandlerState handleRequest(DataInputStream inputStream, response.setKey(ProtoUtils.encodeBytes(key)); fetched++; - handle.incrementEntriesScanned(); + if(streamStats != null) + streamStats.reportStreamingFetch(operation); Message message = response.build(); startNs = System.nanoTime(); ProtoUtils.writeMessage(outputStream, message); - stats.recordNetworkTime(handle, System.nanoTime() - startNs); + if(streamStats != null) + streamStats.reportNetworkTime(operation, System.nanoTime() - startNs); } } else { - stats.recordDiskTime(handle, System.nanoTime() - startNs); + if(streamStats != null) + streamStats.reportStorageTime(operation, System.nanoTime() - startNs); } // log progress - if(0 == counter % 100000) { + if(0 == counter % STAT_RECORDS_INTERVAL) { long totalTime = (System.currentTimeMillis() - startTime) / Time.MS_PER_SECOND; logger.info("Fetch entries scanned " + counter + " entries, fetched " + fetched diff --git a/src/java/voldemort/server/protocol/admin/FetchStreamRequestHandler.java b/src/java/voldemort/server/protocol/admin/FetchStreamRequestHandler.java index 3bd7d3d812..f8fc567008 100644 --- a/src/java/voldemort/server/protocol/admin/FetchStreamRequestHandler.java +++ b/src/java/voldemort/server/protocol/admin/FetchStreamRequestHandler.java @@ -21,8 +21,7 @@ import voldemort.store.StorageEngine; import voldemort.store.StoreDefinition; import voldemort.store.metadata.MetadataStore; -import voldemort.store.stats.StreamStats; -import voldemort.store.stats.StreamStats.Handle; +import voldemort.store.stats.StreamingStats; import voldemort.store.system.SystemStoreConstants; import voldemort.utils.ByteArray; import voldemort.utils.EventThrottler; @@ -45,6 +44,12 @@ public abstract class FetchStreamRequestHandler implements StreamRequestHandler protected final StorageEngine storageEngine; + protected final StreamingStats streamStats; + + protected boolean isJmxEnabled; + + protected final StreamingStats.Operation operation; + protected long counter; protected long skipRecords; @@ -53,10 +58,6 @@ public abstract class FetchStreamRequestHandler implements StreamRequestHandler protected final long startTime; - protected final Handle handle; - - protected final StreamStats stats; - protected final Logger logger = Logger.getLogger(getClass()); protected int nodeId; @@ -69,16 +70,20 @@ protected FetchStreamRequestHandler(VAdminProto.FetchPartitionEntriesRequest req VoldemortConfig voldemortConfig, StoreRepository storeRepository, NetworkClassLoader networkClassLoader, - StreamStats stats, - StreamStats.Operation operation) { + StreamingStats.Operation operation) { this.nodeId = metadataStore.getNodeId(); this.request = request; this.errorCodeMapper = errorCodeMapper; this.replicaToPartitionList = ProtoUtils.decodePartitionTuple(request.getReplicaToPartitionList()); - this.stats = stats; - this.handle = stats.makeHandle(operation, replicaToPartitionList); this.storageEngine = AdminServiceRequestHandler.getStorageEngine(storeRepository, request.getStore()); + if(voldemortConfig.isJmxEnabled()) { + this.streamStats = storeRepository.getStreamingStats(this.storageEngine.getName()); + } else { + this.streamStats = null; + } + + this.operation = operation; this.storeDef = getStoreDef(request.getStore(), metadataStore); if(request.hasInitialCluster()) { this.initialCluster = new ClusterMapper().readCluster(new StringReader(request.getInitialCluster())); diff --git a/src/java/voldemort/server/protocol/admin/UpdatePartitionEntriesStreamRequestHandler.java b/src/java/voldemort/server/protocol/admin/UpdatePartitionEntriesStreamRequestHandler.java index 5dc79b14ff..3ff2e451c6 100644 --- a/src/java/voldemort/server/protocol/admin/UpdatePartitionEntriesStreamRequestHandler.java +++ b/src/java/voldemort/server/protocol/admin/UpdatePartitionEntriesStreamRequestHandler.java @@ -4,8 +4,6 @@ import java.io.DataOutputStream; import java.io.EOFException; import java.io.IOException; -import java.util.HashMap; -import java.util.List; import org.apache.log4j.Level; import org.apache.log4j.Logger; @@ -21,7 +19,8 @@ import voldemort.server.protocol.StreamRequestHandler; import voldemort.store.ErrorCodeMapper; import voldemort.store.StorageEngine; -import voldemort.store.stats.StreamStats; +import voldemort.store.stats.StreamingStats; +import voldemort.store.stats.StreamingStats.Operation; import voldemort.utils.ByteArray; import voldemort.utils.ByteUtils; import voldemort.utils.EventThrottler; @@ -52,9 +51,7 @@ public class UpdatePartitionEntriesStreamRequestHandler implements StreamRequest private final long startTime; - private final StreamStats stats; - - private final StreamStats.Handle handle; + private final StreamingStats streamStats; private final Logger logger = Logger.getLogger(getClass()); @@ -62,8 +59,7 @@ public UpdatePartitionEntriesStreamRequestHandler(UpdatePartitionEntriesRequest ErrorCodeMapper errorCodeMapper, VoldemortConfig voldemortConfig, StoreRepository storeRepository, - NetworkClassLoader networkClassLoader, - StreamStats stats) { + NetworkClassLoader networkClassLoader) { super(); this.request = request; this.errorCodeMapper = errorCodeMapper; @@ -75,9 +71,11 @@ public UpdatePartitionEntriesStreamRequestHandler(UpdatePartitionEntriesRequest networkClassLoader) : new DefaultVoldemortFilter(); startTime = System.currentTimeMillis(); - this.stats = stats; - this.handle = stats.makeHandle(StreamStats.Operation.UPDATE, - new HashMap>()); + if(voldemortConfig.isJmxEnabled()) { + this.streamStats = storeRepository.getStreamingStats(storageEngine.getName()); + } else { + this.streamStats = null; + } } public StreamRequestHandlerState handleRequest(DataInputStream inputStream, @@ -92,7 +90,9 @@ public StreamRequestHandlerState handleRequest(DataInputStream inputStream, } catch(EOFException e) { if(logger.isTraceEnabled()) logger.trace("Incomplete read for message size"); - stats.recordNetworkTime(handle, System.nanoTime() - startNs); + if(streamStats != null) + streamStats.reportNetworkTime(Operation.UPDATE_ENTRIES, System.nanoTime() + - startNs); return StreamRequestHandlerState.INCOMPLETE_READ; } @@ -104,8 +104,9 @@ public StreamRequestHandlerState handleRequest(DataInputStream inputStream, if(logger.isTraceEnabled()) logger.trace("Message size -1, completed partition update"); - stats.recordNetworkTime(handle, System.nanoTime() - startNs); - stats.closeHandle(handle); + if(streamStats != null) + streamStats.reportNetworkTime(Operation.UPDATE_ENTRIES, System.nanoTime() + - startNs); return StreamRequestHandlerState.COMPLETE; } @@ -122,7 +123,9 @@ public StreamRequestHandlerState handleRequest(DataInputStream inputStream, return StreamRequestHandlerState.INCOMPLETE_READ; } finally { - stats.recordNetworkTime(handle, System.nanoTime() - startNs); + if(streamStats != null) + streamStats.reportNetworkTime(Operation.UPDATE_ENTRIES, System.nanoTime() + - startNs); } VAdminProto.UpdatePartitionEntriesRequest.Builder builder = VAdminProto.UpdatePartitionEntriesRequest.newBuilder(); @@ -146,7 +149,9 @@ public StreamRequestHandlerState handleRequest(DataInputStream inputStream, if(logger.isDebugEnabled()) logger.debug("updateEntries (Streaming put) threw ObsoleteVersionException, Ignoring."); } finally { - stats.recordDiskTime(handle, System.nanoTime() - startNs); + if(streamStats != null) + streamStats.reportStorageTime(Operation.UPDATE_ENTRIES, System.nanoTime() + - startNs); } throttler.maybeThrottle(key.length() + AdminServiceRequestHandler.valueSize(value)); @@ -154,9 +159,10 @@ public StreamRequestHandlerState handleRequest(DataInputStream inputStream, // log progress counter++; - handle.incrementEntriesScanned(); + if(streamStats != null) + streamStats.reportStreamingPut(Operation.UPDATE_ENTRIES); - if(0 == counter % 100000) { + if(0 == counter % STAT_RECORDS_INTERVAL) { long totalTime = (System.currentTimeMillis() - startTime) / 1000; logger.info("Update entries updated " + counter + " entries for store '" @@ -181,5 +187,4 @@ public void handleError(DataOutputStream outputStream, VoldemortException e) thr if(logger.isEnabledFor(Level.ERROR)) logger.error("handleUpdatePartitionEntries failed for request(" + request + ")", e); } - } diff --git a/src/java/voldemort/server/protocol/admin/UpdateSlopEntriesRequestHandler.java b/src/java/voldemort/server/protocol/admin/UpdateSlopEntriesRequestHandler.java index 15054d3da4..fc7fa45ba6 100644 --- a/src/java/voldemort/server/protocol/admin/UpdateSlopEntriesRequestHandler.java +++ b/src/java/voldemort/server/protocol/admin/UpdateSlopEntriesRequestHandler.java @@ -4,8 +4,6 @@ import java.io.DataOutputStream; import java.io.EOFException; import java.io.IOException; -import java.util.HashMap; -import java.util.List; import org.apache.log4j.Level; import org.apache.log4j.Logger; @@ -15,10 +13,12 @@ import voldemort.client.protocol.pb.VAdminProto; import voldemort.client.protocol.pb.VAdminProto.UpdateSlopEntriesRequest; import voldemort.server.StoreRepository; +import voldemort.server.VoldemortConfig; import voldemort.server.protocol.StreamRequestHandler; import voldemort.store.ErrorCodeMapper; import voldemort.store.StorageEngine; -import voldemort.store.stats.StreamStats; +import voldemort.store.stats.StreamingStats; +import voldemort.store.stats.StreamingStats.Operation; import voldemort.utils.ByteArray; import voldemort.utils.ByteUtils; import voldemort.versioning.ObsoleteVersionException; @@ -37,26 +37,25 @@ public class UpdateSlopEntriesRequestHandler implements StreamRequestHandler { private final long startTime; - private long counter = 0L; + private long networkTimeNs; - private final StreamStats stats; + private boolean isJmxEnabled; - private final StreamStats.Handle handle; + private long counter = 0L; private final Logger logger = Logger.getLogger(getClass()); public UpdateSlopEntriesRequestHandler(UpdateSlopEntriesRequest request, ErrorCodeMapper errorCodeMapper, StoreRepository storeRepository, - StreamStats stats) { + VoldemortConfig voldemortConfig) { super(); this.request = request; this.errorCodeMapper = errorCodeMapper; this.storeRepository = storeRepository; - this.stats = stats; - this.handle = stats.makeHandle(StreamStats.Operation.SLOP, - new HashMap>()); startTime = System.currentTimeMillis(); + networkTimeNs = 0; + this.isJmxEnabled = voldemortConfig.isJmxEnabled(); } public StreamRequestDirection getDirection() { @@ -71,7 +70,7 @@ public void handleError(DataOutputStream outputStream, VoldemortException e) thr responseBuilder.setError(ProtoUtils.encodeError(errorCodeMapper, e)); if(logger.isEnabledFor(Level.ERROR)) - logger.error("handleUpdatePartitionEntries failed for request(" + request + ")", e); + logger.error("handleUpdateSlopEntries failed for request(" + request + ")", e); } public StreamRequestHandlerState handleRequest(DataInputStream inputStream, @@ -86,26 +85,24 @@ public StreamRequestHandlerState handleRequest(DataInputStream inputStream, } catch(EOFException e) { if(logger.isTraceEnabled()) logger.trace("Incomplete read for message size"); - stats.recordNetworkTime(handle, System.nanoTime() - startNs); + networkTimeNs += System.nanoTime() - startNs; return StreamRequestHandlerState.INCOMPLETE_READ; } if(size == -1) { if(logger.isTraceEnabled()) - logger.trace("Message size -1, completed partition update"); - stats.recordNetworkTime(handle, System.nanoTime() - startNs); - stats.closeHandle(handle); + logger.trace("Message size -1, completed slop update"); return StreamRequestHandlerState.COMPLETE; } if(logger.isTraceEnabled()) - logger.trace("UpdatePartitionEntriesRequest message size: " + size); + logger.trace("UpdateSlopEntriesRequest message size: " + size); byte[] input = new byte[size]; try { ByteUtils.read(inputStream, input); - stats.recordNetworkTime(handle, System.nanoTime() - startNs); + networkTimeNs += System.nanoTime() - startNs; } catch(EOFException e) { if(logger.isTraceEnabled()) logger.trace("Incomplete read for message"); @@ -120,6 +117,13 @@ public StreamRequestHandlerState handleRequest(DataInputStream inputStream, StorageEngine storageEngine = AdminServiceRequestHandler.getStorageEngine(storeRepository, request.getStore()); + StreamingStats streamStats = null; + if(isJmxEnabled) { + streamStats = storeRepository.getStreamingStats(storageEngine.getName()); + streamStats.reportNetworkTime(Operation.SLOP_UPDATE, networkTimeNs); + } + networkTimeNs = 0; + ByteArray key = ProtoUtils.decodeBytes(request.getKey()); VectorClock vectorClock = ProtoUtils.decodeClock(request.getVersion()); @@ -137,7 +141,9 @@ public StreamRequestHandlerState handleRequest(DataInputStream inputStream, byte[] value = ProtoUtils.decodeBytes(request.getValue()).get(); startNs = System.nanoTime(); storageEngine.put(key, Versioned.value(value, vectorClock), transforms); - stats.recordDiskTime(handle, System.nanoTime() - startNs); + if(isJmxEnabled) + streamStats.reportStorageTime(Operation.SLOP_UPDATE, System.nanoTime() + - startNs); if(logger.isTraceEnabled()) logger.trace("updateSlopEntries (Streaming put) successful"); } catch(ObsoleteVersionException e) { @@ -150,7 +156,9 @@ public StreamRequestHandlerState handleRequest(DataInputStream inputStream, try { startNs = System.nanoTime(); storageEngine.delete(key, vectorClock); - stats.recordDiskTime(handle, System.nanoTime() - startNs); + if(isJmxEnabled) + streamStats.reportStorageTime(Operation.SLOP_UPDATE, System.nanoTime() + - startNs); if(logger.isTraceEnabled()) logger.trace("updateSlopEntries (Streaming delete) successful"); @@ -166,7 +174,8 @@ public StreamRequestHandlerState handleRequest(DataInputStream inputStream, // log progress counter++; - handle.incrementEntriesScanned(); + if(isJmxEnabled) + streamStats.reportStreamingPut(Operation.SLOP_UPDATE); if(0 == counter % 100000) { long totalTime = (System.currentTimeMillis() - startTime) / 1000; diff --git a/src/java/voldemort/store/stats/SimpleCounter.java b/src/java/voldemort/store/stats/SimpleCounter.java new file mode 100644 index 0000000000..c169621089 --- /dev/null +++ b/src/java/voldemort/store/stats/SimpleCounter.java @@ -0,0 +1,132 @@ +package voldemort.store.stats; + +import java.util.concurrent.atomic.AtomicLong; + +import voldemort.utils.Time; + +/** + * A simple concurrent, non-blocking event counter that resets itself every + * interval, and provides eventRate and average event value metrics over the + * last interval + * + */ +public class SimpleCounter { + + /** + * Count of total number of events in current interval + */ + AtomicLong eventsCounter; + /** + * Sum of all the event values in the current interval + */ + AtomicLong eventsValueCounter; + /** + * Last time when the counter was reset + */ + AtomicLong lastResetTimeMs; + + /** + * Number of events that occurred in the last interval + */ + long numEventsLastInterval; + + /** + * Sum of all the event values in the the last interval + */ + long totalEventValueLastInterval; + + /** + * Number of events that occurred in the second last interval + */ + long numEventsLastLastInterval; + + /** + * Sum of all the event values in the the second last interval + */ + long totalEventValueLastLastInterval; + + /** + * The counter will be reset once this many ms + */ + final long resetIntervalMs; + + public SimpleCounter(long resetIntervalMs) { + if(resetIntervalMs < 1) { + throw new IllegalArgumentException("Reset interval must be positive"); + } + this.resetIntervalMs = resetIntervalMs; + this.lastResetTimeMs = new AtomicLong(System.currentTimeMillis()); + this.eventsValueCounter = new AtomicLong(0); + this.eventsCounter = new AtomicLong(0); + this.numEventsLastInterval = 0; + this.totalEventValueLastInterval = 0; + this.numEventsLastLastInterval = 0; + this.totalEventValueLastLastInterval = 0; + } + + public void count() { + this.count(0); + } + + public void count(long eventValue) { + resetIfNeeded(); + eventsCounter.incrementAndGet(); + eventsValueCounter.addAndGet(eventValue); + } + + private void resetIfNeeded() { + long currentLastResetTimeMs = lastResetTimeMs.longValue(); + long now = System.currentTimeMillis(); + + // check if interval might have expired + if((now - currentLastResetTimeMs) >= resetIntervalMs) { + long numEvents = eventsCounter.longValue(); + long totalEventValue = eventsValueCounter.longValue(); + // more than one thread can get here concurrently. But exactly one + // will pass the check below + if(lastResetTimeMs.compareAndSet(currentLastResetTimeMs, now)) { + // the synchronization is for any monitoring thread to read a + // consistent state for reporting + synchronized(this) { + // reseting this counters here might be problematic since + // another application thread can go ahead and update the + // counters and we will miss those data points. instead we + // simply update the delta from the current interval. This + // guarantees correctness + numEventsLastLastInterval = numEventsLastInterval; + totalEventValueLastLastInterval = totalEventValueLastInterval; + numEventsLastInterval = numEvents; + totalEventValueLastInterval = totalEventValue; + } + } + } + } + + /** + * Returns the events per second in the current interval + * + * @return + */ + public Double getEventRate() { + resetIfNeeded(); + synchronized(this) { + return (numEventsLastInterval - numEventsLastLastInterval) + / ((1.0 * resetIntervalMs) / Time.MS_PER_SECOND); + } + } + + /** + * Returns the average event value in the current interval + */ + public Double getAvgEventValue() { + resetIfNeeded(); + synchronized(this) { + long eventsLastInterval = numEventsLastInterval - numEventsLastLastInterval; + if(eventsLastInterval > 0) + return ((totalEventValueLastInterval - totalEventValueLastLastInterval) * 1.0) + / eventsLastInterval; + else + return 0.0; + } + } +} diff --git a/src/java/voldemort/store/stats/StreamStats.java b/src/java/voldemort/store/stats/StreamStats.java deleted file mode 100644 index 436c0993df..0000000000 --- a/src/java/voldemort/store/stats/StreamStats.java +++ /dev/null @@ -1,220 +0,0 @@ -package voldemort.store.stats; - -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.atomic.AtomicLong; - -import voldemort.utils.Time; - -import com.google.common.collect.ImmutableList; - -public class StreamStats { - - private static final int MAX_ENTRIES = 64; - - private final Map handles; - private final AtomicLong handleIdGenerator; - private final ConcurrentMap networkCounter; - private final ConcurrentMap diskCounter; - - public StreamStats() { - this.handles = Collections.synchronizedMap(new Cache(MAX_ENTRIES)); - this.handleIdGenerator = new AtomicLong(0L); - this.networkCounter = new ConcurrentHashMap(); - this.diskCounter = new ConcurrentHashMap(); - - for(Operation operation: Operation.values()) { - networkCounter.put(operation, new RequestCounter(300000)); - diskCounter.put(operation, new RequestCounter(30000)); - } - } - - public Handle makeHandle(Operation operation, - HashMap> replicaToPartitionList) { - Handle handle = new Handle(handleIdGenerator.getAndIncrement(), - operation, - System.currentTimeMillis(), - replicaToPartitionList); - handles.put(handle.getId(), handle); - return handle; - } - - public void closeHandle(Handle handle) { - handle.setFinished(true); - } - - public void clearFinished() { - for(long handleId: getHandleIds()) { - if(getHandle(handleId).isFinished()) - handles.remove(handleId); - } - } - - protected Handle getHandle(long handleId) { - if(!handles.containsKey(handleId)) - throw new IllegalArgumentException("No handle with id " + handleId); - - return handles.get(handleId); - } - - public Collection getHandleIds() { - return ImmutableList.copyOf(handles.keySet()); - } - - public Collection getHandles() { - return ImmutableList.copyOf(handles.values()); - } - - public void recordNetworkTime(Handle handle, long timeNs) { - networkCounter.get(handle.getOperation()).addRequest(timeNs); - } - - public void recordDiskTime(Handle handle, long timeNs) { - diskCounter.get(handle.getOperation()).addRequest(timeNs); - } - - public RequestCounter getNetworkCounter(Operation operation) { - return networkCounter.get(operation); - } - - public RequestCounter getDiskCounter(Operation operation) { - return diskCounter.get(operation); - } - - public enum Operation { - FETCH_KEYS, - FETCH_ENTRIES, - FETCH_FILE, - UPDATE, - SLOP, - DELETE, - } - - private static class Cache extends LinkedHashMap { - - private static final long serialVersionUID = 1L; - - private final int maxEntries; - - public Cache(int maxEntries) { - super(); - this.maxEntries = maxEntries; - } - - @Override - protected boolean removeEldestEntry(Map.Entry eldest) { - return eldest.getValue().isFinished() && size() > maxEntries; - } - } - - public static class Handle { - - private final long id; - private final Operation operation; - private final long startedMs; - private final HashMap> replicaToPartitionList; - private final AtomicLong entriesScanned; - private final AtomicLong timeNetworkNs; - private final AtomicLong timeDiskNs; - private volatile boolean finished; - - private Handle(long id, - Operation operation, - long startedMs, - HashMap> replicaToPartitionList) { - this.id = id; - this.operation = operation; - this.startedMs = startedMs; - this.replicaToPartitionList = replicaToPartitionList; - this.entriesScanned = new AtomicLong(0L); - this.timeNetworkNs = new AtomicLong(0L); - this.timeDiskNs = new AtomicLong(0L); - this.finished = false; - } - - public long getId() { - return id; - } - - public long getStartedMs() { - return startedMs; - } - - public Operation getOperation() { - return operation; - } - - public long getEntriesScanned() { - return entriesScanned.get(); - } - - public long incrementEntriesScanned() { - return entriesScanned.incrementAndGet(); - } - - public void setEntriesScanned(long newVal) { - entriesScanned.set(newVal); - } - - public long getEntriesPerSecond() { - long elapsedSecs = System.currentTimeMillis() - startedMs; - if(elapsedSecs == 0L) - return 0L; - return getEntriesScanned() / elapsedSecs; - } - - public boolean isFinished() { - return finished; - } - - public void setFinished(boolean finished) { - this.finished = finished; - } - - public HashMap> getReplicaToPartitionList() { - return replicaToPartitionList; - } - - public void recordTimeNetwork(long deltaNs) { - timeNetworkNs.addAndGet(deltaNs); - } - - public long getTimeNetworkNs() { - return timeNetworkNs.get(); - } - - public void recordTimeDisk(long deltaNs) { - timeDiskNs.addAndGet(deltaNs); - } - - public long getTimeDiskNs() { - return timeDiskNs.get(); - } - - public double getPercentDisk() { - long timeDiskMs = getTimeDiskNs() / Time.NS_PER_MS; - return (timeDiskMs * 100.0) / (System.currentTimeMillis() - startedMs); - } - - public double getPercentNetwork() { - long timeNetworkMs = getTimeNetworkNs() / Time.NS_PER_MS; - return (timeNetworkMs * 100.0) / (System.currentTimeMillis() - startedMs); - } - - @Override - public String toString() { - return "Handle{" + "id=" + id + ", operation=" + operation + ", startedMs=" + startedMs - + ", replicaToPartitionList=" + getReplicaToPartitionList() - + ", entriesScanned=" + getEntriesScanned() + ", finished=" + finished - + ", entriesPerSecond=" + getEntriesPerSecond() + ", timeDiskNs=" - + getTimeDiskNs() + ", timeNetworkNs=" + getTimeNetworkNs() + ", percentDisk=" - + getPercentDisk() + ", percentNetwork=" + getPercentNetwork() + '}'; - } - } -} diff --git a/src/java/voldemort/store/stats/StreamStatsJmx.java b/src/java/voldemort/store/stats/StreamStatsJmx.java deleted file mode 100644 index b483b476c7..0000000000 --- a/src/java/voldemort/store/stats/StreamStatsJmx.java +++ /dev/null @@ -1,107 +0,0 @@ -package voldemort.store.stats; - -import org.apache.log4j.Logger; -import voldemort.annotations.jmx.JmxGetter; -import voldemort.annotations.jmx.JmxManaged; -import voldemort.annotations.jmx.JmxOperation; - -@JmxManaged(description = "Streaming related statistics") -public class StreamStatsJmx { - private final static Logger logger = Logger.getLogger(StreamStatsJmx.class); - - private final StreamStats stats; - - public StreamStatsJmx(StreamStats stats) { - this.stats = stats; - } - - @JmxGetter(name = "streamOperationIds", description = "Get a list of all stream operations.") - public String getStreamOperationIds() { - try { - return stats.getHandleIds().toString(); - } catch(Exception e) { - logger.error("Exception in JMX call", e); - return e.getMessage(); - } - } - - @JmxGetter(name = "allStreamOperations", description = "Get status of all stream operations.") - public String getAllStreamOperations() { - try { - return stats.getHandles().toString(); - } catch(Exception e) { - logger.error("Exception in JMX call", e); - return e.getMessage(); - } - } - - @JmxOperation(description = "Get the status of a stream operation with specified id.") - public String getStreamOperation(long handleId) { - try { - return stats.getHandle(handleId).toString(); - } catch(Exception e) { - logger.error("Exception in JMX call", e); - return e.getMessage(); - } - } - - @JmxOperation(description = "Clear out finished tasks.") - public void clearFinished() { - stats.clearFinished(); - } - - // Disk statistics - - @JmxGetter(name = "averageFetchKeysDiskTimeMs", description = "The avg. disk time in ms per FETCH_KEYS operation.") - public double getAvgFetchKeysDiskTimeMs() { - return stats.getDiskCounter(StreamStats.Operation.FETCH_KEYS).getAverageTimeInMs(); - } - - @JmxGetter(name = "averageFetchEntriesDiskTimeMs", description = "The avg. disk time in ms per FETCH_ENTRIES operation.") - public double getAvgFetchEntriesDiskTimeMs() { - return stats.getDiskCounter(StreamStats.Operation.FETCH_ENTRIES).getAverageTimeInMs(); - } - - @JmxGetter(name = "averageFetchFileDiskTimeMs", description = "The avg. disk time in ms per FETCH_FILE operation.") - public double getAvgFetchFileDiskTimeMs() { - return stats.getDiskCounter(StreamStats.Operation.FETCH_FILE).getAverageTimeInMs(); - } - - @JmxGetter(name = "averageUpdateDiskTimeMs", description = "The avg. disk time in ms per UPDATE operation.") - public double getAvgUpdateDiskTimeMs() { - return stats.getDiskCounter(StreamStats.Operation.UPDATE).getAverageTimeInMs(); - } - - @JmxGetter(name = "averageSlopDiskTimeMs", description = "The avg. disk time in ms per UPDATE_SLOP operation.") - public double getAvgSlopDiskTimeMs() { - return stats.getDiskCounter(StreamStats.Operation.SLOP).getAverageTimeInMs(); - } - - - // Network statistics - - @JmxGetter(name = "averageFetchKeysNetworkTimeMs", description = "The avg. network time in ms per FETCH_KEYS operation.") - public double getAvgFetchKeysNetworkTimeMs() { - return stats.getNetworkCounter(StreamStats.Operation.FETCH_KEYS).getAverageTimeInMs(); - } - - @JmxGetter(name = "averageFetchEntriesNetworkTimeMs", description = "The avg. network time in ms per FETCH_ENTRIES operation.") - public double getAvgFetchEntriesNetworkTimeMs() { - return stats.getNetworkCounter(StreamStats.Operation.FETCH_ENTRIES).getAverageTimeInMs(); - } - - @JmxGetter(name = "averageFetchFileNetworkTimeMs", description = "The avg. network time in ms per FETCH_FILE operation.") - public double getAvgFetchFileNetworkTimeMs() { - return stats.getNetworkCounter(StreamStats.Operation.FETCH_FILE).getAverageTimeInMs(); - } - - @JmxGetter(name = "averageUpdateNetworkTimeMs", description = "The avg. network time in ms per UPDATE operation.") - public double getAvgUpdateNetworkTimeMs() { - return stats.getNetworkCounter(StreamStats.Operation.UPDATE).getAverageTimeInMs(); - } - - @JmxGetter(name = "averageSlopNetworkTimeMs", description = "The avg. network time in ms per UPDATE_SLOP operation.") - public double getAvgSlopNetworkTimeMs() { - return stats.getNetworkCounter(StreamStats.Operation.SLOP).getAverageTimeInMs(); - } -} diff --git a/src/java/voldemort/store/stats/StreamingStats.java b/src/java/voldemort/store/stats/StreamingStats.java new file mode 100644 index 0000000000..14644ff8c5 --- /dev/null +++ b/src/java/voldemort/store/stats/StreamingStats.java @@ -0,0 +1,188 @@ +package voldemort.store.stats; + +import java.util.HashMap; + +import voldemort.annotations.jmx.JmxGetter; +import voldemort.utils.Time; + +public class StreamingStats { + + public enum Operation { + FETCH_KEYS, + FETCH_ENTRIES, + FETCH_FILE, + UPDATE_ENTRIES, + SLOP_UPDATE + } + + private static final int STREAMING_STATS_RESET_INTERVAL_MS = 1000; + private StreamingStats parent; + private HashMap networkTimeCounterMap; + private HashMap storageTimeCounterMap; + private HashMap streamingPutCounterMap; + private HashMap streamingFetchCounterMap; + private HashMap streamingScanCounterMap; + + public StreamingStats() { + networkTimeCounterMap = new HashMap(); + storageTimeCounterMap = new HashMap(); + streamingPutCounterMap = new HashMap(); + streamingFetchCounterMap = new HashMap(); + streamingScanCounterMap = new HashMap(); + + // create the counters for each operation + networkTimeCounterMap.put(Operation.FETCH_KEYS, + new SimpleCounter(STREAMING_STATS_RESET_INTERVAL_MS)); + networkTimeCounterMap.put(Operation.FETCH_ENTRIES, + new SimpleCounter(STREAMING_STATS_RESET_INTERVAL_MS)); + networkTimeCounterMap.put(Operation.UPDATE_ENTRIES, + new SimpleCounter(STREAMING_STATS_RESET_INTERVAL_MS)); + networkTimeCounterMap.put(Operation.SLOP_UPDATE, + new SimpleCounter(STREAMING_STATS_RESET_INTERVAL_MS)); + + storageTimeCounterMap.put(Operation.FETCH_KEYS, + new SimpleCounter(STREAMING_STATS_RESET_INTERVAL_MS)); + storageTimeCounterMap.put(Operation.FETCH_ENTRIES, + new SimpleCounter(STREAMING_STATS_RESET_INTERVAL_MS)); + storageTimeCounterMap.put(Operation.UPDATE_ENTRIES, + new SimpleCounter(STREAMING_STATS_RESET_INTERVAL_MS)); + storageTimeCounterMap.put(Operation.SLOP_UPDATE, + new SimpleCounter(STREAMING_STATS_RESET_INTERVAL_MS)); + + streamingPutCounterMap.put(Operation.SLOP_UPDATE, + new SimpleCounter(STREAMING_STATS_RESET_INTERVAL_MS)); + streamingPutCounterMap.put(Operation.UPDATE_ENTRIES, + new SimpleCounter(STREAMING_STATS_RESET_INTERVAL_MS)); + + streamingFetchCounterMap.put(Operation.FETCH_KEYS, + new SimpleCounter(STREAMING_STATS_RESET_INTERVAL_MS)); + streamingFetchCounterMap.put(Operation.FETCH_ENTRIES, + new SimpleCounter(STREAMING_STATS_RESET_INTERVAL_MS)); + streamingFetchCounterMap.put(Operation.FETCH_FILE, + new SimpleCounter(STREAMING_STATS_RESET_INTERVAL_MS)); + + streamingScanCounterMap.put(Operation.FETCH_KEYS, + new SimpleCounter(STREAMING_STATS_RESET_INTERVAL_MS)); + streamingScanCounterMap.put(Operation.FETCH_ENTRIES, + new SimpleCounter(STREAMING_STATS_RESET_INTERVAL_MS)); + } + + public StreamingStats(StreamingStats parent) { + this(); + this.parent = parent; + } + + public void reportNetworkTime(Operation op, long networkTimeMs) { + networkTimeCounterMap.get(op).count(networkTimeMs); + if(parent != null) + parent.reportNetworkTime(op, networkTimeMs); + } + + public void reportStorageTime(Operation op, long storageTimeMs) { + storageTimeCounterMap.get(op).count(storageTimeMs); + if(parent != null) + parent.reportStorageTime(op, storageTimeMs); + } + + public void reportStreamingFetch(Operation op) { + streamingFetchCounterMap.get(op).count(); + if(parent != null) + parent.reportStreamingFetch(op); + } + + public void reportStreamingScan(Operation op) { + streamingScanCounterMap.get(op).count(); + if(parent != null) + parent.reportStreamingScan(op); + } + + public void reportStreamingPut(Operation op) { + streamingPutCounterMap.get(op).count(); + if(parent != null) + parent.reportStreamingPut(op); + } + + // Mbeans for FETCH_KEYS + @JmxGetter(name = "avgFetchKeysNetworkTimeMs", description = "average time spent on network, for fetch keys") + public double getAvgFetchKeysNetworkTimeMs() { + return networkTimeCounterMap.get(Operation.FETCH_KEYS).getAvgEventValue() / Time.NS_PER_MS; + } + + @JmxGetter(name = "avgFetchKeysStorageTimeMs", description = "average time spent on storage, for fetch keys") + public double getAvgFetchKeysStorageTimeMs() { + return storageTimeCounterMap.get(Operation.FETCH_KEYS).getAvgEventValue() / Time.NS_PER_MS; + } + + @JmxGetter(name = "getFetchKeysFetchRate", description = "rate at which keys are fetched") + public double getFetchKeysFetchRate() { + return streamingFetchCounterMap.get(Operation.FETCH_KEYS).getEventRate(); + } + + @JmxGetter(name = "getFetchKeysScanRate", description = "rate at which keys are scanned") + public double getFetchKeysScanRate() { + return streamingScanCounterMap.get(Operation.FETCH_KEYS).getEventRate(); + } + + // Mbeans for FETCH_ENTRIES + @JmxGetter(name = "avgFetchEntriesNetworkTimeMs", description = "average time spent on network, for streaming operations") + public double getAvgFetchEntriesNetworkTimeMs() { + return networkTimeCounterMap.get(Operation.FETCH_ENTRIES).getAvgEventValue() + / Time.NS_PER_MS; + } + + @JmxGetter(name = "avgFetchEntriesStorageTimeMs", description = "average time spent on storage, for streaming operations") + public double getAvgFetchEntriesStorageTimeMs() { + return storageTimeCounterMap.get(Operation.FETCH_ENTRIES).getAvgEventValue() + / Time.NS_PER_MS; + } + + @JmxGetter(name = "getFetchEntriesFetchRate", description = "rate at which entries are fetched") + public double getFetchEntriesFetchRate() { + return streamingFetchCounterMap.get(Operation.FETCH_ENTRIES).getEventRate(); + } + + @JmxGetter(name = "getFetchEntriesScanRate", description = "rate at which entries are scanned") + public double getFetchEntriesScanRate() { + return streamingScanCounterMap.get(Operation.FETCH_ENTRIES).getEventRate(); + } + + // Mbeans for FETCH_FILE + @JmxGetter(name = "getFetchFileFetchRate", description = "rate at which RO files are fetched") + public double getFetchFileFetchRate() { + return streamingFetchCounterMap.get(Operation.FETCH_FILE).getEventRate(); + } + + // Mbeans for UPDATE_ENTRIES + @JmxGetter(name = "avgUpdateEntriesNetworkTimeMs", description = "average time spent on network, for streaming operations") + public double getAvgUpdateEntriesNetworkTimeMs() { + return networkTimeCounterMap.get(Operation.UPDATE_ENTRIES).getAvgEventValue() + / Time.NS_PER_MS; + } + + @JmxGetter(name = "avgUpdateEntriesStorageTimeMs", description = "average time spent on storage, for streaming operations") + public double getAvgUpdateEntriesStorageTimeMs() { + return storageTimeCounterMap.get(Operation.UPDATE_ENTRIES).getAvgEventValue() + / Time.NS_PER_MS; + } + + @JmxGetter(name = "getUpdateEntriesPutRate", description = "rate at which entries are streaming in") + public double getUpdateEntriesPutRate() { + return streamingPutCounterMap.get(Operation.UPDATE_ENTRIES).getEventRate(); + } + + // Mbeans for SLOP_UPDATE + @JmxGetter(name = "avgSlopUpdateNetworkTimeMs", description = "average time spent on network, for streaming operations") + public double getAvgSlopUpdateNetworkTimeMs() { + return networkTimeCounterMap.get(Operation.SLOP_UPDATE).getAvgEventValue() / Time.NS_PER_MS; + } + + @JmxGetter(name = "avgSlopUpdateStorageTimeMs", description = "average time spent on storage, for streaming operations") + public double getAvgSlopUpdateStorageTimeMs() { + return storageTimeCounterMap.get(Operation.SLOP_UPDATE).getAvgEventValue() / Time.NS_PER_MS; + } + + @JmxGetter(name = "getSlopUpdatePutRate", description = "Rate at which slop entries are written to the server") + public double getSlopUpdatePutRate() { + return streamingPutCounterMap.get(Operation.SLOP_UPDATE).getEventRate(); + } +} diff --git a/test/unit/voldemort/store/stats/SimpleCounterTest.java b/test/unit/voldemort/store/stats/SimpleCounterTest.java new file mode 100644 index 0000000000..9feca98b71 --- /dev/null +++ b/test/unit/voldemort/store/stats/SimpleCounterTest.java @@ -0,0 +1,154 @@ +package voldemort.store.stats; + +import static org.junit.Assert.assertEquals; + +import java.util.Arrays; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import org.junit.Before; +import org.junit.Test; + +import voldemort.utils.Time; + +public class SimpleCounterTest { + + final private static int COUNTER_RESET_INTERVAL_MS = 50; + private SimpleCounter simpleCounter; + + @Before + public void setUp() { + simpleCounter = new SimpleCounter(COUNTER_RESET_INTERVAL_MS); + } + + private static void sleepForResetInterval() { + try { + Thread.sleep(COUNTER_RESET_INTERVAL_MS); + } catch(InterruptedException e) { + e.printStackTrace(); + } + } + + @Test + public void testSingleThread() { + // Interval 0 + assertEquals(0.0, simpleCounter.getAvgEventValue(), 0.0); + assertEquals(0.0, simpleCounter.getEventRate(), 0.0); + + // Interval 1 + // add some samples + for(int i = 0; i < 10; i++) + simpleCounter.count(); + sleepForResetInterval(); + + // Interval 2 + for(int i = 0; i < 10; i++) + simpleCounter.count(100); + // verify the stats returned are for the first interval + assertEquals(0.0, simpleCounter.getAvgEventValue(), 0.0); + assertEquals(10 / ((COUNTER_RESET_INTERVAL_MS * 1.0) / Time.MS_PER_SECOND), + simpleCounter.getEventRate(), + 0.0); + sleepForResetInterval(); + + // Interval 3 + // verify the stats returned are for the second interval and that + // multiple calls during the current interval will always provide the + // same result + for(int i = 0; i < 10; i++) { + assertEquals(100.0, simpleCounter.getAvgEventValue(), 0.0); + assertEquals(10 / ((COUNTER_RESET_INTERVAL_MS * 1.0) / Time.MS_PER_SECOND), + simpleCounter.getEventRate(), + 0.0); + } + sleepForResetInterval(); + + // No activity + assertEquals(0.0, simpleCounter.getAvgEventValue(), 0.0); + assertEquals(0.0, simpleCounter.getEventRate(), 0.0); + } + + @Test + public void testMultipleThreads() throws InterruptedException { + ExecutorService executorService = null; + try { + final int NUM_THREADS = 5; + final int NUM_OPS = 10000; + executorService = Executors.newFixedThreadPool(NUM_THREADS); + final CountDownLatch latch1 = new CountDownLatch(NUM_THREADS); + for(int i = 0; i < NUM_THREADS; i++) { + final int threadId = i; + executorService.submit(new Runnable() { + + public void run() { + // additional sleep so they all start together and run + // concurrently + try { + sleepForResetInterval(); + for(int j = 0; j < NUM_OPS; j++) { + simpleCounter.count(100 * (threadId + 1)); + } + } finally { + latch1.countDown(); + } + } + }); + } + latch1.await(); + // one more sleep so we expire the current interval where all the + // action happened + sleepForResetInterval(); + + assertEquals(300.0, simpleCounter.getAvgEventValue(), 0.0); + assertEquals((NUM_OPS * NUM_THREADS) + / ((COUNTER_RESET_INTERVAL_MS * 1.0) / Time.MS_PER_SECOND), + simpleCounter.getEventRate(), + 0.0); + sleepForResetInterval(); + + // Run for a long period spannning multiple intervals and see if we + // observe if we see consitent metrics + final ConcurrentLinkedQueue observedEventRate = new ConcurrentLinkedQueue(); + final ConcurrentLinkedQueue observedEventValueAvg = new ConcurrentLinkedQueue(); + final int NUM_INTERVALS = 30; + final CountDownLatch latch2 = new CountDownLatch(NUM_THREADS); + + for(int i = 0; i < NUM_THREADS; i++) { + final int threadId = i; + executorService.submit(new Runnable() { + + public void run() { + try { + for(int interval = 0; interval < NUM_INTERVALS; interval++) { + sleepForResetInterval(); + for(int j = 0; j < NUM_OPS; j++) { + simpleCounter.count(100); + } + } + observedEventRate.add(simpleCounter.getEventRate()); + observedEventValueAvg.add(simpleCounter.getAvgEventValue()); + } finally { + latch2.countDown(); + } + } + }); + } + latch2.await(); + Object[] actualEventRates = new Object[NUM_THREADS]; + Object[] actualEventValueAvgs = new Object[NUM_THREADS]; + for(int i = 0; i < NUM_THREADS; i++) { + actualEventRates[i] = (NUM_OPS * NUM_THREADS) + / ((COUNTER_RESET_INTERVAL_MS * 1.0) / Time.MS_PER_SECOND); + actualEventValueAvgs[i] = 100.0; + } + assertEquals(Arrays.equals(observedEventRate.toArray(), actualEventRates), true); + assertEquals(Arrays.equals(observedEventValueAvg.toArray(), actualEventValueAvgs), true); + + } finally { + if(executorService != null) + executorService.shutdown(); + } + } +}