diff --git a/config/config.go b/config/config.go index 559f092f..8916e48b 100644 --- a/config/config.go +++ b/config/config.go @@ -24,10 +24,13 @@ import ( "fmt" "net" "os" + "strings" + "github.com/go-playground/validator/v10" + + "github.com/apache/kvrocks-controller/logger" "github.com/apache/kvrocks-controller/store/engine/etcd" "github.com/apache/kvrocks-controller/store/engine/zookeeper" - "github.com/go-playground/validator/v10" ) type AdminConfig struct { @@ -35,11 +38,8 @@ type AdminConfig struct { } type FailOverConfig struct { - GCIntervalSeconds int `yaml:"gc_interval_seconds"` - PingIntervalSeconds int `yaml:"ping_interval_seconds"` - MaxPingCount int64 `yaml:"max_ping_count"` - MinAliveSize int `yaml:"min_alive_size"` - MaxFailureRatio float64 `yaml:"max_failure_ratio"` + PingIntervalSeconds int `yaml:"ping_interval_seconds"` + MaxPingCount int64 `yaml:"max_ping_count"` } type ControllerConfig struct { @@ -59,11 +59,8 @@ type Config struct { func DefaultFailOverConfig() *FailOverConfig { return &FailOverConfig{ - GCIntervalSeconds: 3600, PingIntervalSeconds: 3, MaxPingCount: 5, - MinAliveSize: 10, - MaxFailureRatio: 0.6, } } @@ -84,14 +81,12 @@ func (c *Config) Validate() error { if c.Controller.FailOver.MaxPingCount < 3 { return errors.New("max ping count required >= 3") } - if c.Controller.FailOver.GCIntervalSeconds < 60 { - return errors.New("gc interval required >= 1min") - } if c.Controller.FailOver.PingIntervalSeconds < 1 { return errors.New("ping interval required >= 1s") } - if c.Controller.FailOver.MinAliveSize < 2 { - return errors.New("min alive size required >= 2") + hostPort := strings.Split(c.Addr, ":") + if hostPort[0] == "0.0.0.0" || hostPort[0] == "127.0.0.1" { + logger.Get().Warn("Leader forward may not work if the host is " + hostPort[0]) } return nil } diff --git a/config/config_test.go b/config/config_test.go index cd91823f..a2002fe3 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -29,11 +29,8 @@ func TestDefaultControllerConfigSet(t *testing.T) { cfg := Default() expectedControllerConfig := &ControllerConfig{ FailOver: &FailOverConfig{ - GCIntervalSeconds: 3600, PingIntervalSeconds: 3, MaxPingCount: 5, - MinAliveSize: 10, - MaxFailureRatio: 0.6, }, } diff --git a/server/helper/helper.go b/server/helper/helper.go index 22851e24..525d8cb1 100644 --- a/server/helper/helper.go +++ b/server/helper/helper.go @@ -17,15 +17,19 @@ * under the License. * */ + package helper import ( "errors" + "fmt" "net/http" - - "github.com/apache/kvrocks-controller/consts" + "strings" "github.com/gin-gonic/gin" + + "github.com/apache/kvrocks-controller/consts" + "github.com/apache/kvrocks-controller/util" ) type Error struct { @@ -77,3 +81,20 @@ func ResponseError(c *gin.Context, err error) { }) c.Abort() } + +// generateSessionID encodes the addr to a session ID, +// which is used to identify the session. And then can be used to +// parse the leader listening address back. +func GenerateSessionID(addr string) string { + return fmt.Sprintf("%s/%s", util.RandString(8), addr) +} + +// extractAddrFromSessionID decodes the session ID to the addr. +func ExtractAddrFromSessionID(sessionID string) string { + parts := strings.Split(sessionID, "/") + if len(parts) != 2 { + // for the old session ID format, we use the addr as the session ID + return sessionID + } + return parts[1] +} diff --git a/server/helper/helper_test.go b/server/helper/helper_test.go new file mode 100644 index 00000000..f9496ffb --- /dev/null +++ b/server/helper/helper_test.go @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package helper + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestGenerateSessionID(t *testing.T) { + testAddr := "127.0.0.1:1234" + sessionID := GenerateSessionID(testAddr) + decodedAddr := ExtractAddrFromSessionID(sessionID) + require.Equal(t, testAddr, decodedAddr) + + // old format + require.Equal(t, testAddr, ExtractAddrFromSessionID(testAddr)) +} diff --git a/server/middleware/middleware.go b/server/middleware/middleware.go index 7955825a..e31766a1 100644 --- a/server/middleware/middleware.go +++ b/server/middleware/middleware.go @@ -17,6 +17,7 @@ * under the License. * */ + package middleware import ( @@ -25,14 +26,13 @@ import ( "strconv" "time" - "github.com/apache/kvrocks-controller/server/helper" + "github.com/gin-gonic/gin" + "github.com/prometheus/client_golang/prometheus" "github.com/apache/kvrocks-controller/consts" "github.com/apache/kvrocks-controller/metrics" + "github.com/apache/kvrocks-controller/server/helper" "github.com/apache/kvrocks-controller/store" - - "github.com/gin-gonic/gin" - "github.com/prometheus/client_golang/prometheus" ) func CollectMetrics(c *gin.Context) { @@ -69,6 +69,8 @@ func RedirectIfNotLeader(c *gin.Context) { if !storage.IsLeader() { if !c.GetBool(consts.HeaderIsRedirect) { c.Set(consts.HeaderIsRedirect, true) + peerAddr := helper.ExtractAddrFromSessionID(storage.Leader()) + c.Redirect(http.StatusTemporaryRedirect, "http://"+peerAddr+c.Request.RequestURI) c.Redirect(http.StatusTemporaryRedirect, "http://"+storage.Leader()+c.Request.RequestURI) } else { c.JSON(http.StatusBadRequest, gin.H{"error": "no leader now, please retry later"}) diff --git a/server/server.go b/server/server.go index 65f5ae9e..b4b8a08e 100644 --- a/server/server.go +++ b/server/server.go @@ -17,6 +17,7 @@ * under the License. * */ + package server import ( @@ -32,6 +33,7 @@ import ( "github.com/apache/kvrocks-controller/config" "github.com/apache/kvrocks-controller/controller" "github.com/apache/kvrocks-controller/logger" + "github.com/apache/kvrocks-controller/server/helper" "github.com/apache/kvrocks-controller/store" "github.com/apache/kvrocks-controller/store/engine" "github.com/apache/kvrocks-controller/store/engine/etcd" @@ -49,16 +51,18 @@ type Server struct { func NewServer(cfg *config.Config) (*Server, error) { var persist engine.Engine var err error + + sessionID := helper.GenerateSessionID(cfg.Addr) switch { case strings.EqualFold(cfg.StorageType, "etcd"): logger.Get().Info("Use Etcd as store") - persist, err = etcd.New(cfg.Addr, cfg.Etcd) + persist, err = etcd.New(sessionID, cfg.Etcd) case strings.EqualFold(cfg.StorageType, "zookeeper"): logger.Get().Info("Use Zookeeper as store") - persist, err = zookeeper.New(cfg.Addr, cfg.Zookeeper) + persist, err = zookeeper.New(sessionID, cfg.Zookeeper) default: logger.Get().Info("Use Etcd as default store") - persist, err = etcd.New(cfg.Addr, cfg.Etcd) + persist, err = etcd.New(sessionID, cfg.Etcd) } if err != nil {