Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-27609] Tracking flink-version and flink-revision in FlinkDeploymentStatus #217

Merged
merged 1 commit into from May 17, 2022

Conversation

morhidi
Copy link
Contributor

@morhidi morhidi commented May 16, 2022

This PR is meant to improve the ability to identify malicious Flink versions (CVE affected, deprecated, etc.) in managed environments. The operator propagates exact versioning information in status:

status:
    clusterInfo:
        flink-revision: 3a4c113 @ 2022-04-20T19:50:32+02:00
        flink-version: 1.15.0

@morhidi
Copy link
Contributor Author

morhidi commented May 16, 2022

cc @wangyang0918 @Aitozi @tweise

if (isJmDeploymentReady(flinkApp)) {
observeClusterInfo(flinkApp, observeConfig);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the cluster info does not change except for upgrading/rollback, we do not need to fetch it for every observation. The rest HTTP call is a little heavy :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair point :) Moved it to (hopefully) the right spot. PTAL

@morhidi morhidi force-pushed the FLINK-27609 branch 2 times, most recently from ccee350 to 5b1f96b Compare May 16, 2022 11:09
@@ -112,6 +125,7 @@ protected void observeJmDeployment(
if (JobManagerDeploymentStatus.DEPLOYED_NOT_READY == previousJmStatus) {
logger.info("JobManager deployment is ready");
deploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
observeClusterInfo(flinkApp, effectiveConfig);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

JobManagerDeploymentStatus.READY does not mean the JobManager Rest API is ready for serving. So we might failed to get the cluster info.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion. Updated the logic again, @wangyang0918. PTAL

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm. Do you have verified this PR could work correctly when upgrade with new image or rollback to old image?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just noticed the broken e2e tests, looking...

@morhidi
Copy link
Contributor Author

morhidi commented May 16, 2022

@wangyang0918 Do you have any idea why the CI test CI / e2e_ci (Default configuration, default, flink:1.13, v1_13) (pull_request) is braking? Made a simple test locally to see if it has anything to do with parsing the response body, without luck so far:

   @Test
    public void testClusterInfoRestCompatibility() throws JsonProcessingException {
        ObjectMapper objectMapper = new ObjectMapper();

        String flink13Response = "{\"refresh-interval\":3000,\"timezone-name\":\"Coordinated Universal Time\",\"timezone-offset\":0,\"flink-version\":\"1.13.6\",\"flink-revision\":\"b2ca390 @ 2022-02-03T14:54:22+01:00\",\"features\":{\"web-submit\":false}}";
        String flink14Response = "{\"refresh-interval\":3000,\"timezone-name\":\"Coordinated Universal Time\",\"timezone-offset\":0,\"flink-version\":\"1.14.4\",\"flink-revision\":\"895c609 @ 2022-02-25T11:57:14+01:00\",\"features\":{\"web-submit\":false,\"web-cancel\":false}}";
        String flink15Response = "{\"refresh-interval\":3000,\"timezone-name\":\"Coordinated Universal Time\",\"timezone-offset\":0,\"flink-version\":\"1.15.0\",\"flink-revision\":\"3a4c113 @ 2022-04-20T19:50:32+02:00\",\"features\":{\"web-submit\":false,\"web-cancel\":false}}";

        var dashboardConfiguration = objectMapper.readValue(flink13Response, DashboardConfiguration.class);
        dashboardConfiguration = objectMapper.readValue(flink14Response, DashboardConfiguration.class);
        dashboardConfiguration = objectMapper.readValue(flink15Response, DashboardConfiguration.class);
    }

@morhidi
Copy link
Contributor Author

morhidi commented May 16, 2022

@wangyang0918 Do you have any idea why the CI test CI / e2e_ci (Default configuration, default, flink:1.13, v1_13) (pull_request) is braking? Made a simple test locally to see if it has anything to do with parsing the response body, without luck so far:

   @Test
    public void testClusterInfoRestCompatibility() throws JsonProcessingException {
        ObjectMapper objectMapper = new ObjectMapper();

        String flink13Response = "{\"refresh-interval\":3000,\"timezone-name\":\"Coordinated Universal Time\",\"timezone-offset\":0,\"flink-version\":\"1.13.6\",\"flink-revision\":\"b2ca390 @ 2022-02-03T14:54:22+01:00\",\"features\":{\"web-submit\":false}}";
        String flink14Response = "{\"refresh-interval\":3000,\"timezone-name\":\"Coordinated Universal Time\",\"timezone-offset\":0,\"flink-version\":\"1.14.4\",\"flink-revision\":\"895c609 @ 2022-02-25T11:57:14+01:00\",\"features\":{\"web-submit\":false,\"web-cancel\":false}}";
        String flink15Response = "{\"refresh-interval\":3000,\"timezone-name\":\"Coordinated Universal Time\",\"timezone-offset\":0,\"flink-version\":\"1.15.0\",\"flink-revision\":\"3a4c113 @ 2022-04-20T19:50:32+02:00\",\"features\":{\"web-submit\":false,\"web-cancel\":false}}";

        var dashboardConfiguration = objectMapper.readValue(flink13Response, DashboardConfiguration.class);
        dashboardConfiguration = objectMapper.readValue(flink14Response, DashboardConfiguration.class);
        dashboardConfiguration = objectMapper.readValue(flink15Response, DashboardConfiguration.class);
    }

I managed to debug this issue locally finally on an older mac (there's no Flink v1.13 for silicon :/). It is indeed an 501 on the first try, then a JSON parsing issue on the retries. (The error handling in the rest client is a mess, the client side errors are swallowed completely) I'll try to come up with something tomorrow:

org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot map `null` into type `boolean` (set DeserializationConfig.DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES to 'false' to allow)
 at [Source: UNKNOWN; line: -1, column: -1] (through reference chain: org.apache.flink.runtime.rest.messages.DashboardConfiguration["features"]->org.apache.flink.runtime.rest.messages.DashboardConfiguration$Features["web-cancel"])

This makes more sense ^^^ than the ones I saw in the CI logs.
.

@gyfora
Copy link
Contributor

gyfora commented May 16, 2022

@morhidi very confusing error message indeed. We can easily fix this tomorrow by the copy trick we did in other incompatible cases :)

@wangyang0918
Copy link
Contributor

@morhidi You could find the following exception in the e2e tests with default namespace. We do not print the operator logs when watchNamespaces enabled. This also need to be fixed. https://github.com/apache/flink-kubernetes-operator/runs/6454327460?check_suite_focus=true

Error: m2022-05-16 14:47:56,840 o.a.f.r.r.RestClient           [ERROR] Received response was neither of the expected type ([simple type, class org.apache.flink.runtime.rest.messages.DashboardConfiguration]) nor an error. Response=JsonResponse{json={"refresh-interval":3000,"timezone-name":"Coordinated Universal Time","timezone-offset":0,"flink-version":"1.13.6","flink-revision":"b2ca390 @ 2022-02-03T14:54:22+01:00","features":{"web-submit":false}}, httpResponseStatus=200 OK}
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: Unrecognized field "refresh-interval" (class org.apache.flink.runtime.rest.messages.ErrorResponseBody), not marked as ignorable (one known property: "errors"])
 at [Source: UNKNOWN; line: -1, column: -1] (through reference chain: org.apache.flink.runtime.rest.messages.ErrorResponseBody["refresh-interval"])
	at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException.from(UnrecognizedPropertyException.java:61)
	at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext.handleUnknownProperty(DeserializationContext.java:987)
	at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer.handleUnknownProperty(StdDeserializer.java:1974)
	at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownProperty(BeanDeserializerBase.java:1701)
	at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownProperties(BeanDeserializerBase.java:1650)
	at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeUsingPropertyBased(BeanDeserializer.java:541)
	at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromObjectUsingNonDefault(BeanDeserializerBase.java:1405)
	at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializer.deserializeFromObject(BeanDeserializer.java:362)
	at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:195)
	at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.DefaultDeserializationContext.readRootValue(DefaultDeserializationContext.java:322)
	at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._readValue(ObjectMapper.java:4569)
	at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2798)
	at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.treeToValue(ObjectMapper.java:3261)
	at org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:529)
	at org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:512)
	at java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(Unknown Source)
	at java.base/java.util.concurrent.CompletableFuture$Completion.run(Unknown Source)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.base/java.lang.Thread.run(Unknown Source)

