diff --git a/codis/cmd/proxy/main.go b/codis/cmd/proxy/main.go index 4ac88044b..8d7da2381 100644 --- a/codis/cmd/proxy/main.go +++ b/codis/cmd/proxy/main.go @@ -113,6 +113,7 @@ Options: log.PanicErrorf(err, "load config %s failed", s) } config.ConfigFileName = s + log.Warnf("option --config = %s", s) } models.SetMaxSlotNum(config.MaxSlotNum) if s, ok := utils.Argument(d, "--host-admin"); ok { diff --git a/codis/config/proxy.toml b/codis/config/proxy.toml index e679b8fdb..e69bc02f4 100644 --- a/codis/config/proxy.toml +++ b/codis/config/proxy.toml @@ -45,7 +45,7 @@ proxy_datacenter = "" proxy_max_clients = 1000 # Set max offheap memory size. (0 to disable) -proxy_max_offheap_size = "1gb" +proxy_max_offheap_size = "1024mb" # Set heap placeholder to reduce GC frequency. proxy_heap_placeholder = "256mb" @@ -99,6 +99,12 @@ session_keepalive_period = "75s" # Set session to be sensitive to failures. Default is false, instead of closing socket, proxy will send an error response to client. session_break_on_failure = false +# Slowlog-log-slower-than(us), from receive command to send response, 0 is allways print slow log +slowlog_log_slower_than = 100000 + +# set the number of slowlog in memory, max len is 10000000. (0 to disable) +slowlog_max_len = 128000 + # Set metrics server (such as http://localhost:28000), proxy will report json formatted metrics to specified server in a predefined period. metrics_report_server = "" metrics_report_period = "1s" @@ -115,6 +121,3 @@ metrics_report_statsd_server = "" metrics_report_statsd_period = "1s" metrics_report_statsd_prefix = "" -ncpu = 0 -slowlog_log_slower_than = 0 -slowlog_max_len = 0 diff --git a/codis/pkg/proxy/config.go b/codis/pkg/proxy/config.go index 2f598e78c..abb0c6c32 100644 --- a/codis/pkg/proxy/config.go +++ b/codis/pkg/proxy/config.go @@ -115,6 +115,12 @@ session_keepalive_period = "75s" # Set session to be sensitive to failures. Default is false, instead of closing socket, proxy will send an error response to client. session_break_on_failure = false +# Slowlog-log-slower-than(us), from receive command to send response, 0 is allways print slow log +slowlog_log_slower_than = 100000 + +# set the number of slowlog in memory, max len is 10000000. (0 to disable) +slowlog_max_len = 128000 + # Set metrics server (such as http://localhost:28000), proxy will report json formatted metrics to specified server in a predefined period. metrics_report_server = "" metrics_report_period = "1s" @@ -134,9 +140,6 @@ metrics_report_statsd_prefix = "" type Config struct { ConfigFileName string `toml:"-" json:"config_file_name"` - Log string `toml:"log"` - LogLevel string `toml:"log_level"` - Ncpu int `toml:"ncpu"` ProtoType string `toml:"proto_type" json:"proto_type"` ProxyAddr string `toml:"proxy_addr" json:"proxy_addr"` @@ -181,8 +184,8 @@ type Config struct { SessionKeepAlivePeriod timesize.Duration `toml:"session_keepalive_period" json:"session_keepalive_period"` SessionBreakOnFailure bool `toml:"session_break_on_failure" json:"session_break_on_failure"` - //SlowlogLogSlowerThan int64 `toml:"slowlog_log_slower_than" json:"slowlog_log_slower_than"` - //SlowlogMaxLen int64 `toml:"slowlog_max_len" json:"slowlog_max_len"` + SlowlogLogSlowerThan int64 `toml:"slowlog_log_slower_than" json:"slowlog_log_slower_than"` + SlowlogMaxLen int64 `toml:"slowlog_max_len" json:"slowlog_max_len"` MetricsReportServer string `toml:"metrics_report_server" json:"metrics_report_server"` MetricsReportPeriod timesize.Duration `toml:"metrics_report_period" json:"metrics_report_period"` @@ -310,21 +313,12 @@ func (c *Config) Validate() error { return errors.New("invalid session_keepalive_period") } - //if c.SlowlogLogSlowerThan < 0 { - // return errors.New("invalid slowlog_log_slower_than") - //} - //if c.SlowlogMaxLen < 0 { - // return errors.New("invalid slowlog_max_len") - //} - //if c.Log == "" { - // return errors.New("invalid log") - //} - //if c.LogLevel == "" { - // return errors.New("invalid log_level") - //} - //if c.Ncpu <= 0 { - // return errors.New("invalid ncpu") - //} + if c.SlowlogLogSlowerThan < 0 { + return errors.New("invalid slowlog_log_slower_than") + } + if c.SlowlogMaxLen < 0 { + return errors.New("invalid slowlog_max_len") + } if c.MetricsReportPeriod < 0 { return errors.New("invalid metrics_report_period") diff --git a/codis/pkg/proxy/proxy.go b/codis/pkg/proxy/proxy.go index f04486939..899eafa60 100644 --- a/codis/pkg/proxy/proxy.go +++ b/codis/pkg/proxy/proxy.go @@ -218,8 +218,14 @@ func (s *Proxy) ConfigGet(key string) *redis.Resp { switch key { case "proxy_max_clients": return redis.NewBulkBytes([]byte(strconv.Itoa(s.config.ProxyMaxClients))) + case "backend_primary_only": + return redis.NewBulkBytes([]byte(strconv.FormatBool(s.config.BackendPrimaryOnly))) + case "slowlog_log_slower_than": + return redis.NewBulkBytes([]byte(strconv.FormatInt(s.config.SlowlogLogSlowerThan, 10))) + case "slowlog_max_len": + return redis.NewBulkBytes([]byte(strconv.FormatInt(s.config.SlowlogMaxLen, 10))) default: - return redis.NewErrorf("unsurport key[%s].", key) + return redis.NewErrorf("unsupported key: %s", key) } } @@ -230,17 +236,41 @@ func (s *Proxy) ConfigSet(key, value string) *redis.Resp { case "proxy_max_clients": n, err := strconv.Atoi(value) if err != nil { - return redis.NewErrorf("err:%s.", err) + return redis.NewErrorf("err:%s", err) } - if n <= 0 { return redis.NewErrorf("invalid proxy_max_clients") - } else { - s.config.ProxyMaxClients = n - return redis.NewString([]byte("OK")) } + s.config.ProxyMaxClients = n + return redis.NewString([]byte("OK")) + case "backend_primary_only": + return redis.NewErrorf("not currently supported") + case "slowlog_log_slower_than": + n, err := strconv.ParseInt(value, 10, 64) + if err != nil { + return redis.NewErrorf("err:%s", err) + } + if n < 0 { + return redis.NewErrorf("invalid slowlog_log_slower_than") + } + s.config.SlowlogLogSlowerThan = n + return redis.NewString([]byte("OK")) + case "slowlog_max_len": + n, err := strconv.ParseInt(value, 10, 64) + if err != nil { + return redis.NewErrorf("err:%s", err) + } + + if n < 0 { + return redis.NewErrorf("invalid slowlog_max_len") + } + s.config.SlowlogMaxLen = n + if s.config.SlowlogMaxLen > 0 { + SlowLogSetMaxLen(s.config.SlowlogMaxLen) + } + return redis.NewString([]byte("OK")) default: - return redis.NewErrorf("unsurport key.") + return redis.NewErrorf("unsupported key: %s", key) } } diff --git a/codis/pkg/proxy/session.go b/codis/pkg/proxy/session.go index 12c837c83..4679b6003 100644 --- a/codis/pkg/proxy/session.go +++ b/codis/pkg/proxy/session.go @@ -208,8 +208,6 @@ func (s *Session) loopWriter(tasks *RequestChan) (err error) { }) s.flushOpStats(true) }() - - //var cmd = make([]byte, 128) var ( breakOnFailure = s.config.SessionBreakOnFailure maxPipelineLen = s.config.SessionMaxPipeline @@ -240,29 +238,26 @@ func (s *Session) loopWriter(tasks *RequestChan) (err error) { if fflush { s.flushOpStats(false) } - //if s.config.SlowlogLogSlowerThan > 0 { - // nowTime := time.Now().UnixNano() - // duration := int64((nowTime - r.ReceiveTime) / 1e3) - // if duration >= s.config.SlowlogLogSlowerThan { - // var d0, d1, d2 int64 = -1, -1, -1 - // if r.SendToServerTime > 0 { - // d0 = int64((r.SendToServerTime - r.ReceiveTime) / 1e3) - // } - // if r.SendToServerTime > 0 && r.ReceiveFromServerTime > 0 { - // d1 = int64((r.ReceiveFromServerTime - r.SendToServerTime) / 1e3) - // } - // if r.ReceiveFromServerTime > 0 { - // d2 = int64((nowTime - r.ReceiveFromServerTime) / 1e3) - // } - // index := getWholeCmd(r.Multi, cmd) - // cmdLog := fmt.Sprintf("%s remote:%s, start_time(us):%d, duration(us): [%d, %d, %d], %d, tasksLen:%d, command:[%s].", - // time.Unix(r.ReceiveTime/1e9, 0).Format("2006-01-02 15:04:05"), s.Conn.RemoteAddr(), r.ReceiveTime/1e3, d0, d1, d2, duration, r.TasksLen, string(cmd[:index])) - // log.Warnf("%s", cmdLog) - // if s.config.SlowlogMaxLen > 0 { - // SlowLogPush(&SlowLogEntry{SlowLogGetCurLogId(), r.ReceiveTime / 1e3, duration, cmdLog}) - // } - // } - //} + nowTime := time.Now().UnixNano() + duration := int64((nowTime - r.ReceiveTime) / 1e3) + if duration >= 50000 { + //client -> proxy -> server -> porxy -> client + //Record the waiting time from receiving the request from the client to sending it to the backend server + //the waiting time from sending the request to the backend server to receiving the response from the server + //the waiting time from receiving the server response to sending it to the client + var d0, d1, d2 int64 = -1, -1, -1 + if r.SendToServerTime > 0 { + d0 = int64((r.SendToServerTime - r.ReceiveTime) / 1e3) + } + if r.SendToServerTime > 0 && r.ReceiveFromServerTime > 0 { + d1 = int64((r.ReceiveFromServerTime - r.SendToServerTime) / 1e3) + } + if r.ReceiveFromServerTime > 0 { + d2 = int64((nowTime - r.ReceiveFromServerTime) / 1e3) + } + log.Errorf("%s remote:%s, start_time(us):%d, duration(us): [%d, %d, %d], %d, tasksLen:%d", + time.Unix(r.ReceiveTime/1e9, 0).Format("2006-01-02 15:04:05"), s.Conn.RemoteAddr(), r.ReceiveTime/1e3, d0, d1, d2, duration, r.TasksLen) + } return nil }) } diff --git a/codis/pkg/proxy/slowlog.go b/codis/pkg/proxy/slowlog.go index c82dd2d98..e91113e58 100644 --- a/codis/pkg/proxy/slowlog.go +++ b/codis/pkg/proxy/slowlog.go @@ -2,11 +2,12 @@ package proxy import ( "container/list" + "strconv" + "sync" + "pika/codis/v2/pkg/proxy/redis" "pika/codis/v2/pkg/utils/log" "pika/codis/v2/pkg/utils/sync2/atomic2" - "strconv" - "sync" ) const ( diff --git a/codis/pkg/utils/configAux.go b/codis/pkg/utils/configAux.go index a15cfa465..5dfbac18b 100644 --- a/codis/pkg/utils/configAux.go +++ b/codis/pkg/utils/configAux.go @@ -40,7 +40,12 @@ func (c *DeployConfig) Init(path string, sep string) error { if err != nil { return err } - defer f.Close() + defer func(f *os.File) { + err := f.Close() + if err != nil { + log.WarnErrorf(err, "Close %s failed.\n", path) + } + }(f) r := bufio.NewReader(f) for { @@ -52,23 +57,19 @@ func (c *DeployConfig) Init(path string, sep string) error { return err } - // 拿到一行并去除两边空白字符 line := strings.TrimSpace(string(b)) item := &ConfItem{} - //第一个字符为“#”或者空行都认为是注释 if strings.Index(line, "#") == 0 || len(line) == 0 { item.confType = TypeComment item.name = line c.items = append(c.items, item) continue } - //找不到 = ,说明这行配置有问题 index := strings.Index(line, sep) if index <= 0 { continue } - //key不可以为空,value可以为空 key := strings.TrimSpace(line[:index]) value := strings.TrimSpace(line[index+1:]) if len(key) == 0 { @@ -88,7 +89,6 @@ func (c *DeployConfig) Reset(conf interface{}, isWrap bool) { for i := 0; i < obj.NumField(); i++ { fieldInfo := obj.Type().Field(i) name := fieldInfo.Tag.Get("toml") - // 检查字段的toml tag是否合法 if name == "" || name == "-" { continue } @@ -100,32 +100,56 @@ func (c *DeployConfig) Reset(conf interface{}, isWrap bool) { continue } if isWrap { - c.Set(name, "\""+value+"\"") + err := c.Set(name, "\""+value+"\"") + if err != nil { + log.WarnErrorf(err, "Set string with wrap failed!") + } } else { - c.Set(name, value) + err := c.Set(name, value) + if err != nil { + log.WarnErrorf(err, "Set string without wrap failed!") + } } case int: value = strconv.Itoa(v) - c.Set(name, value) + err := c.Set(name, value) + if err != nil { + log.WarnErrorf(err, "Set int failed!") + } case int32: value = strconv.FormatInt(int64(v), 10) - c.Set(name, value) + err := c.Set(name, value) + if err != nil { + log.WarnErrorf(err, "Set int32 failed!") + } case int64: value = strconv.FormatInt(v, 10) - c.Set(name, value) + err := c.Set(name, value) + if err != nil { + log.WarnErrorf(err, "Set int64 failed!") + } case bool: if v { - c.Set(name, "true") + err := c.Set(name, "true") + if err != nil { + log.WarnErrorf(err, "Set bool value failed!") + } } else { - c.Set(name, "false") + err := c.Set(name, "false") + if err != nil { + log.WarnErrorf(err, "Set bool value failed!") + } } case timesize.Duration: if ret, err := v.MarshalText(); err != nil { log.WarnErrorf(err, "config set %s failed.\n", name) } else { value = string(ret[:]) - c.Set(name, "\""+value+"\"") + err := c.Set(name, "\""+value+"\"") + if err != nil { + log.WarnErrorf(err, "Set timesize failed!") + } } case bytesize.Int64: @@ -133,7 +157,10 @@ func (c *DeployConfig) Reset(conf interface{}, isWrap bool) { log.WarnErrorf(err, "config set %s failed.\n", name) } else { value = string(ret[:]) - c.Set(name, "\""+value+"\"") + err := c.Set(name, "\""+value+"\"") + if err != nil { + log.WarnErrorf(err, "Set bytesize failed!") + } } default: @@ -180,10 +207,10 @@ func (c *DeployConfig) Show() { log.Infof("Show config, len = %d\n", len(c.items)) for index, item := range c.items { if item.confType == TypeComment { - // 注释的格式: id: context + // Comment format: id: context log.Infof("%d: %s\n", index, item.name) } else { - // 配置文件的格式: id: key = value 或者 id: key value + // Configuration format: id: key = value or id: key value if len(strings.TrimSpace(c.sep)) > 0 { log.Infof("%d: %s %s %s\n", index, item.name, c.sep, item.value) } else { @@ -194,18 +221,21 @@ func (c *DeployConfig) Show() { } func (c *DeployConfig) ReWrite(confName string) error { - // confName = DefaultName.tmp f, err := os.Create(confName) if err != nil { log.WarnErrorf(err, "create %s failed.\n", confName) return err } - defer f.Close() + defer func(f *os.File) { + err := f.Close() + if err != nil { + log.WarnErrorf(err, "Close %s failed.\n", confName) + } + }(f) w := bufio.NewWriter(f) - + var lineStr string for _, item := range c.items { - var lineStr string if item.confType == TypeComment { lineStr = fmt.Sprintf("%s", item.name) } else { @@ -221,7 +251,7 @@ func (c *DeployConfig) ReWrite(confName string) error { } func RewriteConfig(postConf interface{}, defaultConf string, sep string, isWrap bool) error { - conf := new(DeployConfig) + conf := &DeployConfig{} err := conf.Init(defaultConf, sep) if err != nil { log.WarnErrorf(err, "open %s file failed.\n", defaultConf) @@ -229,15 +259,12 @@ func RewriteConfig(postConf interface{}, defaultConf string, sep string, isWrap } conf.Reset(postConf, isWrap) conf.Show() - // 重写一份config var newConf = defaultConf + ".tmp" if err = conf.ReWrite(newConf); err != nil { return err - } else { - if err = os.Remove(defaultConf); err != nil { - return err - } else { - return os.Rename(newConf, defaultConf) - } } + if err = os.Remove(defaultConf); err != nil { + return err + } + return os.Rename(newConf, defaultConf) }