Skip to content

Commit

Permalink
[FLINK-27609][Fix] Optimize REST call frequency
Browse files Browse the repository at this point in the history
  • Loading branch information
morhidi committed May 16, 2022
1 parent aa7bd6b commit da03f8c
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,25 +88,31 @@ public void observe(FlinkDeployment flinkApp, Context context) {
}

if (isJmDeploymentReady(flinkApp)) {
observeClusterInfo(flinkApp, observeConfig);
if (observeFlinkCluster(flinkApp, context, observeConfig)) {
if (reconciliationStatus.getState() != ReconciliationState.ROLLED_BACK) {
reconciliationStatus.markReconciledSpecAsStable();
if (observeClusterInfo(flinkApp, observeConfig)) {
if (observeFlinkCluster(flinkApp, context, observeConfig)) {
if (reconciliationStatus.getState() != ReconciliationState.ROLLED_BACK) {
reconciliationStatus.markReconciledSpecAsStable();
}
}
}
}

clearErrorsIfDeploymentIsHealthy(flinkApp);
}

private void observeClusterInfo(FlinkDeployment flinkApp, Configuration configuration) {
private boolean observeClusterInfo(FlinkDeployment flinkApp, Configuration configuration) {
if (flinkApp.getStatus().getClusterInfo() != null) {
return true;
}
try {
Map<String, String> clusterInfo = flinkService.getClusterinfo(configuration);
Map<String, String> clusterInfo = flinkService.getClusterInfo(configuration);
flinkApp.getStatus().setClusterInfo(clusterInfo);
logger.debug("ClusterInfo: {}", clusterInfo);
} catch (Exception e) {
logger.warn("Exception while fetching cluster info", e);
logger.error("Exception while fetching cluster info", e);
return false;
}
return true;
}

protected void observeJmDeployment(
Expand All @@ -120,6 +126,8 @@ protected void observeJmDeployment(
return;
}

flinkApp.getStatus().setClusterInfo(null);

logger.info(
"Observing JobManager deployment. Previous status: {}", previousJmStatus.name());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -677,7 +677,7 @@ public SavepointFetchResult fetchSavepointInfo(
}
}

public Map<String, String> getClusterinfo(Configuration conf) throws Exception {
public Map<String, String> getClusterInfo(Configuration conf) throws Exception {
Map<String, String> runtimeVersion = new HashMap<>();

try (RestClusterClient<String> clusterClient =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ public SubmittedJobInfo(
}

@Override
public Map<String, String> getClusterinfo(Configuration conf) throws Exception {
public Map<String, String> getClusterInfo(Configuration conf) throws Exception {
return CLUSTER_INFO;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,11 @@ public void observeApplicationCluster() throws Exception {
JobManagerDeploymentStatus.DEPLOYED_NOT_READY,
deployment.getStatus().getJobManagerDeploymentStatus());
assertNull(deployment.getStatus().getReconciliationStatus().getLastStableSpec());
assertNull(deployment.getStatus().getClusterInfo());

// Stable ready
observer.observe(deployment, readyContext);
assertEquals(TestingFlinkService.CLUSTER_INFO, deployment.getStatus().getClusterInfo());
assertEquals(
JobManagerDeploymentStatus.READY,
deployment.getStatus().getJobManagerDeploymentStatus());
Expand Down Expand Up @@ -274,19 +276,4 @@ public void observeListJobsError() {
});
assertEquals(podFailedMessage, exception.getMessage());
}

@Test
public void observeClusterInfo() {
TestingFlinkService flinkService = new TestingFlinkService();
ApplicationObserver observer =
new ApplicationObserver(
null, flinkService, configManager, new TestingStatusHelper<>());
FlinkDeployment deployment = TestUtils.buildApplicationCluster();
bringToReadyStatus(deployment);
observer.observe(deployment, readyContext);
assertEquals(
JobManagerDeploymentStatus.READY,
deployment.getStatus().getJobManagerDeploymentStatus());
assertEquals(TestingFlinkService.CLUSTER_INFO, deployment.getStatus().getClusterInfo());
}
}

0 comments on commit da03f8c

Please sign in to comment.