Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -61,16 +61,17 @@ public class FederatedStatistics {
// stats of the federated worker on the coordinator site
private static Set<Pair<String, Integer>> _fedWorkerAddresses = new HashSet<>();
private static final LongAdder readCount = new LongAdder();
private static final LongAdder putScalarCount = new LongAdder();
private static final LongAdder putListCount = new LongAdder();
private static final LongAdder putMatrixCount = new LongAdder();
private static final LongAdder putFrameCount = new LongAdder();
private static final LongAdder putMatCharCount = new LongAdder();
private static final LongAdder putMatrixBytes = new LongAdder();
private static final LongAdder putFrameBytes = new LongAdder();
private static final LongAdder putCount = new LongAdder();
private static final LongAdder getCount = new LongAdder();
private static final LongAdder executeInstructionCount = new LongAdder();
private static final LongAdder executeUDFCount = new LongAdder();
private static final LongAdder transferredScalarCount = new LongAdder();
private static final LongAdder transferredListCount = new LongAdder();
private static final LongAdder transferredMatrixCount = new LongAdder();
private static final LongAdder transferredFrameCount = new LongAdder();
private static final LongAdder transferredMatCharCount = new LongAdder();
private static final LongAdder transferredMatrixBytes = new LongAdder();
private static final LongAdder transferredFrameBytes = new LongAdder();
private static final LongAdder asyncPrefetchCount = new LongAdder();

// stats on the federated worker itself
Expand All @@ -88,20 +89,8 @@ public static synchronized void incFederated(RequestType rqt, List<Object> data)
readCount.increment();
break;
case PUT_VAR:
if(data.get(0) instanceof MatrixBlock) {
putMatrixCount.increment();
putMatrixBytes.add(((MatrixBlock)data.get(0)).getInMemorySize());
}
else if(data.get(0) instanceof FrameBlock) {
putFrameCount.increment();
putFrameBytes.add(((FrameBlock)data.get(0)).getInMemorySize());
}
else if(data.get(0) instanceof ScalarObject)
putScalarCount.increment();
else if(data.get(0) instanceof ListObject)
putListCount.increment();
else if(data.get(0) instanceof MatrixCharacteristics)
putMatCharCount.increment();
putCount.increment();
incFedTransfer(data.get(0));
break;
case GET_VAR:
getCount.increment();
Expand All @@ -111,34 +100,58 @@ else if(data.get(0) instanceof MatrixCharacteristics)
break;
case EXEC_UDF:
executeUDFCount.increment();
incFedTransfer(data);
break;
default:
break;
}
}

private static void incFedTransfer(List<Object> data) {
for(Object dataObj : data)
incFedTransfer(dataObj);
}

private static void incFedTransfer(Object dataObj) {
if(dataObj instanceof MatrixBlock) {
transferredMatrixCount.increment();
transferredMatrixBytes.add(((MatrixBlock)dataObj).getInMemorySize());
}
else if(dataObj instanceof FrameBlock) {
transferredFrameCount.increment();
transferredFrameBytes.add(((FrameBlock)dataObj).getInMemorySize());
}
else if(dataObj instanceof ScalarObject)
transferredScalarCount.increment();
else if(dataObj instanceof ListObject)
transferredListCount.increment();
else if(dataObj instanceof MatrixCharacteristics)
transferredMatCharCount.increment();
}

public static void incAsyncPrefetchCount(long c) {
asyncPrefetchCount.add(c);
}

public static long getTotalPutCount() {
return putScalarCount.longValue() + putListCount.longValue()
+ putMatrixCount.longValue() + putFrameCount.longValue()
+ putMatCharCount.longValue();
public static long getTotalFedTransferCount() {
return transferredScalarCount.longValue() + transferredListCount.longValue()
+ transferredMatrixCount.longValue() + transferredFrameCount.longValue()
+ transferredMatCharCount.longValue();
}

public static void reset() {
readCount.reset();
putScalarCount.reset();
putListCount.reset();
putMatrixCount.reset();
putFrameCount.reset();
putMatCharCount.reset();
putMatrixBytes.reset();
putFrameBytes.reset();
putCount.reset();
getCount.reset();
executeInstructionCount.reset();
executeUDFCount.reset();
transferredScalarCount.reset();
transferredListCount.reset();
transferredMatrixCount.reset();
transferredFrameCount.reset();
transferredMatCharCount.reset();
transferredMatrixBytes.reset();
transferredFrameBytes.reset();
asyncPrefetchCount.reset();
fedLookupTableGetCount.reset();
fedLookupTableGetTime.reset();
Expand All @@ -154,22 +167,22 @@ public static String displayFedIOExecStatistics() {
StringBuilder sb = new StringBuilder();
sb.append("Federated I/O (Read, Put, Get):\t" +
readCount.longValue() + "/" +
getTotalPutCount() + "/" +
putCount.longValue() + "/" +
getCount.longValue() + ".\n");
if(getTotalPutCount() > 0)
sb.append("Fed Put (Sca/Lis/Mat/Fra/MC):\t" +
putScalarCount.longValue() + "/" +
putListCount.longValue() + "/" +
putMatrixCount.longValue() + "/" +
putFrameCount.longValue() + "/" +
putMatCharCount.longValue() + ".\n");
if(putMatrixBytes.longValue() > 0 || putFrameBytes.longValue() > 0)
sb.append("Fed Put Bytes (Mat/Frame):\t" +
putMatrixBytes.longValue() + "/" +
putFrameBytes.longValue() + " Bytes.\n");
sb.append("Federated Execute (Inst, UDF):\t" +
executeInstructionCount.longValue() + "/" +
executeUDFCount.longValue() + ".\n");
if(getTotalFedTransferCount() > 0)
sb.append("Fed Put Count (Sc/Li/Ma/Fr/MC):\t" +
transferredScalarCount.longValue() + "/" +
transferredListCount.longValue() + "/" +
transferredMatrixCount.longValue() + "/" +
transferredFrameCount.longValue() + "/" +
transferredMatCharCount.longValue() + ".\n");
if(transferredMatrixBytes.longValue() > 0 || transferredFrameBytes.longValue() > 0)
sb.append("Fed Put Bytes (Mat/Frame):\t" +
transferredMatrixBytes.longValue() + "/" +
transferredFrameBytes.longValue() + " Bytes.\n");
sb.append("Federated prefetch count:\t" +
asyncPrefetchCount.longValue() + ".\n");
return sb.toString();
Expand Down