Skip to content
This repository has been archived by the owner on May 14, 2022. It is now read-only.

Commit

Permalink
Add print/remove unschedulable job commands (#1184)
Browse files Browse the repository at this point in the history
* Add print/remove unschedulable job commands

* Address code review requests
  • Loading branch information
tbak committed Nov 22, 2021
1 parent 46979cd commit 88748f8
Show file tree
Hide file tree
Showing 9 changed files with 477 additions and 0 deletions.
5 changes: 5 additions & 0 deletions titus-cli/src/main/java/com/netflix/titus/cli/CLI.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
import com.netflix.titus.cli.command.job.ObserveJobsCommand;
import com.netflix.titus.cli.command.job.TaskGetCommand;
import com.netflix.titus.cli.command.job.TasksGetCommand;
import com.netflix.titus.cli.command.job.unschedulable.PrintUnschedulableJobsCommand;
import com.netflix.titus.cli.command.job.unschedulable.RemoveUnschedulableJobsCommand;
import com.netflix.titus.cli.command.scheduler.ObserveSchedulingResultCommand;
import com.netflix.titus.cli.command.supervisor.SupervisorObserveEventsCommand;
import io.grpc.StatusRuntimeException;
Expand Down Expand Up @@ -89,6 +91,9 @@ public class CLI {
.put("schedulingResults", new ObserveSchedulingResultCommand())
// Supervisor
.put("supervisorEvents", new SupervisorObserveEventsCommand())
// Unschedulable jobs
.put("printUnschedulableJobs", new PrintUnschedulableJobsCommand())
.put("removeUnschedulableJobs", new RemoveUnschedulableJobsCommand())
.build();

private final boolean helpRequested;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.netflix.titus.common.util.grpc.reactor.GrpcToReactorClientFactory;
import com.netflix.titus.common.util.grpc.reactor.client.ReactorToGrpcClientBuilder;
import com.netflix.titus.grpc.protogen.JobManagementServiceGrpc;
import com.netflix.titus.grpc.protogen.SchedulerServiceGrpc;
import com.netflix.titus.runtime.connector.GrpcRequestConfiguration;
import com.netflix.titus.runtime.connector.jobmanager.JobConnectorConfiguration;
import com.netflix.titus.runtime.connector.jobmanager.ReactorJobManagementServiceStub;
Expand Down Expand Up @@ -144,6 +145,10 @@ public RemoteJobManagementClient getJobManagementClientWithKeepAlive(long keepAl
return new RemoteJobManagementClientWithKeepAlive("cli", configuration, stub, reactorStub, titusRuntime);
}

public SchedulerServiceGrpc.SchedulerServiceBlockingStub getSchedulerServiceBlockingStub() {
return SchedulerServiceGrpc.newBlockingStub(createChannel());
}

public void shutdown() {
channels.forEach(ManagedChannel::shutdown);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright 2021 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.netflix.titus.cli.command.job;

import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;

import com.netflix.titus.api.jobmanager.model.job.Job;
import com.netflix.titus.api.jobmanager.model.job.JobState;
import com.netflix.titus.api.jobmanager.model.job.Task;
import com.netflix.titus.api.jobmanager.model.job.event.JobManagerEvent;
import com.netflix.titus.api.jobmanager.model.job.event.JobUpdateEvent;
import com.netflix.titus.api.jobmanager.model.job.event.TaskUpdateEvent;
import com.netflix.titus.cli.CommandContext;
import com.netflix.titus.common.util.tuple.Pair;

public class JobUtil {

public static Pair<Map<String, Job>, Map<String, Map<String, Task>>> loadActiveJobsAndTasks(CommandContext context) {
Map<String, Job> activeJobs = new HashMap<>();
Map<String, Map<String, Task>> activeTasks = new HashMap<>();
Iterator<JobManagerEvent<?>> it = context.getJobManagementClient().observeJobs(Collections.emptyMap())
.toIterable()
.iterator();
while (it.hasNext()) {
JobManagerEvent<?> event = it.next();
if (event instanceof JobUpdateEvent) {
JobUpdateEvent je = (JobUpdateEvent) event;
Job job = je.getCurrent();
if (job.getStatus().getState() == JobState.Accepted) {
activeJobs.put(job.getId(), job);
}
} else if (event instanceof TaskUpdateEvent) {
TaskUpdateEvent te = (TaskUpdateEvent) event;
Task task = te.getCurrent();
if (activeJobs.containsKey(task.getJobId())) {
activeTasks.computeIfAbsent(task.getJobId(), j -> new HashMap<>()).put(task.getId(), task);
}
} else if (event.equals(JobManagerEvent.snapshotMarker())) {
break;
}
}
return Pair.of(activeJobs, activeTasks);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Copyright 2021 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.netflix.titus.cli.command.job.unschedulable;

import java.util.Map;

import com.netflix.titus.api.jobmanager.model.job.Job;
import com.netflix.titus.api.jobmanager.model.job.Task;
import com.netflix.titus.cli.CliCommand;
import com.netflix.titus.cli.CommandContext;
import com.netflix.titus.common.util.tuple.Pair;
import com.netflix.titus.common.util.unit.TimeUnitExt;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static com.netflix.titus.cli.command.job.JobUtil.loadActiveJobsAndTasks;

public class PrintUnschedulableJobsCommand implements CliCommand {

private static final Logger logger = LoggerFactory.getLogger(PrintUnschedulableJobsCommand.class);

@Override
public String getDescription() {
return "print all unschedulable jobs";
}

@Override
public boolean isRemote() {
return true;
}

@Override
public Options getOptions() {
Options options = new Options();
options.addOption(Option.builder("e").longOpt("expiry").hasArg().required()
.desc("Duration threshold after which a not scheduled task can be regarded as non-schedulable (8h, 5d, etc).").build());
return options;
}

@Override
public void execute(CommandContext context) throws Exception {
long stuckInAcceptedThresholdMs = TimeUnitExt.toMillis(context.getCLI().getOptionValue('e'))
.orElseThrow(() -> new IllegalArgumentException("Wrong expiry threshold"));

Pair<Map<String, Job>, Map<String, Map<String, Task>>> all = loadActiveJobsAndTasks(context);
Map<String, UnschedulableJob> unschedulable = UnschedulableFinder.findUnschedulableJobs(context, all.getLeft(), all.getRight(), stuckInAcceptedThresholdMs);
logger.info("Found {} unschedulable jobs", unschedulable.size());
unschedulable.forEach((jobId, info) -> {
logger.info(" {}: reason={}", jobId, info.getReason());
logger.info(" failures={}", info.getFailures());
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Copyright 2021 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.netflix.titus.cli.command.job.unschedulable;

import java.time.Duration;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import com.netflix.titus.api.jobmanager.model.job.Job;
import com.netflix.titus.api.jobmanager.model.job.Task;
import com.netflix.titus.api.model.callmetadata.CallMetadata;
import com.netflix.titus.api.model.callmetadata.Caller;
import com.netflix.titus.api.model.callmetadata.CallerType;
import com.netflix.titus.cli.CliCommand;
import com.netflix.titus.cli.CommandContext;
import com.netflix.titus.common.util.tuple.Pair;
import com.netflix.titus.common.util.unit.TimeUnitExt;
import com.netflix.titus.runtime.connector.jobmanager.RemoteJobManagementClient;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static com.netflix.titus.cli.command.job.JobUtil.loadActiveJobsAndTasks;

public class RemoveUnschedulableJobsCommand implements CliCommand {

private static final Logger logger = LoggerFactory.getLogger(PrintUnschedulableJobsCommand.class);

@Override
public String getDescription() {
return "remove unschedulable jobs";
}

@Override
public boolean isRemote() {
return true;
}

@Override
public Options getOptions() {
Options options = new Options();
options.addOption(Option.builder("e").longOpt("expiry").hasArg().required()
.desc("Duration threshold after which a not scheduled task can be regarded as non-schedulable (8h, 5d, etc).").build());
options.addOption(Option.builder("l").longOpt("limit").hasArg().required()
.desc("Maximum number of jobs to remove").build());
return options;
}

@Override
public void execute(CommandContext context) throws Exception {
long stuckInAcceptedThresholdMs = TimeUnitExt.toMillis(context.getCLI().getOptionValue('e'))
.orElseThrow(() -> new IllegalArgumentException("Wrong expiry threshold"));
int limit = Integer.parseInt(context.getCLI().getOptionValue('l'));

Pair<Map<String, Job>, Map<String, Map<String, Task>>> all = loadActiveJobsAndTasks(context);
Map<String, Job> jobs = all.getLeft();

Map<String, UnschedulableJob> unschedulable = UnschedulableFinder.findUnschedulableJobs(context, all.getLeft(), all.getRight(), stuckInAcceptedThresholdMs);
logger.info("Found {} unschedulable jobs", unschedulable.size());
logger.info("Removing the oldest {}...", limit);

List<Job> orderedJobs = unschedulable.keySet().stream().map(jobs::get)
.sorted(Comparator.comparingLong(j -> j.getStatus().getTimestamp()))
.collect(Collectors.toList());

RemoteJobManagementClient jobClient = context.getJobManagementClient();
int len = Math.min(orderedJobs.size(), limit);
for (int i = 0; i < len; i++) {
Job jobToRemove = orderedJobs.get(i);
logger.info("Removing job {}...", jobToRemove);
CallMetadata callMetadata = CallMetadata.newBuilder()
.withCallReason(unschedulable.get(jobToRemove.getId()).getReason())
.withCallers(Collections.singletonList(
Caller.newBuilder()
.withId(System.getenv("USER"))
.withCallerType(CallerType.User)
.build()
))
.build();
jobClient.killJob(jobToRemove.getId(), callMetadata).block(Duration.ofSeconds(60));
}
logger.info("Removed {} unschedulable jobs out of {} (left {})", len, unschedulable.size(), unschedulable.size() - len);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Copyright 2021 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.netflix.titus.cli.command.job.unschedulable;

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;

import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.util.JsonFormat;
import com.netflix.titus.api.jobmanager.model.job.Job;
import com.netflix.titus.api.jobmanager.model.job.Task;
import com.netflix.titus.api.jobmanager.model.job.TaskState;
import com.netflix.titus.cli.CommandContext;
import com.netflix.titus.common.util.CollectionsExt;
import com.netflix.titus.common.util.DateTimeExt;
import com.netflix.titus.grpc.protogen.SchedulingResultEvent;
import com.netflix.titus.grpc.protogen.SchedulingResultRequest;

public class UnschedulableFinder {

static Map<String, UnschedulableJob> findUnschedulableJobs(CommandContext context,
Map<String, Job> jobs,
Map<String, Map<String, Task>> tasks,
long stuckInAcceptedThresholdMs) {
Map<String, UnschedulableJob> suspectedJobs = new HashMap<>();
tasks.forEach((jobId, jobTasks) ->
processJob(context, jobs.get(jobId), jobTasks, stuckInAcceptedThresholdMs).ifPresent(u -> suspectedJobs.put(jobId, u))
);
return suspectedJobs;
}

private static Optional<UnschedulableJob> processJob(CommandContext context, Job job, Map<String, Task> tasks, long stuckInAcceptedThresholdMs) {
if (tasks.isEmpty()) {
return Optional.empty();
}

boolean anyScheduled = tasks.values().stream().anyMatch(t -> t.getStatus().getState() != TaskState.Accepted);
if (anyScheduled) {
return Optional.empty();
}

// All tasks not scheduled yet. Check if all of them are in the Accepted state long enough to be regarded
// as not-schedulable.
long youngest = Long.MAX_VALUE;
long oldest = Long.MIN_VALUE;
for (Task task : tasks.values()) {
long acceptedTimestamp = task.getStatus().getTimestamp();
youngest = Math.min(youngest, acceptedTimestamp);
oldest = Math.max(oldest, acceptedTimestamp);
long waitTimeMs = System.currentTimeMillis() - acceptedTimestamp;
if (waitTimeMs < stuckInAcceptedThresholdMs) {
return Optional.empty();
}
}

// Fetch scheduling result for any task.
SchedulingResultEvent result = context.getSchedulerServiceBlockingStub().getSchedulingResult(SchedulingResultRequest.newBuilder()
.setTaskId(CollectionsExt.first(tasks.values()).getId())
.build()
);

// All tasks are in the Accepted state for more than stuckInAcceptedThreshold.
long now = System.currentTimeMillis();
String failures;
try {
failures = JsonFormat.printer().omittingInsignificantWhitespace().print(result);
} catch (InvalidProtocolBufferException e) {
failures = result.toString();
}
return Optional.of(new UnschedulableJob(
job.getId(),
String.format(
"All tasks are stuck in the 'Accepted' state for too long (between %s and %s). " +
"Most likely they do not fit into any available node resources.",
DateTimeExt.toTimeUnitString(now - oldest, 2),
DateTimeExt.toTimeUnitString(now - youngest, 2)
),
failures
));
}
}
Loading

0 comments on commit 88748f8

Please sign in to comment.