Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.client.cli;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.JobID;
Expand Down Expand Up @@ -80,6 +81,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.apache.flink.client.cli.CliFrontendParser.HELP_OPTION;
import static org.apache.flink.util.Preconditions.checkNotNull;
Expand Down Expand Up @@ -498,20 +500,7 @@ private <ClusterID> void listJobs(

private static void printJobStatusMessages(List<JobStatusMessage> jobs) {
SimpleDateFormat dateFormat = new SimpleDateFormat("dd.MM.yyyy HH:mm:ss");
Comparator<JobStatusMessage> startTimeComparator =
(o1, o2) -> (int) (o1.getStartTime() - o2.getStartTime());
Comparator<Map.Entry<JobStatus, List<JobStatusMessage>>> statusComparator =
(o1, o2) ->
String.CASE_INSENSITIVE_ORDER.compare(
o1.getKey().toString(), o2.getKey().toString());

Map<JobStatus, List<JobStatusMessage>> jobsByState =
jobs.stream().collect(Collectors.groupingBy(JobStatusMessage::getJobState));
jobsByState.entrySet().stream()
.sorted(statusComparator)
.map(Map.Entry::getValue)
.flatMap(List::stream)
.sorted(startTimeComparator)
sortJobStatusMessages(jobs)
.forEachOrdered(
job ->
System.out.println(
Expand All @@ -525,6 +514,22 @@ private static void printJobStatusMessages(List<JobStatusMessage> jobs) {
+ ")"));
}

@VisibleForTesting
static Stream<JobStatusMessage> sortJobStatusMessages(List<JobStatusMessage> jobs) {
Comparator<Map.Entry<JobStatus, List<JobStatusMessage>>> statusComparator =
(o1, o2) ->
String.CASE_INSENSITIVE_ORDER.compare(
o1.getKey().toString(), o2.getKey().toString());

Map<JobStatus, List<JobStatusMessage>> jobsByState =
jobs.stream().collect(Collectors.groupingBy(JobStatusMessage::getJobState));
return jobsByState.entrySet().stream()
.sorted(statusComparator)
.map(Map.Entry::getValue)
.flatMap(List::stream)
.sorted(Comparator.comparing(JobStatusMessage::getStartTime));
}

/**
* Executes the STOP action.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,19 @@
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

/** Tests for the LIST command. */
Expand Down Expand Up @@ -114,22 +117,31 @@ void testList() throws Exception {
ClusterClient<String> clusterClient = createClusterClient();
MockedCliFrontend testFrontend = new MockedCliFrontend(clusterClient);
testFrontend.list(parameters);
Mockito.verify(clusterClient, times(1)).listJobs();
verify(clusterClient, times(1)).listJobs();
}
}

@Test
void testSorting() {
List<JobStatusMessage> sortedJobs =
CliFrontend.sortJobStatusMessages(getJobStatusMessages())
.collect(Collectors.toList());
assertThat(sortedJobs)
.isSortedAccordingTo(Comparator.comparing(JobStatusMessage::getStartTime));
}

private static ClusterClient<String> createClusterClient() throws Exception {
final ClusterClient<String> clusterClient = mock(ClusterClient.class);
when(clusterClient.listJobs())
.thenReturn(
CompletableFuture.completedFuture(
Arrays.asList(
new JobStatusMessage(
new JobID(), "job1", JobStatus.RUNNING, 1L),
new JobStatusMessage(
new JobID(), "job2", JobStatus.CREATED, 1L),
new JobStatusMessage(
new JobID(), "job3", JobStatus.FINISHED, 3L))));
.thenReturn(CompletableFuture.completedFuture(getJobStatusMessages()));
return clusterClient;
}

private static List<JobStatusMessage> getJobStatusMessages() {
return Arrays.asList(
new JobStatusMessage(new JobID(), "job1", JobStatus.RUNNING, 1665197322962L),
new JobStatusMessage(new JobID(), "job2", JobStatus.CREATED, 1660904115054L),
new JobStatusMessage(new JobID(), "job3", JobStatus.RUNNING, 1664177946934L),
new JobStatusMessage(new JobID(), "job4", JobStatus.FINISHED, 1665738051581L));
}
}