Skip to content

Commit

Permalink
get max and min from available history
Browse files Browse the repository at this point in the history
  • Loading branch information
matush-v committed Jul 11, 2017
1 parent 1a14d3d commit 77de0c1
Showing 1 changed file with 23 additions and 29 deletions.
Expand Up @@ -5,7 +5,6 @@
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;


import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -71,7 +70,6 @@ public class SingularityUsagePoller extends SingularityLeaderOnlyPoller {


@Override @Override
public void runActionOnPoll() { public void runActionOnPoll() {
final Optional<Map<String, RequestUtilization>> oldUtilizationPerRequestId = getOldUtilizationPerRequestId();
Map<String, RequestUtilization> utilizationPerRequestId = new HashMap<>(); Map<String, RequestUtilization> utilizationPerRequestId = new HashMap<>();
final long now = System.currentTimeMillis(); final long now = System.currentTimeMillis();


Expand Down Expand Up @@ -117,7 +115,7 @@ public void runActionOnPoll() {
double cpuReservedForTask = maybeTask.get().getTaskRequest().getDeploy().getResources().get().getCpus(); double cpuReservedForTask = maybeTask.get().getTaskRequest().getDeploy().getResources().get().getCpus();
memoryMbReservedOnSlave += memoryMbReservedForTask; memoryMbReservedOnSlave += memoryMbReservedForTask;
cpuReservedOnSlave += cpuReservedForTask; cpuReservedOnSlave += cpuReservedForTask;
updateRequestUtilization(oldUtilizationPerRequestId, utilizationPerRequestId, pastTaskUsages, latestUsage, task, memoryMbReservedForTask, cpuReservedForTask); updateRequestUtilization(utilizationPerRequestId, pastTaskUsages, latestUsage, task, memoryMbReservedForTask, cpuReservedForTask);
} }
memoryBytesUsedOnSlave += latestUsage.getMemoryTotalBytes(); memoryBytesUsedOnSlave += latestUsage.getMemoryTotalBytes();


Expand Down Expand Up @@ -171,15 +169,6 @@ public void runActionOnPoll() {
usageManager.saveClusterUtilization(getClusterUtilization(utilizationPerRequestId, totalMemBytesUsed, totalMemBytesAvailable, totalCpuUsed, totalCpuAvailable, now)); usageManager.saveClusterUtilization(getClusterUtilization(utilizationPerRequestId, totalMemBytesUsed, totalMemBytesAvailable, totalCpuUsed, totalCpuAvailable, now));
} }


private Optional<Map<String, RequestUtilization>> getOldUtilizationPerRequestId() {
final Optional<SingularityClusterUtilization> oldClusterUtilization = usageManager.getClusterUtilization();

return oldClusterUtilization.isPresent() ? Optional.of(oldClusterUtilization.get()
.getRequestUtilizations()
.stream()
.collect(Collectors.toMap(RequestUtilization::getRequestId, r -> r))) : Optional.absent();
}

private SingularityTaskUsage getUsage(MesosTaskMonitorObject taskUsage) { private SingularityTaskUsage getUsage(MesosTaskMonitorObject taskUsage) {
double cpuSeconds = taskUsage.getStatistics().getCpusSystemTimeSecs() + taskUsage.getStatistics().getCpusUserTimeSecs(); double cpuSeconds = taskUsage.getStatistics().getCpusSystemTimeSecs() + taskUsage.getStatistics().getCpusUserTimeSecs();


Expand Down Expand Up @@ -208,42 +197,47 @@ private void updateLongRunningTasksUsage(Map<ResourceUsageType, Number> longRunn
longRunningTasksUsage.compute(ResourceUsageType.CPU_USED, (k, v) -> (v == null) ? cpuUsed : v.doubleValue() + cpuUsed); longRunningTasksUsage.compute(ResourceUsageType.CPU_USED, (k, v) -> (v == null) ? cpuUsed : v.doubleValue() + cpuUsed);
} }


private void updateRequestUtilization(Optional<Map<String, RequestUtilization>> maybeOldUtilizationPerRequestId, Map<String, RequestUtilization> utilizationPerRequestId, List<SingularityTaskUsage> pastTaskUsages, SingularityTaskUsage latestUsage, SingularityTaskId task, double memoryMbReservedForTask, double cpuReservedForTask) { private void updateRequestUtilization(Map<String, RequestUtilization> utilizationPerRequestId, List<SingularityTaskUsage> pastTaskUsages, SingularityTaskUsage latestUsage, SingularityTaskId task, double memoryMbReservedForTask, double cpuReservedForTask) {
List<SingularityTaskUsage> pastTaskUsagesCopy = copyUsages(pastTaskUsages, latestUsage, task);
String requestId = task.getRequestId(); String requestId = task.getRequestId();
RequestUtilization requestUtilization = utilizationPerRequestId.getOrDefault(requestId, new RequestUtilization(requestId, task.getDeployId())); RequestUtilization requestUtilization = utilizationPerRequestId.getOrDefault(requestId, new RequestUtilization(requestId, task.getDeployId()));

long curMaxMemBytesUsed = 0;
long oldMaxMemBytesUsed = 0; long curMinMemBytesUsed = Long.MAX_VALUE;
long oldMinMemBytesUsed = Long.MAX_VALUE; double curMaxCpuUsed = 0;
double oldMaxCpuUsed = 0; double curMinCpuUsed = Double.MAX_VALUE;
double oldMinCpuUsed = Double.MAX_VALUE;
if (maybeOldUtilizationPerRequestId.isPresent() && maybeOldUtilizationPerRequestId.get().get(requestId) != null) { if (utilizationPerRequestId.containsKey(requestId)) {
oldMaxMemBytesUsed = maybeOldUtilizationPerRequestId.get().get(requestId).getMaxMemBytesUsed(); curMaxMemBytesUsed = requestUtilization.getMaxMemBytesUsed();
oldMinMemBytesUsed = maybeOldUtilizationPerRequestId.get().get(requestId).getMinMemBytesUsed(); curMinMemBytesUsed = requestUtilization.getMinMemBytesUsed();
oldMaxCpuUsed = maybeOldUtilizationPerRequestId.get().get(requestId).getMaxCpuUsed(); curMaxCpuUsed = requestUtilization.getMaxCpuUsed();
oldMinCpuUsed = maybeOldUtilizationPerRequestId.get().get(requestId).getMinCpuUsed(); curMinCpuUsed = requestUtilization.getMinCpuUsed();
} }


List<SingularityTaskUsage> pastTaskUsagesCopy = copyUsages(pastTaskUsages, latestUsage, task);
int numTasks = pastTaskUsagesCopy.size() - 1; int numTasks = pastTaskUsagesCopy.size() - 1;


for (int i = 0; i < numTasks; i++) { for (int i = 0; i < numTasks; i++) {
SingularityTaskUsage olderUsage = pastTaskUsagesCopy.get(i); SingularityTaskUsage olderUsage = pastTaskUsagesCopy.get(i);
SingularityTaskUsage newerUsage = pastTaskUsagesCopy.get(i + 1); SingularityTaskUsage newerUsage = pastTaskUsagesCopy.get(i + 1);
double cpusUsed = (newerUsage.getCpuSeconds() - olderUsage.getCpuSeconds()) / (newerUsage.getTimestampSeconds() - olderUsage.getTimestampSeconds()); double cpusUsed = (newerUsage.getCpuSeconds() - olderUsage.getCpuSeconds()) / (newerUsage.getTimestampSeconds() - olderUsage.getTimestampSeconds());


curMaxCpuUsed = Math.max(cpusUsed, curMaxCpuUsed);
curMinCpuUsed = Math.min(cpusUsed, curMinCpuUsed);
curMaxMemBytesUsed = Math.max(newerUsage.getMemoryTotalBytes(), curMaxMemBytesUsed);
curMinMemBytesUsed = Math.min(newerUsage.getMemoryTotalBytes(), curMinMemBytesUsed);

requestUtilization requestUtilization
.addCpuUsed(cpusUsed) .addCpuUsed(cpusUsed)
.setMaxCpuUsed(Math.max(cpusUsed, Math.max(requestUtilization.getMaxCpuUsed(), oldMaxCpuUsed)))
.setMinCpuUsed(Math.min(cpusUsed, Math.min(requestUtilization.getMinCpuUsed(), oldMinCpuUsed)))
.setMaxMemBytesUsed(Math.max(newerUsage.getMemoryTotalBytes(), Math.max(requestUtilization.getMaxMemBytesUsed(), oldMaxMemBytesUsed)))
.setMinMemBytesUsed(Math.min(newerUsage.getMemoryTotalBytes(), Math.min(requestUtilization.getMinMemBytesUsed(), oldMinMemBytesUsed)))
.addMemBytesUsed(newerUsage.getMemoryTotalBytes()) .addMemBytesUsed(newerUsage.getMemoryTotalBytes())
.incrementTaskCount(); .incrementTaskCount();
} }


requestUtilization requestUtilization
.addMemBytesReserved((long) (memoryMbReservedForTask * SingularitySlaveUsage.BYTES_PER_MEGABYTE * numTasks)) .addMemBytesReserved((long) (memoryMbReservedForTask * SingularitySlaveUsage.BYTES_PER_MEGABYTE * numTasks))
.addCpuReserved(cpuReservedForTask * numTasks); .addCpuReserved(cpuReservedForTask * numTasks)
.setMaxCpuUsed(curMaxCpuUsed)
.setMinCpuUsed(curMinCpuUsed)
.setMaxMemBytesUsed(curMaxMemBytesUsed)
.setMinMemBytesUsed(curMinMemBytesUsed);


utilizationPerRequestId.put(requestId, requestUtilization); utilizationPerRequestId.put(requestId, requestUtilization);
} }
Expand Down

0 comments on commit 77de0c1

Please sign in to comment.