Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-29708] Convert CR Error field to JSON with enriched exception … #409

Merged
merged 1 commit into from
Oct 27, 2022

Conversation

darenwkt
Copy link
Contributor

@darenwkt darenwkt commented Oct 23, 2022

What is the purpose of the change

This PR aims to:

  • Store exception metadata in JSON format under existing CR Error field. Metadata includes: exception type, message, stackTrace, list of throwables.
  • A new boolean Spec to enable/disable adding stackTrace to the CR error field.

Brief change log

A new class is introduced to templatize the error field in JSON format:

@Data
@Jacksonized
@Builder
@JsonInclude(JsonInclude.Include.NON_NULL)
public class FlinkResourceException {

    /** Exception Type. */
    String type;

    /** Exception Message. */
    String message;

    /** Exception StackTrace. */
    List<StackTraceElement> stackTraceElements;

    /** Custom Exception Metadata. */
    Map<String, Object> additionalMetadata;

    /** List of Throwables converted to FlinkResourceException Type. */
    List<FlinkResourceException> throwableList;
}

Verifying this change

Added FlinkResourceExceptionUtilsTest to test serialization/deserialization of the exception and test the stackTraceEnabled spec.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changes to the CustomResourceDescriptors: yes
  • Core observer or reconciler logic that is regularly executed: no

Documentation

  • Does this pull request introduce a new feature? no
  • If yes, how is the feature documented? JavaDocs

Copy link
Contributor

@gyfora gyfora left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall looks like a very nice improvement. Added a few smaller commits around size limits and to move the spec field to a config.

Thank you!

@morhidi
Copy link
Contributor

morhidi commented Oct 24, 2022

I tested it and it almost works great, apart from the minor things, Gyula mentioned. There'll be probably issues with JSON encoding: I've tested it with the following invalid CR (missing volumes):

################################################################################
#  Licensed to the Apache Software Foundation (ASF) under one
#  or more contributor license agreements.  See the NOTICE file
#  distributed with this work for additional information
#  regarding copyright ownership.  The ASF licenses this file
#  to you under the Apache License, Version 2.0 (the
#  "License"); you may not use this file except in compliance
#  with the License.  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: basic-example
spec:
  image: flink:1.15
  flinkVersion: v1_15
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
  serviceAccount: flink
  podTemplate:
    apiVersion: v1
    kind: Pod
    metadata:
      name: pod-template
    spec:
      containers:
        - name: flink-main-container
          volumeMounts:
            - mountPath: /opt/flink/log
              name: flink-logs
            - mountPath: /opt/flink/downloads
              name: downloads
  jobManager:
    resource:
      memory: "2048m"
      cpu: 1
  taskManager:
    resource:
      memory: "2048m"
      cpu: 1
  job:
    jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
    parallelism: 2
    upgradeMode: stateless

It resulted in an string, that might not be a valid JSON:

