From dfbec56cd738ce9b2b8a3a489807f165cf670d85 Mon Sep 17 00:00:00 2001 From: Kevin Date: Tue, 12 Dec 2023 11:09:16 -0500 Subject: [PATCH 1/3] WIP --- src/main/java/emissary/core/MobileAgent.java | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/src/main/java/emissary/core/MobileAgent.java b/src/main/java/emissary/core/MobileAgent.java index f21793408b..15e540301b 100755 --- a/src/main/java/emissary/core/MobileAgent.java +++ b/src/main/java/emissary/core/MobileAgent.java @@ -264,12 +264,14 @@ protected void agentControl(final IServiceProviderPlace currentPlaceArg) { // One based counter loopCount++; + long timeInPlace = 0L; + // First time in, we just have the pickup place where we started // our mission. We dont process there, just use it to call through // to the directory, so skip the processing. See the difference // between the go() and arrive() methods for details if ((loopCount > 1 || getProcessFirstPlace()) && !controlError) { - atPlace(currentPlace, mypayload); + timeInPlace = atPlace(currentPlace, mypayload); } // Choose next place @@ -336,8 +338,10 @@ protected void agentControl(final IServiceProviderPlace currentPlaceArg) { * * @param place the place we are asking to work for us * @param payloadArg the data for the place to operate on + * @return the time in nanoseconds spent in the place */ - protected void atPlace(final IServiceProviderPlace place, final IBaseDataObject payloadArg) { + protected long atPlace(final IServiceProviderPlace place, final IBaseDataObject payloadArg) { + long timeInPlace = 0L; logger.debug("In atPlace {} with {}", place, payloadArg.shortName()); try (TimedResource timer = resourceWatcherStart(place)) { @@ -348,7 +352,10 @@ protected void atPlace(final IServiceProviderPlace place, final IBaseDataObject payloadArg.setParameter("AGENT_MOVE_ERRORS", Integer.toString(this.moveErrorsOccurred)); } + long startTime = System.nanoTime(); place.agentProcessCall(payloadArg); + long endTime = System.nanoTime(); + timeInPlace = (endTime - startTime); if (this.moveErrorsOccurred > 0) { payloadArg.deleteParameter("AGENT_MOVE_ERRORS"); @@ -366,6 +373,8 @@ protected void atPlace(final IServiceProviderPlace place, final IBaseDataObject } checkInterrupt(place); } + + return timeInPlace; } protected final void checkInterrupt(final IServiceProviderPlace place) { From b104370ad49d41a96d058954382a5331f9b42fa4 Mon Sep 17 00:00:00 2001 From: Kevin Date: Tue, 19 Dec 2023 17:58:42 -0500 Subject: [PATCH 2/3] Time in place added for ibdo and mobile agent --- .../java/emissary/core/BaseDataObject.java | 5 +++ .../java/emissary/core/IBaseDataObject.java | 7 ++++ src/main/java/emissary/core/MobileAgent.java | 32 +++++++++++++++++-- .../java/emissary/core/TransformHistory.java | 16 ++++++++++ src/main/java/emissary/util/PayloadUtil.java | 3 +- 5 files changed, 59 insertions(+), 4 deletions(-) diff --git a/src/main/java/emissary/core/BaseDataObject.java b/src/main/java/emissary/core/BaseDataObject.java index 58b42ad283..dd740c93f5 100755 --- a/src/main/java/emissary/core/BaseDataObject.java +++ b/src/main/java/emissary/core/BaseDataObject.java @@ -773,6 +773,11 @@ public void appendTransformHistory(final String key, boolean coordinated) { this.history.append(key, coordinated); } + @Override + public void addTimeInLastPlace(long timeInPlace) { + this.history.addTimeInPlace(timeInPlace); + } + @Override public void setHistory(TransformHistory newHistory) { this.history.set(newHistory); diff --git a/src/main/java/emissary/core/IBaseDataObject.java b/src/main/java/emissary/core/IBaseDataObject.java index 2e46dfc48b..596a73fdd1 100755 --- a/src/main/java/emissary/core/IBaseDataObject.java +++ b/src/main/java/emissary/core/IBaseDataObject.java @@ -743,6 +743,13 @@ enum MergePolicy { */ void appendTransformHistory(String key, boolean coordinated); + /** + * Adds the given time in place to the last entry in transform history. + * + * @param timeInPlace the time in place in nanoseconds + */ + void addTimeInLastPlace(long timeInPlace); + /** * Return what machine we are located on * diff --git a/src/main/java/emissary/core/MobileAgent.java b/src/main/java/emissary/core/MobileAgent.java index 15e540301b..1dc13199be 100755 --- a/src/main/java/emissary/core/MobileAgent.java +++ b/src/main/java/emissary/core/MobileAgent.java @@ -288,11 +288,12 @@ protected void agentControl(final IServiceProviderPlace currentPlaceArg) { if (loopCount == 1 && !getProcessFirstPlace()) { // Use arrivalPlace for MobileAgent.send() recordHistory(currentPlace, this.payload); - recordHistory(newEntry, this.payload); + recordHistory(newEntry, this.payload, timeInPlace); } else { - recordHistory(newEntry, this.payload); + recordHistory(newEntry, this.payload, timeInPlace); } + // A local place, around the loop and hit it if (newEntry.isLocal()) { logger.debug("Choosing local place {}", newEntry.getFullKey()); @@ -788,7 +789,18 @@ protected void logAgentCompletion(final IBaseDataObject payloadArg) { * @param payloadArg the dataobject that is being processed */ protected void recordHistory(final IServiceProviderPlace place, final IBaseDataObject payloadArg) { - recordHistory(place.getDirectoryEntry(), payloadArg); + recordHistory(place.getDirectoryEntry(), payloadArg, -1L); + } + + /** + * Record the processing history in the data object + * + * @param place where the processing is taking place + * @param payloadArg the dataobject that is being processed + * @param timeInPlace the time in place in nanoseconds + */ + protected void recordHistory(final IServiceProviderPlace place, final IBaseDataObject payloadArg, long timeInPlace) { + recordHistory(place.getDirectoryEntry(), payloadArg, timeInPlace); } /** @@ -798,6 +810,17 @@ protected void recordHistory(final IServiceProviderPlace place, final IBaseDataO * @param payloadArg the dataobject that is being processed */ protected void recordHistory(final DirectoryEntry placeEntry, final IBaseDataObject payloadArg) { + recordHistory(placeEntry, payloadArg, -1L); + } + + /** + * Record the processing history in the data object + * + * @param placeEntry where the processing is taking place + * @param payloadArg the dataobject that is being processed + * @param timeInPlace the time in place in nanoseconds + */ + protected void recordHistory(final DirectoryEntry placeEntry, final IBaseDataObject payloadArg, long timeInPlace) { String placeKey = null; final String cf = payloadArg.currentForm(); @@ -835,6 +858,9 @@ protected void recordHistory(final DirectoryEntry placeEntry, final IBaseDataObj } payloadArg.appendTransformHistory(placeKey); + if (timeInPlace >= 0) { + payloadArg.addTimeInLastPlace(timeInPlace); + } logger.debug("Appended {} to history which now has size {}", placeKey, payloadArg.transformHistory().size()); } diff --git a/src/main/java/emissary/core/TransformHistory.java b/src/main/java/emissary/core/TransformHistory.java index 3b335c5866..0a7e3ccbba 100644 --- a/src/main/java/emissary/core/TransformHistory.java +++ b/src/main/java/emissary/core/TransformHistory.java @@ -167,6 +167,13 @@ public boolean beforeStart() { return s.contains(IServiceProviderPlace.SPROUT_KEY); } + public void addTimeInPlace(long timeInPlace) { + if (history.isEmpty()) { + return; + } + history.get(history.size() - 1).setTimeInPlace(timeInPlace); + } + @Override public String toString() { final StringBuilder myOutput = new StringBuilder(); @@ -179,6 +186,7 @@ public String toString() { public static class History { String key; boolean coordinated; + long timeInPlace; /** * Needed to support Kryo deserialization @@ -194,6 +202,10 @@ public History(String key, boolean coordinated) { this.coordinated = coordinated; } + public void setTimeInPlace(long timeInPlace) { + this.timeInPlace = timeInPlace; + } + public String getKey() { return key; } @@ -202,6 +214,10 @@ public String getKeyNoUrl() { return StringUtils.substringBefore(key, ".http"); } + public long getTimeInPlace() { + return timeInPlace; + } + public boolean wasCoordinated() { return coordinated; } diff --git a/src/main/java/emissary/util/PayloadUtil.java b/src/main/java/emissary/util/PayloadUtil.java index 068a45a6f6..8330d2309f 100755 --- a/src/main/java/emissary/util/PayloadUtil.java +++ b/src/main/java/emissary/util/PayloadUtil.java @@ -136,7 +136,8 @@ public static String getPayloadDisplayString(final IBaseDataObject payload) { sb.append(" "); } // check is NO_URL or not - sb.append(" ").append(historyCase.equals(NO_URL) ? h.getKeyNoUrl() : h.getKey()).append("\n"); + sb.append(" ").append(historyCase.equals(NO_URL) ? h.getKeyNoUrl() : h.getKey()).append(" Time: ").append(h.getTimeInPlace()) + .append("\n"); } } return sb.toString(); From 3b8127cb62d0d87f955e20ab39e1975a7bdb1d08 Mon Sep 17 00:00:00 2001 From: Kevin Date: Wed, 31 Jan 2024 14:07:01 -0500 Subject: [PATCH 3/3] First working commit --- .../java/emissary/core/HDMobileAgent.java | 7 +++ src/main/java/emissary/core/MobileAgent.java | 44 ++++--------------- .../java/emissary/core/TransformHistory.java | 2 +- src/main/java/emissary/util/PayloadUtil.java | 4 +- 4 files changed, 19 insertions(+), 38 deletions(-) diff --git a/src/main/java/emissary/core/HDMobileAgent.java b/src/main/java/emissary/core/HDMobileAgent.java index abf326833e..0c090bc330 100755 --- a/src/main/java/emissary/core/HDMobileAgent.java +++ b/src/main/java/emissary/core/HDMobileAgent.java @@ -403,6 +403,7 @@ protected void switchPrimaryPayload(final int i) { */ protected List atPlaceHD(final IServiceProviderPlace place, final List payloadListArg) { MDC.put(MDCConstants.SERVICE_LOCATION, place.toString()); + long timeInPlace; logger.debug("In atPlaceHD {} with {} payload items", place, payloadListArg.size()); List ret = Collections.emptyList(); @@ -417,7 +418,13 @@ protected List atPlaceHD(final IServiceProviderPlace place, fin addMoveErrorCount(payloadListArg); } + long startTime = System.nanoTime(); ret = place.agentProcessHeavyDuty(payloadListArg); + long endTime = System.nanoTime(); + timeInPlace = (endTime - startTime); + for (IBaseDataObject payload : payloadListArg) { + payload.addTimeInLastPlace(timeInPlace); + } for (Iterator it = ret.iterator(); it.hasNext();) { final IBaseDataObject ibdo = it.next(); diff --git a/src/main/java/emissary/core/MobileAgent.java b/src/main/java/emissary/core/MobileAgent.java index 1dc13199be..ba4af63338 100755 --- a/src/main/java/emissary/core/MobileAgent.java +++ b/src/main/java/emissary/core/MobileAgent.java @@ -264,14 +264,12 @@ protected void agentControl(final IServiceProviderPlace currentPlaceArg) { // One based counter loopCount++; - long timeInPlace = 0L; - // First time in, we just have the pickup place where we started // our mission. We dont process there, just use it to call through // to the directory, so skip the processing. See the difference // between the go() and arrive() methods for details if ((loopCount > 1 || getProcessFirstPlace()) && !controlError) { - timeInPlace = atPlace(currentPlace, mypayload); + atPlace(currentPlace, mypayload); } // Choose next place @@ -288,9 +286,9 @@ protected void agentControl(final IServiceProviderPlace currentPlaceArg) { if (loopCount == 1 && !getProcessFirstPlace()) { // Use arrivalPlace for MobileAgent.send() recordHistory(currentPlace, this.payload); - recordHistory(newEntry, this.payload, timeInPlace); + recordHistory(newEntry, this.payload); } else { - recordHistory(newEntry, this.payload, timeInPlace); + recordHistory(newEntry, this.payload); } @@ -339,10 +337,9 @@ protected void agentControl(final IServiceProviderPlace currentPlaceArg) { * * @param place the place we are asking to work for us * @param payloadArg the data for the place to operate on - * @return the time in nanoseconds spent in the place */ - protected long atPlace(final IServiceProviderPlace place, final IBaseDataObject payloadArg) { - long timeInPlace = 0L; + protected void atPlace(final IServiceProviderPlace place, final IBaseDataObject payloadArg) { + long timeInPlace; logger.debug("In atPlace {} with {}", place, payloadArg.shortName()); try (TimedResource timer = resourceWatcherStart(place)) { @@ -357,6 +354,7 @@ protected long atPlace(final IServiceProviderPlace place, final IBaseDataObject place.agentProcessCall(payloadArg); long endTime = System.nanoTime(); timeInPlace = (endTime - startTime); + payloadArg.addTimeInLastPlace(timeInPlace); if (this.moveErrorsOccurred > 0) { payloadArg.deleteParameter("AGENT_MOVE_ERRORS"); @@ -374,8 +372,6 @@ protected long atPlace(final IServiceProviderPlace place, final IBaseDataObject } checkInterrupt(place); } - - return timeInPlace; } protected final void checkInterrupt(final IServiceProviderPlace place) { @@ -788,19 +784,9 @@ protected void logAgentCompletion(final IBaseDataObject payloadArg) { * @param place where the processing is taking place * @param payloadArg the dataobject that is being processed */ - protected void recordHistory(final IServiceProviderPlace place, final IBaseDataObject payloadArg) { - recordHistory(place.getDirectoryEntry(), payloadArg, -1L); - } - /** - * Record the processing history in the data object - * - * @param place where the processing is taking place - * @param payloadArg the dataobject that is being processed - * @param timeInPlace the time in place in nanoseconds - */ - protected void recordHistory(final IServiceProviderPlace place, final IBaseDataObject payloadArg, long timeInPlace) { - recordHistory(place.getDirectoryEntry(), payloadArg, timeInPlace); + protected void recordHistory(final IServiceProviderPlace place, final IBaseDataObject payloadArg) { + recordHistory(place.getDirectoryEntry(), payloadArg); } /** @@ -810,17 +796,6 @@ protected void recordHistory(final IServiceProviderPlace place, final IBaseDataO * @param payloadArg the dataobject that is being processed */ protected void recordHistory(final DirectoryEntry placeEntry, final IBaseDataObject payloadArg) { - recordHistory(placeEntry, payloadArg, -1L); - } - - /** - * Record the processing history in the data object - * - * @param placeEntry where the processing is taking place - * @param payloadArg the dataobject that is being processed - * @param timeInPlace the time in place in nanoseconds - */ - protected void recordHistory(final DirectoryEntry placeEntry, final IBaseDataObject payloadArg, long timeInPlace) { String placeKey = null; final String cf = payloadArg.currentForm(); @@ -858,9 +833,6 @@ protected void recordHistory(final DirectoryEntry placeEntry, final IBaseDataObj } payloadArg.appendTransformHistory(placeKey); - if (timeInPlace >= 0) { - payloadArg.addTimeInLastPlace(timeInPlace); - } logger.debug("Appended {} to history which now has size {}", placeKey, payloadArg.transformHistory().size()); } diff --git a/src/main/java/emissary/core/TransformHistory.java b/src/main/java/emissary/core/TransformHistory.java index 0a7e3ccbba..455b4be1e6 100644 --- a/src/main/java/emissary/core/TransformHistory.java +++ b/src/main/java/emissary/core/TransformHistory.java @@ -214,7 +214,7 @@ public String getKeyNoUrl() { return StringUtils.substringBefore(key, ".http"); } - public long getTimeInPlace() { + public double getTimeInPlace() { return timeInPlace; } diff --git a/src/main/java/emissary/util/PayloadUtil.java b/src/main/java/emissary/util/PayloadUtil.java index 8330d2309f..d6904b7b29 100755 --- a/src/main/java/emissary/util/PayloadUtil.java +++ b/src/main/java/emissary/util/PayloadUtil.java @@ -131,12 +131,14 @@ public static String getPayloadDisplayString(final IBaseDataObject payload) { .append("\n"); } else { for (final TransformHistory.History h : th) { + double msInPlace = h.getTimeInPlace() / 1_000_000; sb.append(" "); if (h.wasCoordinated()) { sb.append(" "); } // check is NO_URL or not - sb.append(" ").append(historyCase.equals(NO_URL) ? h.getKeyNoUrl() : h.getKey()).append(" Time: ").append(h.getTimeInPlace()) + sb.append(" ").append(historyCase.equals(NO_URL) ? h.getKeyNoUrl() : h.getKey()).append(" Time: ") + .append(msInPlace).append("ms") .append("\n"); } }