diff --git a/titus-cli/src/main/java/com/netflix/titus/cli/CLI.java b/titus-cli/src/main/java/com/netflix/titus/cli/CLI.java index 7a9cd0e83f..e8bbe663f0 100644 --- a/titus-cli/src/main/java/com/netflix/titus/cli/CLI.java +++ b/titus-cli/src/main/java/com/netflix/titus/cli/CLI.java @@ -39,6 +39,8 @@ import com.netflix.titus.cli.command.job.ObserveJobsCommand; import com.netflix.titus.cli.command.job.TaskGetCommand; import com.netflix.titus.cli.command.job.TasksGetCommand; +import com.netflix.titus.cli.command.job.unschedulable.PrintUnschedulableJobsCommand; +import com.netflix.titus.cli.command.job.unschedulable.RemoveUnschedulableJobsCommand; import com.netflix.titus.cli.command.scheduler.ObserveSchedulingResultCommand; import com.netflix.titus.cli.command.supervisor.SupervisorObserveEventsCommand; import io.grpc.StatusRuntimeException; @@ -89,6 +91,9 @@ public class CLI { .put("schedulingResults", new ObserveSchedulingResultCommand()) // Supervisor .put("supervisorEvents", new SupervisorObserveEventsCommand()) + // Unschedulable jobs + .put("printUnschedulableJobs", new PrintUnschedulableJobsCommand()) + .put("removeUnschedulableJobs", new RemoveUnschedulableJobsCommand()) .build(); private final boolean helpRequested; diff --git a/titus-cli/src/main/java/com/netflix/titus/cli/CommandContext.java b/titus-cli/src/main/java/com/netflix/titus/cli/CommandContext.java index d394c66ee6..f11aaf9988 100644 --- a/titus-cli/src/main/java/com/netflix/titus/cli/CommandContext.java +++ b/titus-cli/src/main/java/com/netflix/titus/cli/CommandContext.java @@ -29,6 +29,7 @@ import com.netflix.titus.common.util.grpc.reactor.GrpcToReactorClientFactory; import com.netflix.titus.common.util.grpc.reactor.client.ReactorToGrpcClientBuilder; import com.netflix.titus.grpc.protogen.JobManagementServiceGrpc; +import com.netflix.titus.grpc.protogen.SchedulerServiceGrpc; import com.netflix.titus.runtime.connector.GrpcRequestConfiguration; import com.netflix.titus.runtime.connector.jobmanager.JobConnectorConfiguration; import com.netflix.titus.runtime.connector.jobmanager.ReactorJobManagementServiceStub; @@ -144,6 +145,10 @@ public RemoteJobManagementClient getJobManagementClientWithKeepAlive(long keepAl return new RemoteJobManagementClientWithKeepAlive("cli", configuration, stub, reactorStub, titusRuntime); } + public SchedulerServiceGrpc.SchedulerServiceBlockingStub getSchedulerServiceBlockingStub() { + return SchedulerServiceGrpc.newBlockingStub(createChannel()); + } + public void shutdown() { channels.forEach(ManagedChannel::shutdown); } diff --git a/titus-cli/src/main/java/com/netflix/titus/cli/command/job/JobUtil.java b/titus-cli/src/main/java/com/netflix/titus/cli/command/job/JobUtil.java new file mode 100644 index 0000000000..e293fb932b --- /dev/null +++ b/titus-cli/src/main/java/com/netflix/titus/cli/command/job/JobUtil.java @@ -0,0 +1,61 @@ +/* + * Copyright 2021 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.netflix.titus.cli.command.job; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; + +import com.netflix.titus.api.jobmanager.model.job.Job; +import com.netflix.titus.api.jobmanager.model.job.JobState; +import com.netflix.titus.api.jobmanager.model.job.Task; +import com.netflix.titus.api.jobmanager.model.job.event.JobManagerEvent; +import com.netflix.titus.api.jobmanager.model.job.event.JobUpdateEvent; +import com.netflix.titus.api.jobmanager.model.job.event.TaskUpdateEvent; +import com.netflix.titus.cli.CommandContext; +import com.netflix.titus.common.util.tuple.Pair; + +public class JobUtil { + + public static Pair, Map>> loadActiveJobsAndTasks(CommandContext context) { + Map activeJobs = new HashMap<>(); + Map> activeTasks = new HashMap<>(); + Iterator> it = context.getJobManagementClient().observeJobs(Collections.emptyMap()) + .toIterable() + .iterator(); + while (it.hasNext()) { + JobManagerEvent event = it.next(); + if (event instanceof JobUpdateEvent) { + JobUpdateEvent je = (JobUpdateEvent) event; + Job job = je.getCurrent(); + if (job.getStatus().getState() == JobState.Accepted) { + activeJobs.put(job.getId(), job); + } + } else if (event instanceof TaskUpdateEvent) { + TaskUpdateEvent te = (TaskUpdateEvent) event; + Task task = te.getCurrent(); + if (activeJobs.containsKey(task.getJobId())) { + activeTasks.computeIfAbsent(task.getJobId(), j -> new HashMap<>()).put(task.getId(), task); + } + } else if (event.equals(JobManagerEvent.snapshotMarker())) { + break; + } + } + return Pair.of(activeJobs, activeTasks); + } +} diff --git a/titus-cli/src/main/java/com/netflix/titus/cli/command/job/unschedulable/PrintUnschedulableJobsCommand.java b/titus-cli/src/main/java/com/netflix/titus/cli/command/job/unschedulable/PrintUnschedulableJobsCommand.java new file mode 100644 index 0000000000..bc8b2b7987 --- /dev/null +++ b/titus-cli/src/main/java/com/netflix/titus/cli/command/job/unschedulable/PrintUnschedulableJobsCommand.java @@ -0,0 +1,69 @@ +/* + * Copyright 2021 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.netflix.titus.cli.command.job.unschedulable; + +import java.util.Map; + +import com.netflix.titus.api.jobmanager.model.job.Job; +import com.netflix.titus.api.jobmanager.model.job.Task; +import com.netflix.titus.cli.CliCommand; +import com.netflix.titus.cli.CommandContext; +import com.netflix.titus.common.util.tuple.Pair; +import com.netflix.titus.common.util.unit.TimeUnitExt; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static com.netflix.titus.cli.command.job.JobUtil.loadActiveJobsAndTasks; + +public class PrintUnschedulableJobsCommand implements CliCommand { + + private static final Logger logger = LoggerFactory.getLogger(PrintUnschedulableJobsCommand.class); + + @Override + public String getDescription() { + return "print all unschedulable jobs"; + } + + @Override + public boolean isRemote() { + return true; + } + + @Override + public Options getOptions() { + Options options = new Options(); + options.addOption(Option.builder("e").longOpt("expiry").hasArg().required() + .desc("Duration threshold after which a not scheduled task can be regarded as non-schedulable (8h, 5d, etc).").build()); + return options; + } + + @Override + public void execute(CommandContext context) throws Exception { + long stuckInAcceptedThresholdMs = TimeUnitExt.toMillis(context.getCLI().getOptionValue('e')) + .orElseThrow(() -> new IllegalArgumentException("Wrong expiry threshold")); + + Pair, Map>> all = loadActiveJobsAndTasks(context); + Map unschedulable = UnschedulableFinder.findUnschedulableJobs(context, all.getLeft(), all.getRight(), stuckInAcceptedThresholdMs); + logger.info("Found {} unschedulable jobs", unschedulable.size()); + unschedulable.forEach((jobId, info) -> { + logger.info(" {}: reason={}", jobId, info.getReason()); + logger.info(" failures={}", info.getFailures()); + }); + } +} \ No newline at end of file diff --git a/titus-cli/src/main/java/com/netflix/titus/cli/command/job/unschedulable/RemoveUnschedulableJobsCommand.java b/titus-cli/src/main/java/com/netflix/titus/cli/command/job/unschedulable/RemoveUnschedulableJobsCommand.java new file mode 100644 index 0000000000..4512391854 --- /dev/null +++ b/titus-cli/src/main/java/com/netflix/titus/cli/command/job/unschedulable/RemoveUnschedulableJobsCommand.java @@ -0,0 +1,102 @@ +/* + * Copyright 2021 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.netflix.titus.cli.command.job.unschedulable; + +import java.time.Duration; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import com.netflix.titus.api.jobmanager.model.job.Job; +import com.netflix.titus.api.jobmanager.model.job.Task; +import com.netflix.titus.api.model.callmetadata.CallMetadata; +import com.netflix.titus.api.model.callmetadata.Caller; +import com.netflix.titus.api.model.callmetadata.CallerType; +import com.netflix.titus.cli.CliCommand; +import com.netflix.titus.cli.CommandContext; +import com.netflix.titus.common.util.tuple.Pair; +import com.netflix.titus.common.util.unit.TimeUnitExt; +import com.netflix.titus.runtime.connector.jobmanager.RemoteJobManagementClient; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static com.netflix.titus.cli.command.job.JobUtil.loadActiveJobsAndTasks; + +public class RemoveUnschedulableJobsCommand implements CliCommand { + + private static final Logger logger = LoggerFactory.getLogger(PrintUnschedulableJobsCommand.class); + + @Override + public String getDescription() { + return "remove unschedulable jobs"; + } + + @Override + public boolean isRemote() { + return true; + } + + @Override + public Options getOptions() { + Options options = new Options(); + options.addOption(Option.builder("e").longOpt("expiry").hasArg().required() + .desc("Duration threshold after which a not scheduled task can be regarded as non-schedulable (8h, 5d, etc).").build()); + options.addOption(Option.builder("l").longOpt("limit").hasArg().required() + .desc("Maximum number of jobs to remove").build()); + return options; + } + + @Override + public void execute(CommandContext context) throws Exception { + long stuckInAcceptedThresholdMs = TimeUnitExt.toMillis(context.getCLI().getOptionValue('e')) + .orElseThrow(() -> new IllegalArgumentException("Wrong expiry threshold")); + int limit = Integer.parseInt(context.getCLI().getOptionValue('l')); + + Pair, Map>> all = loadActiveJobsAndTasks(context); + Map jobs = all.getLeft(); + + Map unschedulable = UnschedulableFinder.findUnschedulableJobs(context, all.getLeft(), all.getRight(), stuckInAcceptedThresholdMs); + logger.info("Found {} unschedulable jobs", unschedulable.size()); + logger.info("Removing the oldest {}...", limit); + + List orderedJobs = unschedulable.keySet().stream().map(jobs::get) + .sorted(Comparator.comparingLong(j -> j.getStatus().getTimestamp())) + .collect(Collectors.toList()); + + RemoteJobManagementClient jobClient = context.getJobManagementClient(); + int len = Math.min(orderedJobs.size(), limit); + for (int i = 0; i < len; i++) { + Job jobToRemove = orderedJobs.get(i); + logger.info("Removing job {}...", jobToRemove); + CallMetadata callMetadata = CallMetadata.newBuilder() + .withCallReason(unschedulable.get(jobToRemove.getId()).getReason()) + .withCallers(Collections.singletonList( + Caller.newBuilder() + .withId(System.getenv("USER")) + .withCallerType(CallerType.User) + .build() + )) + .build(); + jobClient.killJob(jobToRemove.getId(), callMetadata).block(Duration.ofSeconds(60)); + } + logger.info("Removed {} unschedulable jobs out of {} (left {})", len, unschedulable.size(), unschedulable.size() - len); + } +} diff --git a/titus-cli/src/main/java/com/netflix/titus/cli/command/job/unschedulable/UnschedulableFinder.java b/titus-cli/src/main/java/com/netflix/titus/cli/command/job/unschedulable/UnschedulableFinder.java new file mode 100644 index 0000000000..dd37bae731 --- /dev/null +++ b/titus-cli/src/main/java/com/netflix/titus/cli/command/job/unschedulable/UnschedulableFinder.java @@ -0,0 +1,96 @@ +/* + * Copyright 2021 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.netflix.titus.cli.command.job.unschedulable; + +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.util.JsonFormat; +import com.netflix.titus.api.jobmanager.model.job.Job; +import com.netflix.titus.api.jobmanager.model.job.Task; +import com.netflix.titus.api.jobmanager.model.job.TaskState; +import com.netflix.titus.cli.CommandContext; +import com.netflix.titus.common.util.CollectionsExt; +import com.netflix.titus.common.util.DateTimeExt; +import com.netflix.titus.grpc.protogen.SchedulingResultEvent; +import com.netflix.titus.grpc.protogen.SchedulingResultRequest; + +public class UnschedulableFinder { + + static Map findUnschedulableJobs(CommandContext context, + Map jobs, + Map> tasks, + long stuckInAcceptedThresholdMs) { + Map suspectedJobs = new HashMap<>(); + tasks.forEach((jobId, jobTasks) -> + processJob(context, jobs.get(jobId), jobTasks, stuckInAcceptedThresholdMs).ifPresent(u -> suspectedJobs.put(jobId, u)) + ); + return suspectedJobs; + } + + private static Optional processJob(CommandContext context, Job job, Map tasks, long stuckInAcceptedThresholdMs) { + if (tasks.isEmpty()) { + return Optional.empty(); + } + + boolean anyScheduled = tasks.values().stream().anyMatch(t -> t.getStatus().getState() != TaskState.Accepted); + if (anyScheduled) { + return Optional.empty(); + } + + // All tasks not scheduled yet. Check if all of them are in the Accepted state long enough to be regarded + // as not-schedulable. + long youngest = Long.MAX_VALUE; + long oldest = Long.MIN_VALUE; + for (Task task : tasks.values()) { + long acceptedTimestamp = task.getStatus().getTimestamp(); + youngest = Math.min(youngest, acceptedTimestamp); + oldest = Math.max(oldest, acceptedTimestamp); + long waitTimeMs = System.currentTimeMillis() - acceptedTimestamp; + if (waitTimeMs < stuckInAcceptedThresholdMs) { + return Optional.empty(); + } + } + + // Fetch scheduling result for any task. + SchedulingResultEvent result = context.getSchedulerServiceBlockingStub().getSchedulingResult(SchedulingResultRequest.newBuilder() + .setTaskId(CollectionsExt.first(tasks.values()).getId()) + .build() + ); + + // All tasks are in the Accepted state for more than stuckInAcceptedThreshold. + long now = System.currentTimeMillis(); + String failures; + try { + failures = JsonFormat.printer().omittingInsignificantWhitespace().print(result); + } catch (InvalidProtocolBufferException e) { + failures = result.toString(); + } + return Optional.of(new UnschedulableJob( + job.getId(), + String.format( + "All tasks are stuck in the 'Accepted' state for too long (between %s and %s). " + + "Most likely they do not fit into any available node resources.", + DateTimeExt.toTimeUnitString(now - oldest, 2), + DateTimeExt.toTimeUnitString(now - youngest, 2) + ), + failures + )); + } +} diff --git a/titus-cli/src/main/java/com/netflix/titus/cli/command/job/unschedulable/UnschedulableJob.java b/titus-cli/src/main/java/com/netflix/titus/cli/command/job/unschedulable/UnschedulableJob.java new file mode 100644 index 0000000000..5fd01adcde --- /dev/null +++ b/titus-cli/src/main/java/com/netflix/titus/cli/command/job/unschedulable/UnschedulableJob.java @@ -0,0 +1,51 @@ +/* + * Copyright 2021 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.netflix.titus.cli.command.job.unschedulable; + +class UnschedulableJob { + + private final String jobId; + private final String reason; + private final String failures; + + public UnschedulableJob(String jobId, String reason, String failures) { + this.jobId = jobId; + this.reason = reason; + this.failures = failures; + } + + public String getJobId() { + return jobId; + } + + public String getReason() { + return reason; + } + + public String getFailures() { + return failures; + } + + @Override + public String toString() { + return "UnschedulableJob{" + + "jobId='" + jobId + '\'' + + ", reason='" + reason + '\'' + + ", failures='" + failures + '\'' + + '}'; + } +} diff --git a/titus-common/src/main/java/com/netflix/titus/common/util/DateTimeExt.java b/titus-common/src/main/java/com/netflix/titus/common/util/DateTimeExt.java index 105a9893dd..74dac5ef2e 100644 --- a/titus-common/src/main/java/com/netflix/titus/common/util/DateTimeExt.java +++ b/titus-common/src/main/java/com/netflix/titus/common/util/DateTimeExt.java @@ -23,6 +23,7 @@ import java.time.format.DateTimeFormatter; import java.util.concurrent.TimeUnit; +import com.google.common.base.Preconditions; import com.google.protobuf.Duration; import com.google.protobuf.util.Durations; @@ -129,6 +130,60 @@ public static String toTimeUnitString(long timeMs) { return sb.toString(); } + /** + * Given a duration in milliseconds, format it using time units rounded to the given number of parts. + */ + public static String toTimeUnitString(long timeMs, int parts) { + Preconditions.checkArgument(parts > 0, "At least one unit must be requested"); + StringBuilder sb = new StringBuilder(); + + long sec = timeMs / 1000; + long min = sec / 60; + long hour = min / 60; + long day = hour / 24; + long addedParts = 0; + + if (day > 0) { + sb.append(' ').append(day).append("d"); + addedParts++; + } + if (addedParts < parts) { + if (hour % 24 > 0) { + sb.append(' ').append(hour % 24).append("h"); + addedParts++; + } else if (addedParts > 0) { + addedParts++; + } + if (addedParts < parts) { + if (min % 60 > 0) { + sb.append(' ').append(min % 60).append("min"); + addedParts++; + } else if (addedParts > 0) { + addedParts++; + } + if (addedParts < parts) { + if (sec % 60 > 0) { + sb.append(' ').append(sec % 60).append("s"); + addedParts++; + } else if (addedParts > 0) { + addedParts++; + } + if (addedParts < parts) { + if (timeMs % 1000 > 0) { + sb.append(' ').append(timeMs % 1000).append("ms"); + } + } + } + } + } + if (sb.length() == 0) { + return "0ms"; + } else if (sb.charAt(0) == ' ') { + return sb.substring(1); + } + return sb.toString(); + } + /** * Given an interval, and a number of items generated per interval, create string representation of the * corresponding rate. For example, interval=10sec, itemsPerInterval=5 gives rate="0.5 items/sec". diff --git a/titus-common/src/test/java/com/netflix/titus/common/util/DateTimeExtTest.java b/titus-common/src/test/java/com/netflix/titus/common/util/DateTimeExtTest.java index a5ddfd0e08..6628b14bb1 100644 --- a/titus-common/src/test/java/com/netflix/titus/common/util/DateTimeExtTest.java +++ b/titus-common/src/test/java/com/netflix/titus/common/util/DateTimeExtTest.java @@ -67,6 +67,39 @@ public void testTime() { assertThat(toTimeUnitString(twoDays)).isEqualTo("2d"); } + @Test + public void testRoundedTime() { + long msDuration = 123; + assertThat(toTimeUnitString(msDuration, 2)).isEqualTo("123ms"); + + long secDuration = 3 * 1000 + 123; + assertThat(toTimeUnitString(secDuration, 1)).isEqualTo("3s"); + assertThat(toTimeUnitString(secDuration, 2)).isEqualTo("3s 123ms"); + + long minDuration = (2 * 60 + 3) * 1000 + 123; + assertThat(toTimeUnitString(minDuration, 1)).isEqualTo("2min"); + assertThat(toTimeUnitString(minDuration, 2)).isEqualTo("2min 3s"); + assertThat(toTimeUnitString(minDuration, 3)).isEqualTo("2min 3s 123ms"); + + long hourDuration = ((1 * 60 + 2) * 60 + 3) * 1000 + 123; + assertThat(toTimeUnitString(hourDuration, 1)).isEqualTo("1h"); + assertThat(toTimeUnitString(hourDuration, 2)).isEqualTo("1h 2min"); + assertThat(toTimeUnitString(hourDuration, 3)).isEqualTo("1h 2min 3s"); + assertThat(toTimeUnitString(hourDuration, 4)).isEqualTo("1h 2min 3s 123ms"); + + long oneSecond = 1 * 1000; + assertThat(toTimeUnitString(oneSecond, 2)).isEqualTo("1s"); + + long oneMillis = 1; + assertThat(toTimeUnitString(oneMillis, 2)).isEqualTo("1ms"); + + long zeroMillis = 0; + assertThat(toTimeUnitString(zeroMillis, 2)).isEqualTo("0ms"); + + long twoDays = TimeUnit.DAYS.toMillis(2); + assertThat(toTimeUnitString(twoDays, 2)).isEqualTo("2d"); + } + @Test public void testToRateString() { assertThat(toRateString(1, 1, TimeUnit.MILLISECONDS, "action")).isEqualTo("1.00 action/ms");