@wangyang0918
Copy link
Contributor

We can easily fix this tomorrow by the copy trick we did in other incompatible cases :)

@gyfora Flink is now providing an experimental REST API specification following the OpenAPI standard. Maybe we could manage to use the pure HTTP request/response for posting/getting the cluster-info, savepoint, checkpoint, etc. TBH, I do not like to copy the classes from Flink and make some tiny changes.

I do not mean we need to do this in this PR. It indeed deserves more discussion after release-1.0.0.

@morhidi
Copy link
Contributor Author

morhidi commented May 17, 2022

@morhidi You could find the following exception in the e2e tests with default namespace. We do not print the operator logs when watchNamespaces enabled. This also need to be fixed. https://github.com/apache/flink-kubernetes-operator/runs/6454327460?check_suite_focus=true

Error: m2022-05-16 14:47:56,840 o.a.f.r.r.RestClient           [ERROR] Received response was neither of the expected type ([simple type, class org.apache.flink.runtime.rest.messages.DashboardConfiguration]) nor an error. Response=JsonResponse{json={"refresh-interval":3000,"timezone-name":"Coordinated Universal Time","timezone-offset":0,"flink-version":"1.13.6","flink-revision":"b2ca390 @ 2022-02-03T14:54:22+01:00","features":{"web-submit":false}}, httpResponseStatus=200 OK}
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: Unrecognized field "refresh-interval" (class org.apache.flink.runtime.rest.messages.ErrorResponseBody), not marked as ignorable (one known property: "errors"])
 at [Source: UNKNOWN; line: -1, column: -1] (through reference chain: org.apache.flink.runtime.rest.messages.ErrorResponseBody["refresh-interval"])
	at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException.from(UnrecognizedPropertyException.java:61)
	at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext.handleUnknownProperty(DeserializationContext.java:987)
	at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer.handleUnknownProperty(StdDeserializer.java:1974)
	at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownProperty(BeanDeserializerBase.java:1701)
	at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownProperties(BeanDeserializerBase.java:1650)
	at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeUsingPropertyBased(BeanDeserializer.java:541)
	at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromObjectUsingNonDefault(BeanDeserializerBase.java:1405)
	at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializer.deserializeFromObject(BeanDeserializer.java:362)
	at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:195)
	at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.DefaultDeserializationContext.readRootValue(DefaultDeserializationContext.java:322)
	at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._readValue(ObjectMapper.java:4569)
	at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2798)
	at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.treeToValue(ObjectMapper.java:3261)
	at org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:529)
	at org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:512)
	at java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(Unknown Source)
	at java.base/java.util.concurrent.CompletableFuture$Completion.run(Unknown Source)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.base/java.lang.Thread.run(Unknown Source)

This exception is not the real root cause, unfortunately, and it's misleading. I noticed when debugging, when the rest client is handling the original exception, the real response arrives, and it couldn't parse that response into a ErrorResponseBody. We should address this in core Flink. Regardless I'm planning to use a custom response class with the only information we need here. Overall I agree with the approach of having our own request/response classes in the operator that are more resilient for API changes. The current ones are far from ideal. We're going to hit these issues all the time.

@morhidi morhidi force-pushed the FLINK-27609 branch 2 times, most recently from 6c511c0 to 3886e66 Compare May 17, 2022 06:08
@gyfora gyfora merged commit a432dee into apache:main May 17, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
3 participants