Skip to content

Commit

Permalink
Add new method for otlploggrpc gRPC client
Browse files Browse the repository at this point in the history
  • Loading branch information
XSAM committed Jun 19, 2024
1 parent ffe855d commit abd674b
Show file tree
Hide file tree
Showing 4 changed files with 383 additions and 5 deletions.
138 changes: 135 additions & 3 deletions exporters/otlp/otlplog/otlploggrpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,144 @@

package otlploggrpc // import "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc"

import (
"time"

"google.golang.org/genproto/googleapis/rpc/errdetails"
"google.golang.org/grpc"
"google.golang.org/grpc/backoff"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/encoding/gzip"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"

"go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc/internal/retry"
collogpb "go.opentelemetry.io/proto/otlp/collector/logs/v1"
)

// The methods of this type are not expected to be called concurrently.
type client struct {
// TODO: implement.
metadata metadata.MD
exportTimeout time.Duration
requestFunc retry.RequestFunc

// ourConn keeps track of where conn was created: true if created here in
// NewClient, or false if passed with an option. This is important on
// Shutdown as conn should only be closed if we created it. Otherwise,
// it is up to the processes that passed conn to close it.
ourConn bool
conn *grpc.ClientConn
lsc collogpb.LogsServiceClient
}

// Used for testing.
var newGRPCClient = grpc.NewClient

// newClient creates a new gRPC log client.
func newClient(cfg config) (*client, error) {
// TODO: implement.
return &client{}, nil
c := &client{
exportTimeout: cfg.timeout.Value,
requestFunc: cfg.retryCfg.Value.RequestFunc(retryable),
conn: cfg.gRPCConn.Value,
}

if len(cfg.headers.Value) > 0 {
c.metadata = metadata.New(cfg.headers.Value)
}

if c.conn == nil {
// If the caller did not provide a ClientConn when the client was
// created, create one using the configuration they did provide.
dialOpts := newGRPCDialOptions(cfg)

conn, err := newGRPCClient(cfg.endpoint.Value, dialOpts...)
if err != nil {
return nil, err
}
// Keep track that we own the lifecycle of this conn and need to close
// it on Shutdown.
c.ourConn = true
c.conn = conn
}

c.lsc = collogpb.NewLogsServiceClient(c.conn)

return c, nil
}

func newGRPCDialOptions(cfg config) []grpc.DialOption {
userAgent := "OTel Go OTLP over gRPC logs exporter/" + Version()
dialOpts := []grpc.DialOption{grpc.WithUserAgent(userAgent)}
dialOpts = append(dialOpts, cfg.dialOptions.Value...)

// Convert other grpc configs to the dial options.
// Service config
if cfg.serviceConfig.Value != "" {
dialOpts = append(dialOpts, grpc.WithDefaultServiceConfig(cfg.serviceConfig.Value))
}
// Prioritize GRPCCredentials over Insecure (passing both is an error).
if cfg.gRPCCredentials.Value != nil {
dialOpts = append(dialOpts, grpc.WithTransportCredentials(cfg.gRPCCredentials.Value))
} else if cfg.insecure.Value {
dialOpts = append(dialOpts, grpc.WithTransportCredentials(insecure.NewCredentials()))
} else {
// Default to using the host's root CA.
dialOpts = append(dialOpts, grpc.WithTransportCredentials(
credentials.NewTLS(nil),
))
}
// Compression
if cfg.compression.Value == GzipCompression {
dialOpts = append(dialOpts, grpc.WithDefaultCallOptions(grpc.UseCompressor(gzip.Name)))
}
// Reconnection period
if cfg.reconnectionPeriod.Value != 0 {
p := grpc.ConnectParams{
Backoff: backoff.DefaultConfig,
MinConnectTimeout: cfg.reconnectionPeriod.Value,
}
dialOpts = append(dialOpts, grpc.WithConnectParams(p))
}

return dialOpts
}

// retryable returns if err identifies a request that can be retried and a
// duration to wait for if an explicit throttle time is included in err.
func retryable(err error) (bool, time.Duration) {
s := status.Convert(err)
return retryableGRPCStatus(s)
}

func retryableGRPCStatus(s *status.Status) (bool, time.Duration) {
switch s.Code() {
case codes.Canceled,
codes.DeadlineExceeded,
codes.Aborted,
codes.OutOfRange,
codes.Unavailable,
codes.DataLoss:
// Additionally, handle RetryInfo.
_, d := throttleDelay(s)
return true, d
case codes.ResourceExhausted:
// Retry only if the server signals that the recovery from resource exhaustion is possible.
return throttleDelay(s)
}

// Not a retry-able error.
return false, 0
}

// throttleDelay returns if the status is RetryInfo
// and the duration to wait for if an explicit throttle time is included.
func throttleDelay(s *status.Status) (bool, time.Duration) {
for _, detail := range s.Details() {
if t, ok := detail.(*errdetails.RetryInfo); ok {
return true, t.RetryDelay.AsDuration()
}
}
return false, 0
}
227 changes: 227 additions & 0 deletions exporters/otlp/otlplog/otlploggrpc/client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package otlploggrpc // import "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc"

import (
"testing"
"time"

"github.com/stretchr/testify/require"
"google.golang.org/genproto/googleapis/rpc/errdetails"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/durationpb"

collogpb "go.opentelemetry.io/proto/otlp/collector/logs/v1"

"github.com/stretchr/testify/assert"
)

