/
reporter_udp.go
108 lines (86 loc) · 2.84 KB
/
reporter_udp.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
// Copyright (C) 2017 Librato, Inc. All rights reserved.
package reporter
import (
"context"
"net"
"github.com/appoptics/appoptics-apm-go/v1/ao/internal/bson"
"github.com/appoptics/appoptics-apm-go/v1/ao/internal/config"
"github.com/appoptics/appoptics-apm-go/v1/ao/internal/log"
"github.com/appoptics/appoptics-apm-go/v1/ao/internal/metrics"
"github.com/pkg/errors"
)
const (
udpAddrDefault = "127.0.0.1:7831"
)
type udpReporter struct {
conn *net.UDPConn
}
func udpNewReporter() reporter {
var conn *net.UDPConn
// collector address override
udpAddress := config.GetCollectorUDP()
if udpAddress == "" {
udpAddress = udpAddrDefault
}
serverAddr, err := net.ResolveUDPAddr("udp4", udpAddress)
if err == nil {
conn, err = net.DialUDP("udp4", nil, serverAddr)
}
if err != nil {
log.Errorf("AppOptics failed to initialize UDP reporter: %v", err)
return &nullReporter{}
}
// add default setting
updateSetting(int32(TYPE_DEFAULT), "",
[]byte("SAMPLE_START,SAMPLE_THROUGH_ALWAYS"),
1000000, 120, argsToMap(16, 8, 16, 8, 16, 8, -1, -1, []byte("")))
return &udpReporter{conn: conn}
}
func (r *udpReporter) report(ctx *oboeContext, e *event) error {
if err := prepareEvent(ctx, e); err != nil {
// don't continue if preparation failed
return err
}
_, err := r.conn.Write((*e).bbuf.GetBuf())
return err
}
// Shutdown closes the UDP reporter TODO: not supported
func (r *udpReporter) Shutdown(ctx context.Context) error {
// return r.conn.Close()
return errors.New("not implemented")
}
// ShutdownNow closes the reporter immediately.
func (r *udpReporter) ShutdownNow() error { return nil }
// Closed returns if the reporter is closed or not TODO: not supported
func (r *udpReporter) Closed() bool {
return false
}
// WaitForReady waits until the reporter becomes ready or the context is canceled.
func (r *udpReporter) WaitForReady(ctx context.Context) bool { return true }
func (r *udpReporter) reportEvent(ctx *oboeContext, e *event) error {
return r.report(ctx, e)
}
func (r *udpReporter) reportStatus(ctx *oboeContext, e *event) error {
return r.report(ctx, e)
}
func (r *udpReporter) reportSpan(span metrics.SpanMessage) error {
s := span.(*metrics.HTTPSpanMessage)
bbuf := bson.NewBuffer()
bbuf.AppendString("transaction", s.Transaction)
bbuf.AppendString("url", s.Path)
bbuf.AppendInt("status", s.Status)
bbuf.AppendString("method", s.Method)
bbuf.AppendBool("hasError", s.HasError)
bbuf.AppendInt64("duration", s.Duration.Nanoseconds())
bbuf.Finish()
_, err := r.conn.Write(bbuf.GetBuf())
return err
}
func (r *udpReporter) CustomSummaryMetric(name string, value float64, opts metrics.MetricOptions) error {
return nil
}
func (r *udpReporter) CustomIncrementMetric(name string, opts metrics.MetricOptions) error {
return nil
}
func (r *udpReporter) Flush() error { return nil }
func (r *udpReporter) SetServiceKey(string) {}