From b630b506f57fc87535e83b71a8a2992711fcbc21 Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Wed, 4 Oct 2017 14:41:54 +0200 Subject: [PATCH] [FLINK-7707] [flip6] Add TaskCheckpointStatisticDetailsHandler for new REST endpoint --- .../webmonitor/history/HistoryServer.java | 2 +- .../dispatcher/DispatcherRestEndpoint.java | 18 +- .../job/AbstractExecutionGraphHandler.java | 9 + .../AbstractCheckpointHandler.java | 18 +- .../CheckpointStatisticDetailsHandler.java | 5 +- .../CheckpointingStatisticsHandler.java | 7 +- ...TaskCheckpointStatisticDetailsHandler.java | 151 ++++++++++ .../handler/legacy/ClusterConfigHandler.java | 4 +- .../legacy/ClusterOverviewHandler.java | 2 +- .../legacy/DashboardConfigHandler.java | 2 +- .../checkpoints/CheckpointStatsHandler.java | 7 +- .../messages/ClusterConfigurationInfo.java | 3 +- .../ClusterConfigurationInfoEntry.java | 2 +- .../ClusterConfigurationInfoHeaders.java | 1 - .../rest/messages/ClusterOverviewHeaders.java | 1 - .../messages/DashboardConfiguration.java | 3 +- .../DashboardConfigurationHeaders.java | 1 - .../messages/JobVertexIdPathParameter.java | 43 +++ .../messages/StatusOverviewWithVersion.java | 3 +- .../checkpoints/CheckpointStatistics.java | 100 +------ .../checkpoints/CheckpointingStatistics.java | 62 ---- .../checkpoints/MinMaxAvgStatistics.java | 86 ++++++ .../SubtaskCheckpointStatistics.java | 283 ++++++++++++++++++ .../TaskCheckpointMessageParameters.java | 40 +++ .../checkpoints/TaskCheckpointStatistics.java | 151 ++++++++++ .../TaskCheckpointStatisticsHeaders.java | 72 +++++ ...heckpointStatisticsWithSubtaskDetails.java | 278 +++++++++++++++++ .../legacy/DashboardConfigHandlerTest.java | 2 +- .../messages/BlobServerPortResponseTest.java | 2 - .../ClusterConfigurationInfoTest.java | 2 +- .../messages/DashboardConfigurationTest.java | 2 +- .../messages/JobConfigInfoTest.java | 3 +- .../messages/JobSubmitRequestBodyTest.java | 1 - .../messages/JobSubmitResponseBodyTest.java | 1 - .../RestRequestMarshallingTestBase.java | 3 +- .../RestResponseMarshallingTestBase.java | 3 +- .../StatusOverviewWithVersionTest.java | 2 +- .../CheckpointConfigInfoTest.java | 4 +- .../CheckpointingStatisticsTest.java | 21 +- .../TaskCheckpointStatisticsTest.java | 46 +++ ...pointStatisticsWithSubtaskDetailsTest.java | 72 +++++ 41 files changed, 1302 insertions(+), 216 deletions(-) create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/TaskCheckpointStatisticDetailsHandler.java rename flink-runtime/src/main/java/org/apache/flink/runtime/rest/{handler/legacy => }/messages/ClusterConfigurationInfo.java (94%) rename flink-runtime/src/main/java/org/apache/flink/runtime/rest/{handler/legacy => }/messages/ClusterConfigurationInfoEntry.java (97%) rename flink-runtime/src/main/java/org/apache/flink/runtime/rest/{handler/legacy => }/messages/DashboardConfiguration.java (97%) create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexIdPathParameter.java rename flink-runtime/src/main/java/org/apache/flink/runtime/rest/{handler/legacy => }/messages/StatusOverviewWithVersion.java (97%) create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/MinMaxAvgStatistics.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/SubtaskCheckpointStatistics.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/TaskCheckpointMessageParameters.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/TaskCheckpointStatistics.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/TaskCheckpointStatisticsHeaders.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/TaskCheckpointStatisticsWithSubtaskDetails.java rename flink-runtime/src/test/java/org/apache/flink/runtime/rest/{handler/legacy => }/messages/ClusterConfigurationInfoTest.java (95%) rename flink-runtime/src/test/java/org/apache/flink/runtime/rest/{handler/legacy => }/messages/DashboardConfigurationTest.java (95%) rename flink-runtime/src/test/java/org/apache/flink/runtime/rest/{handler/legacy => }/messages/JobConfigInfoTest.java (93%) rename flink-runtime/src/test/java/org/apache/flink/runtime/rest/{handler/legacy => }/messages/RestRequestMarshallingTestBase.java (94%) rename flink-runtime/src/test/java/org/apache/flink/runtime/rest/{handler/legacy => }/messages/RestResponseMarshallingTestBase.java (94%) rename flink-runtime/src/test/java/org/apache/flink/runtime/rest/{handler/legacy => }/messages/StatusOverviewWithVersionTest.java (95%) rename flink-runtime/src/test/java/org/apache/flink/runtime/rest/{handler/legacy/messages => messages/checkpoints}/CheckpointConfigInfoTest.java (91%) rename flink-runtime/src/test/java/org/apache/flink/runtime/rest/{handler/legacy/messages => messages/checkpoints}/CheckpointingStatisticsTest.java (82%) create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/checkpoints/TaskCheckpointStatisticsTest.java create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/checkpoints/TaskCheckpointStatisticsWithSubtaskDetailsTest.java diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java index 4a28182071f64..9c3b51ef8b99d 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java @@ -28,7 +28,7 @@ import org.apache.flink.runtime.history.FsJobArchivist; import org.apache.flink.runtime.net.SSLUtils; import org.apache.flink.runtime.rest.handler.legacy.DashboardConfigHandler; -import org.apache.flink.runtime.rest.handler.legacy.messages.DashboardConfiguration; +import org.apache.flink.runtime.rest.messages.DashboardConfiguration; import org.apache.flink.runtime.security.SecurityConfiguration; import org.apache.flink.runtime.security.SecurityUtils; import org.apache.flink.runtime.webmonitor.WebMonitorUtils; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java index 2a2d9be3cad39..34509d9a9286f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java @@ -35,6 +35,7 @@ import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatisticDetailsHandler; import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatsCache; import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointingStatisticsHandler; +import org.apache.flink.runtime.rest.handler.job.checkpoints.TaskCheckpointStatisticDetailsHandler; import org.apache.flink.runtime.rest.handler.legacy.ClusterConfigHandler; import org.apache.flink.runtime.rest.handler.legacy.ClusterOverviewHandler; import org.apache.flink.runtime.rest.handler.legacy.CurrentJobsOverviewHandler; @@ -42,19 +43,20 @@ import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache; import org.apache.flink.runtime.rest.handler.legacy.files.StaticFileServerHandler; import org.apache.flink.runtime.rest.handler.legacy.files.WebContentHandlerSpecification; -import org.apache.flink.runtime.rest.handler.legacy.messages.ClusterConfigurationInfo; -import org.apache.flink.runtime.rest.handler.legacy.messages.DashboardConfiguration; -import org.apache.flink.runtime.rest.handler.legacy.messages.StatusOverviewWithVersion; +import org.apache.flink.runtime.rest.messages.ClusterConfigurationInfo; import org.apache.flink.runtime.rest.messages.ClusterConfigurationInfoHeaders; import org.apache.flink.runtime.rest.messages.ClusterOverviewHeaders; import org.apache.flink.runtime.rest.messages.CurrentJobsOverviewHandlerHeaders; +import org.apache.flink.runtime.rest.messages.DashboardConfiguration; import org.apache.flink.runtime.rest.messages.DashboardConfigurationHeaders; import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; import org.apache.flink.runtime.rest.messages.JobConfigHeaders; import org.apache.flink.runtime.rest.messages.JobTerminationHeaders; +import org.apache.flink.runtime.rest.messages.StatusOverviewWithVersion; import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointConfigHeaders; import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatisticDetailsHeaders; import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatisticsHeaders; +import org.apache.flink.runtime.rest.messages.checkpoints.TaskCheckpointStatisticsHeaders; import org.apache.flink.runtime.webmonitor.WebMonitorUtils; import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; import org.apache.flink.util.FileUtils; @@ -186,6 +188,15 @@ protected Collection> in executor, checkpointStatsCache); + TaskCheckpointStatisticDetailsHandler taskCheckpointStatisticDetailsHandler = new TaskCheckpointStatisticDetailsHandler( + restAddressFuture, + leaderRetriever, + timeout, + TaskCheckpointStatisticsHeaders.getInstance(), + executionGraphCache, + executor, + checkpointStatsCache); + final File tmpDir = restConfiguration.getTmpDir(); Optional> optWebContent; @@ -210,6 +221,7 @@ protected Collection> in handlers.add(Tuple2.of(CheckpointConfigHeaders.getInstance(), checkpointConfigHandler)); handlers.add(Tuple2.of(CheckpointingStatisticsHeaders.getInstance(), checkpointStatisticsHandler)); handlers.add(Tuple2.of(CheckpointStatisticDetailsHeaders.getInstance(), checkpointStatisticDetailsHandler)); + handlers.add(Tuple2.of(TaskCheckpointStatisticsHeaders.getInstance(), taskCheckpointStatisticDetailsHandler)); BlobServerPortHandler blobServerPortHandler = new BlobServerPortHandler(restAddressFuture, leaderRetriever, timeout); handlers.add(Tuple2.of(blobServerPortHandler.getMessageHeaders(), blobServerPortHandler)); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractExecutionGraphHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractExecutionGraphHandler.java index 5348b55acbaff..63c3e35ba9e7d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractExecutionGraphHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractExecutionGraphHandler.java @@ -81,5 +81,14 @@ protected CompletableFuture handleRequest(@Nonnull HandlerRequest request, AccessExecutionGraph executionGraph) throws RestHandlerException; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/AbstractCheckpointHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/AbstractCheckpointHandler.java index 62ed1a4e11467..23d09f6562614 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/AbstractCheckpointHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/AbstractCheckpointHandler.java @@ -45,7 +45,7 @@ * * @param type of the response */ -public abstract class AbstractCheckpointHandler extends AbstractExecutionGraphHandler { +public abstract class AbstractCheckpointHandler extends AbstractExecutionGraphHandler { private final CheckpointStatsCache checkpointStatsCache; @@ -53,7 +53,7 @@ protected AbstractCheckpointHandler( CompletableFuture localRestAddress, GatewayRetriever leaderRetriever, Time timeout, - MessageHeaders messageHeaders, + MessageHeaders messageHeaders, ExecutionGraphCache executionGraphCache, Executor executor, CheckpointStatsCache checkpointStatsCache) { @@ -63,7 +63,7 @@ protected AbstractCheckpointHandler( } @Override - protected R handleRequest(HandlerRequest request, AccessExecutionGraph executionGraph) throws RestHandlerException { + protected R handleRequest(HandlerRequest request, AccessExecutionGraph executionGraph) throws RestHandlerException { final long checkpointId = request.getPathParameter(CheckpointIdPathParameter.class); final CheckpointStatsSnapshot checkpointStatsSnapshot = executionGraph.getCheckpointStatsSnapshot(); @@ -78,7 +78,7 @@ protected R handleRequest(HandlerRequest request, AbstractCheckpointStats checkpointStats) throws RestHandlerException; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointStatisticDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointStatisticDetailsHandler.java index 2fc3008493ab8..e6b9f6b30d5fe 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointStatisticDetailsHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointStatisticDetailsHandler.java @@ -20,6 +20,7 @@ import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats; +import org.apache.flink.runtime.rest.handler.HandlerRequest; import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache; import org.apache.flink.runtime.rest.messages.EmptyRequestBody; import org.apache.flink.runtime.rest.messages.MessageHeaders; @@ -34,7 +35,7 @@ /** * REST handler which returns the details for a checkpoint. */ -public class CheckpointStatisticDetailsHandler extends AbstractCheckpointHandler { +public class CheckpointStatisticDetailsHandler extends AbstractCheckpointHandler { public CheckpointStatisticDetailsHandler( CompletableFuture localRestAddress, @@ -48,7 +49,7 @@ public CheckpointStatisticDetailsHandler( } @Override - protected CheckpointStatistics handleCheckpointRequest(AbstractCheckpointStats checkpointStats) { + protected CheckpointStatistics handleCheckpointRequest(HandlerRequest ignored, AbstractCheckpointStats checkpointStats) { return CheckpointStatistics.generateCheckpointStatistics(checkpointStats, true); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointingStatisticsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointingStatisticsHandler.java index 1c5762e647bb4..a8514d501c925 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointingStatisticsHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointingStatisticsHandler.java @@ -36,6 +36,7 @@ import org.apache.flink.runtime.rest.messages.MessageHeaders; import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatistics; import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatistics; +import org.apache.flink.runtime.rest.messages.checkpoints.MinMaxAvgStatistics; import org.apache.flink.runtime.webmonitor.RestfulGateway; import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; @@ -84,15 +85,15 @@ protected CheckpointingStatistics handleRequest(HandlerRequest { + + public TaskCheckpointStatisticDetailsHandler( + CompletableFuture localRestAddress, + GatewayRetriever leaderRetriever, + Time timeout, + MessageHeaders messageHeaders, + ExecutionGraphCache executionGraphCache, + Executor executor, + CheckpointStatsCache checkpointStatsCache) { + super(localRestAddress, leaderRetriever, timeout, messageHeaders, executionGraphCache, executor, checkpointStatsCache); + } + + @Override + protected TaskCheckpointStatisticsWithSubtaskDetails handleCheckpointRequest( + HandlerRequest request, + AbstractCheckpointStats checkpointStats) throws RestHandlerException { + + final JobVertexID jobVertexId = request.getPathParameter(JobVertexIdPathParameter.class); + + final TaskStateStats taskStatistics = checkpointStats.getTaskStateStats(jobVertexId); + + if (taskStatistics != null) { + + final TaskCheckpointStatisticsWithSubtaskDetails.Summary summary = createSummary( + taskStatistics.getSummaryStats(), + checkpointStats.getTriggerTimestamp()); + + final List subtaskCheckpointStatistics = createSubtaskCheckpointStatistics( + taskStatistics.getSubtaskStats(), + checkpointStats.getTriggerTimestamp()); + + return new TaskCheckpointStatisticsWithSubtaskDetails( + checkpointStats.getCheckpointId(), + checkpointStats.getStatus(), + taskStatistics.getLatestAckTimestamp(), + taskStatistics.getStateSize(), + taskStatistics.getEndToEndDuration(checkpointStats.getTriggerTimestamp()), + taskStatistics.getAlignmentBuffered(), + taskStatistics.getNumberOfSubtasks(), + taskStatistics.getNumberOfAcknowledgedSubtasks(), + summary, + subtaskCheckpointStatistics); + } else { + throw new RestHandlerException("There is no checkpoint statistics for task " + jobVertexId + '.', HttpResponseStatus.NOT_FOUND); + } + } + + private static TaskCheckpointStatisticsWithSubtaskDetails.Summary createSummary(TaskStateStats.TaskStateStatsSummary taskStatisticsSummary, long triggerTimestamp) { + final MinMaxAvgStats stateSizeStats = taskStatisticsSummary.getStateSizeStats(); + final MinMaxAvgStats ackTSStats = taskStatisticsSummary.getAckTimestampStats(); + final MinMaxAvgStats syncDurationStats = taskStatisticsSummary.getSyncCheckpointDurationStats(); + final MinMaxAvgStats asyncDurationStats = taskStatisticsSummary.getAsyncCheckpointDurationStats(); + + final TaskCheckpointStatisticsWithSubtaskDetails.CheckpointDuration checkpointDuration = new TaskCheckpointStatisticsWithSubtaskDetails.CheckpointDuration( + new MinMaxAvgStatistics(syncDurationStats.getMinimum(), syncDurationStats.getMaximum(), syncDurationStats.getAverage()), + new MinMaxAvgStatistics(asyncDurationStats.getMinimum(), asyncDurationStats.getMaximum(), asyncDurationStats.getAverage())); + + final MinMaxAvgStats alignmentBufferedStats = taskStatisticsSummary.getAlignmentBufferedStats(); + final MinMaxAvgStats alignmentDurationStats = taskStatisticsSummary.getAlignmentDurationStats(); + + final TaskCheckpointStatisticsWithSubtaskDetails.CheckpointAlignment checkpointAlignment = new TaskCheckpointStatisticsWithSubtaskDetails.CheckpointAlignment( + new MinMaxAvgStatistics(alignmentBufferedStats.getMinimum(), alignmentBufferedStats.getMaximum(), alignmentBufferedStats.getAverage()), + new MinMaxAvgStatistics(alignmentDurationStats.getMinimum(), alignmentDurationStats.getMaximum(), alignmentDurationStats.getAverage())); + + return new TaskCheckpointStatisticsWithSubtaskDetails.Summary( + new MinMaxAvgStatistics(stateSizeStats.getMinimum(), stateSizeStats.getMaximum(), stateSizeStats.getAverage()), + new MinMaxAvgStatistics( + Math.max(0L, ackTSStats.getMinimum() - triggerTimestamp), + Math.max(0L, ackTSStats.getMaximum() - triggerTimestamp), + Math.max(0L, ackTSStats.getAverage() - triggerTimestamp)), + checkpointDuration, + checkpointAlignment); + } + + private static List createSubtaskCheckpointStatistics(SubtaskStateStats[] subtaskStateStats, long triggerTimestamp) { + final List result = new ArrayList<>(subtaskStateStats.length); + + for (int i = 0; i < subtaskStateStats.length; i++) { + final SubtaskStateStats subtask = subtaskStateStats[i]; + + if (subtask == null) { + result.add(new SubtaskCheckpointStatistics.PendingSubtaskCheckpointStatistics(i)); + } else { + result.add(new SubtaskCheckpointStatistics.CompletedSubtaskCheckpointStatistics( + i, + subtask.getAckTimestamp(), + subtask.getEndToEndDuration(triggerTimestamp), + subtask.getStateSize(), + new SubtaskCheckpointStatistics.CompletedSubtaskCheckpointStatistics.CheckpointDuration( + subtask.getSyncCheckpointDuration(), + subtask.getAsyncCheckpointDuration()), + new SubtaskCheckpointStatistics.CompletedSubtaskCheckpointStatistics.CheckpointAlignment( + subtask.getAlignmentBuffered(), + subtask.getAlignmentDuration()) + )); + } + } + + return result; + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ClusterConfigHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ClusterConfigHandler.java index d3ea160aa3815..1de5eca73886f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ClusterConfigHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ClusterConfigHandler.java @@ -23,8 +23,8 @@ import org.apache.flink.runtime.jobmaster.JobManagerGateway; import org.apache.flink.runtime.rest.handler.HandlerRequest; import org.apache.flink.runtime.rest.handler.LegacyRestHandler; -import org.apache.flink.runtime.rest.handler.legacy.messages.ClusterConfigurationInfo; -import org.apache.flink.runtime.rest.handler.legacy.messages.ClusterConfigurationInfoEntry; +import org.apache.flink.runtime.rest.messages.ClusterConfigurationInfo; +import org.apache.flink.runtime.rest.messages.ClusterConfigurationInfoEntry; import org.apache.flink.runtime.rest.messages.ClusterConfigurationInfoHeaders; import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; import org.apache.flink.runtime.rest.messages.EmptyRequestBody; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ClusterOverviewHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ClusterOverviewHandler.java index 2f883abf856c2..ff0508894bd7e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ClusterOverviewHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ClusterOverviewHandler.java @@ -26,9 +26,9 @@ import org.apache.flink.runtime.messages.webmonitor.StatusOverview; import org.apache.flink.runtime.rest.handler.HandlerRequest; import org.apache.flink.runtime.rest.handler.LegacyRestHandler; -import org.apache.flink.runtime.rest.handler.legacy.messages.StatusOverviewWithVersion; import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.StatusOverviewWithVersion; import org.apache.flink.runtime.util.EnvironmentInformation; import org.apache.flink.util.FlinkException; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/DashboardConfigHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/DashboardConfigHandler.java index 0cef5fb22b739..b5aa036148a2e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/DashboardConfigHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/DashboardConfigHandler.java @@ -22,7 +22,7 @@ import org.apache.flink.runtime.jobmaster.JobManagerGateway; import org.apache.flink.runtime.rest.handler.HandlerRequest; import org.apache.flink.runtime.rest.handler.LegacyRestHandler; -import org.apache.flink.runtime.rest.handler.legacy.messages.DashboardConfiguration; +import org.apache.flink.runtime.rest.messages.DashboardConfiguration; import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; import org.apache.flink.runtime.rest.messages.EmptyRequestBody; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsHandler.java index b6c86beb39252..38cd0c8363f63 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsHandler.java @@ -33,6 +33,7 @@ import org.apache.flink.runtime.rest.handler.legacy.JsonFactory; import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatistics; import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatistics; +import org.apache.flink.runtime.rest.messages.checkpoints.MinMaxAvgStatistics; import org.apache.flink.runtime.webmonitor.history.ArchivedJson; import org.apache.flink.runtime.webmonitor.history.JsonArchivist; import org.apache.flink.util.FlinkException; @@ -158,9 +159,9 @@ private static void writeSummary( } static void writeMinMaxAvg(JsonGenerator gen, MinMaxAvgStats minMaxAvg) throws IOException { - gen.writeNumberField(CheckpointingStatistics.MinMaxAvgStatistics.FIELD_NAME_MINIMUM, minMaxAvg.getMinimum()); - gen.writeNumberField(CheckpointingStatistics.MinMaxAvgStatistics.FIELD_NAME_MAXIMUM, minMaxAvg.getMaximum()); - gen.writeNumberField(CheckpointingStatistics.MinMaxAvgStatistics.FIELD_NAME_AVERAGE, minMaxAvg.getAverage()); + gen.writeNumberField(MinMaxAvgStatistics.FIELD_NAME_MINIMUM, minMaxAvg.getMinimum()); + gen.writeNumberField(MinMaxAvgStatistics.FIELD_NAME_MAXIMUM, minMaxAvg.getMaximum()); + gen.writeNumberField(MinMaxAvgStatistics.FIELD_NAME_AVERAGE, minMaxAvg.getAverage()); } private static void writeLatestCheckpoints( diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/messages/ClusterConfigurationInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ClusterConfigurationInfo.java similarity index 94% rename from flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/messages/ClusterConfigurationInfo.java rename to flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ClusterConfigurationInfo.java index bf527d9d971ec..dba6a4cce4886 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/messages/ClusterConfigurationInfo.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ClusterConfigurationInfo.java @@ -16,11 +16,10 @@ * limitations under the License. */ -package org.apache.flink.runtime.rest.handler.legacy.messages; +package org.apache.flink.runtime.rest.messages; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.rest.handler.legacy.ClusterConfigHandler; -import org.apache.flink.runtime.rest.messages.ResponseBody; import java.util.ArrayList; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/messages/ClusterConfigurationInfoEntry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ClusterConfigurationInfoEntry.java similarity index 97% rename from flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/messages/ClusterConfigurationInfoEntry.java rename to flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ClusterConfigurationInfoEntry.java index 19e382126fb57..634f72896433a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/messages/ClusterConfigurationInfoEntry.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ClusterConfigurationInfoEntry.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.flink.runtime.rest.handler.legacy.messages; +package org.apache.flink.runtime.rest.messages; import org.apache.flink.util.Preconditions; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ClusterConfigurationInfoHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ClusterConfigurationInfoHeaders.java index 0ad56b1471c2a..11a750a08afd1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ClusterConfigurationInfoHeaders.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ClusterConfigurationInfoHeaders.java @@ -20,7 +20,6 @@ import org.apache.flink.runtime.rest.HttpMethodWrapper; import org.apache.flink.runtime.rest.handler.legacy.ClusterConfigHandler; -import org.apache.flink.runtime.rest.handler.legacy.messages.ClusterConfigurationInfo; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ClusterOverviewHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ClusterOverviewHeaders.java index 887ce2e398396..f8411e478422b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ClusterOverviewHeaders.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ClusterOverviewHeaders.java @@ -20,7 +20,6 @@ import org.apache.flink.runtime.rest.HttpMethodWrapper; import org.apache.flink.runtime.rest.handler.legacy.ClusterOverviewHandler; -import org.apache.flink.runtime.rest.handler.legacy.messages.StatusOverviewWithVersion; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/messages/DashboardConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/DashboardConfiguration.java similarity index 97% rename from flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/messages/DashboardConfiguration.java rename to flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/DashboardConfiguration.java index cfb3aaad546c6..3e9af68e9ac56 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/messages/DashboardConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/DashboardConfiguration.java @@ -16,10 +16,9 @@ * limitations under the License. */ -package org.apache.flink.runtime.rest.handler.legacy.messages; +package org.apache.flink.runtime.rest.messages; import org.apache.flink.runtime.rest.handler.legacy.DashboardConfigHandler; -import org.apache.flink.runtime.rest.messages.ResponseBody; import org.apache.flink.runtime.util.EnvironmentInformation; import org.apache.flink.util.Preconditions; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/DashboardConfigurationHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/DashboardConfigurationHeaders.java index cc03b7bf71e16..cb14fc515aacb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/DashboardConfigurationHeaders.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/DashboardConfigurationHeaders.java @@ -20,7 +20,6 @@ import org.apache.flink.runtime.rest.HttpMethodWrapper; import org.apache.flink.runtime.rest.handler.legacy.DashboardConfigHandler; -import org.apache.flink.runtime.rest.handler.legacy.messages.DashboardConfiguration; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexIdPathParameter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexIdPathParameter.java new file mode 100644 index 0000000000000..0e000f6898bae --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexIdPathParameter.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages; + +import org.apache.flink.runtime.jobgraph.JobVertexID; + +/** + * Path parameter specifying a {@link JobVertexID}. + */ +public class JobVertexIdPathParameter extends MessagePathParameter { + + private static final String KEY = "vertexid"; + + public JobVertexIdPathParameter() { + super(KEY); + } + + @Override + protected JobVertexID convertFromString(String value) throws ConversionException { + return JobVertexID.fromHexString(value); + } + + @Override + protected String convertToString(JobVertexID value) { + return value.toString(); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/messages/StatusOverviewWithVersion.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/StatusOverviewWithVersion.java similarity index 97% rename from flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/messages/StatusOverviewWithVersion.java rename to flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/StatusOverviewWithVersion.java index f001afc8de850..1ee5c8696ae2c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/messages/StatusOverviewWithVersion.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/StatusOverviewWithVersion.java @@ -16,11 +16,10 @@ * limitations under the License. */ -package org.apache.flink.runtime.rest.handler.legacy.messages; +package org.apache.flink.runtime.rest.messages; import org.apache.flink.runtime.messages.webmonitor.JobsOverview; import org.apache.flink.runtime.messages.webmonitor.StatusOverview; -import org.apache.flink.runtime.rest.messages.ResponseBody; import org.apache.flink.util.Preconditions; import com.fasterxml.jackson.annotation.JsonCreator; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatistics.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatistics.java index 9fb1094a70664..4144571950218 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatistics.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatistics.java @@ -225,6 +225,8 @@ public static CheckpointStatistics generateCheckpointStatistics(AbstractCheckpoi checkpointStatisticsPerTask.put( taskStateStat.getJobVertexId(), new TaskCheckpointStatistics( + checkpointStats.getCheckpointId(), + checkpointStats.getStatus(), taskStateStat.getLatestAckTimestamp(), taskStateStat.getStateSize(), taskStateStat.getEndToEndDuration(checkpointStats.getTriggerTimestamp()), @@ -279,104 +281,6 @@ public static CheckpointStatistics generateCheckpointStatistics(AbstractCheckpoi // Static inner classes // --------------------------------------------------------------------- - /** - * Checkpoint statistics for a single task. - */ - public static final class TaskCheckpointStatistics { - - public static final String FIELD_NAME_LATEST_ACK_TIMESTAMP = "latest_ack_timestamp"; - - public static final String FIELD_NAME_STATE_SIZE = "state_size"; - - public static final String FIELD_NAME_DURATION = "end_to_end_duration"; - - public static final String FIELD_NAME_ALIGNMENT_BUFFERED = "alignment_buffered"; - - public static final String FIELD_NAME_NUM_SUBTASKS = "num_subtasks"; - - public static final String FIELD_NAME_NUM_ACK_SUBTASKS = "num_acknowledged_subtasks"; - - @JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP) - private final long latestAckTimestamp; - - @JsonProperty(FIELD_NAME_STATE_SIZE) - private final long stateSize; - - @JsonProperty(FIELD_NAME_DURATION) - private final long duration; - - @JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) - private final long alignmentBuffered; - - @JsonProperty(FIELD_NAME_NUM_SUBTASKS) - private final int numSubtasks; - - @JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) - private final int numAckSubtasks; - - @JsonCreator - public TaskCheckpointStatistics( - @JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP) long latestAckTimestamp, - @JsonProperty(FIELD_NAME_STATE_SIZE) long stateSize, - @JsonProperty(FIELD_NAME_DURATION) long duration, - @JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) long alignmentBuffered, - @JsonProperty(FIELD_NAME_NUM_SUBTASKS) int numSubtasks, - @JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) int numAckSubtasks) { - this.latestAckTimestamp = latestAckTimestamp; - this.stateSize = stateSize; - this.duration = duration; - this.alignmentBuffered = alignmentBuffered; - this.numSubtasks = numSubtasks; - this.numAckSubtasks = numAckSubtasks; - } - - public long getLatestAckTimestamp() { - return latestAckTimestamp; - } - - public long getStateSize() { - return stateSize; - } - - public long getDuration() { - return duration; - } - - public long getAlignmentBuffered() { - return alignmentBuffered; - } - - public int getNumSubtasks() { - return numSubtasks; - } - - public int getNumAckSubtasks() { - return numAckSubtasks; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - TaskCheckpointStatistics that = (TaskCheckpointStatistics) o; - return latestAckTimestamp == that.latestAckTimestamp && - stateSize == that.stateSize && - duration == that.duration && - alignmentBuffered == that.alignmentBuffered && - numSubtasks == that.numSubtasks && - numAckSubtasks == that.numAckSubtasks; - } - - @Override - public int hashCode() { - return Objects.hash(latestAckTimestamp, stateSize, duration, alignmentBuffered, numSubtasks, numAckSubtasks); - } - } - /** * Statistics for a completed checkpoint. */ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointingStatistics.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointingStatistics.java index 1f00fcc7c75ec..96ca2277e9549 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointingStatistics.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointingStatistics.java @@ -256,68 +256,6 @@ public int hashCode() { } } - /** - * Minimum, maximum and average statistics. - */ - public static final class MinMaxAvgStatistics { - - public static final String FIELD_NAME_MINIMUM = "min"; - - public static final String FIELD_NAME_MAXIMUM = "max"; - - public static final String FIELD_NAME_AVERAGE = "avg"; - - @JsonProperty(FIELD_NAME_MINIMUM) - private final long minimum; - - @JsonProperty(FIELD_NAME_MAXIMUM) - private final long maximum; - - @JsonProperty(FIELD_NAME_AVERAGE) - private final long average; - - @JsonCreator - public MinMaxAvgStatistics( - @JsonProperty(FIELD_NAME_MINIMUM) long minimum, - @JsonProperty(FIELD_NAME_MAXIMUM) long maximum, - @JsonProperty(FIELD_NAME_AVERAGE) long average) { - this.minimum = minimum; - this.maximum = maximum; - this.average = average; - } - - public long getMinimum() { - return minimum; - } - - public long getMaximum() { - return maximum; - } - - public long getAverage() { - return average; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - MinMaxAvgStatistics that = (MinMaxAvgStatistics) o; - return minimum == that.minimum && - maximum == that.maximum && - average == that.average; - } - - @Override - public int hashCode() { - return Objects.hash(minimum, maximum, average); - } - } - /** * Statistics about the latest checkpoints. */ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/MinMaxAvgStatistics.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/MinMaxAvgStatistics.java new file mode 100644 index 0000000000000..171dbacf41041 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/MinMaxAvgStatistics.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.checkpoints; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.Objects; + +/** + * Minimum, maximum and average statistics. + */ +public final class MinMaxAvgStatistics { + + public static final String FIELD_NAME_MINIMUM = "min"; + + public static final String FIELD_NAME_MAXIMUM = "max"; + + public static final String FIELD_NAME_AVERAGE = "avg"; + + @JsonProperty(FIELD_NAME_MINIMUM) + private final long minimum; + + @JsonProperty(FIELD_NAME_MAXIMUM) + private final long maximum; + + @JsonProperty(FIELD_NAME_AVERAGE) + private final long average; + + @JsonCreator + public MinMaxAvgStatistics( + @JsonProperty(FIELD_NAME_MINIMUM) long minimum, + @JsonProperty(FIELD_NAME_MAXIMUM) long maximum, + @JsonProperty(FIELD_NAME_AVERAGE) long average) { + this.minimum = minimum; + this.maximum = maximum; + this.average = average; + } + + public long getMinimum() { + return minimum; + } + + public long getMaximum() { + return maximum; + } + + public long getAverage() { + return average; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + MinMaxAvgStatistics that = (MinMaxAvgStatistics) o; + return minimum == that.minimum && + maximum == that.maximum && + average == that.average; + } + + @Override + public int hashCode() { + return Objects.hash(minimum, maximum, average); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/SubtaskCheckpointStatistics.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/SubtaskCheckpointStatistics.java new file mode 100644 index 0000000000000..e481da553a2a2 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/SubtaskCheckpointStatistics.java @@ -0,0 +1,283 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.checkpoints; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; + +import java.util.Objects; + +/** + * Checkpoint statistics for a subtask. + */ +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "@class") +@JsonSubTypes({ + @JsonSubTypes.Type(value = SubtaskCheckpointStatistics.CompletedSubtaskCheckpointStatistics.class, name = "completed"), + @JsonSubTypes.Type(value = SubtaskCheckpointStatistics.PendingSubtaskCheckpointStatistics.class, name = "pending")}) +public class SubtaskCheckpointStatistics { + + public static final String FIELD_NAME_INDEX = "index"; + + public static final String FIELD_NAME_CHECKPOINT_STATUS = "status"; + + @JsonProperty(FIELD_NAME_INDEX) + private final int index; + + @JsonProperty(FIELD_NAME_CHECKPOINT_STATUS) + private final String checkpointStatus; + + @JsonCreator + private SubtaskCheckpointStatistics( + @JsonProperty(FIELD_NAME_INDEX) int index, + @JsonProperty(FIELD_NAME_CHECKPOINT_STATUS) String checkpointStatus) { + this.index = index; + this.checkpointStatus = checkpointStatus; + } + + public int getIndex() { + return index; + } + + public String getCheckpointStatus() { + return checkpointStatus; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + SubtaskCheckpointStatistics that = (SubtaskCheckpointStatistics) o; + return index == that.index && + Objects.equals(checkpointStatus, that.checkpointStatus); + } + + @Override + public int hashCode() { + return Objects.hash(index, checkpointStatus); + } + + // --------------------------------------------------------------------------------- + // Static inner classes + // --------------------------------------------------------------------------------- + + /** + * Checkpoint statistics for a completed subtask checkpoint. + */ + public static final class CompletedSubtaskCheckpointStatistics extends SubtaskCheckpointStatistics { + + public static final String FIELD_NAME_ACK_TIMESTAMP = "ack_timestamp"; + + public static final String FIELD_NAME_DURATION = "end_to_end_duration"; + + public static final String FIELD_NAME_STATE_SIZE = "state_size"; + + public static final String FIELD_NAME_CHECKPOINT_DURATION = "checkpoint"; + + public static final String FIELD_NAME_ALIGNMENT = "alignment"; + + @JsonProperty(FIELD_NAME_ACK_TIMESTAMP) + private final long ackTimestamp; + + @JsonProperty(FIELD_NAME_DURATION) + private final long duration; + + @JsonProperty(FIELD_NAME_STATE_SIZE) + private final long stateSize; + + @JsonProperty(FIELD_NAME_CHECKPOINT_DURATION) + private final CheckpointDuration checkpointDuration; + + @JsonProperty(FIELD_NAME_ALIGNMENT) + private final CheckpointAlignment alignment; + + @JsonCreator + public CompletedSubtaskCheckpointStatistics( + @JsonProperty(FIELD_NAME_INDEX) int index, + @JsonProperty(FIELD_NAME_ACK_TIMESTAMP) long ackTimestamp, + @JsonProperty(FIELD_NAME_DURATION) long duration, + @JsonProperty(FIELD_NAME_STATE_SIZE) long stateSize, + @JsonProperty(FIELD_NAME_CHECKPOINT_DURATION) CheckpointDuration checkpointDuration, + @JsonProperty(FIELD_NAME_ALIGNMENT) CheckpointAlignment alignment) { + super(index, "completed"); + this.ackTimestamp = ackTimestamp; + this.duration = duration; + this.stateSize = stateSize; + this.checkpointDuration = checkpointDuration; + this.alignment = alignment; + } + + public long getAckTimestamp() { + return ackTimestamp; + } + + public long getDuration() { + return duration; + } + + public long getStateSize() { + return stateSize; + } + + public CheckpointDuration getCheckpointDuration() { + return checkpointDuration; + } + + public CheckpointAlignment getAlignment() { + return alignment; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + CompletedSubtaskCheckpointStatistics that = (CompletedSubtaskCheckpointStatistics) o; + return ackTimestamp == that.ackTimestamp && + duration == that.duration && + stateSize == that.stateSize && + Objects.equals(checkpointDuration, that.checkpointDuration) && + Objects.equals(alignment, that.alignment); + } + + @Override + public int hashCode() { + return Objects.hash(ackTimestamp, duration, stateSize, checkpointDuration, alignment); + } + + /** + * Duration of the checkpoint. + */ + public static final class CheckpointDuration { + + public static final String FIELD_NAME_SYNC_DURATION = "sync"; + + public static final String FIELD_NAME_ASYNC_DURATION = "async"; + + @JsonProperty(FIELD_NAME_SYNC_DURATION) + private final long syncDuration; + + @JsonProperty(FIELD_NAME_ASYNC_DURATION) + private final long asyncDuration; + + @JsonCreator + public CheckpointDuration( + @JsonProperty(FIELD_NAME_SYNC_DURATION) long syncDuration, + @JsonProperty(FIELD_NAME_ASYNC_DURATION) long asyncDuration) { + this.syncDuration = syncDuration; + this.asyncDuration = asyncDuration; + } + + public long getSyncDuration() { + return syncDuration; + } + + public long getAsyncDuration() { + return asyncDuration; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + CheckpointDuration that = (CheckpointDuration) o; + return syncDuration == that.syncDuration && + asyncDuration == that.asyncDuration; + } + + @Override + public int hashCode() { + return Objects.hash(syncDuration, asyncDuration); + } + } + + /** + * Alignment statistics of the checkpoint. + */ + public static final class CheckpointAlignment { + + public static final String FIELD_NAME_ALIGNMENT_BUFFERED = "buffered"; + + public static final String FIELD_NAME_ALIGNMENT_DURATION = "duration"; + + @JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) + private final long alignmentBuffered; + + @JsonProperty(FIELD_NAME_ALIGNMENT_DURATION) + private final long alignmentDuration; + + @JsonCreator + public CheckpointAlignment( + @JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) long alignmentBuffered, + @JsonProperty(FIELD_NAME_ALIGNMENT_DURATION) long alignmentDuration) { + this.alignmentBuffered = alignmentBuffered; + this.alignmentDuration = alignmentDuration; + } + + public long getAlignmentBuffered() { + return alignmentBuffered; + } + + public long getAlignmentDuration() { + return alignmentDuration; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + CheckpointAlignment that = (CheckpointAlignment) o; + return alignmentBuffered == that.alignmentBuffered && + alignmentDuration == that.alignmentDuration; + } + + @Override + public int hashCode() { + return Objects.hash(alignmentBuffered, alignmentDuration); + } + } + } + + /** + * Checkpoint statistics for a pending subtask checkpoint. + */ + public static final class PendingSubtaskCheckpointStatistics extends SubtaskCheckpointStatistics { + + @JsonCreator + public PendingSubtaskCheckpointStatistics(@JsonProperty(FIELD_NAME_INDEX) int index) { + super(index, "pending_or_failed"); + } + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/TaskCheckpointMessageParameters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/TaskCheckpointMessageParameters.java new file mode 100644 index 0000000000000..ed7620a0085de --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/TaskCheckpointMessageParameters.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.checkpoints; + +import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter; +import org.apache.flink.runtime.rest.messages.MessagePathParameter; + +import java.util.Arrays; +import java.util.Collection; + +/** + * Message parameters for subtask related checkpoint message. + * + *

The message requires a JobID, checkpoint ID and a JobVertexID to be specified. + */ +public class TaskCheckpointMessageParameters extends CheckpointMessageParameters { + + protected final JobVertexIdPathParameter jobVertexIdPathParameter = new JobVertexIdPathParameter(); + + @Override + public Collection> getPathParameters() { + return Arrays.asList(jobPathParameter, checkpointIdPathParameter, jobVertexIdPathParameter); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/TaskCheckpointStatistics.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/TaskCheckpointStatistics.java new file mode 100644 index 0000000000000..9836b26871c6b --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/TaskCheckpointStatistics.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.checkpoints; + +import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus; +import org.apache.flink.runtime.rest.messages.ResponseBody; +import org.apache.flink.util.Preconditions; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.Objects; + +/** + * Checkpoint statistics for a single task. + */ +public class TaskCheckpointStatistics implements ResponseBody { + + public static final String FIELD_NAME_ID = "id"; + + public static final String FIELD_NAME_CHECKPOINT_STATUS = "status"; + + public static final String FIELD_NAME_LATEST_ACK_TIMESTAMP = "latest_ack_timestamp"; + + public static final String FIELD_NAME_STATE_SIZE = "state_size"; + + public static final String FIELD_NAME_DURATION = "end_to_end_duration"; + + public static final String FIELD_NAME_ALIGNMENT_BUFFERED = "alignment_buffered"; + + public static final String FIELD_NAME_NUM_SUBTASKS = "num_subtasks"; + + public static final String FIELD_NAME_NUM_ACK_SUBTASKS = "num_acknowledged_subtasks"; + + @JsonProperty(FIELD_NAME_ID) + private final long checkpointId; + + @JsonProperty(FIELD_NAME_CHECKPOINT_STATUS) + private final CheckpointStatsStatus checkpointStatus; + + @JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP) + private final long latestAckTimestamp; + + @JsonProperty(FIELD_NAME_STATE_SIZE) + private final long stateSize; + + @JsonProperty(FIELD_NAME_DURATION) + private final long duration; + + @JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) + private final long alignmentBuffered; + + @JsonProperty(FIELD_NAME_NUM_SUBTASKS) + private final int numSubtasks; + + @JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) + private final int numAckSubtasks; + + @JsonCreator + public TaskCheckpointStatistics( + @JsonProperty(FIELD_NAME_ID) long checkpointId, + @JsonProperty(FIELD_NAME_CHECKPOINT_STATUS) CheckpointStatsStatus checkpointStatus, + @JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP) long latestAckTimestamp, + @JsonProperty(FIELD_NAME_STATE_SIZE) long stateSize, + @JsonProperty(FIELD_NAME_DURATION) long duration, + @JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) long alignmentBuffered, + @JsonProperty(FIELD_NAME_NUM_SUBTASKS) int numSubtasks, + @JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) int numAckSubtasks) { + + this.checkpointId = checkpointId; + this.checkpointStatus = Preconditions.checkNotNull(checkpointStatus); + this.latestAckTimestamp = latestAckTimestamp; + this.stateSize = stateSize; + this.duration = duration; + this.alignmentBuffered = alignmentBuffered; + this.numSubtasks = numSubtasks; + this.numAckSubtasks = numAckSubtasks; + } + + public long getLatestAckTimestamp() { + return latestAckTimestamp; + } + + public long getStateSize() { + return stateSize; + } + + public long getDuration() { + return duration; + } + + public long getAlignmentBuffered() { + return alignmentBuffered; + } + + public int getNumSubtasks() { + return numSubtasks; + } + + public int getNumAckSubtasks() { + return numAckSubtasks; + } + + public long getCheckpointId() { + return checkpointId; + } + + public CheckpointStatsStatus getCheckpointStatus() { + return checkpointStatus; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + TaskCheckpointStatistics that = (TaskCheckpointStatistics) o; + return checkpointId == that.checkpointId && + latestAckTimestamp == that.latestAckTimestamp && + stateSize == that.stateSize && + duration == that.duration && + alignmentBuffered == that.alignmentBuffered && + numSubtasks == that.numSubtasks && + numAckSubtasks == that.numAckSubtasks && + checkpointStatus == that.checkpointStatus; + } + + @Override + public int hashCode() { + return Objects.hash(checkpointId, checkpointStatus, latestAckTimestamp, stateSize, duration, alignmentBuffered, numSubtasks, numAckSubtasks); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/TaskCheckpointStatisticsHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/TaskCheckpointStatisticsHeaders.java new file mode 100644 index 0000000000000..3886b1ff5038b --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/TaskCheckpointStatisticsHeaders.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.checkpoints; + +import org.apache.flink.runtime.rest.HttpMethodWrapper; +import org.apache.flink.runtime.rest.handler.job.checkpoints.TaskCheckpointStatisticDetailsHandler; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.MessageHeaders; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +/** + * Headers for the {@link TaskCheckpointStatisticDetailsHandler}. + */ +public class TaskCheckpointStatisticsHeaders implements MessageHeaders { + + private static final TaskCheckpointStatisticsHeaders INSTANCE = new TaskCheckpointStatisticsHeaders(); + + public static final String URL = "/jobs/:jobid/checkpoints/details/:checkpointid/subtasks/:vertexid"; + + private TaskCheckpointStatisticsHeaders() {} + + @Override + public Class getRequestClass() { + return EmptyRequestBody.class; + } + + @Override + public Class getResponseClass() { + return TaskCheckpointStatisticsWithSubtaskDetails.class; + } + + @Override + public HttpResponseStatus getResponseStatusCode() { + return HttpResponseStatus.OK; + } + + @Override + public TaskCheckpointMessageParameters getUnresolvedMessageParameters() { + return new TaskCheckpointMessageParameters(); + } + + @Override + public HttpMethodWrapper getHttpMethod() { + return HttpMethodWrapper.GET; + } + + @Override + public String getTargetRestEndpointURL() { + return URL; + } + + public static TaskCheckpointStatisticsHeaders getInstance() { + return INSTANCE; + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/TaskCheckpointStatisticsWithSubtaskDetails.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/TaskCheckpointStatisticsWithSubtaskDetails.java new file mode 100644 index 0000000000000..ad2cab8b01518 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/TaskCheckpointStatisticsWithSubtaskDetails.java @@ -0,0 +1,278 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.checkpoints; + +import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus; +import org.apache.flink.util.Preconditions; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.List; +import java.util.Objects; + +/** + * Task checkpoint statistics which also includes information about the sub task + * checkpoint statistics. + */ +public final class TaskCheckpointStatisticsWithSubtaskDetails extends TaskCheckpointStatistics { + + public static final String FIELD_NAME_SUMMARY = "summary"; + + public static final String FIELD_NAME_SUBTASKS_CHECKPOINT_STATISTICS = "subtasks"; + + @JsonProperty(FIELD_NAME_SUMMARY) + private final Summary summary; + + @JsonProperty(FIELD_NAME_SUBTASKS_CHECKPOINT_STATISTICS) + private final List subtaskCheckpointStatistics; + + @JsonCreator + public TaskCheckpointStatisticsWithSubtaskDetails( + @JsonProperty(FIELD_NAME_ID) long checkpointId, + @JsonProperty(FIELD_NAME_CHECKPOINT_STATUS) CheckpointStatsStatus checkpointStatus, + @JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP) long latestAckTimestamp, + @JsonProperty(FIELD_NAME_STATE_SIZE) long stateSize, + @JsonProperty(FIELD_NAME_DURATION) long duration, + @JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) long alignmentBuffered, + @JsonProperty(FIELD_NAME_NUM_SUBTASKS) int numSubtasks, + @JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) int numAckSubtasks, + @JsonProperty(FIELD_NAME_SUMMARY) Summary summary, + @JsonProperty(FIELD_NAME_SUBTASKS_CHECKPOINT_STATISTICS) List subtaskCheckpointStatistics) { + super( + checkpointId, + checkpointStatus, + latestAckTimestamp, + stateSize, + duration, + alignmentBuffered, + numSubtasks, + numAckSubtasks); + + this.summary = Preconditions.checkNotNull(summary); + this.subtaskCheckpointStatistics = Preconditions.checkNotNull(subtaskCheckpointStatistics); + } + + public Summary getSummary() { + return summary; + } + + public List getSubtaskCheckpointStatistics() { + return subtaskCheckpointStatistics; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } + TaskCheckpointStatisticsWithSubtaskDetails that = (TaskCheckpointStatisticsWithSubtaskDetails) o; + return Objects.equals(summary, that.summary) && + Objects.equals(subtaskCheckpointStatistics, that.subtaskCheckpointStatistics); + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), summary, subtaskCheckpointStatistics); + } + + // ----------------------------------------------------------------------------------- + // Static inner classes + // ----------------------------------------------------------------------------------- + + /** + * Summary of the checkpoint statistics for a given task. + */ + public static final class Summary { + + public static final String FIELD_NAME_STATE_SIZE = "state_size"; + + public static final String FIELD_NAME_DURATION = "end_to_end_duration"; + + public static final String FIELD_NAME_CHECKPOINT_DURATION = "checkpoint_duration"; + + public static final String FIELD_NAME_ALIGNMENT = "alignment"; + + @JsonProperty(FIELD_NAME_STATE_SIZE) + private final MinMaxAvgStatistics stateSize; + + @JsonProperty(FIELD_NAME_DURATION) + private final MinMaxAvgStatistics duration; + + @JsonProperty(FIELD_NAME_CHECKPOINT_DURATION) + private final CheckpointDuration checkpointDuration; + + @JsonProperty(FIELD_NAME_ALIGNMENT) + private final CheckpointAlignment checkpointAlignment; + + @JsonCreator + public Summary( + @JsonProperty(FIELD_NAME_STATE_SIZE) MinMaxAvgStatistics stateSize, + @JsonProperty(FIELD_NAME_DURATION) MinMaxAvgStatistics duration, + @JsonProperty(FIELD_NAME_CHECKPOINT_DURATION) CheckpointDuration checkpointDuration, + @JsonProperty(FIELD_NAME_ALIGNMENT) CheckpointAlignment checkpointAlignment) { + this.stateSize = Preconditions.checkNotNull(stateSize); + this.duration = Preconditions.checkNotNull(duration); + this.checkpointDuration = Preconditions.checkNotNull(checkpointDuration); + this.checkpointAlignment = Preconditions.checkNotNull(checkpointAlignment); + } + + public MinMaxAvgStatistics getStateSize() { + return stateSize; + } + + public MinMaxAvgStatistics getDuration() { + return duration; + } + + public CheckpointDuration getCheckpointDuration() { + return checkpointDuration; + } + + public CheckpointAlignment getCheckpointAlignment() { + return checkpointAlignment; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + Summary summary = (Summary) o; + return Objects.equals(stateSize, summary.stateSize) && + Objects.equals(duration, summary.duration) && + Objects.equals(checkpointDuration, summary.checkpointDuration) && + Objects.equals(checkpointAlignment, summary.checkpointAlignment); + } + + @Override + public int hashCode() { + return Objects.hash(stateSize, duration, checkpointDuration, checkpointAlignment); + } + } + + /** + * Duration of a checkpoint split up into its synchronous and asynchronous part. + */ + public static final class CheckpointDuration { + + public static final String FIELD_NAME_SYNCHRONOUS_DURATION = "sync"; + + public static final String FIELD_NAME_ASYNCHRONOUS_DURATION = "async"; + + @JsonProperty(FIELD_NAME_SYNCHRONOUS_DURATION) + private final MinMaxAvgStatistics synchronousDuration; + + @JsonProperty(FIELD_NAME_ASYNCHRONOUS_DURATION) + private final MinMaxAvgStatistics asynchronousDuration; + + @JsonCreator + public CheckpointDuration( + @JsonProperty(FIELD_NAME_SYNCHRONOUS_DURATION) MinMaxAvgStatistics synchronousDuration, + @JsonProperty(FIELD_NAME_ASYNCHRONOUS_DURATION) MinMaxAvgStatistics asynchronousDuration) { + this.synchronousDuration = Preconditions.checkNotNull(synchronousDuration); + this.asynchronousDuration = Preconditions.checkNotNull(asynchronousDuration); + } + + public MinMaxAvgStatistics getSynchronousDuration() { + return synchronousDuration; + } + + public MinMaxAvgStatistics getAsynchronousDuration() { + return asynchronousDuration; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + CheckpointDuration that = (CheckpointDuration) o; + return Objects.equals(synchronousDuration, that.synchronousDuration) && + Objects.equals(asynchronousDuration, that.asynchronousDuration); + } + + @Override + public int hashCode() { + return Objects.hash(synchronousDuration, asynchronousDuration); + } + } + + /** + * Alignment information for a specific checkpoint at a given task. + */ + public static final class CheckpointAlignment { + + public static final String FIELD_NAME_BUFFERED_DATA = "buffered"; + + public static final String FIELD_NAME_DURATION = "duration"; + + @JsonProperty(FIELD_NAME_BUFFERED_DATA) + private final MinMaxAvgStatistics bufferedData; + + @JsonProperty(FIELD_NAME_DURATION) + private final MinMaxAvgStatistics duration; + + @JsonCreator + public CheckpointAlignment( + @JsonProperty(FIELD_NAME_BUFFERED_DATA) MinMaxAvgStatistics bufferedData, + @JsonProperty(FIELD_NAME_DURATION) MinMaxAvgStatistics duration) { + this.bufferedData = bufferedData; + this.duration = duration; + } + + public MinMaxAvgStatistics getBufferedData() { + return bufferedData; + } + + public MinMaxAvgStatistics getDuration() { + return duration; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + CheckpointAlignment alignment = (CheckpointAlignment) o; + return Objects.equals(bufferedData, alignment.bufferedData) && + Objects.equals(duration, alignment.duration); + } + + @Override + public int hashCode() { + return Objects.hash(bufferedData, duration); + } + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/DashboardConfigHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/DashboardConfigHandlerTest.java index 73d91575f70a2..09fd310cec15d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/DashboardConfigHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/DashboardConfigHandlerTest.java @@ -19,8 +19,8 @@ package org.apache.flink.runtime.rest.handler.legacy; import org.apache.flink.runtime.concurrent.Executors; -import org.apache.flink.runtime.rest.handler.legacy.messages.DashboardConfiguration; import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedJobGenerationUtils; +import org.apache.flink.runtime.rest.messages.DashboardConfiguration; import org.apache.flink.util.TestLogger; import com.fasterxml.jackson.databind.JsonNode; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/BlobServerPortResponseTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/BlobServerPortResponseTest.java index add4e3beed4f8..7ad72fc481c2b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/BlobServerPortResponseTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/BlobServerPortResponseTest.java @@ -18,8 +18,6 @@ package org.apache.flink.runtime.rest.messages; -import org.apache.flink.runtime.rest.handler.legacy.messages.RestResponseMarshallingTestBase; - /** * Tests for {@link BlobServerPortResponseBody}. */ diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/ClusterConfigurationInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/ClusterConfigurationInfoTest.java similarity index 95% rename from flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/ClusterConfigurationInfoTest.java rename to flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/ClusterConfigurationInfoTest.java index 8e7092b2f4d08..8ab43f26e681f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/ClusterConfigurationInfoTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/ClusterConfigurationInfoTest.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.flink.runtime.rest.handler.legacy.messages; +package org.apache.flink.runtime.rest.messages; /** * Tests for the {@link ClusterConfigurationInfo}. diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/DashboardConfigurationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/DashboardConfigurationTest.java similarity index 95% rename from flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/DashboardConfigurationTest.java rename to flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/DashboardConfigurationTest.java index bb1a6ec9b7bf4..789310e2b338c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/DashboardConfigurationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/DashboardConfigurationTest.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.flink.runtime.rest.handler.legacy.messages; +package org.apache.flink.runtime.rest.messages; /** * Tests for the {@link DashboardConfiguration}. diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/JobConfigInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobConfigInfoTest.java similarity index 93% rename from flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/JobConfigInfoTest.java rename to flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobConfigInfoTest.java index 2223d3dec94dc..88fadb7dc9336 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/JobConfigInfoTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobConfigInfoTest.java @@ -16,10 +16,9 @@ * limitations under the License. */ -package org.apache.flink.runtime.rest.handler.legacy.messages; +package org.apache.flink.runtime.rest.messages; import org.apache.flink.api.common.JobID; -import org.apache.flink.runtime.rest.messages.JobConfigInfo; import java.util.HashMap; import java.util.Map; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobSubmitRequestBodyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobSubmitRequestBodyTest.java index e69913c808673..7627d985610c8 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobSubmitRequestBodyTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobSubmitRequestBodyTest.java @@ -19,7 +19,6 @@ package org.apache.flink.runtime.rest.messages; import org.apache.flink.runtime.jobgraph.JobGraph; -import org.apache.flink.runtime.rest.handler.legacy.messages.RestRequestMarshallingTestBase; import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody; import java.io.IOException; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobSubmitResponseBodyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobSubmitResponseBodyTest.java index 9dc832aa3666a..d523716dc79dc 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobSubmitResponseBodyTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobSubmitResponseBodyTest.java @@ -18,7 +18,6 @@ package org.apache.flink.runtime.rest.messages; -import org.apache.flink.runtime.rest.handler.legacy.messages.RestResponseMarshallingTestBase; import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody; /** diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/RestRequestMarshallingTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/RestRequestMarshallingTestBase.java similarity index 94% rename from flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/RestRequestMarshallingTestBase.java rename to flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/RestRequestMarshallingTestBase.java index 2eb37cb044e4b..70a8f7884fc3f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/RestRequestMarshallingTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/RestRequestMarshallingTestBase.java @@ -16,9 +16,8 @@ * limitations under the License. */ -package org.apache.flink.runtime.rest.handler.legacy.messages; +package org.apache.flink.runtime.rest.messages; -import org.apache.flink.runtime.rest.messages.RequestBody; import org.apache.flink.runtime.rest.util.RestMapperUtils; import org.apache.flink.util.TestLogger; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/RestResponseMarshallingTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/RestResponseMarshallingTestBase.java similarity index 94% rename from flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/RestResponseMarshallingTestBase.java rename to flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/RestResponseMarshallingTestBase.java index db44b08ad0c08..82eb43646e6e5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/RestResponseMarshallingTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/RestResponseMarshallingTestBase.java @@ -16,9 +16,8 @@ * limitations under the License. */ -package org.apache.flink.runtime.rest.handler.legacy.messages; +package org.apache.flink.runtime.rest.messages; -import org.apache.flink.runtime.rest.messages.ResponseBody; import org.apache.flink.runtime.rest.util.RestMapperUtils; import org.apache.flink.util.TestLogger; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/StatusOverviewWithVersionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/StatusOverviewWithVersionTest.java similarity index 95% rename from flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/StatusOverviewWithVersionTest.java rename to flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/StatusOverviewWithVersionTest.java index 6b01dbe952616..b2376c5a94564 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/StatusOverviewWithVersionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/StatusOverviewWithVersionTest.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.flink.runtime.rest.handler.legacy.messages; +package org.apache.flink.runtime.rest.messages; /** * Tests for the {@link StatusOverviewWithVersion}. diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/CheckpointConfigInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointConfigInfoTest.java similarity index 91% rename from flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/CheckpointConfigInfoTest.java rename to flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointConfigInfoTest.java index deffaaeb82c4c..5259165fd1efd 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/CheckpointConfigInfoTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointConfigInfoTest.java @@ -16,9 +16,9 @@ * limitations under the License. */ -package org.apache.flink.runtime.rest.handler.legacy.messages; +package org.apache.flink.runtime.rest.messages.checkpoints; -import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointConfigInfo; +import org.apache.flink.runtime.rest.messages.RestResponseMarshallingTestBase; /** * Tests for the {@link CheckpointConfigInfo}. diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/CheckpointingStatisticsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointingStatisticsTest.java similarity index 82% rename from flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/CheckpointingStatisticsTest.java rename to flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointingStatisticsTest.java index 8521d34fe4562..562418e64077c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/CheckpointingStatisticsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointingStatisticsTest.java @@ -16,12 +16,11 @@ * limitations under the License. */ -package org.apache.flink.runtime.rest.handler.legacy.messages; +package org.apache.flink.runtime.rest.messages.checkpoints; import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus; import org.apache.flink.runtime.jobgraph.JobVertexID; -import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatistics; -import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatistics; +import org.apache.flink.runtime.rest.messages.RestResponseMarshallingTestBase; import java.util.Arrays; import java.util.Collections; @@ -42,15 +41,17 @@ protected CheckpointingStatistics getTestResponseInstance() throws Exception { final CheckpointingStatistics.Counts counts = new CheckpointingStatistics.Counts(1, 2, 3, 4, 5); final CheckpointingStatistics.Summary summary = new CheckpointingStatistics.Summary( - new CheckpointingStatistics.MinMaxAvgStatistics(1L, 1L, 1L), - new CheckpointingStatistics.MinMaxAvgStatistics(2L, 2L, 2L), - new CheckpointingStatistics.MinMaxAvgStatistics(3L, 3L, 3L)); + new MinMaxAvgStatistics(1L, 1L, 1L), + new MinMaxAvgStatistics(2L, 2L, 2L), + new MinMaxAvgStatistics(3L, 3L, 3L)); - final Map checkpointStatisticsPerTask = new HashMap<>(2); + final Map checkpointStatisticsPerTask = new HashMap<>(2); checkpointStatisticsPerTask.put( new JobVertexID(), - new CheckpointStatistics.TaskCheckpointStatistics( + new TaskCheckpointStatistics( + 1L, + CheckpointStatsStatus.COMPLETED, 1L, 2L, 3L, @@ -60,7 +61,9 @@ protected CheckpointingStatistics getTestResponseInstance() throws Exception { checkpointStatisticsPerTask.put( new JobVertexID(), - new CheckpointStatistics.TaskCheckpointStatistics( + new TaskCheckpointStatistics( + 1L, + CheckpointStatsStatus.COMPLETED, 2L, 3L, 4L, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/checkpoints/TaskCheckpointStatisticsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/checkpoints/TaskCheckpointStatisticsTest.java new file mode 100644 index 0000000000000..f51d5b165da5e --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/checkpoints/TaskCheckpointStatisticsTest.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.checkpoints; + +import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus; +import org.apache.flink.runtime.rest.messages.RestResponseMarshallingTestBase; + +/** + * Tests the (un)marshalling of {@link TaskCheckpointStatistics}. + */ +public class TaskCheckpointStatisticsTest extends RestResponseMarshallingTestBase { + + @Override + protected Class getTestResponseClass() { + return TaskCheckpointStatistics.class; + } + + @Override + protected TaskCheckpointStatistics getTestResponseInstance() throws Exception { + return new TaskCheckpointStatistics( + 1L, + CheckpointStatsStatus.FAILED, + 42L, + 1L, + 23L, + 1337L, + 9, + 8); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/checkpoints/TaskCheckpointStatisticsWithSubtaskDetailsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/checkpoints/TaskCheckpointStatisticsWithSubtaskDetailsTest.java new file mode 100644 index 0000000000000..29e92ee5851c1 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/checkpoints/TaskCheckpointStatisticsWithSubtaskDetailsTest.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.checkpoints; + +import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus; +import org.apache.flink.runtime.rest.messages.RestResponseMarshallingTestBase; + +import java.util.ArrayList; +import java.util.List; + +/** + * Tests (un)marshalling of {@link TaskCheckpointStatisticsWithSubtaskDetails}. + */ +public class TaskCheckpointStatisticsWithSubtaskDetailsTest extends RestResponseMarshallingTestBase { + + @Override + protected Class getTestResponseClass() { + return TaskCheckpointStatisticsWithSubtaskDetails.class; + } + + @Override + protected TaskCheckpointStatisticsWithSubtaskDetails getTestResponseInstance() throws Exception { + final TaskCheckpointStatisticsWithSubtaskDetails.Summary summary = new TaskCheckpointStatisticsWithSubtaskDetails.Summary( + new MinMaxAvgStatistics(1L, 2L, 3L), + new MinMaxAvgStatistics(1L, 2L, 3L), + new TaskCheckpointStatisticsWithSubtaskDetails.CheckpointDuration( + new MinMaxAvgStatistics(1L, 2L, 3L), + new MinMaxAvgStatistics(1L, 2L, 3L)), + new TaskCheckpointStatisticsWithSubtaskDetails.CheckpointAlignment( + new MinMaxAvgStatistics(1L, 2L, 3L), + new MinMaxAvgStatistics(1L, 2L, 3L))); + + List subtaskCheckpointStatistics = new ArrayList<>(2); + + subtaskCheckpointStatistics.add(new SubtaskCheckpointStatistics.PendingSubtaskCheckpointStatistics(0)); + subtaskCheckpointStatistics.add(new SubtaskCheckpointStatistics.CompletedSubtaskCheckpointStatistics( + 1, + 4L, + 13L, + 1337L, + new SubtaskCheckpointStatistics.CompletedSubtaskCheckpointStatistics.CheckpointDuration(1L, 2L), + new SubtaskCheckpointStatistics.CompletedSubtaskCheckpointStatistics.CheckpointAlignment(2L, 3L))); + + return new TaskCheckpointStatisticsWithSubtaskDetails( + 4L, + CheckpointStatsStatus.COMPLETED, + 4L, + 1337L, + 1L, + 2L, + 8, + 9, + summary, + subtaskCheckpointStatistics); + } +}