{"type":"org.apache.flink.kubernetes.operator.exception.ReconciliationException","message":"org.apache.flink.client.deployment.ClusterDeploymentException:
  Could not create Kubernetes cluster \"basic-example\".","throwableList":[{"type":"org.apache.flink.client.deployment.ClusterDeploymentException","message":"Could
  not create Kubernetes cluster \"basic-example\"."},{"type":"io.fabric8.kubernetes.client.KubernetesClientException","message":"Failure
  executing: POST at: https://127.0.0.1:53846/apis/apps/v1/namespaces/default/deployments.
  Message: Deployment.apps \"basic-example\" is invalid: [spec.template.spec.containers[0].volumeMounts[0].name:
  Not found: \"flink-logs\", spec.template.spec.containers[0].volumeMounts[1].name:
  Not found: \"downloads\"]. Received status: Status(apiVersion=v1, code=422,
  details=StatusDetails(causes=[StatusCause(field=spec.template.spec.containers[0].volumeMounts[0].name,
  message=Not found: \"flink-logs\", reason=FieldValueNotFound, additionalProperties={}),
  StatusCause(field=spec.template.spec.containers[0].volumeMounts[1].name, message=Not
  found: \"downloads\", reason=FieldValueNotFound, additionalProperties={})],
  group=apps, kind=Deployment, name=basic-example, retryAfterSeconds=null, uid=null,
  additionalProperties={}), kind=Status, message=Deployment.apps \"basic-example\"
  is invalid: [spec.template.spec.containers[0].volumeMounts[0].name: Not found:
    \"flink-logs\", spec.template.spec.containers[0].volumeMounts[1].name: Not found:
    \"downloads\"], metadata=ListMeta(_continue=null, remainingItemCount=null, resourceVersion=null,
    selfLink=null, additionalProperties={}), reason=Invalid, status=Failure, additionalProperties={})."}]}

@morhidi
Copy link
Contributor

morhidi commented Oct 24, 2022

Another thing that I found is the structure of the stacktrace probably too verbose:

  "type": "org.apache.flink.kubernetes.operator.exception.ReconciliationException",
  "message": "java.lang.IllegalArgumentException: Only \"local\" is supported as schema for application mode. This assumes that the jar is located in the image, not the Flink client. An example of such pathis: local:///opt/flink/examples/streaming/WindowJoin.jar",
  "stackTraceElements": [
    {
      "classLoaderName": "app",
      "moduleName": null,
      "moduleVersion": null,
      "methodName": "reconcile",
      "fileName": "FlinkDeploymentController.java",
      "lineNumber": 133,
      "className": "org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController",
      "nativeMethod": false
    },
    {
      "classLoaderName": "app",
      "moduleName": null,
      "moduleVersion": null,
      "methodName": "reconcile",
      "fileName": "FlinkDeploymentController.java",
      "lineNumber": 54,
      "className": "org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController",
      "nativeMethod": false
    },
    {
      "classLoaderName": "app",
      "moduleName": null,
      "moduleVersion": null,
      "methodName": "execute",
      "fileName": "Controller.java",
      "lineNumber": 130,
      "className": "io.javaoperatorsdk.operator.processing.Controller$1",
      "nativeMethod": false
    },
    {
      "classLoaderName": "app",
      "moduleName": null,
      "moduleVersion": null,
      "methodName": "execute",
      "fileName": "Controller.java",
      "lineNumber": 88,
      "className": "io.javaoperatorsdk.operator.processing.Controller$1",
      "nativeMethod": false
    },
    {
      "classLoaderName": "app",
      "moduleName": null,
      "moduleVersion": null,
      "methodName": "timeControllerExecution",
      "fileName": "Metrics.java",
      "lineNumber": 197,
      "className": "io.javaoperatorsdk.operator.api.monitoring.Metrics",
      "nativeMethod": false
    },
    {
      "classLoaderName": "app",
      "moduleName": null,
      "moduleVersion": null,
      "methodName": "reconcile",
      "fileName": "Controller.java",
      "lineNumber": 87,
      "className": "io.javaoperatorsdk.operator.processing.Controller",
      "nativeMethod": false
    },
    {
      "classLoaderName": "app",
      "moduleName": null,
      "moduleVersion": null,
      "methodName": "reconcileExecution",
      "fileName": "ReconciliationDispatcher.java",
      "lineNumber": 135,
      "className": "io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher",
      "nativeMethod": false
    },
    {
      "classLoaderName": "app",
      "moduleName": null,
      "moduleVersion": null,
      "methodName": "handleReconcile",
      "fileName": "ReconciliationDispatcher.java",
      "lineNumber": 115,
      "className": "io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher",
      "nativeMethod": false
    },
    {
      "classLoaderName": "app",
      "moduleName": null,
      "moduleVersion": null,
      "methodName": "handleDispatch",
      "fileName": "ReconciliationDispatcher.java",
      "lineNumber": 86,
      "className": "io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher",
      "nativeMethod": false
    },
    {
      "classLoaderName": "app",
      "moduleName": null,
      "moduleVersion": null,
      "methodName": "handleExecution",
      "fileName": "ReconciliationDispatcher.java",
      "lineNumber": 59,
      "className": "io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher",
      "nativeMethod": false
    },
    {
      "classLoaderName": "app",
      "moduleName": null,
      "moduleVersion": null,
      "methodName": "run",
      "fileName": "EventProcessor.java",
      "lineNumber": 395,
      "className": "io.javaoperatorsdk.operator.processing.event.EventProcessor$ReconcilerExecutor",
      "nativeMethod": false
    },
    {
      "classLoaderName": null,
      "moduleName": "java.base",
      "moduleVersion": "11.0.14",
      "methodName": "runWorker",
      "fileName": "ThreadPoolExecutor.java",
      "lineNumber": 1128,
      "className": "java.util.concurrent.ThreadPoolExecutor",
      "nativeMethod": false
    },
    {
      "classLoaderName": null,
      "moduleName": "java.base",
      "moduleVersion": "11.0.14",
      "methodName": "run",
      "fileName": "ThreadPoolExecutor.java",
      "lineNumber": 628,
      "className": "java.util.concurrent.ThreadPoolExecutor$Worker",
      "nativeMethod": false
    },
    {
      "classLoaderName": null,
      "moduleName": "java.base",
      "moduleVersion": "11.0.14",
      "methodName": "run",
      "fileName": "Thread.java",
      "lineNumber": 829,
      "className": "java.lang.Thread",
      "nativeMethod": false
    }
  ],
  "throwableList": [
    {
      "type": "java.lang.IllegalArgumentException",
      "message": "Only \"local\" is supported as schema for application mode. This assumes that thejar is located in the image, not the Flink client. An example of such path is:local:///opt/flink/examples/streaming/WindowJoin.jar"
    }
  ]
}

@darenwkt
Copy link
Contributor Author

@morhidi

Thanks for the feedback and sharing your test result:

It resulted in an string, that might not be a valid JSON:

I will do some testing on this and validate the final JSON that's included in the CR field.

Another thing that I found is the structure of the stacktrace probably too verbose:

Yup, it does contain several metadata we might not need like "classLoaderName", "nativeMethod". An alternative that I am considering is to store the stackTrace as a more compact List generated from ExceptionUtils.getStackFrames
(https://commons.apache.org/proper/commons-lang/javadocs/api-3.1/org/apache/commons/lang3/exception/ExceptionUtils.html#getStackFrames(java.lang.Throwable))

However, the downside of this is each stackFrame is a String, so it's a bit more work to figure out which method/file the stackFrame is called from. Any thoughts on which one's more suited for our needs?

@morhidi
Copy link
Contributor

morhidi commented Oct 24, 2022

@morhidi

Thanks for the feedback and sharing your test result:

It resulted in an string, that might not be a valid JSON:

I will do some testing on this and validate the final JSON that's included in the CR field.

Another thing that I found is the structure of the stacktrace probably too verbose:

Yup, it does contain several metadata we might not need like "classLoaderName", "nativeMethod". An alternative that I am considering is to store the stackTrace as a more compact List generated from ExceptionUtils.getStackFrames (https://commons.apache.org/proper/commons-lang/javadocs/api-3.1/org/apache/commons/lang3/exception/ExceptionUtils.html#getStackFrames(java.lang.Throwable))

However, the downside of this is each stackFrame is a String, so it's a bit more work to figure out which method/file the stackFrame is called from. Any thoughts on which one's more suited for our needs?

The error JSON looks good in the logs. Things go wrong when it is saved already in the CR status, and you dump it as a yaml. It breaks the single line error JSON into a multiline YAML structure.

@morhidi
Copy link
Contributor

morhidi commented Oct 24, 2022

@morhidi

Thanks for the feedback and sharing your test result:

It resulted in an string, that might not be a valid JSON:

I will do some testing on this and validate the final JSON that's included in the CR field.

Another thing that I found is the structure of the stacktrace probably too verbose:

Yup, it does contain several metadata we might not need like "classLoaderName", "nativeMethod". An alternative that I am considering is to store the stackTrace as a more compact List generated from ExceptionUtils.getStackFrames (https://commons.apache.org/proper/commons-lang/javadocs/api-3.1/org/apache/commons/lang3/exception/ExceptionUtils.html#getStackFrames(java.lang.Throwable))

However, the downside of this is each stackFrame is a String, so it's a bit more work to figure out which method/file the stackFrame is called from. Any thoughts on which one's more suited for our needs?

We might not need the stack trace in a structured format. Maybe just dump it as a string:

public static ExceptionDetail convertFromThrowable(Throwable t) {
        ExceptionDetail exceptionDetail = new ExceptionDetail();
        exceptionDetail.setMessage(t.toString());
        exceptionDetail.setRootCauseMessage(Throwables.getRootCause(t).toString());
        exceptionDetail.setStacktrace(Throwables.getStackTraceAsString(t));
        return exceptionDetail;
    }

Even if the root cause is not in the first cause. We would still find it I guess in the stack trace.

@gyfora
Copy link
Contributor

gyfora commented Oct 26, 2022

+1 for having a more concise stacktrace (as a String).

This will greately reduce the json size and will make it much more human readable and easier to use. If we later feel that this is insufficient we can change it

@darenwkt
Copy link
Contributor Author

Hi @morhidi,

I have updated the stackTrace to be a string and added a ConfigOption to limit the length of the stackTrace string.

Regarding the concern on deserializing the error status into a valid json field, I have tested that deserialization back into the FlinkResourceException class works. My testing was done as follows:

  1. kubectl get flinkdeployment basic-example -o yaml > test.yaml
  2. Tested deserializing the test.yaml back into FlinkResourceException class using the following code:
    @Test
    public void testYamlDeserialization() throws IOException {

        Yaml yaml = new Yaml();
        InputStream inputStream = this.getClass()
                .getClassLoader()
                .getResourceAsStream("test.yaml");
        Map<String, Object> obj = yaml.load(inputStream);
        System.out.println("deserialized yaml: " + obj);

        ObjectMapper mapper = new ObjectMapper();
        FlinkResourceException ex = mapper.readValue((String) ((Map<String, Object>) obj.get("status")).get("error"), FlinkResourceException.class);
        System.out.println("deserialized json: " + ex);
    }
  1. Results of System.out.println are:
deserialized yaml: {apiVersion=flink.apache.org/v1beta1, kind=FlinkDeployment, metadata={annotations={kubectl.kubernetes.io/last-applied-configuration={"apiVersion":"flink.apache.org/v1beta1","kind":"FlinkDeployment","metadata":{"annotations":{},"name":"basic-example","namespace":"default"},"spec":{"flinkConfiguration":{"taskmanager.numberOfTaskSlots":"2"},"flinkVersion":"v1_15","image":"flink:1.15","job":{"jarURI":"local:///opt/flink/examples/streaming/StateMachineExample.jar","parallelism":2,"upgradeMode":"stateless"},"jobManager":{"resource":{"cpu":1,"memory":"2048m"}},"podTemplate":{"apiVersion":"v1","kind":"Pod","metadata":{"name":"pod-template"},"spec":{"containers":[{"name":"flink-main-container","volumeMounts":[{"mountPath":"/opt/flink/log","name":"flink-logs"},{"mountPath":"/opt/flink/downloads","name":"downloads"}]}]}},"serviceAccount":"flink","taskManager":{"resource":{"cpu":1,"memory":"2048m"}}}}
deserialized json: FlinkResourceException(type=org.apache.flink.kubernetes.operator.exception.ReconciliationException, message=org.apache.flink.client.deployment.ClusterDeploymentException: Could not create Kubernetes cluster "basic-example"., stackTraceElements=null, additionalMetadata=null, throwableList=[FlinkResourceException(type=org.apache.flink.client.deployment.ClusterDeploymentException, message=Could not create Kubernetes cluster "basic-example"., stackTraceElements=null, additionalMetadata=null, throwableList=null), FlinkResourceException(type=org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.KubernetesClientException, message=Failure executing: POST at: https://10.96.0.1/apis/apps/v1/namespaces/default/deployments. Message: Deployment.apps "basic-example" is invalid: [spec.template.spec.containers[0].volumeMounts[0].name: Not found: "flink-logs", spec.template.spec.containers[0].volumeMounts[1].name: Not found: "downloads"]. Received status: Status(apiVersion=v1, code=422, details=StatusDetails(causes=[StatusCause(field=spec.template.spec.containers[0].volumeMounts[0].name, message=Not found: "flink-logs", reason=FieldValueNotFound, additionalProperties={}), StatusCause(field=spec.template.spec.containers[0].volumeMounts[1].name, message=Not found: "downloads", reason=FieldValueNotFound, additionalProperties={})], group=apps, kind=Deployment, name=basic-example, retryAfterSeconds=null, uid=null, additionalProperties={}), kind=Status, message=Deployment.apps "basic-example" is invalid: [spec.template.spec.containers[0].volumeMounts[0].name: Not found: "flink-logs", spec.template.spec.containers[0].volumeMounts[1].name: Not found: "downloads"], metadata=ListMeta(_continue=null, remainingItemCount=null, resourceVersion=null, selfLink=null, additionalProperties={}), reason=Invalid, status=Failure, additionalProperties={})., stackTraceElements=null, additionalMetadata=null, throwableList=null)])

As a result, I can confirm deserialization of the json works. The question now is whether we are ok with the current format the error field is shown in CR yaml, which includes the escape field. I tried to search this up and the cause of this is we are storing the string with single quote '' instead of double quote "" in the yaml. Referring to https://www.baeldung.com/yaml-multi-line#quoting. I have tried to looking at openAPIV3Schema, but I don't see a straightforward way to change it to a double quote. I also suspect this could be related to how K8 api serializes the CR into yaml.

@darenwkt darenwkt force-pushed the FLINK-29708-v2 branch 2 times, most recently from 4d5c68d to 3110ef9 Compare October 26, 2022 09:19
@morhidi
Copy link
Contributor

morhidi commented Oct 26, 2022

Hi @darenwkt ideally we should be able to control the stack-trace per CR:

  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
    kubernetes.operator.exception.stacktrace.enabled: "true"

With the current approach this is an operator wide config only. @gyfora wdyt?

@gyfora
Copy link
Contributor

gyfora commented Oct 26, 2022

Hi @darenwkt ideally we should be able to control the stack-trace per CR:

  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
    kubernetes.operator.exception.stacktrace.enabled: "true"

With the current approach this is an operator wide config only. @gyfora wdyt?

I am not completely sure about this. It might be better to let operator admins decide on this setting, at least the size limits for sure.

In any case there is some inconsitency in the current implementation:
If something is put in the KubernetesOperatorConfigurations the ConfigOption should have the annotation SECTION_SYSTEM not dynamic (those are for options that can dynamically be overridden from the user flink config as Matyas suggested).

@morhidi
Copy link
Contributor

morhidi commented Oct 26, 2022

Hi @darenwkt ideally we should be able to control the stack-trace per CR:

  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
    kubernetes.operator.exception.stacktrace.enabled: "true"

With the current approach this is an operator wide config only. @gyfora wdyt?

I am not completely sure about this. It might be better to let operator admins decide on this setting, at least the size limits for sure.

In any case there is some inconsitency in the current implementation: If something is put in the KubernetesOperatorConfigurations the ConfigOption should have the annotation SECTION_SYSTEM not dynamic (those are for options that can dynamically be overridden from the user flink config as Matyas suggested).

Ok, so let's leave it as is and just move the configs under the right section.

@morhidi
Copy link
Contributor

morhidi commented Oct 26, 2022

Hi @morhidi,

I have updated the stackTrace to be a string and added a ConfigOption to limit the length of the stackTrace string.

Regarding the concern on deserializing the error status into a valid json field, I have tested that deserialization back into the FlinkResourceException class works. My testing was done as follows:

  1. kubectl get flinkdeployment basic-example -o yaml > test.yaml
  2. Tested deserializing the test.yaml back into FlinkResourceException class using the following code:
    @Test
    public void testYamlDeserialization() throws IOException {

        Yaml yaml = new Yaml();
        InputStream inputStream = this.getClass()
                .getClassLoader()
                .getResourceAsStream("test.yaml");
        Map<String, Object> obj = yaml.load(inputStream);
        System.out.println("deserialized yaml: " + obj);

        ObjectMapper mapper = new ObjectMapper();
        FlinkResourceException ex = mapper.readValue((String) ((Map<String, Object>) obj.get("status")).get("error"), FlinkResourceException.class);
        System.out.println("deserialized json: " + ex);
    }
  1. Results of System.out.println are:
deserialized yaml: {apiVersion=flink.apache.org/v1beta1, kind=FlinkDeployment, metadata={annotations={kubectl.kubernetes.io/last-applied-configuration={"apiVersion":"flink.apache.org/v1beta1","kind":"FlinkDeployment","metadata":{"annotations":{},"name":"basic-example","namespace":"default"},"spec":{"flinkConfiguration":{"taskmanager.numberOfTaskSlots":"2"},"flinkVersion":"v1_15","image":"flink:1.15","job":{"jarURI":"local:///opt/flink/examples/streaming/StateMachineExample.jar","parallelism":2,"upgradeMode":"stateless"},"jobManager":{"resource":{"cpu":1,"memory":"2048m"}},"podTemplate":{"apiVersion":"v1","kind":"Pod","metadata":{"name":"pod-template"},"spec":{"containers":[{"name":"flink-main-container","volumeMounts":[{"mountPath":"/opt/flink/log","name":"flink-logs"},{"mountPath":"/opt/flink/downloads","name":"downloads"}]}]}},"serviceAccount":"flink","taskManager":{"resource":{"cpu":1,"memory":"2048m"}}}}
deserialized json: FlinkResourceException(type=org.apache.flink.kubernetes.operator.exception.ReconciliationException, message=org.apache.flink.client.deployment.ClusterDeploymentException: Could not create Kubernetes cluster "basic-example"., stackTraceElements=null, additionalMetadata=null, throwableList=[FlinkResourceException(type=org.apache.flink.client.deployment.ClusterDeploymentException, message=Could not create Kubernetes cluster "basic-example"., stackTraceElements=null, additionalMetadata=null, throwableList=null), FlinkResourceException(type=org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.KubernetesClientException, message=Failure executing: POST at: https://10.96.0.1/apis/apps/v1/namespaces/default/deployments. Message: Deployment.apps "basic-example" is invalid: [spec.template.spec.containers[0].volumeMounts[0].name: Not found: "flink-logs", spec.template.spec.containers[0].volumeMounts[1].name: Not found: "downloads"]. Received status: Status(apiVersion=v1, code=422, details=StatusDetails(causes=[StatusCause(field=spec.template.spec.containers[0].volumeMounts[0].name, message=Not found: "flink-logs", reason=FieldValueNotFound, additionalProperties={}), StatusCause(field=spec.template.spec.containers[0].volumeMounts[1].name, message=Not found: "downloads", reason=FieldValueNotFound, additionalProperties={})], group=apps, kind=Deployment, name=basic-example, retryAfterSeconds=null, uid=null, additionalProperties={}), kind=Status, message=Deployment.apps "basic-example" is invalid: [spec.template.spec.containers[0].volumeMounts[0].name: Not found: "flink-logs", spec.template.spec.containers[0].volumeMounts[1].name: Not found: "downloads"], metadata=ListMeta(_continue=null, remainingItemCount=null, resourceVersion=null, selfLink=null, additionalProperties={}), reason=Invalid, status=Failure, additionalProperties={})., stackTraceElements=null, additionalMetadata=null, throwableList=null)])

As a result, I can confirm deserialization of the json works. The question now is whether we are ok with the current format the error field is shown in CR yaml, which includes the escape field. I tried to search this up and the cause of this is we are storing the string with single quote '' instead of double quote "" in the yaml. Referring to https://www.baeldung.com/yaml-multi-line#quoting. I have tried to looking at openAPIV3Schema, but I don't see a straightforward way to change it to a double quote. I also suspect this could be related to how K8 api serializes the CR into yaml.

@darenwkt did you manage to properly parse the output of kubectl get flinkdeployment basic-example -o yaml > test.yaml back into a flinkdeployment CR too? It looks the JSON breaks up the yaml structure, so I'm worried more about the whole CR parsing not just the JSON within. :)

On my cluster the error/stacktrace appears to be a multiline nested yaml:

image

Which is not a big deal if we can parse the CR back properly without loosing any content.

@darenwkt
Copy link
Contributor Author

darenwkt commented Oct 26, 2022

Hi @morhidi,
I have updated the stackTrace to be a string and added a ConfigOption to limit the length of the stackTrace string.
Regarding the concern on deserializing the error status into a valid json field, I have tested that deserialization back into the FlinkResourceException class works. My testing was done as follows:

  1. kubectl get flinkdeployment basic-example -o yaml > test.yaml
  2. Tested deserializing the test.yaml back into FlinkResourceException class using the following code:
    @Test
    public void testYamlDeserialization() throws IOException {

        Yaml yaml = new Yaml();
        InputStream inputStream = this.getClass()
                .getClassLoader()
                .getResourceAsStream("test.yaml");
        Map<String, Object> obj = yaml.load(inputStream);
        System.out.println("deserialized yaml: " + obj);

        ObjectMapper mapper = new ObjectMapper();
        FlinkResourceException ex = mapper.readValue((String) ((Map<String, Object>) obj.get("status")).get("error"), FlinkResourceException.class);
        System.out.println("deserialized json: " + ex);
    }
  1. Results of System.out.println are:
deserialized yaml: {apiVersion=flink.apache.org/v1beta1, kind=FlinkDeployment, metadata={annotations={kubectl.kubernetes.io/last-applied-configuration={"apiVersion":"flink.apache.org/v1beta1","kind":"FlinkDeployment","metadata":{"annotations":{},"name":"basic-example","namespace":"default"},"spec":{"flinkConfiguration":{"taskmanager.numberOfTaskSlots":"2"},"flinkVersion":"v1_15","image":"flink:1.15","job":{"jarURI":"local:///opt/flink/examples/streaming/StateMachineExample.jar","parallelism":2,"upgradeMode":"stateless"},"jobManager":{"resource":{"cpu":1,"memory":"2048m"}},"podTemplate":{"apiVersion":"v1","kind":"Pod","metadata":{"name":"pod-template"},"spec":{"containers":[{"name":"flink-main-container","volumeMounts":[{"mountPath":"/opt/flink/log","name":"flink-logs"},{"mountPath":"/opt/flink/downloads","name":"downloads"}]}]}},"serviceAccount":"flink","taskManager":{"resource":{"cpu":1,"memory":"2048m"}}}}
deserialized json: FlinkResourceException(type=org.apache.flink.kubernetes.operator.exception.ReconciliationException, message=org.apache.flink.client.deployment.ClusterDeploymentException: Could not create Kubernetes cluster "basic-example"., stackTraceElements=null, additionalMetadata=null, throwableList=[FlinkResourceException(type=org.apache.flink.client.deployment.ClusterDeploymentException, message=Could not create Kubernetes cluster "basic-example"., stackTraceElements=null, additionalMetadata=null, throwableList=null), FlinkResourceException(type=org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.KubernetesClientException, message=Failure executing: POST at: https://10.96.0.1/apis/apps/v1/namespaces/default/deployments. Message: Deployment.apps "basic-example" is invalid: [spec.template.spec.containers[0].volumeMounts[0].name: Not found: "flink-logs", spec.template.spec.containers[0].volumeMounts[1].name: Not found: "downloads"]. Received status: Status(apiVersion=v1, code=422, details=StatusDetails(causes=[StatusCause(field=spec.template.spec.containers[0].volumeMounts[0].name, message=Not found: "flink-logs", reason=FieldValueNotFound, additionalProperties={}), StatusCause(field=spec.template.spec.containers[0].volumeMounts[1].name, message=Not found: "downloads", reason=FieldValueNotFound, additionalProperties={})], group=apps, kind=Deployment, name=basic-example, retryAfterSeconds=null, uid=null, additionalProperties={}), kind=Status, message=Deployment.apps "basic-example" is invalid: [spec.template.spec.containers[0].volumeMounts[0].name: Not found: "flink-logs", spec.template.spec.containers[0].volumeMounts[1].name: Not found: "downloads"], metadata=ListMeta(_continue=null, remainingItemCount=null, resourceVersion=null, selfLink=null, additionalProperties={}), reason=Invalid, status=Failure, additionalProperties={})., stackTraceElements=null, additionalMetadata=null, throwableList=null)])

As a result, I can confirm deserialization of the json works. The question now is whether we are ok with the current format the error field is shown in CR yaml, which includes the escape field. I tried to search this up and the cause of this is we are storing the string with single quote '' instead of double quote "" in the yaml. Referring to https://www.baeldung.com/yaml-multi-line#quoting. I have tried to looking at openAPIV3Schema, but I don't see a straightforward way to change it to a double quote. I also suspect this could be related to how K8 api serializes the CR into yaml.

@darenwkt did you manage to properly parse the output of kubectl get flinkdeployment basic-example -o yaml > test.yaml back into a flinkdeployment CR too? It looks the JSON breaks up the yaml structure, so I'm worried more about the whole CR parsing not just the JSON within. :)

On my cluster the error/stacktrace appears to be a multiline nested yaml:

image

Which is not a big deal if we can parse the CR back properly without loosing any content.

Hi @morhidi,

No problem, I have tested the deserialization of test.yaml into FlinkDeployment.class and can confirm that it's able to deserialize correctly. My testing for this is as follows:

        mapper = new ObjectMapper(new YAMLFactory());
        FlinkDeployment flinkDeployment = mapper.readValue(new File("src/test/resources/test.yaml"), FlinkDeployment.class);
        System.out.println("deserialized flinkDeployment: " + flinkDeployment);
        System.out.println("deserialized flinkDeployment error: " + flinkDeployment.getStatus().getError());

The results are:

deserialized flinkDeployment: CustomResource{kind='FlinkDeployment', apiVersion='flink.apache.org/v1beta1', metadata=ObjectMeta(annotations={kubectl.kubernetes.io/last-applied-configuration={"apiVersion":"flink.apache.org/v1beta1","kind":"FlinkDeployment","metadata":{"annotations":{},"name":"basic-example","namespace":"default"},"spec":{"flinkConfiguration":{"taskmanager.numberOfTaskSlots":"2"},"flinkVersion":"v1_15","image":"flink:1.15","job":{"jarURI":"local:///opt/flink/examples/streaming/StateMachineExample.jar","parallelism":2,"upgradeMode":"stateless"},"jobManager":{"resource":{"cpu":1,"memory":"2048m"}},"podTemplate":{"apiVersion":"v1","kind":"Pod","metadata":{"name":"pod-template"},"spec":{"containers":[{"name":"flink-main-container","volumeMounts":[{"mountPath":"/opt/flink/log","name":"flink-logs"},{"mountPath":"/opt/flink/downloads","name":"downloads"}]}]}},"serviceAccount":"flink","taskManager":{"resource":{"cpu":1,"memory":"2048m"}}}}
}, clusterName=null, creationTimestamp=2022-10-24T21:50:52Z, deletionGracePeriodSeconds=null, deletionTimestamp=null, finalizers=[flinkdeployments.flink.apache.org/finalizer], generateName=null, generation=2, labels=null, managedFields=[], name=basic-example, namespace=default, ownerReferences=[], resourceVersion=3127914, selfLink=null, uid=200aec43-2bf7-44a7-a020-188ea091ff9f, additionalProperties={}), spec=FlinkDeploymentSpec(super=AbstractFlinkSpec(job=JobSpec(jarURI=local:///opt/flink/examples/streaming/StateMachineExample.jar, parallelism=2, entryClass=null, args=[], state=RUNNING, savepointTriggerNonce=null, initialSavepointPath=null, upgradeMode=STATELESS, allowNonRestoredState=null), restartNonce=null, flinkConfiguration={taskmanager.numberOfTaskSlots=2}), image=flink:1.15, imagePullPolicy=null, serviceAccount=flink, flinkVersion=v1_15, ingress=null, podTemplate=Pod(apiVersion=v1, kind=Pod, metadata=ObjectMeta(annotations=null, clusterName=null, creationTimestamp=null, deletionGracePeriodSeconds=null, deletionTimestamp=null, finalizers=[], generateName=null, generation=null, labels=null, managedFields=[], name=pod-template, namespace=null, ownerReferences=[], resourceVersion=null, selfLink=null, uid=null, additionalProperties={}), spec=PodSpec(activeDeadlineSeconds=null, affinity=null, automountServiceAccountToken=null, containers=[Container(args=[], command=[], env=[], envFrom=[], image=null, imagePullPolicy=null, lifecycle=null, livenessProbe=null, name=flink-main-container, ports=[], readinessProbe=null, resources=null, securityContext=null, startupProbe=null, stdin=null, stdinOnce=null, terminationMessagePath=null, terminationMessagePolicy=null, tty=null, volumeDevices=[], volumeMounts=[VolumeMount(mountPath=/opt/flink/log, mountPropagation=null, name=flink-logs, readOnly=null, subPath=null, subPathExpr=null, additionalProperties={}), VolumeMount(mountPath=/opt/flink/downloads, mountPropagation=null, name=downloads, readOnly=null, subPath=null, subPathExpr=null, additionalProperties={})], workingDir=null, additionalProperties={})], dnsConfig=null, dnsPolicy=null, enableServiceLinks=null, ephemeralContainers=[], hostAliases=[], hostIPC=null, hostNetwork=null, hostPID=null, hostname=null, imagePullSecrets=[], initContainers=[], nodeName=null, nodeSelector=null, os=null, overhead=null, preemptionPolicy=null, priority=null, priorityClassName=null, readinessGates=[], restartPolicy=null, runtimeClassName=null, schedulerName=null, securityContext=null, serviceAccount=null, serviceAccountName=null, setHostnameAsFQDN=null, shareProcessNamespace=null, subdomain=null, terminationGracePeriodSeconds=null, tolerations=[], topologySpreadConstraints=[], volumes=[], additionalProperties={}), status=null, additionalProperties={}), jobManager=JobManagerSpec(resource=Resource(cpu=1.0, memory=2048m), replicas=1, podTemplate=null), taskManager=TaskManagerSpec(resource=Resource(cpu=1.0, memory=2048m), replicas=null, podTemplate=null), logConfiguration=null, mode=null), status=FlinkDeploymentStatus(super=CommonStatus(jobStatus=JobStatus(jobName=null, jobId=16ab0d806af3276492caac21c6294d49, state=null, startTime=null, updateTime=null, savepointInfo=SavepointInfo(lastSavepoint=null, triggerId=, triggerTimestamp=0, triggerType=UNKNOWN, formatType=UNKNOWN, savepointHistory=[], lastPeriodicSavepointTimestamp=0)), error={"type":"org.apache.flink.kubernetes.operator.exception.ReconciliationException","message":"org.apache.flink.client.deployment.ClusterDeploymentException: Could not create Kubernetes cluster \"basic-example\".","throwableList":[{"type":"org.apache.flink.client.deployment.ClusterDeploymentException","message":"Could not create Kubernetes cluster \"basic-example\"."},{"type":"org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.KubernetesClientException","message":"Failure executing: POST at: https://10.96.0.1/apis/apps/v1/namespaces/default/deployments. Message: Deployment.apps \"basic-example\" is invalid: [spec.template.spec.containers[0].volumeMounts[0].name: Not found: \"flink-logs\", spec.template.spec.containers[0].volumeMounts[1].name: Not found: \"downloads\"]. Received status: Status(apiVersion=v1, code=422, details=StatusDetails(causes=[StatusCause(field=spec.template.spec.containers[0].volumeMounts[0].name, message=Not found: \"flink-logs\", reason=FieldValueNotFound, additionalProperties={}), StatusCause(field=spec.template.spec.containers[0].volumeMounts[1].name, message=Not found: \"downloads\", reason=FieldValueNotFound, additionalProperties={})], group=apps, kind=Deployment, name=basic-example, retryAfterSeconds=null, uid=null, additionalProperties={}), kind=Status, message=Deployment.apps \"basic-example\" is invalid: [spec.template.spec.containers[0].volumeMounts[0].name: Not found: \"flink-logs\", spec.template.spec.containers[0].volumeMounts[1].name: Not found: \"downloads\"], metadata=ListMeta(_continue=null, remainingItemCount=null, resourceVersion=null, selfLink=null, additionalProperties={}), reason=Invalid, status=Failure, additionalProperties={})."}]}), clusterInfo={}, jobManagerDeploymentStatus=MISSING, reconciliationStatus=FlinkDeploymentReconciliationStatus(super=ReconciliationStatus(reconciliationTimestamp=1666650817435, lastReconciledSpec={"spec":{"job":{"jarURI":"local:///opt/flink/examples/streaming/StateMachineExample.jar","parallelism":2,"entryClass":null,"args":[],"state":"suspended","savepointTriggerNonce":null,"initialSavepointPath":null,"upgradeMode":"stateless","allowNonRestoredState":null},"restartNonce":null,"flinkConfiguration":{"taskmanager.numberOfTaskSlots":"2"},"image":"flink:1.15","imagePullPolicy":null,"serviceAccount":"flink","flinkVersion":"v1_15","ingress":null,"podTemplate":{"apiVersion":"v1","kind":"Pod","metadata":{"name":"pod-template"},"spec":{"containers":[{"name":"flink-main-container","volumeMounts":[{"mountPath":"/opt/flink/log","name":"flink-logs"},{"mountPath":"/opt/flink/downloads","name":"downloads"}]}]}},"jobManager":{"resource":{"cpu":1.0,"memory":"2048m"},"replicas":1,"podTemplate":null},"taskManager":{"resource":{"cpu":1.0,"memory":"2048m"},"replicas":null,"podTemplate":null},"logConfiguration":null,"mode":null},"resource_metadata":{"apiVersion":"flink.apache.org/v1beta1","metadata":{"generation":2},"firstDeployment":true}}, lastStableSpec=null, state=UPGRADING)), taskManager=TaskManagerInfo(labelSelector=, replicas=0))}
deserialized flinkDeployment error: {"type":"org.apache.flink.kubernetes.operator.exception.ReconciliationException","message":"org.apache.flink.client.deployment.ClusterDeploymentException: Could not create Kubernetes cluster \"basic-example\".","throwableList":[{"type":"org.apache.flink.client.deployment.ClusterDeploymentException","message":"Could not create Kubernetes cluster \"basic-example\"."},{"type":"org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.KubernetesClientException","message":"Failure executing: POST at: https://10.96.0.1/apis/apps/v1/namespaces/default/deployments. Message: Deployment.apps \"basic-example\" is invalid: [spec.template.spec.containers[0].volumeMounts[0].name: Not found: \"flink-logs\", spec.template.spec.containers[0].volumeMounts[1].name: Not found: \"downloads\"]. Received status: Status(apiVersion=v1, code=422, details=StatusDetails(causes=[StatusCause(field=spec.template.spec.containers[0].volumeMounts[0].name, message=Not found: \"flink-logs\", reason=FieldValueNotFound, additionalProperties={}), StatusCause(field=spec.template.spec.containers[0].volumeMounts[1].name, message=Not found: \"downloads\", reason=FieldValueNotFound, additionalProperties={})], group=apps, kind=Deployment, name=basic-example, retryAfterSeconds=null, uid=null, additionalProperties={}), kind=Status, message=Deployment.apps \"basic-example\" is invalid: [spec.template.spec.containers[0].volumeMounts[0].name: Not found: \"flink-logs\", spec.template.spec.containers[0].volumeMounts[1].name: Not found: \"downloads\"], metadata=ListMeta(_continue=null, remainingItemCount=null, resourceVersion=null, selfLink=null, additionalProperties={}), reason=Invalid, status=Failure, additionalProperties={})."}]}

Also repeated same test for FlinkSessionJob CR.

deserialized flinkSessionJob: CustomResource{kind='FlinkSessionJob', apiVersion='flink.apache.org/v1beta1', metadata=ObjectMeta(annotations={kubectl.kubernetes.io/last-applied-configuration={"apiVersion":"flink.apache.org/v1beta1","kind":"FlinkSessionJob","metadata":{"annotations":{},"name":"basic-session-job-example","namespace":"default"},"spec":{"deploymentName":"basic-session-deployment-example","job":{"jarURI":"https://repo1.maven.org/maven2/org/apache/flink/flink-examples-streaming_2.12/1.15.1/flink-examples-streaming_2.12-1.15.1-TopSpeedWindowing.jar","parallelism":4,"upgradeMode":"stateless"}}}
}, clusterName=null, creationTimestamp=2022-10-26T20:22:06Z, deletionGracePeriodSeconds=null, deletionTimestamp=null, finalizers=[flinksessionjobs.flink.apache.org/finalizer], generateName=null, generation=1, labels={target.session=basic-session-deployment-example}, managedFields=[], name=basic-session-job-example, namespace=default, ownerReferences=[], resourceVersion=3288473, selfLink=null, uid=a48bb464-3a21-4c65-a73c-b368b299d27c, additionalProperties={}), spec=FlinkSessionJobSpec(super=AbstractFlinkSpec(job=JobSpec(jarURI=https://repo1.maven.org/maven2/org/apache/flink/flink-examples-streaming_2.12/1.15.1/flink-examples-streaming_2.12-1.15.1-TopSpeedWindowing.jar, parallelism=4, entryClass=null, args=[], state=RUNNING, savepointTriggerNonce=null, initialSavepointPath=null, upgradeMode=STATELESS, allowNonRestoredState=null), restartNonce=null, flinkConfiguration=null), deploymentName=basic-session-deployment-example), status=FlinkSessionJobStatus(super=CommonStatus(jobStatus=JobStatus(jobName=null, jobId=null, state=null, startTime=null, updateTime=null, savepointInfo=SavepointInfo(lastSavepoint=null, triggerId=, triggerTimestamp=0, triggerType=UNKNOWN, formatType=UNKNOWN, savepointHistory=[], lastPeriodicSavepointTimestamp=0)), error={"type":"org.apache.flink.kubernetes.operator.exception.ReconciliationException","message":"org.apache.flink.util.FlinkRuntimeException: java.util.concurrent.ExecutionException: org.apache.flink.runtime.rest.util.RestClientException: [Internal server error., <Exception on server side:\njava.lang.RuntimeException: Could not look up the main(String[]) method from the class org.apache.flink.streaming.examples.windowing.TopSpeedWindowing: org/apache/flink/api/connector/sink2/Sink\n\tat org.apache.flink.client.program.PackagedProgram.hasMainMethod(PackagedProgram.java:315)\n\tat org.apache.flink.client.program.PackagedProgram.<init>(PackagedProgram.java:161)\n\tat org.apache.flink.client.program.PackagedProgram.<init>(PackagedProgram.java:65)\n\tat org.apache.flink.client.program.PackagedProgram$Builder.build(PackagedProgram.java:691)\n\tat org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toPackagedProgram(JarHandlerUtils.java:182)\n\tat org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.applyToConfiguration(JarHandlerUtils.java:141)\n\tat org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.handleRequest(JarRunHandler.java:97)\n\tat org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.handleRequest(JarRunHandler.java:54)\n\tat org.apache.flink.runtime.rest.handler.AbstractRestHandler.respondToRequest(AbstractRestHandler.java:83)\n\tat org.apache.flink.runtime.rest.handler.AbstractHandler.respondAsLeader(AbstractHandler.java:195)\n\tat org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.lambda$channelRead0$0(LeaderRetrievalHandler.java:83)\n\tat java.util.Optional.ifPresent(Optional.java:159)\n\tat org.apache.flink.util.OptionalConsumer.ifPresent(OptionalConsumer.java:45)\n\tat org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:80)\n\tat org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:49)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)\n\tat org.apache.flink.shaded.net","stackTraceElements":"org.apache.flink.kubernetes.operator.exception.ReconciliationException: org.apache.flink.util.FlinkRuntimeException: java.util.concurrent.ExecutionException: org.apache.flink.runtime.rest.util.RestClientException: [Internal server error., <Exception on server side:\njava.lang.RuntimeException: Could not look up the main(String[]) method from the class org.apache.flink.streaming.examples.windowing.TopSpeedWindowing: org/apache/flink/api/connector/sink2/Sink\n\tat org.apache.flink.client.program.PackagedProgram.hasMainMethod(PackagedProgram.java:315)\n\tat org.apache.flink.client.program.PackagedProgram.<init>(PackagedProgram.java:161)\n\tat org.apache.flink.client.program.PackagedProgram.<init>(PackagedProgram.java:65)\n\tat org.apache.flink.client.program.PackagedProgram$Builder.build(PackagedProgram.java:691)\n\tat org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toPackagedProgram(JarHandlerUtils.java:182)\n\tat org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.applyToConfiguration(JarHandlerUtils.java:141)\n\tat org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.handleRequest(JarRunHandler.java:97)\n\tat org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.handleRequest(JarRunHandler.java:54)\n\tat org.apache.flink.runtime.rest.handler.AbstractRestHandler.respondToRequest(AbstractRestHandler.java:83)\n\tat org.apache.flink.runtime.rest.handler.AbstractHandler.respondAsLeader(AbstractHandler.java:195)\n\tat org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.lambda$channelRead0$0(LeaderRetrievalHandler.java:83)\n\tat java.util.Optional.ifPresent(Optional.java:159)\n\tat org.apache.flink.util.OptionalConsumer.ifPresent(OptionalConsumer.java:45)\n\tat org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:80)\n\tat org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:49)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelR","throwableList":[{"type":"org.apache.flink.util.FlinkRuntimeException","message":"java.util.concurrent.ExecutionException: org.apache.flink.runtime.rest.util.RestClientException: [Internal server error., <Exception on server side:\njava.lang.RuntimeException: Could not look up the main(String[]) method from the class org.apache.flink.streaming.examples.windowing.TopSpeedWindowing: org/apache/flink/api/connector/sink2/Sink\n\tat org.apache.flink.client.program.PackagedProgram.hasMainMethod(PackagedProgram.java:315)\n\tat org.apache.flink.client.program.PackagedProgram.<init>(PackagedProgram.java:161)\n\tat org.apache.flink.client.program.PackagedProgram.<init>(PackagedProgram.java:65)\n\tat org.apache.flink.client.program.PackagedProgram$Builder.build(PackagedProgram.java:691)\n\tat org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toPackagedProgram(JarHandlerUtils.java:182)\n\tat org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.applyToConfiguration(JarHandlerUtils.java:141)\n\tat org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.handleRequest(JarRunHandler.java:97)\n\tat org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.handleRequest(JarRunHandler.java:54)\n\tat org.apache.flink.runtime.rest.handler.AbstractRestHandler.respondToRequest(AbstractRestHandler.java:83)\n\tat org.apache.flink.runtime.rest.handler.AbstractHandler.respondAsLeader(AbstractHandler.java:195)\n\tat org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.lambda$channelRead0$0(LeaderRetrievalHandler.java:83)\n\tat java.util.Optional.ifPresent(Optional.java:159)\n\tat org.apache.flink.util.OptionalConsumer.ifPresent(OptionalConsumer.java:45)\n\tat org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:80)\n\tat org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:49)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerCo"},{"type":"java.util.concurrent.ExecutionException","message":"org.apache.flink.runtime.rest.util.RestClientException: [Internal server error., <Exception on server side:\njava.lang.RuntimeException: Could not look up the main(String[]) method from the class org.apache.flink.streaming.examples.windowing.TopSpeedWindowing: org/apache/flink/api/connector/sink2/Sink\n\tat org.apache.flink.client.program.PackagedProgram.hasMainMethod(PackagedProgram.java:315)\n\tat org.apache.flink.client.program.PackagedProgram.<init>(PackagedProgram.java:161)\n\tat org.apache.flink.client.program.PackagedProgram.<init>(PackagedProgram.java:65)\n\tat org.apache.flink.client.program.PackagedProgram$Builder.build(PackagedProgram.java:691)\n\tat org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toPackagedProgram(JarHandlerUtils.java:182)\n\tat org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.applyToConfiguration(JarHandlerUtils.java:141)\n\tat org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.handleRequest(JarRunHandler.java:97)\n\tat org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.handleRequest(JarRunHandler.java:54)\n\tat org.apache.flink.runtime.rest.handler.AbstractRestHandler.respondToRequest(AbstractRestHandler.java:83)\n\tat org.apache.flink.runtime.rest.handler.AbstractHandler.respondAsLeader(AbstractHandler.java:195)\n\tat org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.lambda$channelRead0$0(LeaderRetrievalHandler.java:83)\n\tat java.util.Optional.ifPresent(Optional.java:159)\n\tat org.apache.flink.util.OptionalConsumer.ifPresent(OptionalConsumer.java:45)\n\tat org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:80)\n\tat org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:49)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHa"},{"type":"org.apache.flink.runtime.rest.util.RestClientException","message":"[Internal server error., <Exception on server side:\njava.lang.RuntimeException: Could not look up the main(String[]) method from the class org.apache.flink.streaming.examples.windowing.TopSpeedWindowing: org/apache/flink/api/connector/sink2/Sink\n\tat org.apache.flink.client.program.PackagedProgram.hasMainMethod(PackagedProgram.java:315)\n\tat org.apache.flink.client.program.PackagedProgram.<init>(PackagedProgram.java:161)\n\tat org.apache.flink.client.program.PackagedProgram.<init>(PackagedProgram.java:65)\n\tat org.apache.flink.client.program.PackagedProgram$Builder.build(PackagedProgram.java:691)\n\tat org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toPackagedProgram(JarHandlerUtils.java:182)\n\tat org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.applyToConfiguration(JarHandlerUtils.java:141)\n\tat org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.handleRequest(JarRunHandler.java:97)\n\tat org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.handleRequest(JarRunHandler.java:54)\n\tat org.apache.flink.runtime.rest.handler.AbstractRestHandler.respondToRequest(AbstractRestHandler.java:83)\n\tat org.apache.flink.runtime.rest.handler.AbstractHandler.respondAsLeader(AbstractHandler.java:195)\n\tat org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.lambda$channelRead0$0(LeaderRetrievalHandler.java:83)\n\tat java.util.Optional.ifPresent(Optional.java:159)\n\tat org.apache.flink.util.OptionalConsumer.ifPresent(OptionalConsumer.java:45)\n\tat org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:80)\n\tat org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:49)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)\n\tat org.apache.flink.shaded.netty","additionalMetadata":{"httpResponseCode":500}}]}), reconciliationStatus=FlinkSessionJobReconciliationStatus(super=ReconciliationStatus(reconciliationTimestamp=1666815910154, lastReconciledSpec={"spec":{"job":{"jarURI":"https://repo1.maven.org/maven2/org/apache/flink/flink-examples-streaming_2.12/1.15.1/flink-examples-streaming_2.12-1.15.1-TopSpeedWindowing.jar","parallelism":4,"entryClass":null,"args":[],"state":"suspended","savepointTriggerNonce":null,"initialSavepointPath":null,"upgradeMode":"stateless","allowNonRestoredState":null},"restartNonce":null,"flinkConfiguration":null,"deploymentName":"basic-session-deployment-example"},"resource_metadata":{"apiVersion":"flink.apache.org/v1beta1","metadata":{"generation":1},"firstDeployment":true}}, lastStableSpec=null, state=UPGRADING)))}
deserialized flinkSessionJob error: {"type":"org.apache.flink.kubernetes.operator.exception.ReconciliationException","message":"org.apache.flink.util.FlinkRuntimeException: java.util.concurrent.ExecutionException: org.apache.flink.runtime.rest.util.RestClientException: [Internal server error., <Exception on server side:\njava.lang.RuntimeException: Could not look up the main(String[]) method from the class org.apache.flink.streaming.examples.windowing.TopSpeedWindowing: org/apache/flink/api/connector/sink2/Sink\n\tat org.apache.flink.client.program.PackagedProgram.hasMainMethod(PackagedProgram.java:315)\n\tat org.apache.flink.client.program.PackagedProgram.<init>(PackagedProgram.java:161)\n\tat org.apache.flink.client.program.PackagedProgram.<init>(PackagedProgram.java:65)\n\tat org.apache.flink.client.program.PackagedProgram$Builder.build(PackagedProgram.java:691)\n\tat org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toPackagedProgram(JarHandlerUtils.java:182)\n\tat org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.applyToConfiguration(JarHandlerUtils.java:141)\n\tat org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.handleRequest(JarRunHandler.java:97)\n\tat org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.handleRequest(JarRunHandler.java:54)\n\tat org.apache.flink.runtime.rest.handler.AbstractRestHandler.respondToRequest(AbstractRestHandler.java:83)\n\tat org.apache.flink.runtime.rest.handler.AbstractHandler.respondAsLeader(AbstractHandler.java:195)\n\tat org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.lambda$channelRead0$0(LeaderRetrievalHandler.java:83)\n\tat java.util.Optional.ifPresent(Optional.java:159)\n\tat org.apache.flink.util.OptionalConsumer.ifPresent(OptionalConsumer.java:45)\n\tat org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:80)\n\tat org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:49)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)\n\tat org.apache.flink.shaded.net","stackTraceElements":"org.apache.flink.kubernetes.operator.exception.ReconciliationException: org.apache.flink.util.FlinkRuntimeException: java.util.concurrent.ExecutionException: org.apache.flink.runtime.rest.util.RestClientException: [Internal server error., <Exception on server side:\njava.lang.RuntimeException: Could not look up the main(String[]) method from the class org.apache.flink.streaming.examples.windowing.TopSpeedWindowing: org/apache/flink/api/connector/sink2/Sink\n\tat org.apache.flink.client.program.PackagedProgram.hasMainMethod(PackagedProgram.java:315)\n\tat org.apache.flink.client.program.PackagedProgram.<init>(PackagedProgram.java:161)\n\tat org.apache.flink.client.program.PackagedProgram.<init>(PackagedProgram.java:65)\n\tat org.apache.flink.client.program.PackagedProgram$Builder.build(PackagedProgram.java:691)\n\tat org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toPackagedProgram(JarHandlerUtils.java:182)\n\tat org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.applyToConfiguration(JarHandlerUtils.java:141)\n\tat org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.handleRequest(JarRunHandler.java:97)\n\tat org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.handleRequest(JarRunHandler.java:54)\n\tat org.apache.flink.runtime.rest.handler.AbstractRestHandler.respondToRequest(AbstractRestHandler.java:83)\n\tat org.apache.flink.runtime.rest.handler.AbstractHandler.respondAsLeader(AbstractHandler.java:195)\n\tat org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.lambda$channelRead0$0(LeaderRetrievalHandler.java:83)\n\tat java.util.Optional.ifPresent(Optional.java:159)\n\tat org.apache.flink.util.OptionalConsumer.ifPresent(OptionalConsumer.java:45)\n\tat org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:80)\n\tat org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:49)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelR","throwableList":[{"type":"org.apache.flink.util.FlinkRuntimeException","message":"java.util.concurrent.ExecutionException: org.apache.flink.runtime.rest.util.RestClientException: [Internal server error., <Exception on server side:\njava.lang.RuntimeException: Could not look up the main(String[]) method from the class org.apache.flink.streaming.examples.windowing.TopSpeedWindowing: org/apache/flink/api/connector/sink2/Sink\n\tat org.apache.flink.client.program.PackagedProgram.hasMainMethod(PackagedProgram.java:315)\n\tat org.apache.flink.client.program.PackagedProgram.<init>(PackagedProgram.java:161)\n\tat org.apache.flink.client.program.PackagedProgram.<init>(PackagedProgram.java:65)\n\tat org.apache.flink.client.program.PackagedProgram$Builder.build(PackagedProgram.java:691)\n\tat org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toPackagedProgram(JarHandlerUtils.java:182)\n\tat org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.applyToConfiguration(JarHandlerUtils.java:141)\n\tat org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.handleRequest(JarRunHandler.java:97)\n\tat org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.handleRequest(JarRunHandler.java:54)\n\tat org.apache.flink.runtime.rest.handler.AbstractRestHandler.respondToRequest(AbstractRestHandler.java:83)\n\tat org.apache.flink.runtime.rest.handler.AbstractHandler.respondAsLeader(AbstractHandler.java:195)\n\tat org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.lambda$channelRead0$0(LeaderRetrievalHandler.java:83)\n\tat java.util.Optional.ifPresent(Optional.java:159)\n\tat org.apache.flink.util.OptionalConsumer.ifPresent(OptionalConsumer.java:45)\n\tat org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:80)\n\tat org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:49)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerCo"},{"type":"java.util.concurrent.ExecutionException","message":"org.apache.flink.runtime.rest.util.RestClientException: [Internal server error., <Exception on server side:\njava.lang.RuntimeException: Could not look up the main(String[]) method from the class org.apache.flink.streaming.examples.windowing.TopSpeedWindowing: org/apache/flink/api/connector/sink2/Sink\n\tat org.apache.flink.client.program.PackagedProgram.hasMainMethod(PackagedProgram.java:315)\n\tat org.apache.flink.client.program.PackagedProgram.<init>(PackagedProgram.java:161)\n\tat org.apache.flink.client.program.PackagedProgram.<init>(PackagedProgram.java:65)\n\tat org.apache.flink.client.program.PackagedProgram$Builder.build(PackagedProgram.java:691)\n\tat org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toPackagedProgram(JarHandlerUtils.java:182)\n\tat org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.applyToConfiguration(JarHandlerUtils.java:141)\n\tat org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.handleRequest(JarRunHandler.java:97)\n\tat org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.handleRequest(JarRunHandler.java:54)\n\tat org.apache.flink.runtime.rest.handler.AbstractRestHandler.respondToRequest(AbstractRestHandler.java:83)\n\tat org.apache.flink.runtime.rest.handler.AbstractHandler.respondAsLeader(AbstractHandler.java:195)\n\tat org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.lambda$channelRead0$0(LeaderRetrievalHandler.java:83)\n\tat java.util.Optional.ifPresent(Optional.java:159)\n\tat org.apache.flink.util.OptionalConsumer.ifPresent(OptionalConsumer.java:45)\n\tat org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:80)\n\tat org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:49)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHa"},{"type":"org.apache.flink.runtime.rest.util.RestClientException","message":"[Internal server error., <Exception on server side:\njava.lang.RuntimeException: Could not look up the main(String[]) method from the class org.apache.flink.streaming.examples.windowing.TopSpeedWindowing: org/apache/flink/api/connector/sink2/Sink\n\tat org.apache.flink.client.program.PackagedProgram.hasMainMethod(PackagedProgram.java:315)\n\tat org.apache.flink.client.program.PackagedProgram.<init>(PackagedProgram.java:161)\n\tat org.apache.flink.client.program.PackagedProgram.<init>(PackagedProgram.java:65)\n\tat org.apache.flink.client.program.PackagedProgram$Builder.build(PackagedProgram.java:691)\n\tat org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toPackagedProgram(JarHandlerUtils.java:182)\n\tat org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.applyToConfiguration(JarHandlerUtils.java:141)\n\tat org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.handleRequest(JarRunHandler.java:97)\n\tat org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.handleRequest(JarRunHandler.java:54)\n\tat org.apache.flink.runtime.rest.handler.AbstractRestHandler.respondToRequest(AbstractRestHandler.java:83)\n\tat org.apache.flink.runtime.rest.handler.AbstractHandler.respondAsLeader(AbstractHandler.java:195)\n\tat org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.lambda$channelRead0$0(LeaderRetrievalHandler.java:83)\n\tat java.util.Optional.ifPresent(Optional.java:159)\n\tat org.apache.flink.util.OptionalConsumer.ifPresent(OptionalConsumer.java:45)\n\tat org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:80)\n\tat org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:49)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)\n\tat org.apache.flink.shaded.netty","additionalMetadata":{"httpResponseCode":500}}]}

I think having the multiline yaml helps with readability of the error field as long as it's able to deserialize correctly as what you have mentioned.

@darenwkt
Copy link
Contributor Author

Hi @darenwkt ideally we should be able to control the stack-trace per CR:

  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
    kubernetes.operator.exception.stacktrace.enabled: "true"

With the current approach this is an operator wide config only. @gyfora wdyt?

I am not completely sure about this. It might be better to let operator admins decide on this setting, at least the size limits for sure.

In any case there is some inconsitency in the current implementation: If something is put in the KubernetesOperatorConfigurations the ConfigOption should have the annotation SECTION_SYSTEM not dynamic (those are for options that can dynamically be overridden from the user flink config as Matyas suggested).

@gyfora @morhidi

Thanks for confirming, I will push a new revision with the config placed in SYSTEM section.

@gyfora
Copy link
Contributor

gyfora commented Oct 27, 2022

Great work @darenwkt !

@morhidi
Copy link
Contributor

morhidi commented Oct 27, 2022

Agree @darenwkt, great job! @gyfora not following the CI stability, but when you feel it's safe we can merge this.

@gyfora gyfora merged commit 3c63653 into apache:main Oct 27, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
3 participants