Skip to content

Commit

Permalink
[FLINK-35552][runtime] ExecutionGraph shouldn't expose CheckpointStat…
Browse files Browse the repository at this point in the history
…sTracker. It's an implementation detail.
  • Loading branch information
dmvk authored and XComp committed Jun 18, 2024
1 parent eb91358 commit 419bfc6
Show file tree
Hide file tree
Showing 4 changed files with 10 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -559,12 +559,6 @@ public CheckpointCoordinator getCheckpointCoordinator() {
return checkpointCoordinator;
}

@Nullable
@Override
public CheckpointStatsTracker getCheckpointStatsTracker() {
return checkpointStatsTracker;
}

@Override
public KvStateLocationRegistry getKvStateLocationRegistry() {
return kvStateLocationRegistry;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,6 @@ void enableCheckpointing(
@Nullable
CheckpointCoordinator getCheckpointCoordinator();

@Nullable
CheckpointStatsTracker getCheckpointStatsTracker();

KvStateLocationRegistry getKvStateLocationRegistry();

void setJsonPlan(String jsonPlan);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
import org.apache.flink.runtime.jobmaster.SerializedInputSplit;
import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
Expand Down Expand Up @@ -81,23 +82,20 @@ public void reportCheckpointMetrics(
public void reportInitializationMetrics(
ExecutionAttemptID executionAttemptId,
SubTaskInitializationMetrics initializationMetrics) {
if (executionGraph.getCheckpointStatsTracker() == null) {
final CheckpointCoordinatorConfiguration checkpointConfig =
executionGraph.getCheckpointCoordinatorConfiguration();
if (checkpointConfig == null || !checkpointConfig.isCheckpointingEnabled()) {
// TODO: Consider to support reporting initialization stats without checkpointing
log.debug(
"Ignoring reportInitializationMetrics if checkpoint coordinator is not present");
return;
}
ioExecutor.execute(
() -> {
try {
executionGraph
.getCheckpointStatsTracker()
.reportInitializationMetrics(
executionAttemptId, initializationMetrics);
} catch (Exception t) {
log.warn("Error while reportInitializationMetrics", t);
}
});

processCheckpointCoordinatorMessage(
"ReportInitializationMetrics",
coordinator ->
coordinator.reportInitializationMetrics(
executionAttemptId, initializationMetrics));
}

public void acknowledgeCheckpoint(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,12 +294,6 @@ public CheckpointCoordinator getCheckpointCoordinator() {
throw new UnsupportedOperationException();
}

@Nullable
@Override
public CheckpointStatsTracker getCheckpointStatsTracker() {
throw new UnsupportedOperationException();
}

@Override
public KvStateLocationRegistry getKvStateLocationRegistry() {
throw new UnsupportedOperationException();
Expand Down

0 comments on commit 419bfc6

Please sign in to comment.