Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-10935][kubernetes]Implement KubeClient with Faric8 Kubernetes clients #9965

Closed
wants to merge 3 commits into from

Conversation

wangyang0918
Copy link
Contributor

@wangyang0918 wangyang0918 commented Oct 22, 2019

What is the purpose of the change

This PR is part of FLINK-9953(Active Kubernetes integration). This PR introduces the FlinkKubeClient API and adds fabric8 implementation. The following k8s resource decorators are added to support for basic k8s integration.

  • Service, internal service is used for tm->jm hearbeat, rest service is used for flink client to submit job.
  • ConfigMap, save the flink-conf.yaml and log4j config files. It will be mounted into jm/tm pod.
  • Deployment, for job manager. When it crashes exceptionally, a new one will be started.
  • Pod, for task manager. It will created by KubernetesResourceManager.
  • OwnerReference, all resources are set owner reference to internal service. When the internal service is deleted, they will be deleted by gc.

This PR is based on #9695.

Brief change log

  • Add config options and cli options for kubernetes
  • Add FlinkKubeClient API and basic kubernetes resources
  • Add fabric8 kubeClient implementation and decorators
  • Add tests for fabric8 FlinkKubeClient implementation

Verifying this change

This change added Fabric8ClientTest public api implementation of FlinkKubeClient.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (no)
  • If yes, how is the feature documented? (not applicable)

@flinkbot
Copy link
Collaborator

flinkbot commented Oct 22, 2019

Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
to review your pull request. We will use this comment to track the progress of the review.

Automated Checks

Last check on commit 685652d (Wed Dec 04 16:43:19 UTC 2019)

Warnings:

  • 1 pom.xml files were touched: Check for build and licensing issues.
  • No documentation files were touched! Remember to keep the Flink docs up to date!

Mention the bot in a comment to re-run the automated checks.

Review Progress

  • ❓ 1. The [description] looks good.
  • ❓ 2. There is [consensus] that the contribution should go into to Flink.
  • ❓ 3. Needs [attention] from.
  • ❓ 4. The change fits into the overall [architecture].
  • ❓ 5. Overall code [quality] is good.

Please see the Pull Request Review Guide for a full explanation of the review process.


The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands
The @flinkbot bot supports the following commands:

  • @flinkbot approve description to approve one or more aspects (aspects: description, consensus, architecture and quality)
  • @flinkbot approve all to approve all aspects
  • @flinkbot approve-until architecture to approve everything until architecture
  • @flinkbot attention @username1 [@username2 ..] to require somebody's attention
  • @flinkbot disapprove architecture to remove an approval you gave earlier

@wangyang0918 wangyang0918 changed the title Flink 10935 [FLINK-10935][kubernetes]Implement KubeClient with Faric8 Kubernetes clients Oct 22, 2019
@flinkbot
Copy link
Collaborator

flinkbot commented Oct 22, 2019

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run travis re-run the last Travis build

@wangyang0918
Copy link
Contributor Author

I have addressed @MalcolmSanders 's comments and updated the PR.

Copy link
Contributor

@KarmaGYZ KarmaGYZ left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for open this PR, @wangyang0918 !
Good to see the K8S support in flink. Some comment left.

BTW, I think we could find a better way to deliver relevant configuration. Now, too many classes hold the flinkConfig.

Copy link
Member

@tisonkun tisonkun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your working @wangyang0918 ! I left some comments.

/**
* Create kubernetes config map, include flink-conf.yaml, log4j.properties.
*/
void createConfigMap() throws Exception;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I notice that some of "create" functions are blocking while others are non-blocking. What is the different between them and how is the decision made?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All the create operation is asynchronous, including createConfigMap createInternalService createFlinkMasterDeployment createTaskManagerPod. We just add a CompletableFuture for creating service because we need to make sure the service are created successfully. Then we could get the uuid of internal service and use the rest service to submit job.

*/
public class ActionWatcher<T extends HasMetadata> implements Watcher<T> {
private final CountDownLatch latch = new CountDownLatch(1);
private final AtomicReference<T> reference = new AtomicReference();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see any atomic actions like compare-and-set or get-and-set are in used. Why do you instance an AtomicReference here? Is volatile sufficient?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I use AtomicReference just to make sure the atomic operation of set and get from multiple threads. Also volatile is enough here for object reference. AtomicReference is more clear, if you insist, i will use volatile instead.

private final CountDownLatch latch = new CountDownLatch(1);
private final AtomicReference<T> reference = new AtomicReference();
private final T resource;
private Action expectedAction;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It could be final IMO.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will fix it.

*/
public abstract class Resource<T> {

protected T internalResource;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

given we have getter for internalResource and flinkConfig they could be private final; just notice their setters are not into used at all.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is code

subclass extends Resource {
  super(flinkConfig);
  this.internalResource = internalResource;
}

which could be

