-
Notifications
You must be signed in to change notification settings - Fork 837
/
Copy pathtracer.go
127 lines (111 loc) · 3.92 KB
/
tracer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
/*
* Copyright 2021 CloudWeGo Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rpcinfo
import (
"context"
"io"
"runtime/debug"
"github.com/cloudwego/kitex/internal"
"github.com/cloudwego/kitex/internal/stream"
"github.com/cloudwego/kitex/pkg/klog"
"github.com/cloudwego/kitex/pkg/stats"
)
// StreamEventReporter should be implemented by any tracer that wants to report stream events
type StreamEventReporter interface {
// ReportStreamEvent is for collecting Recv/Send events on stream
// NOTE: The callee should NOT hold references to event, which may be recycled later
ReportStreamEvent(ctx context.Context, ri RPCInfo, event Event)
}
// TraceController controls tracers.
type TraceController struct {
tracers []stats.Tracer
streamEventReporters []StreamEventReporter
}
// Append appends a new tracer to the controller.
func (c *TraceController) Append(col stats.Tracer) {
c.tracers = append(c.tracers, col)
if reporter, ok := col.(StreamEventReporter); ok {
c.streamEventReporters = append(c.streamEventReporters, reporter)
}
}
// DoStart starts the tracers.
func (c *TraceController) DoStart(ctx context.Context, ri RPCInfo) context.Context {
defer c.tryRecover(ctx)
Record(ctx, ri, stats.RPCStart, nil)
for _, col := range c.tracers {
ctx = col.Start(ctx)
}
return ctx
}
// DoFinish calls the tracers in reversed order.
func (c *TraceController) DoFinish(ctx context.Context, ri RPCInfo, err error) {
defer c.tryRecover(ctx)
Record(ctx, ri, stats.RPCFinish, err)
if err != nil {
rpcStats := AsMutableRPCStats(ri.Stats())
rpcStats.SetError(err)
}
// reverse the order
for i := len(c.tracers) - 1; i >= 0; i-- {
c.tracers[i].Finish(ctx)
}
}
func buildStreamingEvent(statsEvent stats.Event, err error) Event {
if err == nil || err == io.EOF {
return NewEvent(statsEvent, stats.StatusInfo, "")
} else {
return NewEvent(statsEvent, stats.StatusError, err.Error())
}
}
// ReportStreamEvent is for collecting Recv/Send events on stream
func (c *TraceController) ReportStreamEvent(ctx context.Context, statsEvent stats.Event, err error) {
if !c.HasStreamEventReporter() {
return
}
defer c.tryRecover(ctx)
event := buildStreamingEvent(statsEvent, err)
defer func() {
if recyclable, ok := event.(internal.Reusable); ok {
recyclable.Recycle()
}
}()
// RPCInfo is likely to be used by each reporter
ri := GetRPCInfo(ctx)
for i := len(c.streamEventReporters) - 1; i >= 0; i-- {
c.streamEventReporters[i].ReportStreamEvent(ctx, ri, event)
}
}
// GetStreamEventHandler returns the stream event handler
// If there's no StreamEventReporter, nil is returned for client/server to skip adding tracing middlewares
func (c *TraceController) GetStreamEventHandler() stream.StreamEventHandler {
if c.HasStreamEventReporter() {
return c.ReportStreamEvent
}
return nil
}
// HasTracer reports whether there exists any tracer.
func (c *TraceController) HasTracer() bool {
return c != nil && len(c.tracers) > 0
}
// HasStreamEventReporter reports whether there exists any StreamEventReporter.
func (c *TraceController) HasStreamEventReporter() bool {
return c != nil && len(c.streamEventReporters) > 0
}
func (c *TraceController) tryRecover(ctx context.Context) {
if err := recover(); err != nil {
klog.CtxWarnf(ctx, "Panic happened during tracer call. This doesn't affect the rpc call, but may lead to lack of monitor data such as metrics and logs: error=%s, stack=%s", err, string(debug.Stack()))
}
}