-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathconnection.go
158 lines (130 loc) · 4.78 KB
/
connection.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
package sdp
import (
"context"
"fmt"
reflect "reflect"
"github.com/nats-io/nats.go"
log "github.com/sirupsen/logrus"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/proto"
)
// EncodedConnection is an interface that allows messages to be published to it.
// In production this would always be filled by a *nats.EncodedConn, however in
// testing we will mock this with something that does nothing
type EncodedConnection interface {
Publish(ctx context.Context, subj string, m proto.Message) error
PublishMsg(ctx context.Context, msg *nats.Msg) error
Subscribe(subj string, cb nats.MsgHandler) (*nats.Subscription, error)
QueueSubscribe(subj, queue string, cb nats.MsgHandler) (*nats.Subscription, error)
RequestMsg(ctx context.Context, msg *nats.Msg) (*nats.Msg, error)
Status() nats.Status
Stats() nats.Statistics
LastError() error
Drain() error
Close()
Underlying() *nats.Conn
Drop()
}
type EncodedConnectionImpl struct {
*nats.Conn
}
// assert interface implementation
var _ EncodedConnection = (*EncodedConnectionImpl)(nil)
func recordMessage(ctx context.Context, name, subj, typ, msg string) {
log.WithContext(ctx).WithFields(log.Fields{
"msg": msg,
"subj": subj,
"typ": typ,
}).Trace(name)
// avoid spamming honeycomb
if log.GetLevel() == log.TraceLevel {
span := trace.SpanFromContext(ctx)
span.AddEvent(name, trace.WithAttributes(
attribute.String("ovm.sdp.subject", subj),
attribute.String("ovm.sdp.message", msg),
))
}
}
func (ec *EncodedConnectionImpl) Publish(ctx context.Context, subj string, m proto.Message) error {
// TODO: protojson.Format is pretty expensive, replace with summarized data
recordMessage(ctx, "Publish", subj, fmt.Sprint(reflect.TypeOf(m)), protojson.Format(m))
data, err := proto.Marshal(m)
if err != nil {
return err
}
msg := &nats.Msg{
Subject: subj,
Data: data,
}
InjectOtelTraceContext(ctx, msg)
return ec.Conn.PublishMsg(msg)
}
func (ec *EncodedConnectionImpl) PublishMsg(ctx context.Context, msg *nats.Msg) error {
recordMessage(ctx, "Publish", msg.Subject, "[]byte", "binary")
InjectOtelTraceContext(ctx, msg)
return ec.Conn.PublishMsg(msg)
}
// Subscribe Use NewMsgHandler to get a nats.MsgHandler with otel propagation and protobuf marshaling
func (ec *EncodedConnectionImpl) Subscribe(subj string, cb nats.MsgHandler) (*nats.Subscription, error) {
return ec.Conn.Subscribe(subj, cb)
}
// QueueSubscribe Use NewMsgHandler to get a nats.MsgHandler with otel propagation and protobuf marshaling
func (ec *EncodedConnectionImpl) QueueSubscribe(subj, queue string, cb nats.MsgHandler) (*nats.Subscription, error) {
return ec.Conn.QueueSubscribe(subj, queue, cb)
}
func (ec *EncodedConnectionImpl) RequestMsg(ctx context.Context, msg *nats.Msg) (*nats.Msg, error) {
recordMessage(ctx, "RequestMsg", msg.Subject, "[]byte", "binary")
InjectOtelTraceContext(ctx, msg)
reply, err := ec.Conn.RequestMsgWithContext(ctx, msg)
if err != nil {
recordMessage(ctx, "RequestMsg Error", msg.Subject, fmt.Sprint(reflect.TypeOf(err)), err.Error())
} else {
recordMessage(ctx, "RequestMsg Reply", msg.Subject, "[]byte", "binary")
}
return reply, err
}
func (ec *EncodedConnectionImpl) Drain() error {
return ec.Conn.Drain()
}
func (ec *EncodedConnectionImpl) Close() {
ec.Conn.Close()
}
func (ec *EncodedConnectionImpl) Status() nats.Status {
return ec.Conn.Status()
}
func (ec *EncodedConnectionImpl) Stats() nats.Statistics {
return ec.Conn.Stats()
}
func (ec *EncodedConnectionImpl) LastError() error {
return ec.Conn.LastError()
}
func (ec *EncodedConnectionImpl) Underlying() *nats.Conn {
return ec.Conn
}
// Drop Drops the underlying connection completely
func (ec *EncodedConnectionImpl) Drop() {
ec.Conn = nil
}
// Unmarshal Does a proto.Unmarshal and logs errors in a consistent way. The
// user should still validate that the message is valid as it's possible to
// unmarshal data from one message format into another without an error.
// Validation should be based on the type that the data is being unmarshaled
// into.
func Unmarshal(ctx context.Context, b []byte, m proto.Message) error {
err := proto.Unmarshal(b, m)
if err != nil {
recordMessage(ctx, "Unmarshal err", "unknown", fmt.Sprint(reflect.TypeOf(err)), err.Error())
log.WithContext(ctx).Errorf("Error parsing message: %v", err)
trace.SpanFromContext(ctx).SetStatus(codes.Error, fmt.Sprintf("Error parsing message: %v", err))
return err
}
recordMessage(ctx, "Unmarshal", "unknown", fmt.Sprint(reflect.TypeOf(m)), protojson.Format(m))
return nil
}
//go:generate go run genhandler.go Query
//go:generate go run genhandler.go QueryResponse
//go:generate go run genhandler.go CancelQuery
//go:generate go run genhandler.go GatewayResponse