Skip to content

Commit

Permalink
Feat: add cluster cli
Browse files Browse the repository at this point in the history
  • Loading branch information
Somefive committed Sep 12, 2021
1 parent 8b29fb6 commit 05ace1b
Show file tree
Hide file tree
Showing 4 changed files with 198 additions and 55 deletions.
Expand Up @@ -271,7 +271,7 @@ func (r *Reconciler) handleFinalizers(ctx context.Context, envBinding *v1alpha1.
return true, errors.WithMessage(err, "cannot remove finalizer")
}

if err := GarbageCollectionForAllResourceTrackersInSubCluster(ctx, r, envBinding); err != nil {
if err := GarbageCollectionForAllResourceTrackersInSubCluster(ctx, r.Client, envBinding); err != nil {
return true, err
}
meta.RemoveFinalizer(envBinding, resourceTrackerFinalizer)
Expand Down
10 changes: 5 additions & 5 deletions pkg/controller/core.oam.dev/v1alpha1/envbinding/gc.go
Expand Up @@ -39,8 +39,8 @@ func isEnvBindingPolicy(policy *unstructured.Unstructured) bool {
return policyKindAPIVersion == v1alpha1.EnvBindingKindAPIVersion
}

// GarbageCollectionForOutdatedResourceInSubClusters run garbage collection in sub clusters and remove outdated ResourceTrackers with their associated resources
func GarbageCollectionForOutdatedResourceInSubClusters(ctx context.Context, c client.Client, policies []*unstructured.Unstructured, gcHandler func(context.Context) error) error {
// GarbageCollectionForOutdatedResourcesInSubClusters run garbage collection in sub clusters and remove outdated ResourceTrackers with their associated resources
func GarbageCollectionForOutdatedResourcesInSubClusters(ctx context.Context, c client.Client, policies []*unstructured.Unstructured, gcHandler func(context.Context) error) error {
subClusters := make(map[string]bool)
for _, raw := range policies {
if !isEnvBindingPolicy(raw) {
Expand Down Expand Up @@ -73,7 +73,7 @@ func GarbageCollectionForOutdatedResourceInSubClusters(ctx context.Context, c cl
func GarbageCollectionForAllResourceTrackersInSubCluster(ctx context.Context, c client.Client, envBinding *v1alpha1.EnvBinding) error {
baseApp, err := util.RawExtension2Application(envBinding.Spec.AppTemplate.RawExtension)
if err != nil {
klog.ErrorS(err, "Failed to parse AppTemplate of EnvBinding")
klog.ErrorS(err, "failed to parse AppTemplate of EnvBinding")
return errors.WithMessage(err, "cannot remove finalizer")
}
// delete subCluster resourceTracker
Expand All @@ -86,12 +86,12 @@ func GarbageCollectionForAllResourceTrackersInSubCluster(ctx context.Context, c
}}
rtList := &v1beta1.ResourceTrackerList{}
if err := c.List(subCtx, rtList, listOpts...); err != nil {
klog.ErrorS(err, "Failed to list resource tracker of app", "name", baseApp.Name, "env", decision.Env)
klog.ErrorS(err, "failed to list resource tracker of app", "name", baseApp.Name, "env", decision.Env)
return errors.WithMessage(err, "cannot remove finalizer")
}
for _, rt := range rtList.Items {
if err := c.Delete(subCtx, rt.DeepCopy()); err != nil && !kerrors.IsNotFound(err) {
klog.ErrorS(err, "Failed to delete resource tracker", "name", rt.Name)
klog.ErrorS(err, "failed to delete resource tracker", "name", rt.Name)
return errors.WithMessage(err, "cannot remove finalizer")
}
}
Expand Down
Expand Up @@ -210,7 +210,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
if !wfStatus.Terminated {
_, err := handler.DispatchAndGC(ctx)
if err == nil {
err = envbinding.GarbageCollectionForOutdatedResourceInSubClusters(ctx, r.Client, policies, func(c context.Context) error {
err = envbinding.GarbageCollectionForOutdatedResourcesInSubClusters(ctx, r.Client, policies, func(c context.Context) error {
_, e := handler.DispatchAndGC(c)
return e
})
Expand Down
239 changes: 191 additions & 48 deletions references/cli/cluster.go
Expand Up @@ -19,24 +19,32 @@ package cli
import (
"context"
"fmt"
"strings"

"github.com/pkg/errors"
"github.com/spf13/cobra"
v1 "k8s.io/api/core/v1"
v13 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
errors2 "k8s.io/apimachinery/pkg/api/errors"
v12 "k8s.io/apimachinery/pkg/apis/meta/v1"
types2 "k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/clientcmd"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/oam-dev/kubevela/apis/core.oam.dev/v1alpha1"
"github.com/oam-dev/kubevela/apis/core.oam.dev/v1beta1"
"github.com/oam-dev/kubevela/apis/types"
"github.com/oam-dev/kubevela/pkg/multicluster"
"github.com/oam-dev/kubevela/pkg/oam"
"github.com/oam-dev/kubevela/pkg/utils/common"
errors3 "github.com/oam-dev/kubevela/pkg/utils/errors"
)

const (
// ClusterSecretLabelKey identifies the cluster secrets
ClusterSecretLabelKey = "cluster.core.oam.dev/cluster-credential"
// ClusterCredentialLabelKey identifies the cluster secrets
ClusterCredentialLabelKey = "cluster.core.oam.dev/cluster-credential"
// FlagClusterName specifies the cluster name
FlagClusterName = "name"
)

// ClusterCommandGroup create a group of cluster command
Expand All @@ -48,40 +56,52 @@ func ClusterCommandGroup(c common.Args) *cobra.Command {
Annotations: map[string]string{
types.TagCommandType: types.TypeSystem,
},
// check if cluster-gateway is ready
PersistentPreRunE: func(cmd *cobra.Command, args []string) error {
if c.Config == nil {
if err := c.SetConfig(); err != nil {
return errors.Wrapf(err, "failed to set config for k8s client")
}
}
c.Config.Wrap(multicluster.NewSecretModeMultiClusterRoundTripper)
c.Client = nil
k8sClient, err := c.GetClient()
if err != nil {
return errors.Wrapf(err, "failed to get k8s client")
}
svc, err := multicluster.GetClusterGatewayService(context.Background(), k8sClient)
if err != nil {
return errors.Wrapf(err, "failed to get cluster secret namespace, please ensure cluster gateway is correctly deployed")
}
multicluster.ClusterGatewaySecretNamespace = svc.Namespace
return nil
},
}
_ = c.SetConfig() // set kubeConfig if possible, otherwise ignore it
cmd.AddCommand(
NewClusterListCommand(c),
NewClusterJoinCommand(c),
NewClusterListCommand(&c),
NewClusterJoinCommand(&c),
NewClusterRenameCommand(&c),
NewClusterDetachCommand(&c),
)
return cmd
}

// NewClusterListCommand create cluster list command
func NewClusterListCommand(c common.Args) *cobra.Command {
func NewClusterListCommand(c *common.Args) *cobra.Command {
cmd := &cobra.Command{
Use: "list",
Aliases: []string{"ls"},
Short: "list managed clusters",
Long: "list child clusters managed by KubeVela",
Args: cobra.ExactValidArgs(0),
RunE: func(cmd *cobra.Command, args []string) error {
k8sClient, err := c.GetClient()
if err != nil {
return errors.Wrapf(err, "failed to get k8s client")
}
svc, err := multicluster.GetClusterGatewayService(context.Background(), k8sClient)
if err != nil {
return errors.Wrapf(err, "failed to get cluster secret namespace, please ensure cluster gateway is correctly deployed")
}
namespace := svc.Namespace
secrets := v1.SecretList{}
if err := k8sClient.List(context.Background(), &secrets, client.HasLabels{ClusterSecretLabelKey}, client.InNamespace(namespace)); err != nil {
if err := c.Client.List(context.Background(), &secrets, client.HasLabels{ClusterCredentialLabelKey}, client.InNamespace(multicluster.ClusterGatewaySecretNamespace)); err != nil {
return errors.Wrapf(err, "failed to get cluster secrets")
}
table := newUITable().AddRow("CLUSTER", "ENDPOINT")
table := newUITable().AddRow("CLUSTER", "TYPE", "ENDPOINT")
for _, secret := range secrets.Items {
table.AddRow(secret.Name, string(secret.Data["endpoint"]))
table.AddRow(secret.Name, secret.GetLabels()[ClusterCredentialLabelKey], string(secret.Data["endpoint"]))
}
if len(table.Rows) == 1 {
cmd.Println("No managed cluster found.")
Expand All @@ -94,25 +114,52 @@ func NewClusterListCommand(c common.Args) *cobra.Command {
return cmd
}

func ensureClusterNotExists(c client.Client, clusterName string) error {
secret := &v1.Secret{}
err := c.Get(context.Background(), types2.NamespacedName{Name: clusterName, Namespace: multicluster.ClusterGatewaySecretNamespace}, secret)
if err == nil {
return fmt.Errorf("cluster %s already exists", clusterName)
}
if !errors2.IsNotFound(err) {
return errors.Wrapf(err, "failed to check duplicate cluster secret")
}
return nil
}

func ensureResourceTrackerCRDInstalled(c client.Client, clusterName string) error {
ctx := context.Background()
remoteCtx := multicluster.ContextWithClusterName(ctx, clusterName)
crdName := types2.NamespacedName{Name: "resourcetrackers." + v1beta1.Group}
if err := c.Get(remoteCtx, crdName, &v13.CustomResourceDefinition{}); err != nil {
if !errors2.IsNotFound(err) {
return errors.Wrapf(err, "failed to check resourcetracker crd in cluster %s", clusterName)
}
crd := &v13.CustomResourceDefinition{}
if err = c.Get(ctx, crdName, crd); err != nil {
return errors.Wrapf(err, "failed to get resourcetracker crd in hub cluster")
}
crd.ObjectMeta = v12.ObjectMeta{
Name: crdName.Name,
Annotations: crd.Annotations,
Labels: crd.Labels,
}
if err = c.Create(remoteCtx, crd); err != nil {
return errors.Wrapf(err, "failed to create resourcetracker crd in cluster %s", clusterName)
}
}
return nil
}

// NewClusterJoinCommand create command to help user join cluster to multicluster management
func NewClusterJoinCommand(c common.Args) *cobra.Command {
func NewClusterJoinCommand(c *common.Args) *cobra.Command {
cmd := &cobra.Command{
Use: "join [KUBECONFIG]",
Short: "join managed cluster",
Long: "join managed cluster by kubeconfig",
Example: "# Join cluster declared in my-child-cluster.kubeconfig\n" +
"> vela cluster join my-child-cluster.kubeconfig",
"> vela cluster join my-child-cluster.kubeconfig --name example-cluster",
Args: cobra.ExactValidArgs(1),
RunE: func(cmd *cobra.Command, args []string) error {
k8sClient, err := c.GetClient()
if err != nil {
return errors.Wrapf(err, "failed to get k8s client")
}
svc, err := multicluster.GetClusterGatewayService(context.Background(), k8sClient)
if err != nil {
return errors.Wrapf(err, "failed to get cluster secret namespace, please ensure cluster gateway is correctly deployed")
}
namespace := svc.Namespace
config, err := clientcmd.LoadFromFile(args[0])
if err != nil {
return errors.Wrapf(err, "failed to get kubeconfig")
Expand All @@ -132,32 +179,128 @@ func NewClusterJoinCommand(c common.Args) *cobra.Command {
if !ok {
return fmt.Errorf("authInfo %s not found", ctx.AuthInfo)
}
secret := &v1.Secret{}
err = k8sClient.Get(context.Background(), types2.NamespacedName{Name: ctx.Cluster, Namespace: namespace}, secret)
if err == nil {
return fmt.Errorf("cluster %s already exists", ctx.Cluster)

// get ClusterName from flag or config
clusterName, err := cmd.Flags().GetString(FlagClusterName)
if err != nil {
return errors.Wrapf(err, "failed to get cluster name flag")
}
if clusterName == "" {
clusterName = ctx.Cluster
}
if !errors2.IsNotFound(err) {
return errors.Wrapf(err, "failed to check duplicate cluster secret")

if err := ensureClusterNotExists(c.Client, clusterName); err != nil {
return errors.Wrapf(err, "cannot use cluster name %s", clusterName)
}
credentialType := v1.SecretTypeTLS
data := map[string][]byte{
"endpoint": []byte(cluster.Server),
"ca.crt": cluster.CertificateAuthorityData,
"tls.crt": authInfo.ClientCertificateData,
"tls.key": authInfo.ClientKeyData,
}
secret = &v1.Secret{
secret := &v1.Secret{
ObjectMeta: v12.ObjectMeta{
Name: ctx.Cluster,
Namespace: namespace,
Labels: map[string]string{ClusterSecretLabelKey: "kubeconfig"},
},
Type: v1.SecretTypeTLS,
Data: map[string][]byte{
"endpoint": []byte(cluster.Server),
"ca.crt": cluster.CertificateAuthorityData,
"tls.crt": authInfo.ClientCertificateData,
"tls.key": authInfo.ClientKeyData,
Name: clusterName,
Namespace: multicluster.ClusterGatewaySecretNamespace,
Labels: map[string]string{ClusterCredentialLabelKey: strings.Split(string(credentialType), "/")[1]},
},
Type: credentialType,
Data: data,
}
if err := k8sClient.Create(context.Background(), secret); err != nil {
if err := c.Client.Create(context.Background(), secret); err != nil {
return errors.Wrapf(err, "failed to add cluster to kubernetes")
}
cmd.Printf("Successfully add cluster %s, endpoint: %s.\n", ctx.Cluster, cluster.Server)
if err := ensureResourceTrackerCRDInstalled(c.Client, clusterName); err != nil {
_ = c.Client.Delete(context.Background(), secret)
return errors.Wrapf(err, "failed to ensure resourcetracker crd installed in cluster %s", clusterName)
}
cmd.Printf("Successfully add cluster %s, endpoint: %s.\n", clusterName, cluster.Server)
return nil
},
}
cmd.Flags().StringP(FlagClusterName, "n", "", "Specify the cluster name. If empty, it will use the cluster name in config file. Default to be empty.")
return cmd
}

func getMutableClusterSecret(c client.Client, clusterName string) (*v1.Secret, error) {
clusterSecret := &v1.Secret{}
if err := c.Get(context.Background(), types2.NamespacedName{Namespace: multicluster.ClusterGatewaySecretNamespace, Name: clusterName}, clusterSecret); err != nil {
return nil, errors.Wrapf(err, "failed to find target cluster secret %s", clusterName)
}
labels := clusterSecret.GetLabels()
if labels == nil || labels[ClusterCredentialLabelKey] == "" {
return nil, fmt.Errorf("invalid cluster secret %s: cluster credential type label %s is not set", clusterName, ClusterCredentialLabelKey)
}
ebs := &v1alpha1.EnvBindingList{}
if err := c.List(context.Background(), ebs); err != nil {
return nil, errors.Wrap(err, "failed to find EnvBindings to check clusters")
}
errs := errors3.ErrorList{}
for _, eb := range ebs.Items {
for _, decision := range eb.Status.ClusterDecisions {
if decision.Cluster == clusterName {
errs.Append(fmt.Errorf("application %s/%s (env: %s, envBinding: %s) is currently using cluster %s", eb.Namespace, eb.Labels[oam.LabelAppName], decision.Env, eb.Name, clusterName))
}
}
}
if errs.HasError() {
return nil, errors.Wrapf(errs, "cluster %s is in use now", clusterName)
}
return clusterSecret, nil
}

// NewClusterRenameCommand create command to help user rename cluster
func NewClusterRenameCommand(c *common.Args) *cobra.Command {
cmd := &cobra.Command{
Use: "rename [OLD_NAME] [NEW_NAME]",
Short: "rename managed cluster",
Args: cobra.ExactValidArgs(2),
RunE: func(cmd *cobra.Command, args []string) error {
oldClusterName := args[0]
newClusterName := args[1]
clusterSecret, err := getMutableClusterSecret(c.Client, oldClusterName)
if err != nil {
return errors.Wrapf(err, "cluster %s is not mutable now", oldClusterName)
}
if err := ensureClusterNotExists(c.Client, newClusterName); err != nil {
return errors.Wrapf(err, "cannot set cluster name to %s", newClusterName)
}
if err := c.Client.Delete(context.Background(), clusterSecret); err != nil {
return errors.Wrapf(err, "failed to rename cluster from %s to %s", oldClusterName, newClusterName)
}
clusterSecret.ObjectMeta = v12.ObjectMeta{
Name: newClusterName,
Namespace: multicluster.ClusterGatewaySecretNamespace,
Labels: clusterSecret.Labels,
Annotations: clusterSecret.Annotations,
}
if err := c.Client.Create(context.Background(), clusterSecret); err != nil {
return errors.Wrapf(err, "failed to rename cluster from %s to %s", oldClusterName, newClusterName)
}
cmd.Printf("Rename cluster %s to %s successfully.\n", oldClusterName, newClusterName)
return nil
},
}
return cmd
}

// NewClusterDetachCommand create command to help user detach existing cluster
func NewClusterDetachCommand(c *common.Args) *cobra.Command {
cmd := &cobra.Command{
Use: "detach [CLUSTER_NAME]",
Short: "detach managed cluster",
Args: cobra.ExactValidArgs(1),
RunE: func(cmd *cobra.Command, args []string) error {
clusterName := args[0]
clusterSecret, err := getMutableClusterSecret(c.Client, clusterName)
if err != nil {
return errors.Wrapf(err, "cluster %s is not mutable now", clusterName)
}
if err := c.Client.Delete(context.Background(), clusterSecret); err != nil {
return errors.Wrapf(err, "failed to detach cluster %s", clusterName)
}
cmd.Printf("Detach cluster %s successfully.\n", clusterName)
return nil
},
}
Expand Down

0 comments on commit 05ace1b

Please sign in to comment.