-
Notifications
You must be signed in to change notification settings - Fork 199
/
telemetry.go
153 lines (142 loc) · 4.83 KB
/
telemetry.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
package portercontext
import (
"context"
"crypto/tls"
"crypto/x509"
"time"
"get.porter.sh/porter/pkg"
"github.com/pkg/errors"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp"
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)
func FromContext(ctx context.Context) (*Context, bool) {
val := ctx.Value("porter.context")
pc, ok := val.(*Context)
return pc, ok
}
func (c *Context) configureTelemetry(ctx context.Context, logger *zap.Logger, cfg LogConfiguration) error {
// default to noop
c.tracer = trace.NewNoopTracerProvider().Tracer("noop")
c.traceCloser = nil
client, err := c.createTraceClient(cfg)
if err != nil {
return err
}
if client == nil {
logger.Debug("telemetry disabled")
return nil
}
ctx, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()
exporter, err := otlptrace.New(ctx, client)
if err != nil {
return err
}
if c.traceServiceName == "" {
c.traceServiceName = "porter"
}
serviceVersion := pkg.Version
if serviceVersion == "" {
serviceVersion = "dev"
}
r := resource.NewWithAttributes(
semconv.SchemaURL,
semconv.ServiceNameKey.String(c.traceServiceName),
semconv.ServiceVersionKey.String(serviceVersion),
)
provider := sdktrace.NewTracerProvider(
sdktrace.WithBatcher(exporter),
sdktrace.WithResource(r),
)
c.tracer = provider.Tracer("") // empty tracer name defaults to the underlying trace implementor
c.traceCloser = provider
return nil
}
// createTraceClient from the Porter configuration
// See https://github.com/open-telemetry/opentelemetry-go/tree/main/exporters/otlp/otlptrace
func (c *Context) createTraceClient(cfg LogConfiguration) (otlptrace.Client, error) {
if !cfg.TelemetryEnabled {
return nil, nil
}
switch cfg.TelemetryProtocol {
case "grpc":
opts := []otlptracegrpc.Option{otlptracegrpc.WithDialOption(grpc.WithBlock())}
if cfg.TelemetryEndpoint != "" {
opts = append(opts, otlptracegrpc.WithEndpoint(cfg.TelemetryEndpoint))
}
if cfg.TelemetryInsecure {
opts = append(opts, otlptracegrpc.WithInsecure())
}
if cfg.TelemetryCertificate != "" {
creds, err := credentials.NewClientTLSFromFile(cfg.TelemetryCertificate, "")
if err != nil {
return nil, errors.Wrapf(err, "invalid telemetry certificate in %s", cfg.TelemetryCertificate)
}
opts = append(opts, otlptracegrpc.WithTLSCredentials(creds))
}
if cfg.TelemetryTimeout != "" {
timeout, err := time.ParseDuration(cfg.TelemetryTimeout)
if err != nil {
return nil, errors.Wrapf(err, "invalid telemetry timeout %s", cfg.TelemetryTimeout)
}
opts = append(opts, otlptracegrpc.WithTimeout(timeout))
}
if cfg.TelemetryCompression != "" {
opts = append(opts, otlptracegrpc.WithCompressor(cfg.TelemetryCompression))
}
if len(cfg.TelemetryHeaders) > 0 {
opts = append(opts, otlptracegrpc.WithHeaders(cfg.TelemetryHeaders))
}
return otlptracegrpc.NewClient(opts...), nil
case "http/protobuf", "":
var opts []otlptracehttp.Option
if cfg.TelemetryEndpoint != "" {
opts = append(opts, otlptracehttp.WithEndpoint(cfg.TelemetryEndpoint))
}
if cfg.TelemetryInsecure {
opts = append(opts, otlptracehttp.WithInsecure())
}
if cfg.TelemetryCertificate != "" {
certB, err := c.FileSystem.ReadFile(cfg.TelemetryCertificate)
if err != nil {
return nil, errors.Wrapf(err, "invalid telemetry certificate in %s", cfg.TelemetryCertificate)
}
cp := x509.NewCertPool()
if ok := cp.AppendCertsFromPEM(certB); !ok {
return nil, errors.Errorf("could not use certificate in %s", cfg.TelemetryCertificate)
}
opts = append(opts, otlptracehttp.WithTLSClientConfig(&tls.Config{RootCAs: cp}))
}
if cfg.TelemetryTimeout != "" {
timeout, err := time.ParseDuration(cfg.TelemetryTimeout)
if err != nil {
return nil, errors.Wrapf(err, "invalid telemetry timeout %s. Supported values are durations such as 30s or 1m.", cfg.TelemetryTimeout)
}
opts = append(opts, otlptracehttp.WithTimeout(timeout))
}
if cfg.TelemetryCompression != "" {
var compression otlptracehttp.Compression
switch cfg.TelemetryCompression {
case "gzip":
compression = otlptracehttp.GzipCompression
default:
compression = otlptracehttp.NoCompression
}
opts = append(opts, otlptracehttp.WithCompression(compression))
}
if len(cfg.TelemetryHeaders) > 0 {
opts = append(opts, otlptracehttp.WithHeaders(cfg.TelemetryHeaders))
}
return otlptracehttp.NewClient(opts...), nil
default:
return nil, errors.Errorf("invalid OTEL_EXPORTER_OTLP_PROTOCOL value %s. Only grpc and http/protobuf are supported", cfg.TelemetryProtocol)
}
}