Skip to content

Commit

Permalink
[FLINK-27609] Tracking flink-version and flink-revision in FlinkDeplo…
Browse files Browse the repository at this point in the history
…ymentStatus
  • Loading branch information
morhidi committed May 16, 2022
1 parent be54f2b commit b403fcd
Show file tree
Hide file tree
Showing 7 changed files with 85 additions and 0 deletions.
1 change: 1 addition & 0 deletions docs/content/docs/custom-resource/reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ This page serves as a full reference for FlinkDeployment custom resource definit
| ----------| ---- | ---- |
| jobStatus | org.apache.flink.kubernetes.operator.crd.status.JobStatus | Last observed status of the Flink job on Application/Session cluster. |
| error | java.lang.String | Error information about the FlinkDeployment/FlinkSessionJob. |
| clusterInfo | java.util.Map<java.lang.String,java.lang.String> | Config information from running clusters. |
| jobManagerDeploymentStatus | org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus | Last observed status of the JobManager deployment. |
| reconciliationStatus | org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentReconciliationStatus | Status of the last reconcile operation. |

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@
import lombok.ToString;
import lombok.experimental.SuperBuilder;

import java.util.HashMap;
import java.util.Map;

/** Last observed status of the Flink deployment. */
@Experimental
@Data
Expand All @@ -37,6 +40,9 @@
@SuperBuilder
public class FlinkDeploymentStatus extends CommonStatus<FlinkDeploymentSpec> {

/** Config information from running clusters. */
private Map<String, String> clusterInfo = new HashMap<>();

/** Last observed status of the JobManager deployment. */
private JobManagerDeploymentStatus jobManagerDeploymentStatus =
JobManagerDeploymentStatus.MISSING;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.Map;
import java.util.Optional;

/** The base observer. */
Expand Down Expand Up @@ -85,16 +86,29 @@ public void observe(FlinkDeployment flinkApp, Context context) {
if (!isJmDeploymentReady(flinkApp)) {
observeJmDeployment(flinkApp, context, observeConfig);
}

if (isJmDeploymentReady(flinkApp)) {
observeClusterInfo(flinkApp, observeConfig);
if (observeFlinkCluster(flinkApp, context, observeConfig)) {
if (reconciliationStatus.getState() != ReconciliationState.ROLLED_BACK) {
reconciliationStatus.markReconciledSpecAsStable();
}
}
}

clearErrorsIfDeploymentIsHealthy(flinkApp);
}

private void observeClusterInfo(FlinkDeployment flinkApp, Configuration configuration) {
try {
Map<String, String> clusterInfo = flinkService.getClusterinfo(configuration);
flinkApp.getStatus().setClusterInfo(clusterInfo);
logger.debug("ClusterInfo: {}", clusterInfo);
} catch (Exception e) {
logger.warn("Exception while fetching cluster info", e);
}
}

protected void observeJmDeployment(
FlinkDeployment flinkApp, Context context, Configuration effectiveConfig) {
FlinkDeploymentStatus deploymentStatus = flinkApp.getStatus();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@
import org.apache.flink.runtime.rest.RestClient;
import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationResult;
import org.apache.flink.runtime.rest.handler.async.TriggerResponse;
import org.apache.flink.runtime.rest.messages.DashboardConfiguration;
import org.apache.flink.runtime.rest.messages.DashboardConfigurationHeaders;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.TriggerId;
Expand Down Expand Up @@ -102,6 +104,8 @@
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -643,6 +647,35 @@ public SavepointFetchResult fetchSavepointInfo(
}
}

public Map<String, String> getClusterinfo(Configuration conf) throws Exception {
Map<String, String> runtimeVersion = new HashMap<>();

try (RestClusterClient<String> clusterClient =
(RestClusterClient<String>) getClusterClient(conf)) {

DashboardConfiguration dashboardConfiguration =
clusterClient
.sendRequest(
DashboardConfigurationHeaders.getInstance(),
EmptyMessageParameters.getInstance(),
EmptyRequestBody.getInstance())
.get(
configManager
.getOperatorConfiguration()
.getFlinkClientTimeout()
.toSeconds(),
TimeUnit.SECONDS);

runtimeVersion.put(
DashboardConfiguration.FIELD_NAME_FLINK_VERSION,
dashboardConfiguration.getFlinkVersion());
runtimeVersion.put(
DashboardConfiguration.FIELD_NAME_FLINK_REVISION,
dashboardConfiguration.getFlinkRevision());
}
return runtimeVersion;
}

public PodList getJmPodList(FlinkDeployment deployment, Configuration conf) {
final String namespace = conf.getString(KubernetesConfigOptions.NAMESPACE);
final String clusterId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.rest.messages.DashboardConfiguration;

import io.fabric8.kubernetes.api.model.ObjectMeta;
import io.fabric8.kubernetes.api.model.PodList;
Expand All @@ -64,6 +65,12 @@
/** Flink service mock for tests. */
public class TestingFlinkService extends FlinkService {

public static final Map<String, String> CLUSTER_INFO =
Map.of(
DashboardConfiguration.FIELD_NAME_FLINK_VERSION, "15.0.0",
DashboardConfiguration.FIELD_NAME_FLINK_REVISION,
"1234567 @ 1970-01-01T00:00:00+00:00");

private int savepointCounter = 0;
private int triggerCounter = 0;

Expand Down Expand Up @@ -326,4 +333,9 @@ public SubmittedJobInfo(
this.effectiveConfig = effectiveConfig;
}
}

@Override
public Map<String, String> getClusterinfo(Configuration conf) throws Exception {
return CLUSTER_INFO;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -270,4 +270,19 @@ public void observeListJobsError() {
});
assertEquals(podFailedMessage, exception.getMessage());
}

@Test
public void observeClusterInfo() {
TestingFlinkService flinkService = new TestingFlinkService();
ApplicationObserver observer =
new ApplicationObserver(
null, flinkService, configManager, new TestingStatusHelper<>());
FlinkDeployment deployment = TestUtils.buildApplicationCluster();
bringToReadyStatus(deployment);
observer.observe(deployment, readyContext);
assertEquals(
JobManagerDeploymentStatus.READY,
deployment.getStatus().getJobManagerDeploymentStatus());
assertEquals(TestingFlinkService.CLUSTER_INFO, deployment.getStatus().getClusterInfo());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9084,6 +9084,10 @@ spec:
type: object
status:
properties:
clusterInfo:
additionalProperties:
type: string
type: object
jobManagerDeploymentStatus:
enum:
- READY
Expand Down

0 comments on commit b403fcd

Please sign in to comment.