Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: reimplement cluster connect cmd #180

Merged
merged 1 commit into from
Nov 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 19 additions & 40 deletions cmd/gtctl/cluster_connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,10 @@ import (
"context"
"fmt"

greptimedbclusterv1alpha1 "github.com/GreptimeTeam/greptimedb-operator/apis/v1alpha1"
"github.com/spf13/cobra"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"

"github.com/GreptimeTeam/gtctl/pkg/connector"
"github.com/GreptimeTeam/gtctl/pkg/deployer/k8s"
opt "github.com/GreptimeTeam/gtctl/pkg/cluster"
"github.com/GreptimeTeam/gtctl/pkg/cluster/kubernetes"
"github.com/GreptimeTeam/gtctl/pkg/logger"
)

Expand All @@ -34,11 +31,6 @@ type clusterConnectCliOptions struct {
}

func NewConnectCommand(l logger.Logger) *cobra.Command {
const (
connectionProtocolMySQL = "mysql"
connectionProtocolPostgres = "pg"
)

var options clusterConnectCliOptions

cmd := &cobra.Command{
Expand All @@ -50,50 +42,37 @@ func NewConnectCommand(l logger.Logger) *cobra.Command {
return fmt.Errorf("cluster name should be set")
}

k8sDeployer, err := k8s.NewDeployer(l)
if err != nil {
return err
}

var (
ctx = context.TODO()
clusterName = args[0]
namespace = options.Namespace
protocol opt.ConnectProtocol
)

name := types.NamespacedName{
Namespace: options.Namespace,
Name: clusterName,
}.String()
cluster, err := k8sDeployer.GetGreptimeDBCluster(ctx, name, nil)
if err != nil && errors.IsNotFound(err) {
l.Errorf("cluster %s in %s not found\n", clusterName, namespace)
return nil
}

rawCluster, ok := cluster.Raw.(*greptimedbclusterv1alpha1.GreptimeDBCluster)
if !ok {
return fmt.Errorf("invalid cluster type")
cluster, err := kubernetes.NewCluster(l)
if err != nil {
return err
}

switch options.Protocol {
case connectionProtocolMySQL:
if err = connector.MySQLConnectCommand(rawCluster, l); err != nil {
return fmt.Errorf("error connecting to mysql: %v", err)
}
case connectionProtocolPostgres:
if err = connector.PostgresSQLConnectCommand(rawCluster, l); err != nil {
return fmt.Errorf("error connecting to postgres: %v", err)
}
case "mysql":
protocol = opt.MySQL
case "pg", "psql", "postgres":
protocol = opt.Postgres
default:
return fmt.Errorf("database type not supported: %s", options.Protocol)
return fmt.Errorf("unsupported connection protocol: %s", options.Protocol)
}
return nil
connectOptions := &opt.ConnectOptions{
Namespace: options.Namespace,
Name: clusterName,
Protocol: protocol,
}

return cluster.Connect(ctx, connectOptions)
},
}

cmd.Flags().StringVarP(&options.Namespace, "namespace", "n", "default", "Namespace of GreptimeDB cluster.")
cmd.Flags().StringVarP(&options.Protocol, "protocol", "p", "mysql", "Specify a database, like mysql or pg.")
cmd.Flags().StringVarP(&options.Protocol, "protocol", "p", "mysql", "Specify a database protocol, like mysql or pg.")

return cmd
}
3 changes: 2 additions & 1 deletion cmd/gtctl/cluster_create.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ func NewCluster(args []string, options *clusterCreateCliOptions, l logger.Logger
UseGreptimeCNArtifacts: options.UseGreptimeCNArtifacts,
ValuesFile: options.GreptimeDBClusterValuesFile,
},
Spinner: spinner,
}

var cluster opt.Operations
Expand Down Expand Up @@ -211,7 +212,7 @@ func NewCluster(args []string, options *clusterCreateCliOptions, l logger.Logger
}
}

