Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-20944][k8s] Do not resolve the rest endpoint address when the service exposed type is ClusterIP #14692

Closed

Conversation

wangyang0918
Copy link
Contributor

@wangyang0918 wangyang0918 commented Jan 19, 2021

What is the purpose of the change

If the kubernetes.rest-service.exposed.type is ClusterIP, then we do not need to resolve the rest endpoint address(aka namespaced service name). Otherwise, we will always get a UnknownHostException when deploying a Flink application outside of the K8s cluster.

Brief change log

  • Do not resolve the rest endpoint address when the service exposed type is ClusterIP

Verifying this change

  • Add a dedicated unit test testDeployApplicationClusterWithClusterIP, which should fail before with UnknownHostException this change and pass after this PR
  • Manual test on a minikube

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes / no)
  • The serializers: (yes / no / don't know)
  • The runtime per-record code paths (performance sensitive): (yes / no / don't know)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (yes / no / don't know)
  • The S3 file system connector: (yes / no / don't know)

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)

@flinkbot
Copy link
Collaborator

Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
to review your pull request. We will use this comment to track the progress of the review.

Automated Checks

Last check on commit 48ef644 (Tue Jan 19 07:33:12 UTC 2021)

Warnings:

  • No documentation files were touched! Remember to keep the Flink docs up to date!
  • This pull request references an unassigned Jira ticket. According to the code contribution guide, tickets need to be assigned before starting with the implementation work.

Mention the bot in a comment to re-run the automated checks.

Review Progress

  • ❓ 1. The [description] looks good.
  • ❓ 2. There is [consensus] that the contribution should go into to Flink.
  • ❓ 3. Needs [attention] from.
  • ❓ 4. The change fits into the overall [architecture].
  • ❓ 5. Overall code [quality] is good.

Please see the Pull Request Review Guide for a full explanation of the review process.


The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands
The @flinkbot bot supports the following commands:

  • @flinkbot approve description to approve one or more aspects (aspects: description, consensus, architecture and quality)
  • @flinkbot approve all to approve all aspects
  • @flinkbot approve-until architecture to approve everything until architecture
  • @flinkbot attention @username1 [@username2 ..] to require somebody's attention
  • @flinkbot disapprove architecture to remove an approval you gave earlier

@flinkbot
Copy link
Collaborator

flinkbot commented Jan 19, 2021

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run travis re-run the last Travis build
  • @flinkbot run azure re-run the last Azure build

Copy link
Contributor

@tillrohrmann tillrohrmann left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for creating this PR @wangyang0918. I had a question concerning the change. How does the client communicates with the cluster if ClusterIP is chosen? I think you mentioned that we are using a namespaced service for it. Can't the RestClusterClient be directly initialized with this service's address instead of trying to resolve the web monitor address?

Comment on lines 124 to 137
private String getWebMonitorAddress(Configuration configuration) throws Exception {
HighAvailabilityServicesUtils.AddressResolution resolution =
HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION;
if (configuration.get(KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE)
== KubernetesConfigOptions.ServiceExposedType.ClusterIP) {
resolution = HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION;
}
return HighAvailabilityServicesUtils.getWebMonitorAddress(configuration, resolution);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of suppressing the web monitor address resolution, can't we tell the RestClusterClient the address to the namespaced service? I think you mentioned in the ticket that we will communicate with the cluster through this service if we have chose ClusterIP. It just feels wrong that we still retrieve the web monitor's address even though we don't need it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Flink client communicates with the cluster via namespaced service if ClusterIP is chosen.

I assume you mean directly return a RestClusterClient using the namespaced service(aka restEndpoint.get().getAddress()). After then, we also need to check whether the ssl is enabled and add http/https protocol. I think it is what we have done in HighAvailabilityServicesUtils.getWebMonitorAddress.

Moreover, I do not think we are retrieving the web monitor's address. It is more like to construct the address in a specific schema(aka protocol://address:port). The retrieval process has already been done in the flinkKubeClient.getRestEndpoint.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the K8s case, from where exactly do we retrieve the address of the service? If I understood you correctly, then RestOptions.ADDRESS contains some address which is not resolvable from the outside. Hence, I am wondering why we should try to construct the web monitor address from this configuration at all.

Copy link
Contributor Author

@wangyang0918 wangyang0918 Jan 19, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Flink application submission could happen in or out of the K8s cluster. The reason why we set the RestOptions.ADDRESS to the namespaced service is that it could be directly used in the K8s cluster. However, when the submission happens out of the K8s cluster, the namespaced service could not be used to contact with the cluster.

In such situation, users usually need to create an ingress for the communication.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assuming the submission happens outside of the K8s cluster with ClusterIP configured, does this mean that the RestClusterClient cannot talk to the cluster? If yes, then there is not really a need for creating it, does it? But then there is the problem that we could also deploy the cluster from within the K8s cluster where the RestClusterClient can talk to the Flink cluster, right?

Should we maybe say that user's have to submit the cluster in detached mode if they cannot connect to it? The problem with the current solution is that we might start an attached per job cluster with which we cannot talk. Hence, we cannot request the final job result either and consequently, the cluster will never shut down. So maybe we should fail with an better exception message, what do you think?

@wangyang0918 wangyang0918 force-pushed the FLINK-20944-cluster-ip branch 2 times, most recently from f65e390 to 5f74749 Compare January 19, 2021 11:09
@wangyang0918
Copy link
Contributor Author

After a little more consideration, maybe we always do not need to resolve the rest endpoint address.
When the service exposed type is NodePort/LoadBalancer, the rest endpoint address is usually a ip address, not a hostname. For ClusterIP, we also do not need to resolve the address.

I have updated the PR based on this.

@tillrohrmann
Copy link
Contributor

The idea of the address resolution was to check that the client can actually talk to the cluster and to fail early if this is not the case. Hence, I am not sure whether we should change this behaviour because it can lead to the following problem: Let's assume we deploy a per-job cluster with ClusterIP. Then it is not possible for the client to talk to the cluster and, consequently, we will never fetch the final result of the job which is required for shutting the cluster down. Hence, the user should either deploy the cluster in detached mode or from within the K8s cluster. Everything else should fail.

@fsw0422
Copy link

fsw0422 commented Jan 19, 2021

@tillrohrmann in alot of cases, organizations are reluctant to open up NodePorts for security issues, instead exposing a secured ingress where all the external systems talk to. Is it possible if we can specifically in document that if ClusterIP is chosen, the cluster admin should connect the service from their own ingress?

@wangyang0918
Copy link
Contributor Author

@tillrohrmann I have the same concern before implement this PR. But now I think this change makes sense since Flink client will not retrieve the result in the application mode. BTW the per-job mode is not supported for native K8s integration and we do not have a plan to support per-job mode.

The benefit of not resolving the rest endpoint address is we could deploy the Flink session/application cluster with ClusterIP out of the K8s cluster.

The limitation is that flink list/cancel could not be used to contact with the cluster. It will fail with TimeoutException instead of UnknownHostException .

@tillrohrmann
Copy link
Contributor

You are right with the per-job-mode and K8s @wangyang0918. Hence, this was a bad example ;-) But the example of canceling the job/application is actually a good one. If we know that we cannot talk to the cluster, then we probably shouldn't create a RestClusterClient in the first place because it might fail with some weird exceptions (e.g. TimeoutException) later on. Instead the user should probably be told that he should submit his application using the detached mode when using the ClusterIP setting.

For what do we actually need the RestClusterClient when using the application mode? Maybe @kl0u or @aljoscha could help us answering this question.

@wangyang0918
Copy link
Contributor Author

wangyang0918 commented Jan 19, 2021

Both the detached/attached mode for application, the submission process does not need to create a RestClusterClient. Currently, we only use the RestClusterClient to print the web monitor address in KubernetesClusterDescriptor.

The problem is that Flink client does not know whether it is running in or out of the K8s cluster. So it is hard to not create the RestClusterClient and tell the users that they should not use the flink cancel/list out of the K8s cluster when using ClusterIP setting. Maybe we could add a log to remind users that the rest endpoint(e.g. webui, cancel, list, etc.) could only be used in the K8s cluster when service is exposed with ClusterIP.

@wangyang0918 wangyang0918 changed the title [FLINK-20944][k8s] Do not resolve the rest endpoint address when the service exposed type is ClusterIP [FLINK-20944][k8s] Do not resolve the rest endpoint address when creating the RestClusterClient Jan 20, 2021
@tillrohrmann
Copy link
Contributor

Maybe we could add a log to remind users that the rest endpoint(e.g. webui, cancel, list, etc.) could only be used in the K8s cluster when service is exposed with ClusterIP.

You mean when not using ClusterIP, right?

I think it is ok that Flink does not know where it runs (outside or inside the K8s cluster). What should matter is whether we have to be able to talk to the cluster or not. If we have to talk to the cluster, then we have to be able to create a ClusterClient and should fail if we cannot connect to it (e.g. not being able to resolve the address).

Would it make sense to say that we don't create a ClusterClient if we submit the cluster in detached mode? At the moment, this is not the case but it might make things a bit clearer.

Another question, why does the KubernetesClusterDescriptor.deployApplicationCluster log the web interface URL? This looks not right because I would expect this kind of behaviour to happen for all ClusterDescriptor if at all. The YarnClusterDescriptor does not do this.

@wangyang0918
Copy link
Contributor Author

wangyang0918 commented Jan 20, 2021

You mean when not using ClusterIP, right?

No, I mean to add a log to remind users rest endpoint could only be used in the K8s cluster when using ClusterIP. Let's forget it and try to make us on the same page.

Would it make sense to say that we don't create a ClusterClient if we submit the cluster in detached mode? At the moment, this is not the case but it might make things a bit clearer.

I agree with you that Flink client should fail if we could not connect to the cluster(e.g. not being able to resolve the address).
But for both the detached and attached application mode, Flink client does not need to talk to the cluster. Because the submission process actually happen on the JobManager side. The Flink client just needs to tell the K8s to create the resources.
When running flink list/cancel/savepoint, we need to talk to the cluster. In such case, we should let the Flink client fails if we could not talk to the cluster.

Another question, why does the KubernetesClusterDescriptor.deployApplicationCluster log the web interface URL?

Printing the web interface URL will help the users to quickly access the dashboard. I think in Yarn deployment, we have a similar log.

2021-01-20 17:54:31,270 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Found Web Interface host11.xxx:64114 of application 'application_1602580065114_1123'.

All in all, do you think the following suggestion makes sense?

  • We still create the RestClusterClient when deploying application/session and retrieving. For deploying, it is only used to print the web interface URL. For retrieving, it will be used to talk to the cluster.
  • Based on above, we will use NO_ADDRESS_RESOLUTION for deploying and TRY_ADDRESS_RESOLUTION for retrieving.

Or you insist on not creating the RestClusterClient for deploying and creating for retrieving. After then, we will not print the web interface URL in the logs.

@tillrohrmann
Copy link
Contributor

tillrohrmann commented Jan 20, 2021

  • We still create the RestClusterClient when deploying application/session and retrieving. For deploying, it is only used to print the web interface URL. For retrieving, it will be used to talk to the cluster.
  • Based on above, we will use NO_ADDRESS_RESOLUTION for deploying and TRY_ADDRESS_RESOLUTION for retrieving.

I don't think that this would work w/o other changes because when you deploy an attached Yarn per-job cluster, then the client will be used to query the job result which requires access to the cluster. What one maybe needs to do is to separate deploy and retrieve better in the sense that deploy only deploys the cluster and maybe gives a handle to shut down the cluster using a native client (e.g. YarnClient or KubeClient) and then retrieve which tries to create the RestClusterClient for a cluster.

Don't get me wrong here. I think your solution works as a quick fix and maybe that is what we should do. But I think that this problem shows that there is more of a conceptual problem with the overall design because we need special case logic for ClusterIP.

@tillrohrmann
Copy link
Contributor

Given that this change is a bigger change, I would be in favour of applying your initial solution: Do not resolve address if ClusterIP is chosen and log a warning that client operation won't work from outside the K8s cluster.

@wangyang0918
Copy link
Contributor Author

We still create the RestClusterClient when deploying application/session and retrieving. For deploying, it is only used to print the web interface URL. For retrieving, it will be used to talk to the cluster.
Based on above, we will use NO_ADDRESS_RESOLUTION for deploying and TRY_ADDRESS_RESOLUTION for retrieving.

Hmm. Maybe I do not make myself clear. The above solution is only for Kubernetes. For Yarn, we indeed have the issue for per-job cluster. Just like you said, on a high level scope, I think we do not have a very good abstraction for all ClusterDescriptor now, when we need to create a RestClusterClient or not. As well as when and how to print the web interface URL.

I will update the PR to the initial solution.

@wangyang0918 wangyang0918 changed the title [FLINK-20944][k8s] Do not resolve the rest endpoint address when creating the RestClusterClient [FLINK-20944][k8s] Do not resolve the rest endpoint address when the service exposed type is ClusterIP Jan 20, 2021
@wangyang0918 wangyang0918 force-pushed the FLINK-20944-cluster-ip branch 2 times, most recently from 08992db to df11e96 Compare January 20, 2021 15:22
@wangyang0918
Copy link
Contributor Author

cc @tillrohrmann I have updated this PR.

After this change, we could start a Flink application/session cluster on K8s native inside/outside the K8s cluster when using ClusterIP. And we will have the following logs in client side.

... ...
2021-01-20 23:10:21,045 WARN  org.apache.flink.kubernetes.KubernetesClusterDescriptor      [] - Please note that Flink client operation(e.g. cancel, list, stop, savepoint, etc.) won't work from outside the Kubernetes cluster.
2021-01-20 23:10:21,338 INFO  org.apache.flink.kubernetes.KubernetesClusterDescriptor      [] - Create flink application cluster k8s-ha-app-1 successfully, JobManager Web Interface: http://k8s-ha-app-1-rest.default:8081

Copy link
Contributor

@tillrohrmann tillrohrmann left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for updating this PR @wangyang0918. I had a last comment which would be great to resolve before we merge this PR.

Comment on lines 130 to 131
"Please note that Flink client operation(e.g. cancel, list, stop,"
+ " savepoint, etc.) won't work from outside the Kubernetes cluster.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add that this is due to having chosen ClusterIP for the KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This log only shows up when the having chosen ClusterIP. So I do not add this information here.

But I also think it is harmless to show more information.

@wangyang0918
Copy link
Contributor Author

Address the last comments. Now the log looks like following.

2021-01-21 11:18:57,847 WARN  org.apache.flink.kubernetes.KubernetesClusterDescriptor      [] - Please note that Flink client operations(e.g. cancel, list, stop, savepoint, etc.) won't work from outside the Kubernetes cluster since 'kubernetes.rest-service.exposed.type' has been set to ClusterIP.
2021-01-21 11:18:58,131 INFO  org.apache.flink.kubernetes.KubernetesClusterDescriptor      [] - Create flink application cluster k8s-ha-app-1 successfully, JobManager Web Interface: http://k8s-ha-app-1-rest.default:8081

tillrohrmann pushed a commit that referenced this pull request Jan 21, 2021
YuvalItzchakov pushed a commit to YuvalItzchakov/flink that referenced this pull request Jan 23, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants