Skip to content

Commit

Permalink
Update user resource metric reporting to include both embedded and ag…
Browse files Browse the repository at this point in the history
…ent jobs

For now limit to only jobs launched via API submission for consistency with previous behavior

Modify the actual persistence APIs leveraged to be more flexibly via parameters for the jobs statuses to consider and whether the job was submitted via API or agent CLI
  • Loading branch information
tgianos committed May 29, 2020
1 parent b40bdf9 commit b36d2f2
Show file tree
Hide file tree
Showing 8 changed files with 57 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1166,12 +1166,15 @@ void canGetJobMetadata() throws GenieException {
@Test
@DatabaseSetup("persistence/jobs/search.xml")
void canGetUserResourceSummaries() {
final Map<String, UserResourcesSummary> summaries = this.service.getUserResourcesSummaries();
final Map<String, UserResourcesSummary> summaries = this.service.getUserResourcesSummaries(
JobStatus.getActiveStatuses(),
true
);
Assertions.assertThat(summaries.keySet()).contains("tgianos");
final UserResourcesSummary userResourcesSummary = summaries.get("tgianos");
Assertions.assertThat(userResourcesSummary.getUser()).isEqualTo("tgianos");
Assertions.assertThat(userResourcesSummary.getRunningJobsCount()).isEqualTo(1L);
Assertions.assertThat(userResourcesSummary.getUsedMemory()).isEqualTo(2048L);
Assertions.assertThat(userResourcesSummary.getRunningJobsCount()).isEqualTo(2L);
Assertions.assertThat(userResourcesSummary.getUsedMemory()).isEqualTo(4096L);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,7 @@
v4="false"
cluster_name="h2query"
command_name="spark"
api="true"
/>
<job_command_arguments
job_id="1"
Expand Down Expand Up @@ -524,6 +525,7 @@
v4="false"
cluster_name="h2query"
command_name="spark"
api="true"
/>
<job_command_arguments
job_id="2"
Expand Down Expand Up @@ -611,6 +613,7 @@
v4="false"
cluster_name="h2query"
command_name="spark"
api="true"
/>
<job_command_arguments
job_id="3"
Expand Down Expand Up @@ -698,6 +701,7 @@
v4="true"
cluster_name="h2query"
command_name="spark"
api="false"
/>
<job_command_arguments
job_id="4"
Expand Down Expand Up @@ -785,6 +789,7 @@
v4="true"
cluster_name="h2query"
command_name="spark"
api="false"
/>
<job_command_arguments
job_id="5"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1001,11 +1001,13 @@ void updateJobStatus(
long getActiveJobCountForUser(@NotBlank String user);

/**
* Get a map of summaries of resources usage for each user with at least one running job.
* Get a map of summaries of resources usage for each user with at least one active job.
*
* @param statuses The set of {@link JobStatus} a job must be in to be considered in this request
* @param api Whether the job was submitted via the api ({@literal true}) or the agent cli ({@literal false})
* @return a map of user resources summaries, keyed on user name
*/
Map<String, UserResourcesSummary> getUserResourcesSummaries();
Map<String, UserResourcesSummary> getUserResourcesSummaries(Set<JobStatus> statuses, boolean api);

/**
* Get the amount of memory currently used on the given host by Genie jobs in any of the following states.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2227,9 +2227,16 @@ public long getActiveJobCountForUser(@NotBlank final String user) {
*/
@Override
@Transactional(readOnly = true)
public Map<String, UserResourcesSummary> getUserResourcesSummaries() {
log.debug("[getUserResourcesSummaries] Called");
return this.jobRepository.getUserJobResourcesAggregates()
public Map<String, UserResourcesSummary> getUserResourcesSummaries(
final Set<JobStatus> statuses,
final boolean api
) {
log.debug("[getUserResourcesSummaries] Called for statuses {} and api {}", statuses, api);
return this.jobRepository
.getUserJobResourcesAggregates(
statuses.stream().map(JobStatus::name).collect(Collectors.toSet()),
api
)
.stream()
.map(EntityV3DtoConverters::toUserResourceSummaryDto)
.collect(Collectors.toMap(UserResourcesSummary::getUser, userResourcesSummary -> userResourcesSummary));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,15 +154,20 @@ Set<Long> findJobsCreatedBefore(
* Returns resources usage for each user that has a running job.
* Only jobs running on Genie servers are considered (i.e. no Agent jobs)
*
* @param statuses The set of statuses a job has to be in to be considered
* @param api Whether the job was submitted through the api ({@literal true}) or agent cli ({@literal false})
* @return The user resource aggregates
*/
@Query(
"SELECT j.user AS user, COUNT(j) as runningJobsCount, SUM(j.memoryUsed) as usedMemory"
"SELECT j.user AS user, COUNT(j) as runningJobsCount, COALESCE(SUM(j.memoryUsed), 0) as usedMemory"
+ " FROM JobEntity j"
+ " WHERE j.status = 'RUNNING' AND j.v4 = FALSE"
+ " WHERE j.status IN (:statuses) AND j.api = :isApi"
+ " GROUP BY j.user"
)
Set<UserJobResourcesAggregate> getUserJobResourcesAggregates();
Set<UserJobResourcesAggregate> getUserJobResourcesAggregates(
@Param("statuses") Set<String> statuses,
@Param("isApi") boolean api
);

/**
* Find agent jobs in the given set of states.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.AtomicDouble;
import com.netflix.genie.common.dto.UserResourcesSummary;
import com.netflix.genie.common.external.dtos.v4.JobStatus;
import com.netflix.genie.web.data.services.DataServices;
import com.netflix.genie.web.data.services.PersistenceService;
import com.netflix.genie.web.properties.UserMetricsProperties;
Expand Down Expand Up @@ -98,7 +99,13 @@ public long getFixedRate() {
public void run() {
log.debug("Publishing user metrics");

final Map<String, UserResourcesSummary> summaries = this.persistenceService.getUserResourcesSummaries();
// For now just report the API jobs as they're using resources on Genie web nodes
// Get us unblocked for now on agent migration but in future we may want to change this to further dice or
// combine reports by CLI vs. API
final Map<String, UserResourcesSummary> summaries = this.persistenceService.getUserResourcesSummaries(
JobStatus.getActiveStatuses(),
true
);

// Update number of active users
log.debug("Number of users with active jobs: {}", summaries.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package com.netflix.genie.web.tasks.leader

import com.netflix.genie.common.dto.UserResourcesSummary
import com.netflix.genie.common.external.dtos.v4.JobStatus
import com.netflix.genie.web.data.services.DataServices
import com.netflix.genie.web.data.services.PersistenceService
import com.netflix.genie.web.properties.UserMetricsProperties
Expand Down Expand Up @@ -87,7 +88,7 @@ class UserMetricsTaskSpec extends Specification {
this.task.run()

then:
1 * persistenceService.getUserResourcesSummaries() >> fooBarSummariesMap
1 * persistenceService.getUserResourcesSummaries(JobStatus.activeStatuses, true) >> fooBarSummariesMap
4 * registry.gauge(_ as Meter.Id, _, _ as ToDoubleFunction) >> {
args -> return captureGauge(args[0] as Meter.Id, args[1] as Object, args[2] as ToDoubleFunction<Object>)
}
Expand All @@ -102,7 +103,7 @@ class UserMetricsTaskSpec extends Specification {
this.task.run()

then:
1 * persistenceService.getUserResourcesSummaries() >> fooBooSummariesMap
1 * persistenceService.getUserResourcesSummaries(JobStatus.activeStatuses, true) >> fooBooSummariesMap
2 * registry.gauge(_ as Meter.Id, _, _ as ToDoubleFunction) >> {
args -> return captureGauge(args[0] as Meter.Id, args[1] as Object, args[2] as ToDoubleFunction<Object>)
}
Expand All @@ -119,7 +120,7 @@ class UserMetricsTaskSpec extends Specification {
this.task.run()

then:
1 * persistenceService.getUserResourcesSummaries() >> emptySummariesMap
1 * persistenceService.getUserResourcesSummaries(JobStatus.activeStatuses, true) >> emptySummariesMap
measureActiveUsers() == 0
measureJobs("foo") == Double.NaN
measureMemory("foo") == Double.NaN
Expand All @@ -144,7 +145,7 @@ class UserMetricsTaskSpec extends Specification {
this.task.run()

then:
1 * persistenceService.getUserResourcesSummaries() >> fooBarSummariesMap
1 * persistenceService.getUserResourcesSummaries(JobStatus.activeStatuses, true) >> fooBarSummariesMap

4 * registry.gauge(_ as Meter.Id, _, _ as ToDoubleFunction) >> {
args -> return captureGauge(args[0] as Meter.Id, args[1] as Object, args[2] as ToDoubleFunction<Object>)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1079,9 +1079,11 @@ void canGetJobHost() throws GenieCheckedException {
@Test
void canGetUserResourceSummariesNoRecords() {
Mockito
.when(this.jobRepository.getUserJobResourcesAggregates())
.when(this.jobRepository.getUserJobResourcesAggregates(JpaPersistenceServiceImpl.ACTIVE_STATUS_SET, true))
.thenReturn(Sets.newHashSet());
Assertions.assertThat(this.persistenceService.getUserResourcesSummaries()).isEmpty();
Assertions
.assertThat(this.persistenceService.getUserResourcesSummaries(JobStatus.getActiveStatuses(), true))
.isEmpty();
}

@Test
Expand All @@ -1097,14 +1099,19 @@ void canGetUserResourceSummaries() {
Mockito.when(p2.getRunningJobsCount()).thenReturn(5L);
Mockito.when(p2.getUsedMemory()).thenReturn(2048L);

final Set<JobStatus> statuses = JobStatus.getResolvableStatuses();
final Set<String> statusStrings = statuses.stream().map(JobStatus::name).collect(Collectors.toSet());

Mockito
.when(this.jobRepository.getUserJobResourcesAggregates())
.when(this.jobRepository.getUserJobResourcesAggregates(statusStrings, false))
.thenReturn(Sets.newHashSet(p1, p2));

final HashMap<String, UserResourcesSummary> expectedMap = Maps.newHashMap();
expectedMap.put("foo", new UserResourcesSummary("foo", 3, 1024));
expectedMap.put("bar", new UserResourcesSummary("bar", 5, 2048));
Assertions.assertThat(this.persistenceService.getUserResourcesSummaries()).isEqualTo(expectedMap);
Assertions
.assertThat(this.persistenceService.getUserResourcesSummaries(statuses, false))
.isEqualTo(expectedMap);
}

@Test
Expand Down

0 comments on commit b36d2f2

Please sign in to comment.