diff --git a/SingularityService/src/main/java/com/hubspot/singularity/scheduler/SingularityUsagePoller.java b/SingularityService/src/main/java/com/hubspot/singularity/scheduler/SingularityUsagePoller.java index 165043799a..a9ab836485 100644 --- a/SingularityService/src/main/java/com/hubspot/singularity/scheduler/SingularityUsagePoller.java +++ b/SingularityService/src/main/java/com/hubspot/singularity/scheduler/SingularityUsagePoller.java @@ -5,7 +5,6 @@ import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -71,7 +70,6 @@ public class SingularityUsagePoller extends SingularityLeaderOnlyPoller { @Override public void runActionOnPoll() { - final Optional> oldUtilizationPerRequestId = getOldUtilizationPerRequestId(); Map utilizationPerRequestId = new HashMap<>(); final long now = System.currentTimeMillis(); @@ -117,7 +115,7 @@ public void runActionOnPoll() { double cpuReservedForTask = maybeTask.get().getTaskRequest().getDeploy().getResources().get().getCpus(); memoryMbReservedOnSlave += memoryMbReservedForTask; cpuReservedOnSlave += cpuReservedForTask; - updateRequestUtilization(oldUtilizationPerRequestId, utilizationPerRequestId, pastTaskUsages, latestUsage, task, memoryMbReservedForTask, cpuReservedForTask); + updateRequestUtilization(utilizationPerRequestId, pastTaskUsages, latestUsage, task, memoryMbReservedForTask, cpuReservedForTask); } memoryBytesUsedOnSlave += latestUsage.getMemoryTotalBytes(); @@ -171,15 +169,6 @@ public void runActionOnPoll() { usageManager.saveClusterUtilization(getClusterUtilization(utilizationPerRequestId, totalMemBytesUsed, totalMemBytesAvailable, totalCpuUsed, totalCpuAvailable, now)); } - private Optional> getOldUtilizationPerRequestId() { - final Optional oldClusterUtilization = usageManager.getClusterUtilization(); - - return oldClusterUtilization.isPresent() ? Optional.of(oldClusterUtilization.get() - .getRequestUtilizations() - .stream() - .collect(Collectors.toMap(RequestUtilization::getRequestId, r -> r))) : Optional.absent(); - } - private SingularityTaskUsage getUsage(MesosTaskMonitorObject taskUsage) { double cpuSeconds = taskUsage.getStatistics().getCpusSystemTimeSecs() + taskUsage.getStatistics().getCpusUserTimeSecs(); @@ -208,22 +197,22 @@ private void updateLongRunningTasksUsage(Map longRunn longRunningTasksUsage.compute(ResourceUsageType.CPU_USED, (k, v) -> (v == null) ? cpuUsed : v.doubleValue() + cpuUsed); } - private void updateRequestUtilization(Optional> maybeOldUtilizationPerRequestId, Map utilizationPerRequestId, List pastTaskUsages, SingularityTaskUsage latestUsage, SingularityTaskId task, double memoryMbReservedForTask, double cpuReservedForTask) { - List pastTaskUsagesCopy = copyUsages(pastTaskUsages, latestUsage, task); + private void updateRequestUtilization(Map utilizationPerRequestId, List pastTaskUsages, SingularityTaskUsage latestUsage, SingularityTaskId task, double memoryMbReservedForTask, double cpuReservedForTask) { String requestId = task.getRequestId(); RequestUtilization requestUtilization = utilizationPerRequestId.getOrDefault(requestId, new RequestUtilization(requestId, task.getDeployId())); - - long oldMaxMemBytesUsed = 0; - long oldMinMemBytesUsed = Long.MAX_VALUE; - double oldMaxCpuUsed = 0; - double oldMinCpuUsed = Double.MAX_VALUE; - if (maybeOldUtilizationPerRequestId.isPresent() && maybeOldUtilizationPerRequestId.get().get(requestId) != null) { - oldMaxMemBytesUsed = maybeOldUtilizationPerRequestId.get().get(requestId).getMaxMemBytesUsed(); - oldMinMemBytesUsed = maybeOldUtilizationPerRequestId.get().get(requestId).getMinMemBytesUsed(); - oldMaxCpuUsed = maybeOldUtilizationPerRequestId.get().get(requestId).getMaxCpuUsed(); - oldMinCpuUsed = maybeOldUtilizationPerRequestId.get().get(requestId).getMinCpuUsed(); + long curMaxMemBytesUsed = 0; + long curMinMemBytesUsed = Long.MAX_VALUE; + double curMaxCpuUsed = 0; + double curMinCpuUsed = Double.MAX_VALUE; + + if (utilizationPerRequestId.containsKey(requestId)) { + curMaxMemBytesUsed = requestUtilization.getMaxMemBytesUsed(); + curMinMemBytesUsed = requestUtilization.getMinMemBytesUsed(); + curMaxCpuUsed = requestUtilization.getMaxCpuUsed(); + curMinCpuUsed = requestUtilization.getMinCpuUsed(); } + List pastTaskUsagesCopy = copyUsages(pastTaskUsages, latestUsage, task); int numTasks = pastTaskUsagesCopy.size() - 1; for (int i = 0; i < numTasks; i++) { @@ -231,19 +220,24 @@ private void updateRequestUtilization(Optional> SingularityTaskUsage newerUsage = pastTaskUsagesCopy.get(i + 1); double cpusUsed = (newerUsage.getCpuSeconds() - olderUsage.getCpuSeconds()) / (newerUsage.getTimestampSeconds() - olderUsage.getTimestampSeconds()); + curMaxCpuUsed = Math.max(cpusUsed, curMaxCpuUsed); + curMinCpuUsed = Math.min(cpusUsed, curMinCpuUsed); + curMaxMemBytesUsed = Math.max(newerUsage.getMemoryTotalBytes(), curMaxMemBytesUsed); + curMinMemBytesUsed = Math.min(newerUsage.getMemoryTotalBytes(), curMinMemBytesUsed); + requestUtilization .addCpuUsed(cpusUsed) - .setMaxCpuUsed(Math.max(cpusUsed, Math.max(requestUtilization.getMaxCpuUsed(), oldMaxCpuUsed))) - .setMinCpuUsed(Math.min(cpusUsed, Math.min(requestUtilization.getMinCpuUsed(), oldMinCpuUsed))) - .setMaxMemBytesUsed(Math.max(newerUsage.getMemoryTotalBytes(), Math.max(requestUtilization.getMaxMemBytesUsed(), oldMaxMemBytesUsed))) - .setMinMemBytesUsed(Math.min(newerUsage.getMemoryTotalBytes(), Math.min(requestUtilization.getMinMemBytesUsed(), oldMinMemBytesUsed))) .addMemBytesUsed(newerUsage.getMemoryTotalBytes()) .incrementTaskCount(); } requestUtilization .addMemBytesReserved((long) (memoryMbReservedForTask * SingularitySlaveUsage.BYTES_PER_MEGABYTE * numTasks)) - .addCpuReserved(cpuReservedForTask * numTasks); + .addCpuReserved(cpuReservedForTask * numTasks) + .setMaxCpuUsed(curMaxCpuUsed) + .setMinCpuUsed(curMinCpuUsed) + .setMaxMemBytesUsed(curMaxMemBytesUsed) + .setMinMemBytesUsed(curMinMemBytesUsed); utilizationPerRequestId.put(requestId, requestUtilization); }