Skip to content

Commit

Permalink
Modified some specification issues.
Browse files Browse the repository at this point in the history
  • Loading branch information
dingxiaoshuai123 committed Nov 7, 2023
1 parent 788aba2 commit 8f9451e
Show file tree
Hide file tree
Showing 7 changed files with 138 additions and 87 deletions.
1 change: 1 addition & 0 deletions codis/cmd/proxy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ Options:
log.PanicErrorf(err, "load config %s failed", s)
}
config.ConfigFileName = s
log.Warnf("option --config = %s", s)
}
models.SetMaxSlotNum(config.MaxSlotNum)
if s, ok := utils.Argument(d, "--host-admin"); ok {
Expand Down
11 changes: 7 additions & 4 deletions codis/config/proxy.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ proxy_datacenter = ""
proxy_max_clients = 1000

# Set max offheap memory size. (0 to disable)
proxy_max_offheap_size = "1gb"
proxy_max_offheap_size = "1024mb"

# Set heap placeholder to reduce GC frequency.
proxy_heap_placeholder = "256mb"
Expand Down Expand Up @@ -99,6 +99,12 @@ session_keepalive_period = "75s"
# Set session to be sensitive to failures. Default is false, instead of closing socket, proxy will send an error response to client.
session_break_on_failure = false

# Slowlog-log-slower-than(us), from receive command to send response, 0 is allways print slow log
slowlog_log_slower_than = 100000

# set the number of slowlog in memory, max len is 10000000. (0 to disable)
slowlog_max_len = 128000

# Set metrics server (such as http://localhost:28000), proxy will report json formatted metrics to specified server in a predefined period.
metrics_report_server = ""
metrics_report_period = "1s"
Expand All @@ -115,6 +121,3 @@ metrics_report_statsd_server = ""
metrics_report_statsd_period = "1s"
metrics_report_statsd_prefix = ""

ncpu = 0
slowlog_log_slower_than = 0
slowlog_max_len = 0
34 changes: 14 additions & 20 deletions codis/pkg/proxy/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,12 @@ session_keepalive_period = "75s"
# Set session to be sensitive to failures. Default is false, instead of closing socket, proxy will send an error response to client.
session_break_on_failure = false
# Slowlog-log-slower-than(us), from receive command to send response, 0 is allways print slow log
slowlog_log_slower_than = 100000
# set the number of slowlog in memory, max len is 10000000. (0 to disable)
slowlog_max_len = 128000
# Set metrics server (such as http://localhost:28000), proxy will report json formatted metrics to specified server in a predefined period.
metrics_report_server = ""
metrics_report_period = "1s"
Expand All @@ -134,9 +140,6 @@ metrics_report_statsd_prefix = ""

type Config struct {
ConfigFileName string `toml:"-" json:"config_file_name"`
Log string `toml:"log"`
LogLevel string `toml:"log_level"`
Ncpu int `toml:"ncpu"`

ProtoType string `toml:"proto_type" json:"proto_type"`
ProxyAddr string `toml:"proxy_addr" json:"proxy_addr"`
Expand Down Expand Up @@ -181,8 +184,8 @@ type Config struct {
SessionKeepAlivePeriod timesize.Duration `toml:"session_keepalive_period" json:"session_keepalive_period"`
SessionBreakOnFailure bool `toml:"session_break_on_failure" json:"session_break_on_failure"`

//SlowlogLogSlowerThan int64 `toml:"slowlog_log_slower_than" json:"slowlog_log_slower_than"`
//SlowlogMaxLen int64 `toml:"slowlog_max_len" json:"slowlog_max_len"`
SlowlogLogSlowerThan int64 `toml:"slowlog_log_slower_than" json:"slowlog_log_slower_than"`
SlowlogMaxLen int64 `toml:"slowlog_max_len" json:"slowlog_max_len"`

MetricsReportServer string `toml:"metrics_report_server" json:"metrics_report_server"`
MetricsReportPeriod timesize.Duration `toml:"metrics_report_period" json:"metrics_report_period"`
Expand Down Expand Up @@ -310,21 +313,12 @@ func (c *Config) Validate() error {
return errors.New("invalid session_keepalive_period")
}

//if c.SlowlogLogSlowerThan < 0 {
// return errors.New("invalid slowlog_log_slower_than")
//}
//if c.SlowlogMaxLen < 0 {
// return errors.New("invalid slowlog_max_len")
//}
//if c.Log == "" {
// return errors.New("invalid log")
//}
//if c.LogLevel == "" {
// return errors.New("invalid log_level")
//}
//if c.Ncpu <= 0 {
// return errors.New("invalid ncpu")
//}
if c.SlowlogLogSlowerThan < 0 {
return errors.New("invalid slowlog_log_slower_than")
}
if c.SlowlogMaxLen < 0 {
return errors.New("invalid slowlog_max_len")
}

if c.MetricsReportPeriod < 0 {
return errors.New("invalid metrics_report_period")
Expand Down
44 changes: 37 additions & 7 deletions codis/pkg/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,8 +218,14 @@ func (s *Proxy) ConfigGet(key string) *redis.Resp {
switch key {
case "proxy_max_clients":
return redis.NewBulkBytes([]byte(strconv.Itoa(s.config.ProxyMaxClients)))
case "backend_primary_only":
return redis.NewBulkBytes([]byte(strconv.FormatBool(s.config.BackendPrimaryOnly)))
case "slowlog_log_slower_than":
return redis.NewBulkBytes([]byte(strconv.FormatInt(s.config.SlowlogLogSlowerThan, 10)))
case "slowlog_max_len":
return redis.NewBulkBytes([]byte(strconv.FormatInt(s.config.SlowlogMaxLen, 10)))
default:
return redis.NewErrorf("unsurport key[%s].", key)
return redis.NewErrorf("unsupported key: %s", key)
}
}

Expand All @@ -230,17 +236,41 @@ func (s *Proxy) ConfigSet(key, value string) *redis.Resp {
case "proxy_max_clients":
n, err := strconv.Atoi(value)
if err != nil {
return redis.NewErrorf("err:%s.", err)
return redis.NewErrorf("err:%s", err)
}

if n <= 0 {
return redis.NewErrorf("invalid proxy_max_clients")
} else {
s.config.ProxyMaxClients = n
return redis.NewString([]byte("OK"))
}
s.config.ProxyMaxClients = n
return redis.NewString([]byte("OK"))
case "backend_primary_only":
return redis.NewErrorf("not currently supported")
case "slowlog_log_slower_than":
n, err := strconv.ParseInt(value, 10, 64)
if err != nil {
return redis.NewErrorf("err:%s", err)
}
if n < 0 {
return redis.NewErrorf("invalid slowlog_log_slower_than")
}
s.config.SlowlogLogSlowerThan = n
return redis.NewString([]byte("OK"))
case "slowlog_max_len":
n, err := strconv.ParseInt(value, 10, 64)
if err != nil {
return redis.NewErrorf("err:%s", err)
}

if n < 0 {
return redis.NewErrorf("invalid slowlog_max_len")
}
s.config.SlowlogMaxLen = n
if s.config.SlowlogMaxLen > 0 {
SlowLogSetMaxLen(s.config.SlowlogMaxLen)
}
return redis.NewString([]byte("OK"))
default:
return redis.NewErrorf("unsurport key.")
return redis.NewErrorf("unsupported key: %s", key)
}
}

Expand Down
45 changes: 20 additions & 25 deletions codis/pkg/proxy/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,8 +208,6 @@ func (s *Session) loopWriter(tasks *RequestChan) (err error) {
})
s.flushOpStats(true)
}()

//var cmd = make([]byte, 128)
var (
breakOnFailure = s.config.SessionBreakOnFailure
maxPipelineLen = s.config.SessionMaxPipeline
Expand Down Expand Up @@ -240,29 +238,26 @@ func (s *Session) loopWriter(tasks *RequestChan) (err error) {
if fflush {
s.flushOpStats(false)
}
//if s.config.SlowlogLogSlowerThan > 0 {
// nowTime := time.Now().UnixNano()
// duration := int64((nowTime - r.ReceiveTime) / 1e3)
// if duration >= s.config.SlowlogLogSlowerThan {
// var d0, d1, d2 int64 = -1, -1, -1
// if r.SendToServerTime > 0 {
// d0 = int64((r.SendToServerTime - r.ReceiveTime) / 1e3)
// }
// if r.SendToServerTime > 0 && r.ReceiveFromServerTime > 0 {
// d1 = int64((r.ReceiveFromServerTime - r.SendToServerTime) / 1e3)
// }
// if r.ReceiveFromServerTime > 0 {
// d2 = int64((nowTime - r.ReceiveFromServerTime) / 1e3)
// }
// index := getWholeCmd(r.Multi, cmd)
// cmdLog := fmt.Sprintf("%s remote:%s, start_time(us):%d, duration(us): [%d, %d, %d], %d, tasksLen:%d, command:[%s].",
// time.Unix(r.ReceiveTime/1e9, 0).Format("2006-01-02 15:04:05"), s.Conn.RemoteAddr(), r.ReceiveTime/1e3, d0, d1, d2, duration, r.TasksLen, string(cmd[:index]))
// log.Warnf("%s", cmdLog)
// if s.config.SlowlogMaxLen > 0 {
// SlowLogPush(&SlowLogEntry{SlowLogGetCurLogId(), r.ReceiveTime / 1e3, duration, cmdLog})
// }
// }
//}
nowTime := time.Now().UnixNano()
duration := int64((nowTime - r.ReceiveTime) / 1e3)
if duration >= 50000 {
//client -> proxy -> server -> porxy -> client
//Record the waiting time from receiving the request from the client to sending it to the backend server
//the waiting time from sending the request to the backend server to receiving the response from the server
//the waiting time from receiving the server response to sending it to the client
var d0, d1, d2 int64 = -1, -1, -1
if r.SendToServerTime > 0 {
d0 = int64((r.SendToServerTime - r.ReceiveTime) / 1e3)
}
if r.SendToServerTime > 0 && r.ReceiveFromServerTime > 0 {
d1 = int64((r.ReceiveFromServerTime - r.SendToServerTime) / 1e3)
}
if r.ReceiveFromServerTime > 0 {
d2 = int64((nowTime - r.ReceiveFromServerTime) / 1e3)
}
log.Errorf("%s remote:%s, start_time(us):%d, duration(us): [%d, %d, %d], %d, tasksLen:%d",
time.Unix(r.ReceiveTime/1e9, 0).Format("2006-01-02 15:04:05"), s.Conn.RemoteAddr(), r.ReceiveTime/1e3, d0, d1, d2, duration, r.TasksLen)
}
return nil
})
}
Expand Down
5 changes: 3 additions & 2 deletions codis/pkg/proxy/slowlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@ package proxy

import (
"container/list"
"strconv"
"sync"

"pika/codis/v2/pkg/proxy/redis"
"pika/codis/v2/pkg/utils/log"
"pika/codis/v2/pkg/utils/sync2/atomic2"
"strconv"
"sync"
)

const (
Expand Down
Loading

0 comments on commit 8f9451e

Please sign in to comment.