Skip to content

Commit

Permalink
Monitoring for streaming operations
Browse files Browse the repository at this point in the history
  • Loading branch information
vinothchandar committed Jan 11, 2013
1 parent 4ae1336 commit 903e749
Show file tree
Hide file tree
Showing 18 changed files with 677 additions and 469 deletions.
56 changes: 56 additions & 0 deletions src/java/voldemort/server/StoreRepository.java
Expand Up @@ -12,7 +12,9 @@
import voldemort.store.StorageEngine;
import voldemort.store.Store;
import voldemort.store.slop.SlopStorageEngine;
import voldemort.store.stats.StreamingStats;
import voldemort.utils.ByteArray;
import voldemort.utils.JmxUtils;
import voldemort.utils.Pair;

/**
Expand Down Expand Up @@ -59,6 +61,16 @@ public class StoreRepository {
*/
private final ConcurrentMap<String, StorageEngine<ByteArray, byte[], byte[]>> storageEngines;

/**
* Aggregated statistics about streaming operations
*/
private StreamingStats aggregatedStreamStats;
/**
* Maintains statistics about streaming reads/writes performed against all
* the local storage engines in this node
*/
private ConcurrentMap<String, StreamingStats> streamingStatsMap;

/*
* Routed stores that write and read from multiple nodes
*/
Expand All @@ -80,10 +92,21 @@ public class StoreRepository {
*/
private RepairJob repairJob;

/**
* Constructor invoked by tests
*/
public StoreRepository() {
this(true);
}

public StoreRepository(boolean jmxEnabled) {
super();
this.localStores = new ConcurrentHashMap<String, Store<ByteArray, byte[], byte[]>>();
this.storageEngines = new ConcurrentHashMap<String, StorageEngine<ByteArray, byte[], byte[]>>();
if(jmxEnabled) {
this.streamingStatsMap = new ConcurrentHashMap<String, StreamingStats>();
this.aggregatedStreamStats = new StreamingStats();
}
this.routedStores = new ConcurrentHashMap<String, Store<ByteArray, byte[], byte[]>>();
this.nodeStores = new ConcurrentHashMap<Pair<String, Integer>, Store<ByteArray, byte[], byte[]>>();
this.redirectingSocketStores = new ConcurrentHashMap<Pair<String, Integer>, Store<ByteArray, byte[], byte[]>>();
Expand Down Expand Up @@ -127,9 +150,38 @@ public void addStorageEngine(StorageEngine<ByteArray, byte[], byte[]> engine) {
if(found != null)
throw new VoldemortException("Storage Engine '" + engine.getName()
+ "' has already been initialized.");

// register streaming stats object for the store
if(streamingStatsMap != null) {
// lazily register the aggregated mbean
if(storageEngines.size() == 1) {
JmxUtils.registerMbean(aggregatedStreamStats,
JmxUtils.createObjectName(this.getClass().getCanonicalName(),
"aggregated-streaming-stats"));
}

StreamingStats stat = new StreamingStats(aggregatedStreamStats);
JmxUtils.registerMbean(stat, JmxUtils.createObjectName(this.getClass()
.getCanonicalName(),
engine.getName()
+ "-streaming-stats"));
streamingStatsMap.putIfAbsent(engine.getName(), stat);
}
}

public Store<ByteArray, byte[], byte[]> removeStorageEngine(String storeName) {
// register streaming stats object for the store
if(streamingStatsMap != null) {
JmxUtils.unregisterMbean(JmxUtils.createObjectName(this.getClass().getCanonicalName(),
storeName));
streamingStatsMap.remove(storeName);
// lazily unregister the aggregated mbean
if(storageEngines.size() == 1) {
JmxUtils.unregisterMbean(JmxUtils.createObjectName(this.getClass()
.getCanonicalName(),
"aggregated-streaming-stats"));
}
}
return this.storageEngines.remove(storeName);
}

Expand Down Expand Up @@ -234,4 +286,8 @@ public RepairJob getRepairJob() {
public void registerRepairJob(RepairJob job) {
repairJob = job;
}

public StreamingStats getStreamingStats(String store) {
return streamingStatsMap.get(store);
}
}
4 changes: 2 additions & 2 deletions src/java/voldemort/server/VoldemortServer.java
Expand Up @@ -83,7 +83,7 @@ public class VoldemortServer extends AbstractService {
public VoldemortServer(VoldemortConfig config) {
super(ServiceType.VOLDEMORT);
this.voldemortConfig = config;
this.storeRepository = new StoreRepository();
this.storeRepository = new StoreRepository(config.isJmxEnabled());
this.metadata = MetadataStore.readFromDirectory(new File(this.voldemortConfig.getMetadataDirectory()),
voldemortConfig.getNodeId());
this.identityNode = metadata.getCluster().getNodeById(voldemortConfig.getNodeId());
Expand All @@ -97,7 +97,7 @@ public VoldemortServer(VoldemortConfig config, Cluster cluster) {
this.identityNode = cluster.getNodeById(voldemortConfig.getNodeId());

this.checkHostName();
this.storeRepository = new StoreRepository();
this.storeRepository = new StoreRepository(config.isJmxEnabled());
// update cluster details in metaDataStore
ConfigurationStorageEngine metadataInnerEngine = new ConfigurationStorageEngine("metadata-config-store",
voldemortConfig.getMetadataDirectory());
Expand Down
Expand Up @@ -12,9 +12,6 @@
import voldemort.server.storage.StorageService;
import voldemort.store.ErrorCodeMapper;
import voldemort.store.metadata.MetadataStore;
import voldemort.store.stats.StreamStats;
import voldemort.store.stats.StreamStatsJmx;
import voldemort.utils.JmxUtils;

/**
* A factory that gets the appropriate request handler for a given
Expand All @@ -30,7 +27,6 @@ public class SocketRequestHandlerFactory implements RequestHandlerFactory {
private final VoldemortConfig voldemortConfig;
private final AsyncOperationService asyncService;
private final Rebalancer rebalancer;
private final StreamStats stats;

public SocketRequestHandlerFactory(StorageService storageService,
StoreRepository repository,
Expand All @@ -44,17 +40,6 @@ public SocketRequestHandlerFactory(StorageService storageService,
this.voldemortConfig = voldemortConfig;
this.asyncService = asyncService;
this.rebalancer = rebalancer;
this.stats = new StreamStats();
if(null != voldemortConfig && voldemortConfig.isJmxEnabled())
if(this.voldemortConfig.isEnableJmxClusterName())
JmxUtils.registerMbean(new StreamStatsJmx(stats),
JmxUtils.createObjectName(metadata.getCluster().getName()
+ ".voldemort.store.stats",
"admin-streaming"));
else
JmxUtils.registerMbean(new StreamStatsJmx(stats),
JmxUtils.createObjectName("voldemort.store.stats",
"admin-streaming"));
}

public RequestHandler getRequestHandler(RequestFormatType type) {
Expand All @@ -76,8 +61,7 @@ public RequestHandler getRequestHandler(RequestFormatType type) {
metadata,
voldemortConfig,
asyncService,
rebalancer,
stats);
rebalancer);
default:
throw new VoldemortException("Unknown wire format " + type);
}
Expand Down
2 changes: 2 additions & 0 deletions src/java/voldemort/server/protocol/StreamRequestHandler.java
Expand Up @@ -15,6 +15,8 @@

public interface StreamRequestHandler {

public final static int STAT_RECORDS_INTERVAL = 100000;

/**
* Handles a "segment" of a streaming request.
*
Expand Down
Expand Up @@ -60,7 +60,6 @@
import voldemort.store.readonly.ReadOnlyStorageEngine;
import voldemort.store.readonly.ReadOnlyUtils;
import voldemort.store.slop.SlopStorageEngine;
import voldemort.store.stats.StreamStats;
import voldemort.utils.ByteArray;
import voldemort.utils.ByteUtils;
import voldemort.utils.ClosableIterator;
Expand Down Expand Up @@ -97,7 +96,6 @@ public class AdminServiceRequestHandler implements RequestHandler {
private final VoldemortConfig voldemortConfig;
private final AsyncOperationService asyncService;
private final Rebalancer rebalancer;
private final StreamStats stats;
private FileFetcher fileFetcher;

public AdminServiceRequestHandler(ErrorCodeMapper errorCodeMapper,
Expand All @@ -106,8 +104,7 @@ public AdminServiceRequestHandler(ErrorCodeMapper errorCodeMapper,
MetadataStore metadataStore,
VoldemortConfig voldemortConfig,
AsyncOperationService asyncService,
Rebalancer rebalancer,
StreamStats stats) {
Rebalancer rebalancer) {
this.errorCodeMapper = errorCodeMapper;
this.storageService = storageService;
this.metadataStore = metadataStore;
Expand All @@ -117,7 +114,6 @@ public AdminServiceRequestHandler(ErrorCodeMapper errorCodeMapper,
.getContextClassLoader());
this.asyncService = asyncService;
this.rebalancer = rebalancer;
this.stats = stats;
setFetcherClass(voldemortConfig);
}

Expand Down Expand Up @@ -532,12 +528,14 @@ public StreamRequestHandler handleFetchROPartitionFiles(VAdminProto.FetchPartiti
return new FetchPartitionFileStreamRequestHandler(request,
metadataStore,
voldemortConfig,
storeRepository,
stats);
storeRepository);
}

public StreamRequestHandler handleUpdateSlopEntries(VAdminProto.UpdateSlopEntriesRequest request) {
return new UpdateSlopEntriesRequestHandler(request, errorCodeMapper, storeRepository, stats);
return new UpdateSlopEntriesRequestHandler(request,
errorCodeMapper,
storeRepository,
voldemortConfig);
}

public StreamRequestHandler handleFetchPartitionEntries(VAdminProto.FetchPartitionEntriesRequest request) {
Expand All @@ -552,33 +550,29 @@ public StreamRequestHandler handleFetchPartitionEntries(VAdminProto.FetchPartiti
errorCodeMapper,
voldemortConfig,
storeRepository,
networkClassLoader,
stats);
networkClassLoader);
else
return new FetchEntriesStreamRequestHandler(request,
metadataStore,
errorCodeMapper,
voldemortConfig,
storeRepository,
networkClassLoader,
stats);
networkClassLoader);
} else {
if(storageEngine.isPartitionScanSupported())
return new FetchPartitionKeysStreamRequestHandler(request,
metadataStore,
errorCodeMapper,
voldemortConfig,
storeRepository,
networkClassLoader,
stats);
networkClassLoader);
else
return new FetchKeysStreamRequestHandler(request,
metadataStore,
errorCodeMapper,
voldemortConfig,
storeRepository,
networkClassLoader,
stats);
networkClassLoader);
}
}

Expand All @@ -587,8 +581,7 @@ public StreamRequestHandler handleUpdatePartitionEntries(VAdminProto.UpdateParti
errorCodeMapper,
voldemortConfig,
storeRepository,
networkClassLoader,
stats);
networkClassLoader);
}

public VAdminProto.AsyncOperationListResponse handleAsyncOperationList(VAdminProto.AsyncOperationListRequest request) {
Expand Down
Expand Up @@ -12,12 +12,12 @@
import voldemort.server.VoldemortConfig;
import voldemort.store.ErrorCodeMapper;
import voldemort.store.metadata.MetadataStore;
import voldemort.store.stats.StreamStats;
import voldemort.store.stats.StreamStats.Operation;
import voldemort.store.stats.StreamingStats.Operation;
import voldemort.utils.ByteArray;
import voldemort.utils.ClosableIterator;
import voldemort.utils.NetworkClassLoader;
import voldemort.utils.RebalanceUtils;
import voldemort.utils.Time;
import voldemort.versioning.Versioned;

import com.google.protobuf.Message;
Expand All @@ -39,15 +39,13 @@ public FetchEntriesStreamRequestHandler(FetchPartitionEntriesRequest request,
ErrorCodeMapper errorCodeMapper,
VoldemortConfig voldemortConfig,
StoreRepository storeRepository,
NetworkClassLoader networkClassLoader,
StreamStats stats) {
NetworkClassLoader networkClassLoader) {
super(request,
metadataStore,
errorCodeMapper,
voldemortConfig,
storeRepository,
networkClassLoader,
stats,
Operation.FETCH_ENTRIES);
this.keyIterator = storageEngine.keys();
logger.info("Starting fetch entries for store '" + storageEngine.getName()
Expand All @@ -62,6 +60,8 @@ public StreamRequestHandlerState handleRequest(DataInputStream inputStream,

long startNs = System.nanoTime();
ByteArray key = keyIterator.next();
if(streamStats != null)
streamStats.reportStreamingScan(operation);

if(RebalanceUtils.checkKeyBelongsToPartition(nodeId,
key.get(),
Expand All @@ -71,12 +71,14 @@ public StreamRequestHandlerState handleRequest(DataInputStream inputStream,

&& counter % skipRecords == 0) {
List<Versioned<byte[]>> values = storageEngine.get(key, null);
stats.recordDiskTime(handle, System.nanoTime() - startNs);
if(streamStats != null)
streamStats.reportStorageTime(operation, System.nanoTime() - startNs);
for(Versioned<byte[]> value: values) {
throttler.maybeThrottle(key.length());
if(filter.accept(key, value)) {
fetched++;
handle.incrementEntriesScanned();
if(streamStats != null)
streamStats.reportStreamingFetch(operation);
VAdminProto.FetchPartitionEntriesResponse.Builder response = VAdminProto.FetchPartitionEntriesResponse.newBuilder();

VAdminProto.PartitionEntry partitionEntry = VAdminProto.PartitionEntry.newBuilder()
Expand All @@ -89,20 +91,22 @@ public StreamRequestHandlerState handleRequest(DataInputStream inputStream,

startNs = System.nanoTime();
ProtoUtils.writeMessage(outputStream, message);
stats.recordNetworkTime(handle, System.nanoTime() - startNs);
if(streamStats != null)
streamStats.reportNetworkTime(operation, System.nanoTime() - startNs);

throttler.maybeThrottle(AdminServiceRequestHandler.valueSize(value));
}
}
} else {
stats.recordDiskTime(handle, System.nanoTime() - startNs);
if(streamStats != null)
streamStats.reportStorageTime(operation, System.nanoTime() - startNs);
}

// log progress
counter++;

if(0 == counter % 100000) {
long totalTime = (System.currentTimeMillis() - startTime) / 1000;
if(0 == counter % STAT_RECORDS_INTERVAL) {
long totalTime = (System.currentTimeMillis() - startTime) / Time.MS_PER_SECOND;

logger.info("Fetch entries scanned " + counter + " entries, fetched " + fetched
+ " entries for store '" + storageEngine.getName()
Expand All @@ -113,7 +117,6 @@ public StreamRequestHandlerState handleRequest(DataInputStream inputStream,
if(keyIterator.hasNext())
return StreamRequestHandlerState.WRITING;
else {
stats.closeHandle(handle);
return StreamRequestHandlerState.COMPLETE;
}
}
Expand Down

0 comments on commit 903e749

Please sign in to comment.