/
utils.go
130 lines (110 loc) · 2.8 KB
/
utils.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package bus
import (
"context"
"fmt"
"maps"
"reflect"
"runtime"
"strings"
"time"
"unsafe"
"github.com/goccy/go-json"
"github.com/redis/go-redis/v9"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
)
const instrumName = "github.com/gowool/bus"
// https://github.com/redis/go-redis/issues/2276
// remove this wrapper after fix issue 2276
type clientWrapper struct {
redis.UniversalClient
}
func (c *clientWrapper) XReadGroup(ctx context.Context, a *redis.XReadGroupArgs) *redis.XStreamSliceCmd {
ch := make(chan *redis.XStreamSliceCmd, 1)
defer close(ch)
go func() {
defer func() {
_ = recover()
}()
ch <- c.UniversalClient.XReadGroup(ctx, a)
}()
select {
case cmd := <-ch:
return cmd
case <-ctx.Done():
var cmd redis.XStreamSliceCmd
cmd.SetErr(ctx.Err())
return &cmd
}
}
type addArgs struct {
stream string
noMkStream bool
maxLen int64 // MAXLEN N
minID string
// Approx causes MaxLen and MinID to use "~" matcher (instead of "=").
approx bool
limit int64
id string
values map[string]any
}
func (a *addArgs) clone() *addArgs {
args := new(addArgs)
*args = *a
args.values = maps.Clone(a.values)
return args
}
func (a *addArgs) toXAddArgs() *redis.XAddArgs {
return &redis.XAddArgs{
Stream: a.stream,
NoMkStream: a.noMkStream,
MaxLen: a.maxLen,
MinID: a.minID,
Approx: a.approx,
Limit: a.limit,
ID: a.id,
Values: a.values,
}
}
func handlerName(h any) string {
typ := reflect.TypeOf(h)
if typ.Kind() == reflect.Func {
return strings.ReplaceAll(runtime.FuncForPC(reflect.ValueOf(h).Pointer()).Name(), ".", ":")
}
if typ.Kind() == reflect.Pointer {
typ = typ.Elem()
}
return strings.ReplaceAll(typ.String(), ".", ":")
}
func toEvent(xMessage redis.XMessage) (Event, map[string]any, error) {
var message eventMessage
if value, ok := xMessage.Values[dataKey]; ok {
if data, ok := value.(string); ok {
if err := json.Unmarshal(unsafe.Slice(unsafe.StringData(data), len(data)), &message); err != nil {
return nil, nil, fmt.Errorf("event data is of incorrect type %T: %w", xMessage.Values[dataKey], err)
}
}
}
additional := maps.Clone(xMessage.Values)
delete(additional, dataKey)
if err := message.validate(); err != nil {
return newEventData(message), additional, fmt.Errorf("event data is not valid: %w", err)
}
return newEventData(message), additional, nil
}
func milliseconds(d time.Duration) float64 {
return float64(d) / float64(time.Millisecond)
}
func statusAttr(err error) attribute.KeyValue {
if err != nil {
return attribute.String("status", "error")
}
return attribute.String("status", "ok")
}
func recordError(span trace.Span, err error) {
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
}
}