From abd674b8bfe0c22e219340301657c20f5c755ff6 Mon Sep 17 00:00:00 2001 From: Sam Xie Date: Tue, 18 Jun 2024 17:40:39 -0700 Subject: [PATCH] Add new method for otlploggrpc gRPC client --- exporters/otlp/otlplog/otlploggrpc/client.go | 138 ++++++++++- .../otlp/otlplog/otlploggrpc/client_test.go | 227 ++++++++++++++++++ exporters/otlp/otlplog/otlploggrpc/go.mod | 7 +- exporters/otlp/otlplog/otlploggrpc/go.sum | 16 +- 4 files changed, 383 insertions(+), 5 deletions(-) create mode 100644 exporters/otlp/otlplog/otlploggrpc/client_test.go diff --git a/exporters/otlp/otlplog/otlploggrpc/client.go b/exporters/otlp/otlplog/otlploggrpc/client.go index dcefef29c23..339f52e9ccf 100644 --- a/exporters/otlp/otlplog/otlploggrpc/client.go +++ b/exporters/otlp/otlplog/otlploggrpc/client.go @@ -3,12 +3,144 @@ package otlploggrpc // import "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc" +import ( + "time" + + "google.golang.org/genproto/googleapis/rpc/errdetails" + "google.golang.org/grpc" + "google.golang.org/grpc/backoff" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/credentials" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/encoding/gzip" + "google.golang.org/grpc/metadata" + "google.golang.org/grpc/status" + + "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc/internal/retry" + collogpb "go.opentelemetry.io/proto/otlp/collector/logs/v1" +) + +// The methods of this type are not expected to be called concurrently. type client struct { - // TODO: implement. + metadata metadata.MD + exportTimeout time.Duration + requestFunc retry.RequestFunc + + // ourConn keeps track of where conn was created: true if created here in + // NewClient, or false if passed with an option. This is important on + // Shutdown as conn should only be closed if we created it. Otherwise, + // it is up to the processes that passed conn to close it. + ourConn bool + conn *grpc.ClientConn + lsc collogpb.LogsServiceClient } +// Used for testing. +var newGRPCClient = grpc.NewClient + // newClient creates a new gRPC log client. func newClient(cfg config) (*client, error) { - // TODO: implement. - return &client{}, nil + c := &client{ + exportTimeout: cfg.timeout.Value, + requestFunc: cfg.retryCfg.Value.RequestFunc(retryable), + conn: cfg.gRPCConn.Value, + } + + if len(cfg.headers.Value) > 0 { + c.metadata = metadata.New(cfg.headers.Value) + } + + if c.conn == nil { + // If the caller did not provide a ClientConn when the client was + // created, create one using the configuration they did provide. + dialOpts := newGRPCDialOptions(cfg) + + conn, err := newGRPCClient(cfg.endpoint.Value, dialOpts...) + if err != nil { + return nil, err + } + // Keep track that we own the lifecycle of this conn and need to close + // it on Shutdown. + c.ourConn = true + c.conn = conn + } + + c.lsc = collogpb.NewLogsServiceClient(c.conn) + + return c, nil +} + +func newGRPCDialOptions(cfg config) []grpc.DialOption { + userAgent := "OTel Go OTLP over gRPC logs exporter/" + Version() + dialOpts := []grpc.DialOption{grpc.WithUserAgent(userAgent)} + dialOpts = append(dialOpts, cfg.dialOptions.Value...) + + // Convert other grpc configs to the dial options. + // Service config + if cfg.serviceConfig.Value != "" { + dialOpts = append(dialOpts, grpc.WithDefaultServiceConfig(cfg.serviceConfig.Value)) + } + // Prioritize GRPCCredentials over Insecure (passing both is an error). + if cfg.gRPCCredentials.Value != nil { + dialOpts = append(dialOpts, grpc.WithTransportCredentials(cfg.gRPCCredentials.Value)) + } else if cfg.insecure.Value { + dialOpts = append(dialOpts, grpc.WithTransportCredentials(insecure.NewCredentials())) + } else { + // Default to using the host's root CA. + dialOpts = append(dialOpts, grpc.WithTransportCredentials( + credentials.NewTLS(nil), + )) + } + // Compression + if cfg.compression.Value == GzipCompression { + dialOpts = append(dialOpts, grpc.WithDefaultCallOptions(grpc.UseCompressor(gzip.Name))) + } + // Reconnection period + if cfg.reconnectionPeriod.Value != 0 { + p := grpc.ConnectParams{ + Backoff: backoff.DefaultConfig, + MinConnectTimeout: cfg.reconnectionPeriod.Value, + } + dialOpts = append(dialOpts, grpc.WithConnectParams(p)) + } + + return dialOpts +} + +// retryable returns if err identifies a request that can be retried and a +// duration to wait for if an explicit throttle time is included in err. +func retryable(err error) (bool, time.Duration) { + s := status.Convert(err) + return retryableGRPCStatus(s) +} + +func retryableGRPCStatus(s *status.Status) (bool, time.Duration) { + switch s.Code() { + case codes.Canceled, + codes.DeadlineExceeded, + codes.Aborted, + codes.OutOfRange, + codes.Unavailable, + codes.DataLoss: + // Additionally, handle RetryInfo. + _, d := throttleDelay(s) + return true, d + case codes.ResourceExhausted: + // Retry only if the server signals that the recovery from resource exhaustion is possible. + return throttleDelay(s) + } + + // Not a retry-able error. + return false, 0 +} + +// throttleDelay returns if the status is RetryInfo +// and the duration to wait for if an explicit throttle time is included. +func throttleDelay(s *status.Status) (bool, time.Duration) { + for _, detail := range s.Details() { + if t, ok := detail.(*errdetails.RetryInfo); ok { + return true, t.RetryDelay.AsDuration() + } + } + return false, 0 } diff --git a/exporters/otlp/otlplog/otlploggrpc/client_test.go b/exporters/otlp/otlplog/otlploggrpc/client_test.go new file mode 100644 index 00000000000..811e94060d4 --- /dev/null +++ b/exporters/otlp/otlplog/otlploggrpc/client_test.go @@ -0,0 +1,227 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package otlploggrpc // import "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc" + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" + "google.golang.org/genproto/googleapis/rpc/errdetails" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/credentials" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/types/known/durationpb" + + collogpb "go.opentelemetry.io/proto/otlp/collector/logs/v1" + + "github.com/stretchr/testify/assert" +) + +func TestThrottleDelay(t *testing.T) { + c := codes.ResourceExhausted + testcases := []struct { + status *status.Status + wantOK bool + wantDuration time.Duration + }{ + { + status: status.New(c, "NoRetryInfo"), + wantOK: false, + wantDuration: 0, + }, + { + status: func() *status.Status { + s, err := status.New(c, "SingleRetryInfo").WithDetails( + &errdetails.RetryInfo{ + RetryDelay: durationpb.New(15 * time.Millisecond), + }, + ) + require.NoError(t, err) + return s + }(), + wantOK: true, + wantDuration: 15 * time.Millisecond, + }, + { + status: func() *status.Status { + s, err := status.New(c, "ErrorInfo").WithDetails( + &errdetails.ErrorInfo{Reason: "no throttle detail"}, + ) + require.NoError(t, err) + return s + }(), + wantOK: false, + wantDuration: 0, + }, + { + status: func() *status.Status { + s, err := status.New(c, "ErrorAndRetryInfo").WithDetails( + &errdetails.ErrorInfo{Reason: "with throttle detail"}, + &errdetails.RetryInfo{ + RetryDelay: durationpb.New(13 * time.Minute), + }, + ) + require.NoError(t, err) + return s + }(), + wantOK: true, + wantDuration: 13 * time.Minute, + }, + { + status: func() *status.Status { + s, err := status.New(c, "DoubleRetryInfo").WithDetails( + &errdetails.RetryInfo{ + RetryDelay: durationpb.New(13 * time.Minute), + }, + &errdetails.RetryInfo{ + RetryDelay: durationpb.New(15 * time.Minute), + }, + ) + require.NoError(t, err) + return s + }(), + wantOK: true, + wantDuration: 13 * time.Minute, + }, + } + + for _, tc := range testcases { + t.Run(tc.status.Message(), func(t *testing.T) { + ok, d := throttleDelay(tc.status) + assert.Equal(t, tc.wantOK, ok) + assert.Equal(t, tc.wantDuration, d) + }) + } +} + +func TestRetryable(t *testing.T) { + retryableCodes := map[codes.Code]bool{ + codes.OK: false, + codes.Canceled: true, + codes.Unknown: false, + codes.InvalidArgument: false, + codes.DeadlineExceeded: true, + codes.NotFound: false, + codes.AlreadyExists: false, + codes.PermissionDenied: false, + codes.ResourceExhausted: false, + codes.FailedPrecondition: false, + codes.Aborted: true, + codes.OutOfRange: true, + codes.Unimplemented: false, + codes.Internal: false, + codes.Unavailable: true, + codes.DataLoss: true, + codes.Unauthenticated: false, + } + + for c, want := range retryableCodes { + got, _ := retryable(status.Error(c, "")) + assert.Equalf(t, want, got, "evaluate(%s)", c) + } +} + +func TestRetryableGRPCStatusResourceExhaustedWithRetryInfo(t *testing.T) { + delay := 15 * time.Millisecond + s, err := status.New(codes.ResourceExhausted, "WithRetryInfo").WithDetails( + &errdetails.RetryInfo{ + RetryDelay: durationpb.New(delay), + }, + ) + require.NoError(t, err) + + ok, d := retryableGRPCStatus(s) + assert.True(t, ok) + assert.Equal(t, delay, d) +} + +func TestNewClient(t *testing.T) { + newGRPCClientSwap := newGRPCClient + t.Cleanup(func() { + newGRPCClient = newGRPCClientSwap + }) + + // The gRPC connection created by newClient. + conn, err := grpc.NewClient("test", grpc.WithTransportCredentials(insecure.NewCredentials())) + require.NoError(t, err) + newGRPCClient = func(target string, opts ...grpc.DialOption) (*grpc.ClientConn, error) { + return conn, nil + } + + // The gRPC connection created by users. + userConn, err := grpc.NewClient("test 2", grpc.WithTransportCredentials(insecure.NewCredentials())) + require.NoError(t, err) + + testCases := []struct { + name string + cfg config + cli *client + }{ + { + name: "empty config", + cli: &client{ + ourConn: true, + conn: conn, + lsc: collogpb.NewLogsServiceClient(conn), + }, + }, + { + name: "with headers", + cfg: config{ + headers: newSetting(map[string]string{ + "key": "value", + }), + }, + cli: &client{ + ourConn: true, + conn: conn, + lsc: collogpb.NewLogsServiceClient(conn), + metadata: map[string][]string{"key": {"value"}}, + }, + }, + { + name: "with gRPC connection", + cfg: config{ + gRPCConn: newSetting(userConn), + }, + cli: &client{ + ourConn: false, + conn: userConn, + lsc: collogpb.NewLogsServiceClient(userConn), + }, + }, + { + // It is not possible to compare grpc dial options directly, so we just check that the client is created + // and no panic occurs. + name: "with dial options", + cfg: config{ + serviceConfig: newSetting("service config"), + gRPCCredentials: newSetting(credentials.NewTLS(nil)), + compression: newSetting(GzipCompression), + reconnectionPeriod: newSetting(10 * time.Second), + }, + cli: &client{ + ourConn: true, + conn: conn, + lsc: collogpb.NewLogsServiceClient(conn), + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + cli, err := newClient(tc.cfg) + require.NoError(t, err) + + assert.Equal(t, tc.cli.metadata, cli.metadata) + assert.Equal(t, tc.cli.exportTimeout, cli.exportTimeout) + assert.Equal(t, tc.cli.ourConn, cli.ourConn) + assert.Equal(t, tc.cli.conn, cli.conn) + assert.Equal(t, tc.cli.lsc, cli.lsc) + }) + } +} diff --git a/exporters/otlp/otlplog/otlploggrpc/go.mod b/exporters/otlp/otlplog/otlploggrpc/go.mod index 498a9e2a20b..6a9b2f17d9c 100644 --- a/exporters/otlp/otlplog/otlploggrpc/go.mod +++ b/exporters/otlp/otlplog/otlploggrpc/go.mod @@ -7,6 +7,8 @@ require ( github.com/stretchr/testify v1.9.0 go.opentelemetry.io/otel v1.27.0 go.opentelemetry.io/otel/sdk/log v0.3.0 + go.opentelemetry.io/proto/otlp v1.3.1 + google.golang.org/genproto/googleapis/rpc v0.0.0-20240617180043-68d350f18fd4 google.golang.org/grpc v1.64.0 ) @@ -14,7 +16,10 @@ require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/go-logr/logr v1.4.2 // indirect github.com/go-logr/stdr v1.2.2 // indirect + github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0 // indirect + github.com/kr/text v0.2.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/rogpeppe/go-internal v1.12.0 // indirect go.opentelemetry.io/otel/log v0.3.0 // indirect go.opentelemetry.io/otel/metric v1.27.0 // indirect go.opentelemetry.io/otel/sdk v1.27.0 // indirect @@ -22,7 +27,7 @@ require ( golang.org/x/net v0.26.0 // indirect golang.org/x/sys v0.21.0 // indirect golang.org/x/text v0.16.0 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20240617180043-68d350f18fd4 // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20240513163218-0867130af1f8 // indirect google.golang.org/protobuf v1.34.2 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/exporters/otlp/otlplog/otlploggrpc/go.sum b/exporters/otlp/otlplog/otlploggrpc/go.sum index c6bfd58e0a3..483f819cb14 100644 --- a/exporters/otlp/otlplog/otlploggrpc/go.sum +++ b/exporters/otlp/otlplog/otlploggrpc/go.sum @@ -1,5 +1,6 @@ github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8= github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= @@ -9,23 +10,36 @@ github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0 h1:bkypFPDjIYGfCYD5mRBvpqxfYX1YCS1PXdKYWi8FsN0= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0/go.mod h1:P+Lt/0by1T8bfcF3z737NnSbmxQAppXMRziHUxPOC8k= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8= +github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +go.opentelemetry.io/proto/otlp v1.3.1 h1:TrMUixzpM0yuc/znrFTP9MMRh8trP93mkCiDVeXrui0= +go.opentelemetry.io/proto/otlp v1.3.1/go.mod h1:0X1WI4de4ZsLrrJNLAQbFeLCm3T7yBkR0XqQ7niQU+8= golang.org/x/net v0.26.0 h1:soB7SVo0PWrY4vPW/+ay0jKDNScG2X9wFeYlXIvJsOQ= golang.org/x/net v0.26.0/go.mod h1:5YKkiSynbBIh3p6iOc/vibscux0x38BZDkn8sCUPxHE= golang.org/x/sys v0.21.0 h1:rF+pYz3DAGSQAxAu1CbC7catZg4ebC4UIeIhKxBZvws= golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4= golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI= +google.golang.org/genproto/googleapis/api v0.0.0-20240513163218-0867130af1f8 h1:W5Xj/70xIA4x60O/IFyXivR5MGqblAb8R3w26pnD6No= +google.golang.org/genproto/googleapis/api v0.0.0-20240513163218-0867130af1f8/go.mod h1:vPrPUTsDCYxXWjP7clS81mZ6/803D8K4iM9Ma27VKas= google.golang.org/genproto/googleapis/rpc v0.0.0-20240617180043-68d350f18fd4 h1:Di6ANFilr+S60a4S61ZM00vLdw0IrQOSMS2/6mrnOU0= google.golang.org/genproto/googleapis/rpc v0.0.0-20240617180043-68d350f18fd4/go.mod h1:Ue6ibwXGpU+dqIcODieyLOcgj7z8+IcskoNIgZxtrFY= google.golang.org/grpc v1.64.0 h1:KH3VH9y/MgNQg1dE7b3XfVK0GsPSIzJwdF617gUSbvY= google.golang.org/grpc v1.64.0/go.mod h1:oxjF8E3FBnjp+/gVFYdWacaLDx9na1aqy9oovLpxQYg= google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg= google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= -gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=