From 16f407204d6418a9d31fff4d281498309a5757eb Mon Sep 17 00:00:00 2001 From: ywcb00 Date: Mon, 21 Mar 2022 17:08:59 +0100 Subject: [PATCH] feat(FederatedStatistics.java): extend the count and bytes statistics for put objects to include the inputs of a UDF --- .../federated/FederatedStatistics.java | 101 ++++++++++-------- 1 file changed, 57 insertions(+), 44 deletions(-) diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedStatistics.java b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedStatistics.java index 9a9cfa72265..68057e104ae 100644 --- a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedStatistics.java +++ b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedStatistics.java @@ -61,16 +61,17 @@ public class FederatedStatistics { // stats of the federated worker on the coordinator site private static Set> _fedWorkerAddresses = new HashSet<>(); private static final LongAdder readCount = new LongAdder(); - private static final LongAdder putScalarCount = new LongAdder(); - private static final LongAdder putListCount = new LongAdder(); - private static final LongAdder putMatrixCount = new LongAdder(); - private static final LongAdder putFrameCount = new LongAdder(); - private static final LongAdder putMatCharCount = new LongAdder(); - private static final LongAdder putMatrixBytes = new LongAdder(); - private static final LongAdder putFrameBytes = new LongAdder(); + private static final LongAdder putCount = new LongAdder(); private static final LongAdder getCount = new LongAdder(); private static final LongAdder executeInstructionCount = new LongAdder(); private static final LongAdder executeUDFCount = new LongAdder(); + private static final LongAdder transferredScalarCount = new LongAdder(); + private static final LongAdder transferredListCount = new LongAdder(); + private static final LongAdder transferredMatrixCount = new LongAdder(); + private static final LongAdder transferredFrameCount = new LongAdder(); + private static final LongAdder transferredMatCharCount = new LongAdder(); + private static final LongAdder transferredMatrixBytes = new LongAdder(); + private static final LongAdder transferredFrameBytes = new LongAdder(); private static final LongAdder asyncPrefetchCount = new LongAdder(); // stats on the federated worker itself @@ -88,20 +89,8 @@ public static synchronized void incFederated(RequestType rqt, List data) readCount.increment(); break; case PUT_VAR: - if(data.get(0) instanceof MatrixBlock) { - putMatrixCount.increment(); - putMatrixBytes.add(((MatrixBlock)data.get(0)).getInMemorySize()); - } - else if(data.get(0) instanceof FrameBlock) { - putFrameCount.increment(); - putFrameBytes.add(((FrameBlock)data.get(0)).getInMemorySize()); - } - else if(data.get(0) instanceof ScalarObject) - putScalarCount.increment(); - else if(data.get(0) instanceof ListObject) - putListCount.increment(); - else if(data.get(0) instanceof MatrixCharacteristics) - putMatCharCount.increment(); + putCount.increment(); + incFedTransfer(data.get(0)); break; case GET_VAR: getCount.increment(); @@ -111,34 +100,58 @@ else if(data.get(0) instanceof MatrixCharacteristics) break; case EXEC_UDF: executeUDFCount.increment(); + incFedTransfer(data); break; default: break; } } + private static void incFedTransfer(List data) { + for(Object dataObj : data) + incFedTransfer(dataObj); + } + + private static void incFedTransfer(Object dataObj) { + if(dataObj instanceof MatrixBlock) { + transferredMatrixCount.increment(); + transferredMatrixBytes.add(((MatrixBlock)dataObj).getInMemorySize()); + } + else if(dataObj instanceof FrameBlock) { + transferredFrameCount.increment(); + transferredFrameBytes.add(((FrameBlock)dataObj).getInMemorySize()); + } + else if(dataObj instanceof ScalarObject) + transferredScalarCount.increment(); + else if(dataObj instanceof ListObject) + transferredListCount.increment(); + else if(dataObj instanceof MatrixCharacteristics) + transferredMatCharCount.increment(); + } + public static void incAsyncPrefetchCount(long c) { asyncPrefetchCount.add(c); } - public static long getTotalPutCount() { - return putScalarCount.longValue() + putListCount.longValue() - + putMatrixCount.longValue() + putFrameCount.longValue() - + putMatCharCount.longValue(); + public static long getTotalFedTransferCount() { + return transferredScalarCount.longValue() + transferredListCount.longValue() + + transferredMatrixCount.longValue() + transferredFrameCount.longValue() + + transferredMatCharCount.longValue(); } public static void reset() { readCount.reset(); - putScalarCount.reset(); - putListCount.reset(); - putMatrixCount.reset(); - putFrameCount.reset(); - putMatCharCount.reset(); - putMatrixBytes.reset(); - putFrameBytes.reset(); + putCount.reset(); getCount.reset(); executeInstructionCount.reset(); executeUDFCount.reset(); + transferredScalarCount.reset(); + transferredListCount.reset(); + transferredMatrixCount.reset(); + transferredFrameCount.reset(); + transferredMatCharCount.reset(); + transferredMatrixBytes.reset(); + transferredFrameBytes.reset(); asyncPrefetchCount.reset(); fedLookupTableGetCount.reset(); fedLookupTableGetTime.reset(); @@ -154,22 +167,22 @@ public static String displayFedIOExecStatistics() { StringBuilder sb = new StringBuilder(); sb.append("Federated I/O (Read, Put, Get):\t" + readCount.longValue() + "/" + - getTotalPutCount() + "/" + + putCount.longValue() + "/" + getCount.longValue() + ".\n"); - if(getTotalPutCount() > 0) - sb.append("Fed Put (Sca/Lis/Mat/Fra/MC):\t" + - putScalarCount.longValue() + "/" + - putListCount.longValue() + "/" + - putMatrixCount.longValue() + "/" + - putFrameCount.longValue() + "/" + - putMatCharCount.longValue() + ".\n"); - if(putMatrixBytes.longValue() > 0 || putFrameBytes.longValue() > 0) - sb.append("Fed Put Bytes (Mat/Frame):\t" + - putMatrixBytes.longValue() + "/" + - putFrameBytes.longValue() + " Bytes.\n"); sb.append("Federated Execute (Inst, UDF):\t" + executeInstructionCount.longValue() + "/" + executeUDFCount.longValue() + ".\n"); + if(getTotalFedTransferCount() > 0) + sb.append("Fed Put Count (Sc/Li/Ma/Fr/MC):\t" + + transferredScalarCount.longValue() + "/" + + transferredListCount.longValue() + "/" + + transferredMatrixCount.longValue() + "/" + + transferredFrameCount.longValue() + "/" + + transferredMatCharCount.longValue() + ".\n"); + if(transferredMatrixBytes.longValue() > 0 || transferredFrameBytes.longValue() > 0) + sb.append("Fed Put Bytes (Mat/Frame):\t" + + transferredMatrixBytes.longValue() + "/" + + transferredFrameBytes.longValue() + " Bytes.\n"); sb.append("Federated prefetch count:\t" + asyncPrefetchCount.longValue() + ".\n"); return sb.toString();