  super(flinkConfig, internalResource);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice suggestion. I will update the Resource and implementation class.

The setInternalResource will be kept. It will be used by Decorator. Extract real resource from resource, decorate and put it back.


private final KubernetesClient internalClient;

private List<Decorator<ConfigMap, FlinkConfigMap>> configMapDecorators;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd tend to mark these lists as final when possible

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree. Final will be better.

/**
* Get the rest endpoints for access outside cluster.
*/
Endpoint getRestEndpoints(String clusterId);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I notice that it could be Nullable in Faric8 implementation. The fact that it is nullable and the meaning of null should be documented.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will add return description for getRestEndpoints and other methods may return null.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe getRestEndpoint since only one Endpoint would be returned

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sense. I will fix it.

/**
* Get the kubernetes internal service of the given flink clusterId.
*/
FlinkService getInternalService(String clusterId);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I notice that it could be Nullable in Faric8 implementation. The fact that it is nullable and the meaning of null should be documented.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will fix it.

try {
config = Config.fromKubeconfig(KubernetesUtils.getContentFromFile(kubeConfigFile));
} catch (IOException e) {
LOG.error("Load kubernetes config failed.", e);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't throw exception here so new DefaultKubernetesClient(config); could be new DefaultKubernetesClient(null);. What is the semantic there then? It looks weird we log error but control flow continues.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An exception should be thrown if we get the content from kube config file error. I will fix it.

@tisonkun
Copy link
Member

also you could rebase the branch on current master especially on merged FLINK-10932

@wangyang0918
Copy link
Contributor Author

@tisonkun Really thanks for your detailed review. I have rebase the master and addressed all your comments. Please take a look.

Copy link
Contributor

@zhuzhurk zhuzhurk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for open this PR @wangyang0918 .
I have a few comments for the configs and will later go through the remaining parts.

/**
* Get the rest endpoints for access outside cluster.
*/
Endpoint getRestEndpoints(String clusterId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe getRestEndpoint since only one Endpoint would be returned

@wangyang0918
Copy link
Contributor Author

@zhuzhurk Thanks a lot for your detailed review. I will address your comments one by one.

@wangyang0918
Copy link
Contributor Author

@tisonkun All the comments from you and @zhuzhurk have been addressed. Could you please take a look again and help to merge.

Copy link
Contributor

@zhuzhurk zhuzhurk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for addressing all the comments @wangyang0918 .
The change looks good to me except for some more minor comments.

@wangyang0918
Copy link
Contributor Author

@zhuzhurk I have addressed all your comments. Please take a look.

@zhuzhurk
Copy link
Contributor

zhuzhurk commented Dec 3, 2019

LGTM. +1 to merge it.

@wangyang0918
Copy link
Contributor Author

@tisonkun Please have a final check and help to merge. Really thanks for your kind help. -:)

"to the number of slots per TaskManager");

public static final ConfigOption<String> CONTAINER_IMAGE_PULL_POLICY =
key("kubernetes.container.image.pullPolicy")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for consistency, "pull-policy"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will fix it.

…rnetes.

All the basic and necessary config options has been added. Such as, image name, service account, taskmanager resources, etc.
…s resources.

The basic resources for active Kubernetes integration include ConfigMap, Deployment, Service, Pod.
…ecorators.

The Fabric8FlinkKubeClient will be used to interact with Kubernetes API server to create/delete resources and get the status. The decorators will be used to build the complete Kubernetes resource description.
Copy link
Member

@tisonkun tisonkun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your contribution @wangyang0918 . Merging now...I'll fix two typo as listed below. Be care to modify your following PRs.

.desc(KubernetesConfigOptions.CONTAINER_IMAGE.description().toString())
.build();

public static final Option JOB_MANAGER_MEMEORY_OPTION = Option.builder("jm")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public static final Option JOB_MANAGER_MEMEORY_OPTION = Option.builder("jm")
public static final Option JOB_MANAGER_MEMORY_OPTION = Option.builder("jm")

typo. I will fix on merging.

.desc("Memory for JobManager Container with optional unit (default: MB)")
.build();

public static final Option TASK_MANAGER_MEMEORY_OPTION = Option.builder("tm")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public static final Option TASK_MANAGER_MEMEORY_OPTION = Option.builder("tm")
public static final Option TASK_MANAGER_MEMORY_OPTION = Option.builder("tm")

typo. I will fix on merging.

@tisonkun tisonkun closed this in f49e632 Dec 5, 2019
@wangyang0918 wangyang0918 deleted the FLINK-10935 branch December 5, 2019 05:52
}

@Override
public void onClose(KubernetesClientException e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is it ok to simply ignore the onClose event? Can't it happen that there is a problem (e.g. e != null)? Shouldn't we then signal this to potential waiters who have called await @wangyang0918?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
8 participants