if err = cluster.Create(ctx, createOptions, spinner); err != nil {
if err = cluster.Create(ctx, createOptions); err != nil {
return err
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/artifacts/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ const (
// GreptimeChartReleaseDownloadURL is the URL of the Greptime charts that stored in the GitHub release.
GreptimeChartReleaseDownloadURL = "https://github.com/GreptimeTeam/helm-charts/releases/download"

// Greptime release bucket public endpoint in CN region.
// GreptimeReleaseBucketCN releases bucket public endpoint in CN region.
GreptimeReleaseBucketCN = "https://downloads.greptime.cn/releases"

// GreptimeCNCharts is the URL of the Greptime charts that stored in the S3 bucket of the CN region.
Expand Down
19 changes: 14 additions & 5 deletions pkg/cluster/baremetal/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,27 @@ import (
"github.com/GreptimeTeam/gtctl/pkg/artifacts"
opt "github.com/GreptimeTeam/gtctl/pkg/cluster"
"github.com/GreptimeTeam/gtctl/pkg/logger"
"github.com/GreptimeTeam/gtctl/pkg/status"
fileutils "github.com/GreptimeTeam/gtctl/pkg/utils/file"
)

func (c *Cluster) Create(ctx context.Context, options *opt.CreateOptions, spinner *status.Spinner) error {
func (c *Cluster) Create(ctx context.Context, options *opt.CreateOptions) error {
spinner := options.Spinner

withSpinner := func(target string, f func(context.Context, *opt.CreateOptions) error) error {
spinner.Start(fmt.Sprintf("Installing %s...", target))
if spinner != nil {
spinner.Start(fmt.Sprintf("Installing %s...", target))
}

if err := f(ctx, options); err != nil {
spinner.Stop(false, fmt.Sprintf("Installing %s failed", target))
if spinner != nil {
spinner.Stop(false, fmt.Sprintf("Installing %s failed", target))
}
return err
}
spinner.Stop(true, fmt.Sprintf("Installing %s successfully 🎉", target))

if spinner != nil {
spinner.Stop(true, fmt.Sprintf("Installing %s successfully 🎉", target))
}
return nil
}

Expand Down
4 changes: 4 additions & 0 deletions pkg/cluster/baremetal/not_implemented.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,7 @@ func (c *Cluster) List(ctx context.Context, options *opt.ListOptions) error {
func (c *Cluster) Scale(ctx context.Context, options *opt.ScaleOptions) error {
return fmt.Errorf("do not support")
}

func (c *Cluster) Connect(ctx context.Context, options *opt.ConnectOptions) error {
return fmt.Errorf("do not support")
}
61 changes: 61 additions & 0 deletions pkg/cluster/kubernetes/connect.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package kubernetes

import (
"context"
"fmt"
"strconv"

greptimedbclusterv1alpha1 "github.com/GreptimeTeam/greptimedb-operator/apis/v1alpha1"
"k8s.io/apimachinery/pkg/api/errors"

opt "github.com/GreptimeTeam/gtctl/pkg/cluster"
"github.com/GreptimeTeam/gtctl/pkg/connector"
)

func (c *Cluster) Connect(ctx context.Context, options *opt.ConnectOptions) error {
cluster, err := c.get(ctx, &opt.GetOptions{
Namespace: options.Namespace,
Name: options.Name,
})
if err != nil && errors.IsNotFound(err) {
c.logger.V(0).Infof("cluster %s in %s not found", options.Name, options.Namespace)
return nil
}

switch options.Protocol {
case opt.MySQL:
if err = c.connectMySQL(cluster); err != nil {
return fmt.Errorf("error connecting to mysql: %v", err)
}
case opt.Postgres:
if err = c.connectPostgres(cluster); err != nil {
return fmt.Errorf("error connecting to postgres: %v", err)
}
default:
return fmt.Errorf("unsupported connect protocol type")
}

return nil
}

func (c *Cluster) connectMySQL(cluster *greptimedbclusterv1alpha1.GreptimeDBCluster) error {
return connector.Mysql(strconv.Itoa(int(cluster.Spec.MySQLServicePort)), cluster.Name, c.logger)
}

func (c *Cluster) connectPostgres(cluster *greptimedbclusterv1alpha1.GreptimeDBCluster) error {
return connector.PostgresSQL(strconv.Itoa(int(cluster.Spec.PostgresServicePort)), cluster.Name, c.logger)
}
17 changes: 12 additions & 5 deletions pkg/cluster/kubernetes/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"github.com/GreptimeTeam/gtctl/pkg/artifacts"
opt "github.com/GreptimeTeam/gtctl/pkg/cluster"
"github.com/GreptimeTeam/gtctl/pkg/helm"
"github.com/GreptimeTeam/gtctl/pkg/status"
)

const (
Expand All @@ -30,17 +29,25 @@ const (
disableRBACConfig = "auth.rbac.create=false,auth.rbac.token.enabled=false,"
)

func (c *Cluster) Create(ctx context.Context, options *opt.CreateOptions, spinner *status.Spinner) error {
func (c *Cluster) Create(ctx context.Context, options *opt.CreateOptions) error {
spinner := options.Spinner

withSpinner := func(target string, f func(context.Context, *opt.CreateOptions) error) error {
if !c.dryRun {
if !c.dryRun && spinner != nil {
spinner.Start(fmt.Sprintf("Installing %s...", target))
}

if err := f(ctx, options); err != nil {
spinner.Stop(false, fmt.Sprintf("Installing %s failed", target))
if spinner != nil {
spinner.Stop(false, fmt.Sprintf("Installing %s failed", target))
}
return err
}

if !c.dryRun {
spinner.Stop(true, fmt.Sprintf("Installing %s successfully 🎉", target))
if spinner != nil {
spinner.Stop(true, fmt.Sprintf("Installing %s successfully 🎉", target))
}
}
return nil
}
Expand Down
20 changes: 19 additions & 1 deletion pkg/cluster/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,13 @@ type Operations interface {
Scale(ctx context.Context, options *ScaleOptions) error

// Create creates a new cluster.
Create(ctx context.Context, options *CreateOptions, spinner *status.Spinner) error
Create(ctx context.Context, options *CreateOptions) error

// Delete deletes a specific cluster.
Delete(ctx context.Context, options *DeleteOptions) error

// Connect connects to a specific cluster.
Connect(ctx context.Context, options *ConnectOptions) error
}

type GetOptions struct {
Expand Down Expand Up @@ -74,6 +77,8 @@ type CreateOptions struct {
Cluster *CreateClusterOptions
Operator *CreateOperatorOptions
Etcd *CreateEtcdOptions

Spinner *status.Spinner
}

// CreateClusterOptions is the options to create a GreptimeDB cluster.
Expand Down Expand Up @@ -114,3 +119,16 @@ type CreateEtcdOptions struct {
EtcdStorageSize string `helm:"persistence.size"`
ConfigValues string `helm:"*"`
}

type ConnectProtocol int

const (
MySQL ConnectProtocol = iota
Postgres
)

type ConnectOptions struct {
Namespace string
Name string
Protocol ConnectProtocol
}
13 changes: 3 additions & 10 deletions pkg/connector/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,9 @@ import (
"net"
"os"
"os/exec"
"strconv"
"sync"
"syscall"

greptimedbclusterv1alpha1 "github.com/GreptimeTeam/greptimedb-operator/apis/v1alpha1"
"github.com/go-sql-driver/mysql"

"github.com/GreptimeTeam/gtctl/pkg/logger"
Expand All @@ -43,16 +41,11 @@ const (
portForward = "port-forward"
)

// MySQLConnectCommand connects to a GreptimeDB cluster
func MySQLConnectCommand(rawCluster *greptimedbclusterv1alpha1.GreptimeDBCluster, l logger.Logger) error {
return mysqlConnect(strconv.Itoa(int(rawCluster.Spec.MySQLServicePort)), rawCluster.Name, l)
}

// mysqlConnect connects to a GreptimeDB cluster
func mysqlConnect(port, clusterName string, l logger.Logger) error {
// Mysql connects to a GreptimeDB cluster using mysql protocol.
func Mysql(port, clusterName string, l logger.Logger) error {
waitGroup := sync.WaitGroup{}

// TODO(sh2): is there any elegant way to enable port-forward?
// TODO: is there any elegant way to enable port-forward?
cmd := exec.CommandContext(context.Background(), kubectl, portForward, "-n", "default", "svc/"+clusterName+"-frontend", fmt.Sprintf("%s:%s", port, port))
if err := cmd.Start(); err != nil {
l.Errorf("Error starting port-forwarding: %v", err)
Expand Down
11 changes: 3 additions & 8 deletions pkg/connector/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,9 @@ import (
"net"
"os"
"os/exec"
"strconv"
"sync"
"syscall"

greptimedbclusterv1alpha1 "github.com/GreptimeTeam/greptimedb-operator/apis/v1alpha1"
"github.com/go-pg/pg/v10"

"github.com/GreptimeTeam/gtctl/pkg/logger"
Expand All @@ -41,14 +39,11 @@ const (
postgresSQLDatabaseArg = "-d"
)

func PostgresSQLConnectCommand(rawCluster *greptimedbclusterv1alpha1.GreptimeDBCluster, l logger.Logger) error {
return postgresSQLConnect(strconv.Itoa(int(rawCluster.Spec.PostgresServicePort)), rawCluster.Name, l)
}

func postgresSQLConnect(port, clusterName string, l logger.Logger) error {
// PostgresSQL connects to a GreptimeDB cluster using postgres protocol.
func PostgresSQL(port, clusterName string, l logger.Logger) error {
waitGroup := sync.WaitGroup{}

// TODO(sh2): is there any elegant way to enable port-forward?
// TODO: is there any elegant way to enable port-forward?
cmd := exec.CommandContext(context.Background(), kubectl, portForward, "-n", "default", "svc/"+clusterName+"-frontend", fmt.Sprintf("%s:%s", port, port))
if err := cmd.Start(); err != nil {
l.Errorf("Error starting port-forwarding: %v", err)
Expand Down
Loading
Loading