func TestThrottleDelay(t *testing.T) {
c := codes.ResourceExhausted
testcases := []struct {
status *status.Status
wantOK bool
wantDuration time.Duration
}{
{
status: status.New(c, "NoRetryInfo"),
wantOK: false,
wantDuration: 0,
},
{
status: func() *status.Status {
s, err := status.New(c, "SingleRetryInfo").WithDetails(
&errdetails.RetryInfo{
RetryDelay: durationpb.New(15 * time.Millisecond),
},
)
require.NoError(t, err)
return s
}(),
wantOK: true,
wantDuration: 15 * time.Millisecond,
},
{
status: func() *status.Status {
s, err := status.New(c, "ErrorInfo").WithDetails(
&errdetails.ErrorInfo{Reason: "no throttle detail"},
)
require.NoError(t, err)
return s
}(),
wantOK: false,
wantDuration: 0,
},
{
status: func() *status.Status {
s, err := status.New(c, "ErrorAndRetryInfo").WithDetails(
&errdetails.ErrorInfo{Reason: "with throttle detail"},
&errdetails.RetryInfo{
RetryDelay: durationpb.New(13 * time.Minute),
},
)
require.NoError(t, err)
return s
}(),
wantOK: true,
wantDuration: 13 * time.Minute,
},
{
status: func() *status.Status {
s, err := status.New(c, "DoubleRetryInfo").WithDetails(
&errdetails.RetryInfo{
RetryDelay: durationpb.New(13 * time.Minute),
},
&errdetails.RetryInfo{
RetryDelay: durationpb.New(15 * time.Minute),
},
)
require.NoError(t, err)
return s
}(),
wantOK: true,
wantDuration: 13 * time.Minute,
},
}

for _, tc := range testcases {
t.Run(tc.status.Message(), func(t *testing.T) {
ok, d := throttleDelay(tc.status)
assert.Equal(t, tc.wantOK, ok)
assert.Equal(t, tc.wantDuration, d)
})
}
}

func TestRetryable(t *testing.T) {
retryableCodes := map[codes.Code]bool{
codes.OK: false,
codes.Canceled: true,
codes.Unknown: false,
codes.InvalidArgument: false,
codes.DeadlineExceeded: true,
codes.NotFound: false,
codes.AlreadyExists: false,
codes.PermissionDenied: false,
codes.ResourceExhausted: false,
codes.FailedPrecondition: false,
codes.Aborted: true,
codes.OutOfRange: true,
codes.Unimplemented: false,
codes.Internal: false,
codes.Unavailable: true,
codes.DataLoss: true,
codes.Unauthenticated: false,
}

for c, want := range retryableCodes {
got, _ := retryable(status.Error(c, ""))
assert.Equalf(t, want, got, "evaluate(%s)", c)
}
}

func TestRetryableGRPCStatusResourceExhaustedWithRetryInfo(t *testing.T) {
delay := 15 * time.Millisecond
s, err := status.New(codes.ResourceExhausted, "WithRetryInfo").WithDetails(
&errdetails.RetryInfo{
RetryDelay: durationpb.New(delay),
},
)
require.NoError(t, err)

ok, d := retryableGRPCStatus(s)
assert.True(t, ok)
assert.Equal(t, delay, d)
}

func TestNewClient(t *testing.T) {
newGRPCClientSwap := newGRPCClient
t.Cleanup(func() {
newGRPCClient = newGRPCClientSwap
})

// The gRPC connection created by newClient.
conn, err := grpc.NewClient("test", grpc.WithTransportCredentials(insecure.NewCredentials()))
require.NoError(t, err)
newGRPCClient = func(target string, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
return conn, nil
}

// The gRPC connection created by users.
userConn, err := grpc.NewClient("test 2", grpc.WithTransportCredentials(insecure.NewCredentials()))
require.NoError(t, err)

testCases := []struct {
name string
cfg config
cli *client
}{
{
name: "empty config",
cli: &client{
ourConn: true,
conn: conn,
lsc: collogpb.NewLogsServiceClient(conn),
},
},
{
name: "with headers",
cfg: config{
headers: newSetting(map[string]string{
"key": "value",
}),
},
cli: &client{
ourConn: true,
conn: conn,
lsc: collogpb.NewLogsServiceClient(conn),
metadata: map[string][]string{"key": {"value"}},
},
},
{
name: "with gRPC connection",
cfg: config{
gRPCConn: newSetting(userConn),
},
cli: &client{
ourConn: false,
conn: userConn,
lsc: collogpb.NewLogsServiceClient(userConn),
},
},
{
// It is not possible to compare grpc dial options directly, so we just check that the client is created
// and no panic occurs.
name: "with dial options",
cfg: config{
serviceConfig: newSetting("service config"),
gRPCCredentials: newSetting(credentials.NewTLS(nil)),
compression: newSetting(GzipCompression),
reconnectionPeriod: newSetting(10 * time.Second),
},
cli: &client{
ourConn: true,
conn: conn,
lsc: collogpb.NewLogsServiceClient(conn),
},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
cli, err := newClient(tc.cfg)
require.NoError(t, err)

assert.Equal(t, tc.cli.metadata, cli.metadata)
assert.Equal(t, tc.cli.exportTimeout, cli.exportTimeout)
assert.Equal(t, tc.cli.ourConn, cli.ourConn)
assert.Equal(t, tc.cli.conn, cli.conn)
assert.Equal(t, tc.cli.lsc, cli.lsc)
})
}
}

0 comments on commit abd674b

Please sign in to comment.