-
Notifications
You must be signed in to change notification settings - Fork 4.4k
/
orca.go
186 lines (165 loc) · 6.9 KB
/
orca.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
/*
* Copyright 2022 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// Package orca implements Open Request Cost Aggregation, which is an open
// standard for request cost aggregation and reporting by backends and the
// corresponding aggregation of such reports by L7 load balancers (such as
// Envoy) on the data plane. In a proxyless world with gRPC enabled
// applications, aggregation of such reports will be done by the gRPC client.
//
// # Experimental
//
// Notice: All APIs is this package are EXPERIMENTAL and may be changed or
// removed in a later release.
package orca
import (
"context"
"errors"
"fmt"
"google.golang.org/grpc"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/balancerload"
"google.golang.org/grpc/metadata"
"google.golang.org/protobuf/proto"
v3orcapb "github.com/cncf/xds/go/xds/data/orca/v3"
)
var (
logger = grpclog.Component("orca-backend-metrics")
joinServerOptions = internal.JoinServerOptions.(func(...grpc.ServerOption) grpc.ServerOption)
)
const trailerMetadataKey = "endpoint-load-metrics-bin"
// CallMetricsServerOption returns a server option which enables the reporting
// of per-RPC custom backend metrics for unary and streaming RPCs.
//
// Server applications interested in injecting custom backend metrics should
// pass the server option returned from this function as the first argument to
// grpc.NewServer().
//
// Subsequently, server RPC handlers can retrieve a reference to the RPC
// specific custom metrics recorder [CallMetricRecorder] to be used, via a call
// to CallMetricRecorderFromContext(), and inject custom metrics at any time
// during the RPC lifecycle.
//
// The injected custom metrics will be sent as part of trailer metadata, as a
// binary-encoded [ORCA LoadReport] protobuf message, with the metadata key
// being set be "endpoint-load-metrics-bin".
//
// [ORCA LoadReport]: https://github.com/cncf/xds/blob/main/xds/data/orca/v3/orca_load_report.proto#L15
func CallMetricsServerOption() grpc.ServerOption {
return joinServerOptions(grpc.ChainUnaryInterceptor(unaryInt), grpc.ChainStreamInterceptor(streamInt))
}
func unaryInt(ctx context.Context, req interface{}, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
// We don't allocate the metric recorder here. It will be allocated the
// first time the user calls CallMetricRecorderFromContext().
rw := &recorderWrapper{}
ctxWithRecorder := newContextWithRecorderWrapper(ctx, rw)
resp, err := handler(ctxWithRecorder, req)
// It is safe to access the underlying metric recorder inside the wrapper at
// this point, as the user's RPC handler is done executing, and therefore
// there will be no more calls to CallMetricRecorderFromContext(), which is
// where the metric recorder is lazy allocated.
if rw.r == nil {
return resp, err
}
setTrailerMetadata(ctx, rw.r)
return resp, err
}
func streamInt(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
// We don't allocate the metric recorder here. It will be allocated the
// first time the user calls CallMetricRecorderFromContext().
rw := &recorderWrapper{}
ws := &wrappedStream{
ServerStream: ss,
ctx: newContextWithRecorderWrapper(ss.Context(), rw),
}
err := handler(srv, ws)
// It is safe to access the underlying metric recorder inside the wrapper at
// this point, as the user's RPC handler is done executing, and therefore
// there will be no more calls to CallMetricRecorderFromContext(), which is
// where the metric recorder is lazy allocated.
if rw.r == nil {
return err
}
setTrailerMetadata(ss.Context(), rw.r)
return err
}
// setTrailerMetadata adds a trailer metadata entry with key being set to
// `trailerMetadataKey` and value being set to the binary-encoded
// orca.OrcaLoadReport protobuf message.
//
// This function is called from the unary and streaming interceptors defined
// above. Any errors encountered here are not propagated to the caller because
// they are ignored there. Hence we simply log any errors encountered here at
// warning level, and return nothing.
func setTrailerMetadata(ctx context.Context, r *CallMetricRecorder) {
b, err := proto.Marshal(r.toLoadReportProto())
if err != nil {
logger.Warningf("failed to marshal load report: %v", err)
return
}
if err := grpc.SetTrailer(ctx, metadata.Pairs(trailerMetadataKey, string(b))); err != nil {
logger.Warningf("failed to set trailer metadata: %v", err)
}
}
// wrappedStream wraps the grpc.ServerStream received by the streaming
// interceptor. Overrides only the Context() method to return a context which
// contains a reference to the CallMetricRecorder corresponding to this stream.
type wrappedStream struct {
grpc.ServerStream
ctx context.Context
}
func (w *wrappedStream) Context() context.Context {
return w.ctx
}
// ErrLoadReportMissing indicates no ORCA load report was found in trailers.
var ErrLoadReportMissing = errors.New("orca load report missing in provided metadata")
// ToLoadReport unmarshals a binary encoded [ORCA LoadReport] protobuf message
// from md and returns the corresponding struct. The load report is expected to
// be stored as the value for key "endpoint-load-metrics-bin".
//
// If no load report was found in the provided metadata, ErrLoadReportMissing is
// returned.
//
// [ORCA LoadReport]: (https://github.com/cncf/xds/blob/main/xds/data/orca/v3/orca_load_report.proto#L15)
func ToLoadReport(md metadata.MD) (*v3orcapb.OrcaLoadReport, error) {
vs := md.Get(trailerMetadataKey)
if len(vs) == 0 {
return nil, ErrLoadReportMissing
}
ret := new(v3orcapb.OrcaLoadReport)
if err := proto.Unmarshal([]byte(vs[0]), ret); err != nil {
return nil, fmt.Errorf("failed to unmarshal load report found in metadata: %v", err)
}
return ret, nil
}
// loadParser implements the Parser interface defined in `internal/balancerload`
// package. This interface is used by the client stream to parse load reports
// sent by the server in trailer metadata. The parsed loads are then sent to
// balancers via balancer.DoneInfo.
//
// The grpc package cannot directly call orca.ToLoadReport() as that would cause
// an import cycle. Hence this roundabout method is used.
type loadParser struct{}
func (loadParser) Parse(md metadata.MD) interface{} {
lr, err := ToLoadReport(md)
if err != nil {
logger.Errorf("Parse(%v) failed: %v", err)
}
return lr
}
func init() {
balancerload.SetParser(loadParser{})
}