Skip to content

Commit

Permalink
*: add rpc error stats and refine log (pingcap#52810)
Browse files Browse the repository at this point in the history
Signed-off-by: crazycs520 <crazycs520@gmail.com>
  • Loading branch information
crazycs520 committed May 15, 2024
1 parent 01e95af commit 1cb8533
Show file tree
Hide file tree
Showing 10 changed files with 44 additions and 45 deletions.
6 changes: 3 additions & 3 deletions distsql/distsql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func TestSelectResultRuntimeStats(t *testing.T) {
backoffSleep: map[string]time.Duration{"RegionMiss": time.Millisecond},
totalProcessTime: time.Second,
totalWaitTime: time.Second,
rpcStat: tikv.NewRegionRequestRuntimeStats(),
reqStat: tikv.NewRegionRequestRuntimeStats(),
distSQLConcurrency: 15,
}
s1.copRespTime.Add(execdetails.Duration(time.Second))
Expand All @@ -131,7 +131,7 @@ func TestSelectResultRuntimeStats(t *testing.T) {
// Test for idempotence.
require.Equal(t, expect, stats.String())

s1.rpcStat.Stats[tikvrpc.CmdCop] = &tikv.RPCRuntimeStats{
s1.reqStat.RPCStats[tikvrpc.CmdCop] = &tikv.RPCRuntimeStats{
Count: 1,
Consume: int64(time.Second),
}
Expand All @@ -146,7 +146,7 @@ func TestSelectResultRuntimeStats(t *testing.T) {
backoffSleep: map[string]time.Duration{"RegionMiss": time.Millisecond},
totalProcessTime: time.Second,
totalWaitTime: time.Second,
rpcStat: tikv.NewRegionRequestRuntimeStats(),
reqStat: tikv.NewRegionRequestRuntimeStats(),
}
s1.copRespTime.Add(execdetails.Duration(time.Second))
s1.procKeys.Add(100)
Expand Down
29 changes: 10 additions & 19 deletions distsql/select_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ import (
"github.com/pingcap/tipb/go-tipb"
tikvmetrics "github.com/tikv/client-go/v2/metrics"
"github.com/tikv/client-go/v2/tikv"
"github.com/tikv/client-go/v2/tikvrpc"
"go.uber.org/zap"
"golang.org/x/exp/maps"
)
Expand Down Expand Up @@ -360,7 +359,7 @@ func (r *selectResult) updateCopRuntimeStats(ctx context.Context, copStats *copr
if r.stats == nil {
r.stats = &selectResultRuntimeStats{
backoffSleep: make(map[string]time.Duration),
rpcStat: tikv.NewRegionRequestRuntimeStats(),
reqStat: tikv.NewRegionRequestRuntimeStats(),
distSQLConcurrency: r.distSQLConcurrency,
}
}
Expand Down Expand Up @@ -471,7 +470,7 @@ type selectResultRuntimeStats struct {
backoffSleep map[string]time.Duration
totalProcessTime time.Duration
totalWaitTime time.Duration
rpcStat tikv.RegionRequestRuntimeStats
reqStat *tikv.RegionRequestRuntimeStats
distSQLConcurrency int
CoprCacheHitNum int64
}
Expand All @@ -486,7 +485,7 @@ func (s *selectResultRuntimeStats) mergeCopRuntimeStats(copStats *copr.CopRuntim
maps.Copy(s.backoffSleep, copStats.BackoffSleep)
s.totalProcessTime += copStats.TimeDetail.ProcessTime
s.totalWaitTime += copStats.TimeDetail.WaitTime
s.rpcStat.Merge(copStats.RegionRequestRuntimeStats)
s.reqStat.Merge(copStats.ReqStats)
if copStats.CoprCacheHit {
s.CoprCacheHitNum++
}
Expand All @@ -497,7 +496,7 @@ func (s *selectResultRuntimeStats) Clone() execdetails.RuntimeStats {
copRespTime: execdetails.Percentile[execdetails.Duration]{},
procKeys: execdetails.Percentile[execdetails.Int64]{},
backoffSleep: make(map[string]time.Duration, len(s.backoffSleep)),
rpcStat: tikv.NewRegionRequestRuntimeStats(),
reqStat: tikv.NewRegionRequestRuntimeStats(),
distSQLConcurrency: s.distSQLConcurrency,
CoprCacheHitNum: s.CoprCacheHitNum,
}
Expand All @@ -508,7 +507,7 @@ func (s *selectResultRuntimeStats) Clone() execdetails.RuntimeStats {
}
newRs.totalProcessTime += s.totalProcessTime
newRs.totalWaitTime += s.totalWaitTime
maps.Copy(newRs.rpcStat.Stats, s.rpcStat.Stats)
newRs.reqStat = s.reqStat.Clone()
return &newRs
}

Expand All @@ -525,13 +524,13 @@ func (s *selectResultRuntimeStats) Merge(rs execdetails.RuntimeStats) {
}
s.totalProcessTime += other.totalProcessTime
s.totalWaitTime += other.totalWaitTime
s.rpcStat.Merge(other.rpcStat)
s.reqStat.Merge(other.reqStat)
s.CoprCacheHitNum += other.CoprCacheHitNum
}

func (s *selectResultRuntimeStats) String() string {
buf := bytes.NewBuffer(nil)
rpcStat := s.rpcStat
reqStat := s.reqStat
if s.copRespTime.Size() > 0 {
size := s.copRespTime.Size()
if size == 1 {
Expand Down Expand Up @@ -562,15 +561,6 @@ func (s *selectResultRuntimeStats) String() string {
buf.WriteString(execdetails.FormatDuration(s.totalWaitTime))
}
}
copRPC := rpcStat.Stats[tikvrpc.CmdCop]
if copRPC != nil && copRPC.Count > 0 {
rpcStat = rpcStat.Clone()
delete(rpcStat.Stats, tikvrpc.CmdCop)
buf.WriteString(", rpc_num: ")
buf.WriteString(strconv.FormatInt(copRPC.Count, 10))
buf.WriteString(", rpc_time: ")
buf.WriteString(execdetails.FormatDuration(time.Duration(copRPC.Consume)))
}
if config.GetGlobalConfig().TiKVClient.CoprCache.CapacityMB > 0 {
buf.WriteString(fmt.Sprintf(", copr_cache_hit_ratio: %v",
strconv.FormatFloat(float64(s.CoprCacheHitNum)/float64(s.copRespTime.Size()), 'f', 2, 64)))
Expand All @@ -584,10 +574,11 @@ func (s *selectResultRuntimeStats) String() string {
buf.WriteString("}")
}

rpcStatsStr := rpcStat.String()
rpcStatsStr := reqStat.String()
if len(rpcStatsStr) > 0 {
buf.WriteString(", ")
buf.WriteString(", rpc_info:{")
buf.WriteString(rpcStatsStr)
buf.WriteString("}")
}

if len(s.backoffSleep) > 0 {
Expand Down
14 changes: 7 additions & 7 deletions executor/distsql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -444,16 +444,16 @@ func TestCoprocessorPagingSize(t *testing.T) {
// Check 'rpc_num' in the execution information
//
// mysql> explain analyze select * from t_paging;
// +--------------------+----------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
// | id |task | execution info |
// +--------------------+----------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
// | TableReader_5 |root | time:7.27ms, loops:2, cop_task: {num: 10, max: 1.57ms, min: 313.3µs, avg: 675.9µs, p95: 1.57ms, tot_proc: 2ms, rpc_num: 10, rpc_time: 6.69ms, copr_cache_hit_ratio: 0.00, distsql_concurrency: 15} |
// | └─TableFullScan_4 |cop[tikv] | tikv_task:{proc max:1.48ms, min:294µs, avg: 629µs, p80:1.21ms, p95:1.48ms, iters:0, tasks:10} |
// +--------------------+----------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
// +--------------------+----------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
// | id |task | execution info |
// +--------------------+----------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
// | TableReader_5 |root | time:7.27ms, loops:2, cop_task: {num: 10, max: 1.57ms, min: 313.3µs, avg: 675.9µs, p95: 1.57ms, tot_proc: 2ms, copr_cache_hit_ratio: 0.00, distsql_concurrency: 15}, rpc_info:{Cop:{num_rpc:10, total_time:6.69ms}} |
// | └─TableFullScan_4 |cop[tikv] | tikv_task:{proc max:1.48ms, min:294µs, avg: 629µs, p80:1.21ms, p95:1.48ms, iters:0, tasks:10} |
// +--------------------+----------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
// 2 rows in set (0.01 sec)

getRPCNumFromExplain := func(rows [][]interface{}) (res uint64) {
re := regexp.MustCompile("rpc_num: ([0-9]+)")
re := regexp.MustCompile("num_rpc:([0-9]+)")
for _, row := range rows {
buf := bytes.NewBufferString("")
_, _ = fmt.Fprintf(buf, "%s\n", row)
Expand Down
10 changes: 5 additions & 5 deletions executor/executor_failpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ func TestCollectCopRuntimeStats(t *testing.T) {
rows := tk.MustQuery("explain analyze select * from t1").Rows()
require.Len(t, rows, 2)
explain := fmt.Sprintf("%v", rows[0])
require.Regexp(t, ".*rpc_num: .*, .*regionMiss:.*", explain)
require.Regexp(t, ".*num_rpc:.*, .*regionMiss:.*", explain)
require.NoError(t, failpoint.Disable("tikvclient/tikvStoreRespResult"))
}

Expand Down Expand Up @@ -586,15 +586,15 @@ func TestTiKVClientReadTimeout(t *testing.T) {
rows = tk.MustQuery("explain analyze select /*+ set_var(tikv_client_read_timeout=1) */ * from t where b > 1").Rows()
require.Len(t, rows, 3)
explain = fmt.Sprintf("%v", rows[0])
require.Regexp(t, ".*TableReader.* root time:.*, loops:.* cop_task: {num: 1, .* rpc_num: 2.*", explain)
require.Regexp(t, ".*TableReader.* root time:.*, loops:.* cop_task: {num: 1, .*num_rpc:2.*", explain)

// Test for stale read.
tk.MustExec("set @a=now(6);")
tk.MustExec("set @@tidb_replica_read='closest-replicas';")
rows = tk.MustQuery("explain analyze select /*+ set_var(tikv_client_read_timeout=1) */ * from t as of timestamp(@a) where b > 1").Rows()
require.Len(t, rows, 3)
explain = fmt.Sprintf("%v", rows[0])
require.Regexp(t, ".*TableReader.* root time:.*, loops:.* cop_task: {num: 1, .* rpc_num: 2.*", explain)
require.Regexp(t, ".*TableReader.* root time:.*, loops:.* cop_task: {num: 1, .*num_rpc:2.*", explain)

// Test for tikv_client_read_timeout session variable.
tk.MustExec("set @@tikv_client_read_timeout=1;")
Expand All @@ -614,15 +614,15 @@ func TestTiKVClientReadTimeout(t *testing.T) {
rows = tk.MustQuery("explain analyze select * from t where b > 1").Rows()
require.Len(t, rows, 3)
explain = fmt.Sprintf("%v", rows[0])
require.Regexp(t, ".*TableReader.* root time:.*, loops:.* cop_task: {num: 1, .* rpc_num: 2.*", explain)
require.Regexp(t, ".*TableReader.* root time:.*, loops:.* cop_task: {num: 1, .*num_rpc:2.*", explain)

// Test for stale read.
tk.MustExec("set @a=now(6);")
tk.MustExec("set @@tidb_replica_read='closest-replicas';")
rows = tk.MustQuery("explain analyze select * from t as of timestamp(@a) where b > 1").Rows()
require.Len(t, rows, 3)
explain = fmt.Sprintf("%v", rows[0])
require.Regexp(t, ".*TableReader.* root time:.*, loops:.* cop_task: {num: 1, .* rpc_num: 2.*", explain)
require.Regexp(t, ".*TableReader.* root time:.*, loops:.* cop_task: {num: 1, .*num_rpc:2.*", explain)
}

func TestGetMvccByEncodedKeyRegionError(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion executor/explainfor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func TestExplainFor(t *testing.T) {
buf.WriteString(fmt.Sprintf("%v", v))
}
}
require.Regexp(t, "TableReader_5 10000.00 0 root time:.*, loops:1, cop_task: {num:.*, max:.*, proc_keys:.* rpc_num: 1, rpc_time:.*} data:TableFullScan_4 N/A N/A\n"+
require.Regexp(t, "TableReader_5 10000.00 0 root time:.*, loops:1, cop_task: {num:.*, max:.*, proc_keys:.*num_rpc:1, total_time:.*} data:TableFullScan_4 N/A N/A\n"+
"└─TableFullScan_4 10000.00 0 cop.* table:t1 tikv_task:{time:.*, loops:0} keep order:false, stats:pseudo N/A N/A",
buf.String())
}
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -270,3 +270,5 @@ replace (
github.com/pingcap/tidb/parser => ./parser
go.opencensus.io => go.opencensus.io v0.23.1-0.20220331163232-052120675fac
)

replace github.com/tikv/client-go/v2 => github.com/crazycs520/client-go/v2 v2.0.0-alpha.0.20240515064016-81b7527d2749
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,8 @@ github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfc
github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwcJI5acqYI6dE=
github.com/cpuguy83/go-md2man/v2 v2.0.0/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU=
github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
github.com/crazycs520/client-go/v2 v2.0.0-alpha.0.20240515064016-81b7527d2749 h1:O8UlBnmxQK20Qw4oIq8j3ik4fqSKogJglPPG6KNOe6w=
github.com/crazycs520/client-go/v2 v2.0.0-alpha.0.20240515064016-81b7527d2749/go.mod h1:mmVCLP2OqWvQJPOIevQPZvGphzh/oq9vv8J5LDfpadQ=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/creack/pty v1.1.11/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/cznic/mathutil v0.0.0-20181122101859-297441e03548 h1:iwZdTE0PVqJCos1vaoKsclOGD3ADKpshg3SRtYBbwso=
Expand Down Expand Up @@ -935,8 +937,6 @@ github.com/tenntenn/text/transform v0.0.0-20200319021203-7eef512accb3 h1:f+jULpR
github.com/tenntenn/text/transform v0.0.0-20200319021203-7eef512accb3/go.mod h1:ON8b8w4BN/kE1EOhwT0o+d62W65a6aPw1nouo9LMgyY=
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 h1:mbAskLJ0oJfDRtkanvQPiooDH8HvJ2FBh+iKT/OmiQQ=
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2/go.mod h1:2PfKggNGDuadAa0LElHrByyrz4JPZ9fFx6Gs7nx7ZZU=
github.com/tikv/client-go/v2 v2.0.4-0.20240514061345-18d0dab321a9 h1:lVzOczZHdE5muOK9ATfi4niV9qwApTKq2caoc90ku/c=
github.com/tikv/client-go/v2 v2.0.4-0.20240514061345-18d0dab321a9/go.mod h1:mmVCLP2OqWvQJPOIevQPZvGphzh/oq9vv8J5LDfpadQ=
github.com/tikv/pd/client v0.0.0-20221031025758-80f0d8ca4d07 h1:ckPpxKcl75mO2N6a4cJXiZH43hvcHPpqc9dh1TmH1nc=
github.com/tikv/pd/client v0.0.0-20221031025758-80f0d8ca4d07/go.mod h1:CipBxPfxPUME+BImx9MUYXCnAVLS3VJUr3mnSJwh40A=
github.com/timakin/bodyclose v0.0.0-20210704033933-f49887972144 h1:kl4KhGNsJIbDHS9/4U9yQo1UcPQM0kOMJHn29EoH/Ro=
Expand Down
2 changes: 1 addition & 1 deletion store/copr/batch_request_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (ss *RegionBatchRequestSender) SendReqToAddr(bo *Backoffer, rpcCtx *tikv.RP
start := time.Now()
resp, err = ss.GetClient().SendRequest(ctx, rpcCtx.Addr, req, timout)
if ss.Stats != nil && ss.enableCollectExecutionInfo {
tikv.RecordRegionRequestRuntimeStats(ss.Stats, req.Type, time.Since(start))
ss.Stats.RecordRPCRuntimeStats(req.Type, time.Since(start))
}
if err != nil {
cancel()
Expand Down
12 changes: 9 additions & 3 deletions store/copr/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1040,7 +1040,7 @@ func (worker *copIteratorWorker) handleTaskOnce(bo *Backoffer, task *copTask, ch
req.StoreTp = getEndPointType(task.storeType)
startTime := time.Now()
if worker.kvclient.Stats == nil {
worker.kvclient.Stats = make(map[tikvrpc.CmdType]*tikv.RPCRuntimeStats)
worker.kvclient.Stats = tikv.NewRegionRequestRuntimeStats()
}
// set ReadReplicaScope and TxnScope so that req.IsStaleRead will be true when it's a global scope stale read.
req.ReadReplicaScope = worker.req.ReadReplicaScope
Expand Down Expand Up @@ -1101,10 +1101,16 @@ const (

func (worker *copIteratorWorker) logTimeCopTask(costTime time.Duration, task *copTask, bo *Backoffer, resp *coprocessor.Response) {
logStr := fmt.Sprintf("[TIME_COP_PROCESS] resp_time:%s txnStartTS:%d region_id:%d store_addr:%s", costTime, worker.req.StartTs, task.region.GetID(), task.storeAddr)
if worker.kvclient.Stats != nil {
logStr += fmt.Sprintf(" stats:%s", worker.kvclient.Stats.String())
}
if bo.GetTotalSleep() > minLogBackoffTime {
backoffTypes := strings.Replace(fmt.Sprintf("%v", bo.TiKVBackoffer().GetTypes()), " ", ",", -1)
logStr += fmt.Sprintf(" backoff_ms:%d backoff_types:%s", bo.GetTotalSleep(), backoffTypes)
}
if regionErr := resp.GetRegionError(); regionErr != nil {
logStr += fmt.Sprintf(" region_err:%s", regionErr.String())
}
// resp might be nil, but it is safe to call resp.GetXXX here.
detailV2 := resp.GetExecDetailsV2()
detail := resp.GetExecDetails()
Expand Down Expand Up @@ -1441,7 +1447,7 @@ func (worker *copIteratorWorker) handleCollectExecutionInfo(bo *Backoffer, rpcCt
if resp.detail == nil {
resp.detail = new(CopRuntimeStats)
}
resp.detail.Stats = worker.kvclient.Stats
resp.detail.ReqStats = worker.kvclient.Stats
backoffTimes := bo.GetBackoffTimes()
resp.detail.BackoffTime = time.Duration(bo.GetTotalSleep()) * time.Millisecond
resp.detail.BackoffSleep = make(map[string]time.Duration, len(backoffTimes))
Expand Down Expand Up @@ -1481,7 +1487,7 @@ func (worker *copIteratorWorker) handleCollectExecutionInfo(bo *Backoffer, rpcCt
// CopRuntimeStats contains execution detail information.
type CopRuntimeStats struct {
execdetails.ExecDetails
tikv.RegionRequestRuntimeStats
ReqStats *tikv.RegionRequestRuntimeStats

CoprCacheHit bool
}
Expand Down
8 changes: 4 additions & 4 deletions tests/realtikvtest/sessiontest/session_fail_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,15 +246,15 @@ func TestTiKVClientReadTimeout(t *testing.T) {
rows = tk.MustQuery("explain analyze select /*+ set_var(tikv_client_read_timeout=1) */ * from t where b > 1").Rows()
require.Len(t, rows, 3)
explain = fmt.Sprintf("%v", rows[0])
require.Regexp(t, ".*TableReader.* root time:.*, loops:.* cop_task: {num: 1, .* rpc_num: 4.*", explain)
require.Regexp(t, ".*TableReader.* root time:.*, loops:.* cop_task: {num: 1, .*num_rpc:4.*", explain)

// Test for stale read.
tk.MustExec("insert into t values (1,1), (2,2);")
tk.MustExec("set @@tidb_replica_read='closest-replicas';")
rows = tk.MustQuery("explain analyze select /*+ set_var(tikv_client_read_timeout=1) */ * from t as of timestamp(@stale_read_ts_var) where b > 1").Rows()
require.Len(t, rows, 3)
explain = fmt.Sprintf("%v", rows[0])
require.Regexp(t, ".*TableReader.* root time:.*, loops:.* cop_task: {num: 1, .* rpc_num: (3|4).*", explain)
require.Regexp(t, ".*TableReader.* root time:.*, loops:.* cop_task: {num: 1, .*num_rpc:(3|4).*", explain)

// Test for tikv_client_read_timeout session variable.
tk.MustExec("set @@tikv_client_read_timeout=1;")
Expand All @@ -275,12 +275,12 @@ func TestTiKVClientReadTimeout(t *testing.T) {
rows = tk.MustQuery("explain analyze select * from t where b > 1").Rows()
require.Len(t, rows, 3)
explain = fmt.Sprintf("%v", rows[0])
require.Regexp(t, ".*TableReader.* root time:.*, loops:.* cop_task: {num: 1, .* rpc_num: 4.*", explain)
require.Regexp(t, ".*TableReader.* root time:.*, loops:.* cop_task: {num: 1, .*num_rpc:4.*", explain)

// Test for stale read.
tk.MustExec("set @@tidb_replica_read='closest-replicas';")
rows = tk.MustQuery("explain analyze select * from t as of timestamp(@stale_read_ts_var) where b > 1").Rows()
require.Len(t, rows, 3)
explain = fmt.Sprintf("%v", rows[0])
require.Regexp(t, ".*TableReader.* root time:.*, loops:.* cop_task: {num: 1, .* rpc_num: (3|4).*", explain)
require.Regexp(t, ".*TableReader.* root time:.*, loops:.* cop_task: {num: 1, *num_rpc:(3|4).*", explain)
}

0 comments on commit 1cb8533

Please sign in to comment.