diff --git a/src/java/voldemort/server/niosocket/AsyncRequestHandler.java b/src/java/voldemort/server/niosocket/AsyncRequestHandler.java index d4df1254d7..27772e7740 100644 --- a/src/java/voldemort/server/niosocket/AsyncRequestHandler.java +++ b/src/java/voldemort/server/niosocket/AsyncRequestHandler.java @@ -76,6 +76,11 @@ public AsyncRequestHandler(Selector selector, protected void read(SelectionKey selectionKey) throws IOException { int count = 0; + long startNs = -1; + + if(logger.isDebugEnabled()) + startNs = System.nanoTime(); + if((count = socketChannel.read(inputStream.getBuffer())) == -1) throw new EOFException("EOF for " + socketChannel.socket()); @@ -125,6 +130,14 @@ protected void read(SelectionKey selectionKey) throws IOException { streamRequestHandler = requestHandler.handleRequest(new DataInputStream(inputStream), new DataOutputStream(outputStream)); + if(logger.isDebugEnabled()) { + logger.debug("AsyncRequestHandler:read finished request from " + + socketChannel.socket().getRemoteSocketAddress() + " handlerRef: " + + System.identityHashCode(streamRequestHandler) + " at time: " + + System.currentTimeMillis() + " elapsed time: " + + (System.nanoTime() - startNs) + " ns"); + } + if(streamRequestHandler != null) { // In the case of a StreamRequestHandler, we handle that separately // (attempting to process multiple "segments"). @@ -282,8 +295,23 @@ private StreamRequestHandlerState handleStreamRequestInternal(SelectionKey selec if(logger.isTraceEnabled()) traceInputBufferState("Before streaming request handler"); + // this is the lowest level in the NioSocketServer stack at which we + // still have a reference to the client IP address and port + long startNs = -1; + + if(logger.isDebugEnabled()) + startNs = System.nanoTime(); + state = streamRequestHandler.handleRequest(dataInputStream, dataOutputStream); + if(logger.isDebugEnabled()) { + logger.debug("Handled request from " + + socketChannel.socket().getRemoteSocketAddress() + " handlerRef: " + + System.identityHashCode(streamRequestHandler) + " at time: " + + System.currentTimeMillis() + " elapsed time: " + + (System.nanoTime() - startNs) + " ns"); + } + if(logger.isTraceEnabled()) traceInputBufferState("After streaming request handler"); } catch(Exception e) { diff --git a/src/java/voldemort/server/protocol/vold/VoldemortNativeRequestHandler.java b/src/java/voldemort/server/protocol/vold/VoldemortNativeRequestHandler.java index b74b60a920..fa70a4b3eb 100644 --- a/src/java/voldemort/server/protocol/vold/VoldemortNativeRequestHandler.java +++ b/src/java/voldemort/server/protocol/vold/VoldemortNativeRequestHandler.java @@ -101,6 +101,14 @@ private RequestRoutingType getRoutingType(DataInputStream inputStream) throws IO private void handleGetVersion(DataInputStream inputStream, DataOutputStream outputStream, Store store) throws IOException { + long startTimeMs = -1; + long startTimeNs = -1; + + if(logger.isDebugEnabled()) { + startTimeMs = System.currentTimeMillis(); + startTimeNs = System.nanoTime(); + } + ByteArray key = readKey(inputStream); List results = null; try { @@ -112,11 +120,25 @@ private void handleGetVersion(DataInputStream inputStream, return; } outputStream.writeInt(results.size()); + + String clockStr = ""; + for(Version v: results) { byte[] clock = ((VectorClock) v).toBytes(); + + if(logger.isDebugEnabled()) + clockStr += clock + " "; + outputStream.writeInt(clock.length); outputStream.write(clock); } + + if(logger.isDebugEnabled()) { + logger.debug("GETVERSIONS started at: " + startTimeMs + " handlerRef: " + + System.identityHashCode(this) + " key: " + key + " " + + (System.nanoTime() - startTimeNs) + " ns, keySize: " + key.length() + + "clocks: " + clockStr); + } } /** @@ -269,6 +291,14 @@ private void writeResults(DataOutputStream outputStream, List> private void handleGet(DataInputStream inputStream, DataOutputStream outputStream, Store store) throws IOException { + long startTimeMs = -1; + long startTimeNs = -1; + + if(logger.isDebugEnabled()) { + startTimeMs = System.currentTimeMillis(); + startTimeNs = System.nanoTime(); + } + ByteArray key = readKey(inputStream); byte[] transforms = null; @@ -286,11 +316,22 @@ private void handleGet(DataInputStream inputStream, return; } writeResults(outputStream, results); + if(logger.isDebugEnabled()) { + debugLogReturnValue(key, results, startTimeMs, startTimeNs, "GET"); + } } private void handleGetAll(DataInputStream inputStream, DataOutputStream outputStream, Store store) throws IOException { + long startTimeMs = -1; + long startTimeNs = -1; + + if(logger.isDebugEnabled()) { + startTimeMs = System.currentTimeMillis(); + startTimeNs = System.nanoTime(); + } + // read keys int numKeys = inputStream.readInt(); List keys = new ArrayList(numKeys); @@ -321,18 +362,69 @@ private void handleGetAll(DataInputStream inputStream, // write back the results outputStream.writeInt(results.size()); + + if(logger.isDebugEnabled()) + logger.debug("GETALL start"); + for(Map.Entry>> entry: results.entrySet()) { // write the key outputStream.writeInt(entry.getKey().length()); outputStream.write(entry.getKey().get()); // write the values writeResults(outputStream, entry.getValue()); + + if(logger.isDebugEnabled()) { + debugLogReturnValue(entry.getKey(), + entry.getValue(), + startTimeMs, + startTimeNs, + "GETALL"); + } } + + if(logger.isDebugEnabled()) + logger.debug("GETALL end"); + } + + private void debugLogReturnValue(ByteArray key, + List> values, + long startTimeMs, + long startTimeNs, + String getType) { + long totalValueSize = 0; + String valueSizeStr = "["; + String valueHashStr = "["; + String versionsStr = "["; + for(Versioned b: values) { + int len = b.getValue().length; + totalValueSize += len; + valueSizeStr += len + ","; + valueHashStr += b.hashCode() + ","; + versionsStr += b.getVersion(); + } + valueSizeStr += "]"; + valueHashStr += "]"; + versionsStr += "]"; + + logger.debug(getType + " handlerRef: " + System.identityHashCode(this) + " start time: " + + startTimeMs + " key: " + key + " elapsed time: " + + (System.nanoTime() - startTimeNs) + " ns, keySize: " + key.length() + + " numResults: " + values.size() + " totalResultSize: " + totalValueSize + + " resultSizes: " + valueSizeStr + " resultHashes: " + valueHashStr + + " versions: " + versionsStr + " current time: " + System.currentTimeMillis()); } private void handlePut(DataInputStream inputStream, DataOutputStream outputStream, Store store) throws IOException { + long startTimeMs = -1; + long startTimeNs = -1; + + if(logger.isDebugEnabled()) { + startTimeMs = System.currentTimeMillis(); + startTimeNs = System.nanoTime(); + } + ByteArray key = readKey(inputStream); int valueSize = inputStream.readInt(); byte[] bytes = new byte[valueSize]; @@ -352,11 +444,28 @@ private void handlePut(DataInputStream inputStream, } catch(VoldemortException e) { writeException(outputStream, e); } + + if(logger.isDebugEnabled()) { + logger.debug("PUT started at: " + startTimeMs + " handlerRef: " + + System.identityHashCode(this) + " key: " + key + " " + + (System.nanoTime() - startTimeNs) + " ns, keySize: " + key.length() + + " valueHash: " + value.hashCode() + " valueSize: " + valueSize + + " clockSize: " + clock.sizeInBytes() + " time: " + + System.currentTimeMillis()); + } } private void handleDelete(DataInputStream inputStream, DataOutputStream outputStream, Store store) throws IOException { + long startTimeMs = -1; + long startTimeNs = -1; + + if(logger.isDebugEnabled()) { + startTimeMs = System.currentTimeMillis(); + startTimeNs = System.nanoTime(); + } + ByteArray key = readKey(inputStream); int versionSize = inputStream.readShort(); byte[] versionBytes = new byte[versionSize]; @@ -369,6 +478,13 @@ private void handleDelete(DataInputStream inputStream, } catch(VoldemortException e) { writeException(outputStream, e); } + + if(logger.isDebugEnabled()) { + logger.debug("DELETE started at: " + startTimeMs + " key: " + key + " handlerRef: " + + System.identityHashCode(this) + " time: " + + (System.nanoTime() - startTimeNs) + " ns, keySize: " + key.length() + + " clockSize: " + version.sizeInBytes()); + } } private void writeException(DataOutputStream stream, VoldemortException e) throws IOException { diff --git a/src/java/voldemort/store/bdb/BdbStorageEngine.java b/src/java/voldemort/store/bdb/BdbStorageEngine.java index 290aac61e7..1b2013ca46 100644 --- a/src/java/voldemort/store/bdb/BdbStorageEngine.java +++ b/src/java/voldemort/store/bdb/BdbStorageEngine.java @@ -211,6 +211,11 @@ private List get(ByteArray key, Serializer serializer) throws PersistenceFailureException { StoreUtils.assertValidKey(key); + long startTimeNs = -1; + + if(logger.isTraceEnabled()) + startTimeNs = System.nanoTime(); + Cursor cursor = null; try { cursor = getBdbDatabase().openCursor(null, null); @@ -227,6 +232,13 @@ private List get(ByteArray key, logger.error(e); throw new PersistenceFailureException(e); } finally { + if(logger.isTraceEnabled()) { + logger.trace("Completed GET from key " + key + " (keyRef: " + + System.identityHashCode(key) + ") in " + + (System.nanoTime() - startTimeNs) + " ns at " + + System.currentTimeMillis()); + } + attemptClose(cursor); } } @@ -252,12 +264,25 @@ private Database getBdbDatabase() { public Map>> getAll(Iterable keys, Map transforms) throws VoldemortException { + + long startTimeNs = -1; + + if(logger.isTraceEnabled()) + startTimeNs = System.nanoTime(); + StoreUtils.assertValidKeys(keys); Map>> result = StoreUtils.newEmptyHashMap(keys); Cursor cursor = null; + + String keyStr = ""; + try { cursor = getBdbDatabase().openCursor(null, null); for(ByteArray key: keys) { + + if(logger.isTraceEnabled()) + keyStr += key + " "; + List> values = get(cursor, key, readLockMode, versionedSerializer); if(!values.isEmpty()) result.put(key, values); @@ -268,6 +293,12 @@ public Map>> getAll(Iterable keys, } finally { attemptClose(cursor); } + + if(logger.isTraceEnabled()) + logger.trace("Completed GETALL from keys " + keyStr + " in " + + (System.nanoTime() - startTimeNs) + " ns at " + + System.currentTimeMillis()); + return result; } @@ -277,6 +308,11 @@ private static List get(Cursor cursor, Serializer serializer) throws DatabaseException { StoreUtils.assertValidKey(key); + long startTimeNs = -1; + + if(logger.isTraceEnabled()) + startTimeNs = System.nanoTime(); + DatabaseEntry keyEntry = new DatabaseEntry(key.get()); DatabaseEntry valueEntry = new DatabaseEntry(); List results = Lists.newArrayList(); @@ -286,6 +322,13 @@ private static List get(Cursor cursor, lockMode)) { results.add(serializer.toObject(valueEntry.getData())); } + + if(logger.isTraceEnabled()) { + logger.trace("Completed GET from key " + key + " in " + + (System.nanoTime() - startTimeNs) + " ns at " + + System.currentTimeMillis()); + } + return results; } @@ -293,6 +336,11 @@ public void put(ByteArray key, Versioned value, byte[] transforms) throws PersistenceFailureException { StoreUtils.assertValidKey(key); + long startTimeNs = -1; + + if(logger.isTraceEnabled()) + startTimeNs = System.nanoTime(); + DatabaseEntry keyEntry = new DatabaseEntry(key.get()); boolean succeeded = false; Transaction transaction = null; @@ -340,10 +388,23 @@ else if(occurred == Occurred.AFTER) else attemptAbort(transaction); } + + if(logger.isTraceEnabled()) { + logger.trace("Completed PUT to key " + key + " (keyRef: " + + System.identityHashCode(key) + " value " + value + " in " + + (System.nanoTime() - startTimeNs) + " ns at " + + System.currentTimeMillis()); + } } public boolean delete(ByteArray key, Version version) throws PersistenceFailureException { StoreUtils.assertValidKey(key); + + long startTimeNs = -1; + + if(logger.isTraceEnabled()) + startTimeNs = System.nanoTime(); + boolean deletedSomething = false; Cursor cursor = null; Transaction transaction = null; @@ -368,6 +429,14 @@ public boolean delete(ByteArray key, Version version) throws PersistenceFailureE logger.error(e); throw new PersistenceFailureException(e); } finally { + + if(logger.isTraceEnabled()) { + logger.trace("Completed DELETE of key " + key + " (keyRef: " + + System.identityHashCode(key) + ") in " + + (System.nanoTime() - startTimeNs) + " ns at " + + System.currentTimeMillis()); + } + try { attemptClose(cursor); } finally { diff --git a/src/java/voldemort/store/routed/PipelineRoutedStore.java b/src/java/voldemort/store/routed/PipelineRoutedStore.java index 38ff90517d..dfde9b350c 100644 --- a/src/java/voldemort/store/routed/PipelineRoutedStore.java +++ b/src/java/voldemort/store/routed/PipelineRoutedStore.java @@ -144,6 +144,14 @@ public PipelineRoutedStore(String name, public List> get(final ByteArray key, final byte[] transforms) { StoreUtils.assertValidKey(key); + long startTimeMs = -1; + long startTimeNs = -1; + + if(logger.isDebugEnabled()) { + startTimeMs = System.currentTimeMillis(); + startTimeNs = System.nanoTime(); + } + BasicPipelineData>> pipelineData = new BasicPipelineData>>(); if(zoneRoutingEnabled) pipelineData.setZonesRequired(storeDef.getZoneCountReads()); @@ -242,14 +250,42 @@ public List> request(Store store) { results.addAll(value); } + if(logger.isDebugEnabled()) { + logger.debug("Finished " + pipeline.getOperation().getSimpleName() + " for key " + key + + " keyRef: " + System.identityHashCode(key) + "; started at " + + startTimeMs + " took " + (System.nanoTime() - startTimeNs) + " values: " + + formatNodeValuesFromGet(pipelineData.getResponses())); + } + return results; } + private String formatNodeValuesFromGet(List>>> results) { + // log all retrieved values + StringBuilder builder = new StringBuilder(); + builder.append("{"); + for(Response>> r: results) { + builder.append("(nodeId=" + r.getNode().getId() + ", key=" + r.getKey() + + ", retrieved= " + r.getValue() + "), "); + } + builder.append("}"); + + return builder.toString(); + } + public Map>> getAll(Iterable keys, Map transforms) throws VoldemortException { StoreUtils.assertValidKeys(keys); + long startTimeMs = -1; + long startTimeNs = -1; + + if(logger.isDebugEnabled()) { + startTimeMs = System.currentTimeMillis(); + startTimeNs = System.nanoTime(); + } + boolean allowReadRepair = repairReads && (transforms == null || transforms.size() == 0); GetAllPipelineData pipelineData = new GetAllPipelineData(); @@ -318,12 +354,41 @@ public Map>> getAll(Iterable keys, if(pipelineData.getFatalError() != null) throw pipelineData.getFatalError(); + if(logger.isDebugEnabled()) { + logger.debug("Finished " + pipeline.getOperation().getSimpleName() + "for keys " + keys + + " keyRef: " + System.identityHashCode(keys) + "; started at " + + startTimeMs + " took " + (System.nanoTime() - startTimeNs) + " values: " + + formatNodeValuesFromGetAll(pipelineData.getResponses())); + } + return pipelineData.getResult(); } + private String formatNodeValuesFromGetAll(List, Map>>>> list) { + // log all retrieved values + StringBuilder builder = new StringBuilder(); + builder.append("{"); + for(Response, Map>>> r: list) { + builder.append("(nodeId=" + r.getNode().getId() + ", key=" + r.getKey() + + ", retrieved= " + r.getValue() + ")"); + builder.append(", "); + } + builder.append("}"); + + return builder.toString(); + } + public List getVersions(final ByteArray key) { StoreUtils.assertValidKey(key); + long startTimeMs = -1; + long startTimeNs = -1; + + if(logger.isDebugEnabled()) { + startTimeMs = System.currentTimeMillis(); + startTimeNs = System.nanoTime(); + } + BasicPipelineData> pipelineData = new BasicPipelineData>(); if(zoneRoutingEnabled) pipelineData.setZonesRequired(storeDef.getZoneCountReads()); @@ -385,7 +450,7 @@ public List request(Store store) { pipeline.addEvent(Event.STARTED); if(logger.isDebugEnabled()) { - logger.debug("Operation " + pipeline.getOperation().getSimpleName() + "Key " + logger.debug("Operation " + pipeline.getOperation().getSimpleName() + " Key " + ByteUtils.toHexString(key.get())); } try { @@ -403,12 +468,40 @@ public List request(Store store) { for(Response> response: pipelineData.getResponses()) results.addAll(response.getValue()); + if(logger.isDebugEnabled()) { + logger.debug("Finished " + pipeline.getOperation().getSimpleName() + " for key " + key + + " keyRef: " + System.identityHashCode(key) + "; started at " + + startTimeMs + " took " + (System.nanoTime() - startTimeNs) + " values: " + + formatNodeValuesFromGetVersions(pipelineData.getResponses())); + } + return results; } + private String formatNodeValuesFromGetVersions(List>> results) { + // log all retrieved values + StringBuilder builder = new StringBuilder(); + builder.append("{"); + for(Response> r: results) { + builder.append("(nodeId=" + r.getNode().getId() + ", key=" + r.getKey() + + ", retrieved= " + r.getValue() + "), "); + } + builder.append("}"); + + return builder.toString(); + } + public boolean delete(final ByteArray key, final Version version) throws VoldemortException { StoreUtils.assertValidKey(key); + long startTimeMs = -1; + long startTimeNs = -1; + + if(logger.isDebugEnabled()) { + startTimeMs = System.currentTimeMillis(); + startTimeNs = System.nanoTime(); + } + BasicPipelineData pipelineData = new BasicPipelineData(); if(zoneRoutingEnabled) pipelineData.setZonesRequired(storeDef.getZoneCountWrites()); @@ -480,6 +573,12 @@ public boolean delete(final ByteArray key, final Version version) throws Voldemo throw e; } + if(logger.isDebugEnabled()) { + logger.debug("Finished " + pipeline.getOperation().getSimpleName() + " for key " + + key.get() + " keyRef: " + System.identityHashCode(key) + "; started at " + + startTimeMs + " took " + (System.nanoTime() - startTimeNs)); + } + if(pipelineData.getFatalError() != null) throw pipelineData.getFatalError(); @@ -497,6 +596,15 @@ public boolean isHintedHandoffEnabled() { public void put(ByteArray key, Versioned versioned, byte[] transforms) throws VoldemortException { + + long startTimeMs = -1; + long startTimeNs = -1; + + if(logger.isDebugEnabled()) { + startTimeMs = System.currentTimeMillis(); + startTimeNs = System.nanoTime(); + } + StoreUtils.assertValidKey(key); PutPipelineData pipelineData = new PutPipelineData(); if(zoneRoutingEnabled) @@ -591,6 +699,13 @@ public void put(ByteArray key, Versioned versioned, byte[] transforms) throw e; } + if(logger.isDebugEnabled()) { + logger.debug("Finished " + pipeline.getOperation().getSimpleName() + " for key " + key + + " keyRef: " + System.identityHashCode(key) + "; started at " + + startTimeMs + " took " + (System.nanoTime() - startTimeNs) + " value: " + + versioned.getValue() + " (size: " + versioned.getValue().length + ")"); + } + if(pipelineData.getFatalError() != null) throw pipelineData.getFatalError(); } diff --git a/src/java/voldemort/store/routed/action/AbstractReadRepair.java b/src/java/voldemort/store/routed/action/AbstractReadRepair.java index 2588c36df0..6e2ea19d6f 100644 --- a/src/java/voldemort/store/routed/action/AbstractReadRepair.java +++ b/src/java/voldemort/store/routed/action/AbstractReadRepair.java @@ -75,6 +75,11 @@ protected void insertNodeValue(Node node, ByteArray key, List> public void execute(Pipeline pipeline) { insertNodeValues(); + long startTimeNs = -1; + + if(logger.isTraceEnabled()) + startTimeNs = System.nanoTime(); + if(nodeValues.size() > 1 && preferred > 1) { List> toReadRepair = Lists.newArrayList(); @@ -111,6 +116,15 @@ public void execute(Pipeline pipeline) { logger.debug("Read repair failed: ", e); } } + + if(logger.isDebugEnabled()) { + String logStr = "Repaired (node, key, version): ("; + for(NodeValue v: toReadRepair) { + logStr += "(" + v.getNodeId() + ", " + v.getKey() + "," + v.getVersion() + ") "; + } + logStr += "in " + (System.nanoTime() - startTimeNs) + " ns"; + logger.debug(logStr); + } } pipeline.addEvent(completeEvent); diff --git a/src/java/voldemort/store/routed/action/PerformParallelPutRequests.java b/src/java/voldemort/store/routed/action/PerformParallelPutRequests.java index c459057589..a8373418b5 100644 --- a/src/java/voldemort/store/routed/action/PerformParallelPutRequests.java +++ b/src/java/voldemort/store/routed/action/PerformParallelPutRequests.java @@ -129,6 +129,11 @@ public void requestComplete(Object result, long requestTime) { requestTime); responses.put(node.getId(), response); + if(logger.isDebugEnabled()) + logger.debug("Finished secondary PUT for key " + key + " (keyRef: " + + System.identityHashCode(key) + "); took " + requestTime + + " ms on node " + node.getId() + "(" + node.getHost() + ")"); + if(isHintedHandoffEnabled() && pipeline.isFinished()) { if(response.getValue() instanceof UnreachableStoreException) { Slop slop = new Slop(pipelineData.getStoreName(), diff --git a/src/java/voldemort/store/routed/action/PerformParallelRequests.java b/src/java/voldemort/store/routed/action/PerformParallelRequests.java index a5a0ef9aa6..73399d75b0 100644 --- a/src/java/voldemort/store/routed/action/PerformParallelRequests.java +++ b/src/java/voldemort/store/routed/action/PerformParallelRequests.java @@ -95,6 +95,8 @@ public void execute(final Pipeline pipeline) { final Node node = nodes.get(i); pipelineData.incrementNodeIndex(); + final long startMs = logger.isDebugEnabled() ? System.currentTimeMillis() : -1; + NonblockingStoreCallback callback = new NonblockingStoreCallback() { public void requestComplete(Object result, long requestTime) { @@ -107,6 +109,13 @@ public void requestComplete(Object result, long requestTime) { key, result, requestTime); + if(logger.isDebugEnabled()) + logger.debug("Finished " + pipeline.getOperation().getSimpleName() + + " for key " + key + " (keyRef: " + + System.identityHashCode(key) + "); started at " + startMs + + " took " + requestTime + " ms on node " + node.getId() + "(" + + node.getHost() + ")"); + responses.put(node.getId(), response); latch.countDown(); @@ -164,6 +173,11 @@ else if(pipeline.getOperation() == Operation.GET_VERSIONS) } } + if(logger.isDebugEnabled()) + logger.debug("GET for key " + key + " (keyRef: " + System.identityHashCode(key) + + "); successes: " + pipelineData.getSuccesses() + " preferred: " + + preferred + " required: " + required); + if(pipelineData.getSuccesses() < required) { if(insufficientSuccessesEvent != null) { pipeline.addEvent(insufficientSuccessesEvent); diff --git a/src/java/voldemort/store/routed/action/PerformSerialGetAllRequests.java b/src/java/voldemort/store/routed/action/PerformSerialGetAllRequests.java index 2f5d07e7a2..c8135781d4 100644 --- a/src/java/voldemort/store/routed/action/PerformSerialGetAllRequests.java +++ b/src/java/voldemort/store/routed/action/PerformSerialGetAllRequests.java @@ -79,6 +79,11 @@ public void execute(Pipeline pipeline) { boolean zoneRequirement = false; MutableInt successCount = pipelineData.getSuccessCount(key); + if(logger.isDebugEnabled()) + logger.debug("GETALL for key " + key + " (keyRef: " + System.identityHashCode(key) + + ") successes: " + successCount.intValue() + " preferred: " + preferred + + " required: " + required); + if(successCount.intValue() >= preferred) { if(pipelineData.getZonesRequired() != null) { @@ -132,6 +137,13 @@ public void execute(Pipeline pipeline) { pipelineData.getResponses().add(response); failureDetector.recordSuccess(response.getNode(), response.getRequestTime()); + if(logger.isDebugEnabled()) + logger.debug("GET for key " + key + " (keyRef: " + + System.identityHashCode(key) + ") successes: " + + successCount.intValue() + " preferred: " + preferred + + " required: " + required + " new GET success on node " + + node.getId()); + HashSet zoneResponses = null; if(pipelineData.getKeyToZoneResponse().containsKey(key)) { zoneResponses = pipelineData.getKeyToZoneResponse().get(key); diff --git a/src/java/voldemort/store/routed/action/PerformSerialPutRequests.java b/src/java/voldemort/store/routed/action/PerformSerialPutRequests.java index 5d8debe17a..7b7b3d65fd 100644 --- a/src/java/voldemort/store/routed/action/PerformSerialPutRequests.java +++ b/src/java/voldemort/store/routed/action/PerformSerialPutRequests.java @@ -74,11 +74,20 @@ public void execute(Pipeline pipeline) { int currentNode = 0; List nodes = pipelineData.getNodes(); + long startMasterMs = -1; + long startMasterNs = -1; + + if(logger.isDebugEnabled()) { + startMasterMs = System.currentTimeMillis(); + startMasterNs = System.nanoTime(); + } + if(logger.isDebugEnabled()) logger.debug("Performing serial put requests to determine master"); + Node node = null; for(; currentNode < nodes.size(); currentNode++) { - Node node = nodes.get(currentNode); + node = nodes.get(currentNode); pipelineData.incrementNodeIndex(); VectorClock versionedClock = (VectorClock) versioned.getVersion(); @@ -86,8 +95,8 @@ public void execute(Pipeline pipeline) { versionedClock.incremented(node.getId(), time.getMilliseconds())); - if(logger.isTraceEnabled()) - logger.trace("Attempt #" + (currentNode + 1) + " to perform put (node " + if(logger.isDebugEnabled()) + logger.debug("Attempt #" + (currentNode + 1) + " to perform put (node " + node.getId() + ")"); long start = System.nanoTime(); @@ -98,8 +107,8 @@ public void execute(Pipeline pipeline) { pipelineData.incrementSuccesses(); failureDetector.recordSuccess(node, requestTime); - if(logger.isTraceEnabled()) - logger.trace("Put on node " + node.getId() + " succeeded, using as master"); + if(logger.isDebugEnabled()) + logger.debug("Put on node " + node.getId() + " succeeded, using as master"); pipelineData.setMaster(node); pipelineData.setVersionedCopy(versionedCopy); @@ -108,6 +117,12 @@ public void execute(Pipeline pipeline) { } catch(Exception e) { long requestTime = (System.nanoTime() - start) / Time.NS_PER_MS; + if(logger.isDebugEnabled()) + logger.debug("Master PUT at node " + currentNode + "(" + node.getHost() + ")" + + " failed (" + e.getMessage() + ") in " + + (System.nanoTime() - start) + " ns" + " (keyRef: " + + System.identityHashCode(key) + ")"); + if(handleResponseError(e, node, requestTime, pipeline, failureDetector)) return; } @@ -157,10 +172,25 @@ public void execute(Pipeline pipeline) { } } else { + if(logger.isDebugEnabled()) + logger.debug("Finished master PUT for key " + key + " (keyRef: " + + System.identityHashCode(key) + "); started at " + + startMasterMs + " took " + + (System.nanoTime() - startMasterNs) + " ns on node " + + (node == null ? "NULL" : node.getId()) + "(" + + (node == null ? "NULL" : node.getHost()) + "); now complete"); + pipeline.addEvent(completeEvent); } } } else { + if(logger.isDebugEnabled()) + logger.debug("Finished master PUT for key " + key + " (keyRef: " + + System.identityHashCode(key) + "); started at " + startMasterMs + + " took " + (System.nanoTime() - startMasterNs) + " ns on node " + + (node == null ? "NULL" : node.getId()) + "(" + + (node == null ? "NULL" : node.getHost()) + ")"); + pipeline.addEvent(masterDeterminedEvent); } } diff --git a/src/java/voldemort/store/routed/action/PerformSerialRequests.java b/src/java/voldemort/store/routed/action/PerformSerialRequests.java index 70c8fa563a..4c447174d8 100644 --- a/src/java/voldemort/store/routed/action/PerformSerialRequests.java +++ b/src/java/voldemort/store/routed/action/PerformSerialRequests.java @@ -98,6 +98,13 @@ public void execute(Pipeline pipeline) { result, ((System.nanoTime() - start) / Time.NS_PER_MS)); + if(logger.isDebugEnabled()) + logger.debug(pipeline.getOperation().getSimpleName() + " for key " + key + + " successes: " + pipelineData.getSuccesses() + " preferred: " + + preferred + " required: " + required + " new " + + pipeline.getOperation().getSimpleName() + " success on node " + + node.getId()); + pipelineData.incrementSuccesses(); pipelineData.getResponses().add(response); failureDetector.recordSuccess(response.getNode(), response.getRequestTime()); diff --git a/src/java/voldemort/store/slop/HintedHandoff.java b/src/java/voldemort/store/slop/HintedHandoff.java index a4a66217b3..5e506a85e2 100644 --- a/src/java/voldemort/store/slop/HintedHandoff.java +++ b/src/java/voldemort/store/slop/HintedHandoff.java @@ -101,15 +101,17 @@ public void sendHintParallel(final Node failedNode, final Version version, final for(final Node node: handoffStrategy.routeHint(failedNode)) { int nodeId = node.getId(); - if(logger.isTraceEnabled()) - logger.trace("Sending an async hint to " + nodeId); + + if(logger.isDebugEnabled()) + logger.debug("Sending an async hint to " + nodeId); if(!failedNodes.contains(node) && failureDetector.isAvailable(node)) { NonblockingStore nonblockingStore = nonblockingSlopStores.get(nodeId); Utils.notNull(nonblockingStore); final long startNs = System.nanoTime(); - if(logger.isTraceEnabled()) - logger.trace("Attempt to write " + slop.getKey() + " for " + failedNode + + if(logger.isDebugEnabled()) + logger.debug("Slop attempt to write " + slop.getKey() + " for " + failedNode + " to node " + node); NonblockingStoreCallback callback = new NonblockingStoreCallback() { @@ -127,6 +129,13 @@ public void requestComplete(Object result, long requestTime) { failedNodes.add(node); if(response.getValue() instanceof UnreachableStoreException) { UnreachableStoreException use = (UnreachableStoreException) response.getValue(); + + if(logger.isDebugEnabled()) + logger.debug("Write of key " + slop.getKey() + " for " + + failedNode + " to node " + node + + " failed due to unreachable: " + + use.getMessage()); + failureDetector.recordException(node, (System.nanoTime() - startNs) / Time.NS_PER_MS, @@ -136,6 +145,12 @@ public void requestComplete(Object result, long requestTime) { } return; } + + if(logger.isDebugEnabled()) + logger.debug("Slop write of key " + slop.getKey() + " for " + + failedNode + " to node " + node + " succeeded in " + + (System.nanoTime() - startNs) + " ns"); + failureDetector.recordSuccess(node, (System.nanoTime() - startNs) / Time.NS_PER_MS); @@ -151,7 +166,7 @@ public void requestComplete(Object result, long requestTime) { } } } - + /** * Send a hint of a request originally meant for the failed node to another * node in the ring, as selected by the {@link HintedHandoffStrategy} @@ -166,8 +181,8 @@ public boolean sendHintSerial(Node failedNode, Version version, Slop slop) { boolean persisted = false; for(Node node: handoffStrategy.routeHint(failedNode)) { int nodeId = node.getId(); - if(logger.isTraceEnabled()) - logger.trace("Trying to send hint to " + nodeId); + if(logger.isDebugEnabled()) + logger.debug("Trying to send hint to " + nodeId); if(!failedNodes.contains(node) && failureDetector.isAvailable(node)) { Store slopStore = slopStores.get(nodeId); @@ -175,10 +190,10 @@ public boolean sendHintSerial(Node failedNode, Version version, Slop slop) { long startNs = System.nanoTime(); try { - if(logger.isTraceEnabled()) - logger.trace("Attempt to handoff " + slop.getOperation() + " on " - + slop.getKey() + " for " + failedNode - + " to node " + node); + if(logger.isDebugEnabled()) + logger.debug("Slop attempt to write " + slop.getKey() + " (keyRef: " + + System.identityHashCode(slop.getKey()) + ") for " + + failedNode + " to node " + node); // No transform needs to applied to the slop slopStore.put(slop.makeKey(), new Versioned(slop, version), null); @@ -197,6 +212,12 @@ public boolean sendHintSerial(Node failedNode, Version version, Slop slop) { } catch(ObsoleteVersionException e) { logger.debug(e, e); } + + if(logger.isDebugEnabled()) + logger.debug("Slop write of key " + slop.getKey() + " (keyRef: " + + System.identityHashCode(slop.getKey()) + " for " + failedNode + + " to node " + node + " succeeded in " + + (System.nanoTime() - startNs) + " ns"); } } diff --git a/src/java/voldemort/store/socket/SocketStore.java b/src/java/voldemort/store/socket/SocketStore.java index 0886341521..3781a77cbe 100644 --- a/src/java/voldemort/store/socket/SocketStore.java +++ b/src/java/voldemort/store/socket/SocketStore.java @@ -100,6 +100,9 @@ public void submitDeleteRequest(ByteArray key, requestRoutingType, key, version); + if(logger.isDebugEnabled()) + logger.debug("DELETE keyRef: " + System.identityHashCode(key) + " requestRef: " + + System.identityHashCode(clientRequest)); requestAsync(clientRequest, callback, timeoutMs, "delete"); } @@ -113,6 +116,9 @@ public void submitGetRequest(ByteArray key, requestRoutingType, key, transforms); + if(logger.isDebugEnabled()) + logger.debug("GET keyRef: " + System.identityHashCode(key) + " requestRef: " + + System.identityHashCode(clientRequest)); requestAsync(clientRequest, callback, timeoutMs, "get"); } @@ -126,6 +132,9 @@ public void submitGetAllRequest(Iterable keys, requestRoutingType, keys, transforms); + if(logger.isDebugEnabled()) + logger.debug("GETALL keyRef: " + System.identityHashCode(keys) + " requestRef: " + + System.identityHashCode(clientRequest)); requestAsync(clientRequest, callback, timeoutMs, "get all"); } @@ -137,6 +146,9 @@ public void submitGetVersionsRequest(ByteArray key, requestFormat, requestRoutingType, key); + if(logger.isDebugEnabled()) + logger.debug("GETVERSIONS keyRef: " + System.identityHashCode(key) + " requestRef: " + + System.identityHashCode(clientRequest)); requestAsync(clientRequest, callback, timeoutMs, "get versions"); } @@ -152,6 +164,9 @@ public void submitPutRequest(ByteArray key, key, value, transforms); + if(logger.isDebugEnabled()) + logger.debug("PUT keyRef: " + System.identityHashCode(key) + " requestRef: " + + System.identityHashCode(clientRequest)); requestAsync(clientRequest, callback, timeoutMs, "put"); } @@ -162,6 +177,9 @@ public boolean delete(ByteArray key, Version version) throws VoldemortException requestRoutingType, key, version); + if(logger.isDebugEnabled()) + logger.debug("DELETE keyRef: " + System.identityHashCode(key) + " requestRef: " + + System.identityHashCode(clientRequest)); return request(clientRequest, "delete"); } @@ -172,6 +190,9 @@ public List> get(ByteArray key, byte[] transforms) throws Vold requestRoutingType, key, transforms); + if(logger.isDebugEnabled()) + logger.debug("GET keyRef: " + System.identityHashCode(key) + " requestRef: " + + System.identityHashCode(clientRequest)); return request(clientRequest, "get"); } @@ -184,6 +205,9 @@ public Map>> getAll(Iterable keys, requestRoutingType, keys, transforms); + if(logger.isDebugEnabled()) + logger.debug("GETALL keyRef: " + System.identityHashCode(keys) + " requestRef: " + + System.identityHashCode(clientRequest)); return request(clientRequest, "getAll"); } @@ -193,6 +217,9 @@ public List getVersions(ByteArray key) { requestFormat, requestRoutingType, key); + if(logger.isDebugEnabled()) + logger.debug("GETVERSIONS keyRef: " + System.identityHashCode(key) + " requestRef: " + + System.identityHashCode(clientRequest)); return request(clientRequest, "getVersions"); } @@ -205,6 +232,9 @@ public void put(ByteArray key, Versioned versioned, byte[] transforms) key, versioned, transforms); + if(logger.isDebugEnabled()) + logger.debug("PUT keyRef: " + System.identityHashCode(key) + " requestRef: " + + System.identityHashCode(clientRequest)); request(clientRequest, "put"); } @@ -240,17 +270,40 @@ public void close() throws VoldemortException { private T request(ClientRequest delegate, String operationName) { ClientRequestExecutor clientRequestExecutor = pool.checkout(destination); + + long startTimeMs = -1; + long startTimeNs = -1; + + if(logger.isDebugEnabled()) { + startTimeMs = System.currentTimeMillis(); + startTimeNs = System.nanoTime(); + } + + String debugMsgStr = ""; + BlockingClientRequest blockingClientRequest = null; try { blockingClientRequest = new BlockingClientRequest(delegate, timeoutMs); clientRequestExecutor.addClientRequest(blockingClientRequest, timeoutMs); blockingClientRequest.await(); + + if(logger.isDebugEnabled()) + debugMsgStr += "success"; + return blockingClientRequest.getResult(); } catch(InterruptedException e) { + + if(logger.isDebugEnabled()) + debugMsgStr += "unreachable: " + e.getMessage(); + throw new UnreachableStoreException("Failure in " + operationName + " on " + destination + ": " + e.getMessage(), e); } catch(IOException e) { clientRequestExecutor.close(); + + if(logger.isDebugEnabled()) + debugMsgStr += "failure: " + e.getMessage(); + throw new UnreachableStoreException("Failure in " + operationName + " on " + destination + ": " + e.getMessage(), e); } finally { @@ -258,6 +311,29 @@ private T request(ClientRequest delegate, String operationName) { // close the executor if we timed out clientRequestExecutor.close(); } + + if(logger.isDebugEnabled()) { + logger.debug("Sync request end, type: " + + operationName + + " requestRef: " + + System.identityHashCode(delegate) + + " totalTimeNs: " + + (System.nanoTime() - startTimeNs) + + " start time: " + + startTimeMs + + " end time: " + + System.currentTimeMillis() + + " client:" + + clientRequestExecutor.getSocketChannel().socket().getLocalAddress() + + ":" + + clientRequestExecutor.getSocketChannel().socket().getLocalPort() + + " server: " + + clientRequestExecutor.getSocketChannel() + .socket() + .getRemoteSocketAddress() + " outcome: " + + debugMsgStr); + } + pool.checkin(destination, clientRequestExecutor); } } @@ -285,6 +361,23 @@ private void requestAsync(ClientRequest delegate, try { clientRequestExecutor = pool.checkout(destination); + + if(logger.isDebugEnabled()) { + logger.debug("Async request start; type: " + + operationName + + " requestRef: " + + System.identityHashCode(delegate) + + " time: " + + System.currentTimeMillis() + + " server: " + + clientRequestExecutor.getSocketChannel() + .socket() + .getRemoteSocketAddress() + " local socket: " + + clientRequestExecutor.getSocketChannel().socket().getLocalAddress() + + ":" + + clientRequestExecutor.getSocketChannel().socket().getLocalPort()); + } + } catch(Exception e) { // If we can't check out a socket from the pool, we'll usually get // either an IOException (subclass) or an UnreachableStoreException @@ -335,6 +428,26 @@ public NonblockingStoreCallbackClientRequest(ClientRequest clientRequest, private void invokeCallback(Object o, long requestTime) { if(callback != null) { try { + if(logger.isDebugEnabled()) { + logger.debug("Async request end; requestRef: " + + System.identityHashCode(clientRequest) + + " time: " + + System.currentTimeMillis() + + " server: " + + clientRequestExecutor.getSocketChannel() + .socket() + .getRemoteSocketAddress() + + " local socket: " + + clientRequestExecutor.getSocketChannel() + .socket() + .getLocalAddress() + + ":" + + clientRequestExecutor.getSocketChannel() + .socket() + .getLocalPort() + " result: " + + o.toString()); + } + callback.requestComplete(o, requestTime); } catch(Exception e) { if(logger.isEnabledFor(Level.WARN))