/
client_observed.go
101 lines (83 loc) · 2.42 KB
/
client_observed.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
package client
import (
"context"
"github.com/cloudevents/sdk-go/v2/event"
"github.com/cloudevents/sdk-go/v2/extensions"
"github.com/cloudevents/sdk-go/v2/observability"
"github.com/cloudevents/sdk-go/v2/protocol"
"go.opencensus.io/trace"
)
// NewObserved produces a new client with the provided transport object and applied
// client options.
func NewObserved(protocol interface{}, opts ...Option) (Client, error) {
client, err := New(protocol, opts...)
if err != nil {
return nil, err
}
c := &obsClient{client: client}
if err := c.applyOptions(opts...); err != nil {
return nil, err
}
return c, nil
}
type obsClient struct {
client Client
addTracing bool
}
func (c *obsClient) applyOptions(opts ...Option) error {
for _, fn := range opts {
if err := fn(c); err != nil {
return err
}
}
return nil
}
// Send transmits the provided event on a preconfigured Protocol. Send returns
// an error if there was an an issue validating the outbound event or the
// transport returns an error.
func (c *obsClient) Send(ctx context.Context, e event.Event) protocol.Result {
ctx, r := observability.NewReporter(ctx, reportSend)
ctx, span := trace.StartSpan(ctx, observability.ClientSpanName, trace.WithSpanKind(trace.SpanKindClient))
defer span.End()
if span.IsRecordingEvents() {
span.AddAttributes(EventTraceAttributes(&e)...)
}
if c.addTracing {
e.Context = e.Context.Clone()
extensions.FromSpanContext(span.SpanContext()).AddTracingAttributes(&e)
}
result := c.client.Send(ctx, e)
if protocol.IsACK(result) {
r.OK()
} else {
r.Error()
}
return result
}
func (c *obsClient) Request(ctx context.Context, e event.Event) (*event.Event, protocol.Result) {
ctx, r := observability.NewReporter(ctx, reportRequest)
ctx, span := trace.StartSpan(ctx, observability.ClientSpanName, trace.WithSpanKind(trace.SpanKindClient))
defer span.End()
if span.IsRecordingEvents() {
span.AddAttributes(EventTraceAttributes(&e)...)
}
resp, result := c.client.Request(ctx, e)
if protocol.IsACK(result) {
r.OK()
} else {
r.Error()
}
return resp, result
}
// StartReceiver sets up the given fn to handle Receive.
// See Client.StartReceiver for details. This is a blocking call.
func (c *obsClient) StartReceiver(ctx context.Context, fn interface{}) error {
ctx, r := observability.NewReporter(ctx, reportStartReceiver)
err := c.client.StartReceiver(ctx, fn)
if err != nil {
r.Error()
} else {
r.OK()
}
return err
}