From dace7b6eaff2ae39c177eccb430d92729a4159b3 Mon Sep 17 00:00:00 2001 From: Sam Xie Date: Thu, 13 Jun 2024 09:13:08 -0700 Subject: [PATCH] Add implementation of otlploggrpc configuration (#5383) part of #5056 Most of the codes are copied from `otlploghttp`. I will try to make `internal/conf` as a shared go template file so `otlploghttp` can use a shared setting struct with `otlploggrpc` in the following PRs. --- exporters/otlp/otlplog/otlploggrpc/config.go | 455 ++++++++++++++++-- .../otlp/otlplog/otlploggrpc/config_test.go | 389 +++++++++++++++ exporters/otlp/otlplog/otlploggrpc/go.mod | 2 +- 3 files changed, 812 insertions(+), 34 deletions(-) create mode 100644 exporters/otlp/otlplog/otlploggrpc/config_test.go diff --git a/exporters/otlp/otlplog/otlploggrpc/config.go b/exporters/otlp/otlplog/otlploggrpc/config.go index 25635aabdaa..37220acabfe 100644 --- a/exporters/otlp/otlplog/otlploggrpc/config.go +++ b/exporters/otlp/otlplog/otlploggrpc/config.go @@ -4,28 +4,130 @@ package otlploggrpc // import "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc" import ( + "crypto/tls" + "crypto/x509" + "errors" + "fmt" + "net/url" + "os" + "strconv" + "strings" "time" "google.golang.org/grpc" "google.golang.org/grpc/credentials" + "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc/internal/retry" + "go.opentelemetry.io/otel/internal/global" ) +// Default values. +var ( + defaultEndpoint = "localhost:4317" + defaultTimeout = 10 * time.Second + defaultRetryCfg = retry.DefaultConfig +) + +// Environment variable keys. +var ( + envEndpoint = []string{ + "OTEL_EXPORTER_OTLP_LOGS_ENDPOINT", + "OTEL_EXPORTER_OTLP_ENDPOINT", + } + envInsecure = envEndpoint + + envHeaders = []string{ + "OTEL_EXPORTER_OTLP_LOGS_HEADERS", + "OTEL_EXPORTER_OTLP_HEADERS", + } + + envCompression = []string{ + "OTEL_EXPORTER_OTLP_LOGS_COMPRESSION", + "OTEL_EXPORTER_OTLP_COMPRESSION", + } + + envTimeout = []string{ + "OTEL_EXPORTER_OTLP_LOGS_TIMEOUT", + "OTEL_EXPORTER_OTLP_TIMEOUT", + } + + envTLSCert = []string{ + "OTEL_EXPORTER_OTLP_LOGS_CERTIFICATE", + "OTEL_EXPORTER_OTLP_CERTIFICATE", + } + envTLSClient = []struct { + Certificate string + Key string + }{ + { + "OTEL_EXPORTER_OTLP_LOGS_CLIENT_CERTIFICATE", + "OTEL_EXPORTER_OTLP_LOGS_CLIENT_KEY", + }, + { + "OTEL_EXPORTER_OTLP_CLIENT_CERTIFICATE", + "OTEL_EXPORTER_OTLP_CLIENT_KEY", + }, + } +) + +type fnOpt func(config) config + +func (f fnOpt) applyOption(c config) config { return f(c) } + // Option applies an option to the Exporter. type Option interface { - applyHTTPOption(config) config + applyOption(config) config } type config struct { - // TODO: implement. + endpoint setting[string] + insecure setting[bool] + tlsCfg setting[*tls.Config] + headers setting[map[string]string] + compression setting[Compression] + timeout setting[time.Duration] + retryCfg setting[retry.Config] + + // gRPC configurations + gRPCCredentials setting[credentials.TransportCredentials] + serviceConfig setting[string] + reconnectionPeriod setting[time.Duration] + dialOptions setting[[]grpc.DialOption] + gRPCConn setting[*grpc.ClientConn] } func newConfig(options []Option) config { var c config for _, opt := range options { - c = opt.applyHTTPOption(c) + c = opt.applyOption(c) } + + // Apply environment value and default value + c.endpoint = c.endpoint.Resolve( + getEnv[string](envEndpoint, convEndpoint), + fallback[string](defaultEndpoint), + ) + c.insecure = c.insecure.Resolve( + getEnv[bool](envInsecure, convInsecure), + ) + c.tlsCfg = c.tlsCfg.Resolve( + loadEnvTLS[*tls.Config](), + ) + c.headers = c.headers.Resolve( + getEnv[map[string]string](envHeaders, convHeaders), + ) + c.compression = c.compression.Resolve( + getEnv[Compression](envCompression, convCompression), + ) + c.timeout = c.timeout.Resolve( + getEnv[time.Duration](envTimeout, convDuration), + fallback[time.Duration](defaultTimeout), + ) + c.retryCfg = c.retryCfg.Resolve( + fallback[retry.Config](defaultRetryCfg), + ) + return c } @@ -51,8 +153,10 @@ type RetryConfig retry.Config // // This option has no effect if WithGRPCConn is used. func WithInsecure() Option { - // TODO: implement. - return nil + return fnOpt(func(c config) config { + c.insecure = newSetting(true) + return c + }) } // WithEndpoint sets the target endpoint the Exporter will connect to. @@ -70,8 +174,10 @@ func WithInsecure() Option { // // This option has no effect if WithGRPCConn is used. func WithEndpoint(endpoint string) Option { - // TODO: implement. - return nil + return fnOpt(func(c config) config { + c.endpoint = newSetting(endpoint) + return c + }) } // WithEndpointURL sets the target endpoint URL the Exporter will connect to. @@ -90,9 +196,21 @@ func WithEndpoint(endpoint string) Option { // passed, "localhost:4317" will be used. // // This option has no effect if WithGRPCConn is used. -func WithEndpointURL(u string) Option { - // TODO: implement. - return nil +func WithEndpointURL(rawURL string) Option { + u, err := url.Parse(rawURL) + if err != nil { + global.Error(err, "otlplog: parse endpoint url", "url", rawURL) + return fnOpt(func(c config) config { return c }) + } + return fnOpt(func(c config) config { + c.endpoint = newSetting(u.Host) + if u.Scheme != "https" { + c.insecure = newSetting(true) + } else { + c.insecure = newSetting(false) + } + return c + }) } // WithReconnectionPeriod set the minimum amount of time between connection @@ -100,10 +218,22 @@ func WithEndpointURL(u string) Option { // // This option has no effect if WithGRPCConn is used. func WithReconnectionPeriod(rp time.Duration) Option { - // TODO: implement. - return nil + return fnOpt(func(c config) config { + c.reconnectionPeriod = newSetting(rp) + return c + }) } +// Compression describes the compression used for exported payloads. +type Compression int + +const ( + // NoCompression represents that no compression should be used. + NoCompression Compression = iota + // GzipCompression represents that gzip compression should be used. + GzipCompression +) + // WithCompressor sets the compressor the gRPC client uses. // Supported compressor values: "gzip". // @@ -114,12 +244,14 @@ func WithReconnectionPeriod(rp time.Duration) Option { // OTEL_EXPORTER_OTLP_LOGS_COMPRESSION will take precedence. // // By default, if an environment variable is not set, and this option is not -// passed, no compressor will be used. +// passed, no compression strategy will be used. // // This option has no effect if WithGRPCConn is used. func WithCompressor(compressor string) Option { - // TODO: implement. - return nil + return fnOpt(func(c config) config { + c.compression = newSetting(compressorToCompression(compressor)) + return c + }) } // WithHeaders will send the provided headers with each gRPC requests. @@ -134,8 +266,10 @@ func WithCompressor(compressor string) Option { // By default, if an environment variable is not set, and this option is not // passed, no user headers will be set. func WithHeaders(headers map[string]string) Option { - // TODO: implement. - return nil + return fnOpt(func(c config) config { + c.headers = newSetting(headers) + return c + }) } // WithTLSCredentials sets the gRPC connection to use creds. @@ -150,17 +284,21 @@ func WithHeaders(headers map[string]string) Option { // passed, no TLS credentials will be used. // // This option has no effect if WithGRPCConn is used. -func WithTLSCredentials(_ credentials.TransportCredentials) Option { - // TODO: implement. - return nil +func WithTLSCredentials(credential credentials.TransportCredentials) Option { + return fnOpt(func(c config) config { + c.gRPCCredentials = newSetting(credential) + return c + }) } // WithServiceConfig defines the default gRPC service config used. // // This option has no effect if WithGRPCConn is used. func WithServiceConfig(serviceConfig string) Option { - // TODO: implement. - return nil + return fnOpt(func(c config) config { + c.serviceConfig = newSetting(serviceConfig) + return c + }) } // WithDialOption sets explicit grpc.DialOptions to use when establishing a @@ -171,9 +309,11 @@ func WithServiceConfig(serviceConfig string) Option { // grpc.DialOptions are ignored. // // This option has no effect if WithGRPCConn is used. -func WithDialOption(_ ...grpc.DialOption) Option { - // TODO: implement. - return nil +func WithDialOption(opts ...grpc.DialOption) Option { + return fnOpt(func(c config) config { + c.dialOptions = newSetting(opts) + return c + }) } // WithGRPCConn sets conn as the gRPC ClientConn used for all communication. @@ -184,9 +324,11 @@ func WithDialOption(_ ...grpc.DialOption) Option { // // It is the callers responsibility to close the passed conn. The Exporter // Shutdown method will not close this connection. -func WithGRPCConn(_ *grpc.ClientConn) Option { - // TODO: implement. - return nil +func WithGRPCConn(conn *grpc.ClientConn) Option { + return fnOpt(func(c config) config { + c.gRPCConn = newSetting(conn) + return c + }) } // WithTimeout sets the max amount of time an Exporter will attempt an export. @@ -204,8 +346,10 @@ func WithGRPCConn(_ *grpc.ClientConn) Option { // By default, if an environment variable is not set, and this option is not // passed, a timeout of 10 seconds will be used. func WithTimeout(duration time.Duration) Option { - // TODO: implement. - return nil + return fnOpt(func(c config) config { + c.timeout = newSetting(duration) + return c + }) } // WithRetry sets the retry policy for transient retryable errors that are @@ -221,7 +365,252 @@ func WithTimeout(duration time.Duration) Option { // If unset, the default retry policy will be used. It will retry the export // 5 seconds after receiving a retryable error and increase exponentially // after each error for no more than a total time of 1 minute. -func WithRetry(settings RetryConfig) Option { - // TODO: implement. - return nil +func WithRetry(rc RetryConfig) Option { + return fnOpt(func(c config) config { + c.retryCfg = newSetting(retry.Config(rc)) + return c + }) +} + +// convCompression returns the parsed compression encoded in s. NoCompression +// and an errors are returned if s is unknown. +func convCompression(s string) (Compression, error) { + switch s { + case "gzip": + return GzipCompression, nil + case "none", "": + return NoCompression, nil + } + return NoCompression, fmt.Errorf("unknown compression: %s", s) +} + +// convEndpoint converts s from a URL string to an endpoint if s is a valid +// URL. Otherwise, "" and an error are returned. +func convEndpoint(s string) (string, error) { + u, err := url.Parse(s) + if err != nil { + return "", err + } + return u.Host, nil +} + +// convInsecure parses s as a URL string and returns if the connection should +// use client transport security or not. If s is an invalid URL, false and an +// error are returned. +func convInsecure(s string) (bool, error) { + u, err := url.Parse(s) + if err != nil { + return false, err + } + return u.Scheme != "https", nil +} + +// convHeaders converts the OTel environment variable header value s into a +// mapping of header key to value. If s is invalid a partial result and error +// are returned. +func convHeaders(s string) (map[string]string, error) { + out := make(map[string]string) + var err error + for _, header := range strings.Split(s, ",") { + rawKey, rawVal, found := strings.Cut(header, "=") + if !found { + err = errors.Join(err, fmt.Errorf("invalid header: %s", header)) + continue + } + + escKey, e := url.PathUnescape(rawKey) + if e != nil { + err = errors.Join(err, fmt.Errorf("invalid header key: %s", rawKey)) + continue + } + key := strings.TrimSpace(escKey) + + escVal, e := url.PathUnescape(rawVal) + if e != nil { + err = errors.Join(err, fmt.Errorf("invalid header value: %s", rawVal)) + continue + } + val := strings.TrimSpace(escVal) + + out[key] = val + } + return out, err +} + +// convDuration converts s into a duration of milliseconds. If s does not +// contain an integer, 0 and an error are returned. +func convDuration(s string) (time.Duration, error) { + d, err := strconv.Atoi(s) + if err != nil { + return 0, err + } + // OTel durations are defined in milliseconds. + return time.Duration(d) * time.Millisecond, nil +} + +// loadEnvTLS returns a resolver that loads a *tls.Config from files defeind by +// the OTLP TLS environment variables. This will load both the rootCAs and +// certificates used for mTLS. +// +// If the filepath defined is invalid or does not contain valid TLS files, an +// error is passed to the OTel ErrorHandler and no TLS configuration is +// provided. +func loadEnvTLS[T *tls.Config]() resolver[T] { + return func(s setting[T]) setting[T] { + if s.Set { + // Passed, valid, options have precedence. + return s + } + + var rootCAs *x509.CertPool + var err error + for _, key := range envTLSCert { + if v := os.Getenv(key); v != "" { + rootCAs, err = loadCertPool(v) + break + } + } + + var certs []tls.Certificate + for _, pair := range envTLSClient { + cert := os.Getenv(pair.Certificate) + key := os.Getenv(pair.Key) + if cert != "" && key != "" { + var e error + certs, e = loadCertificates(cert, key) + err = errors.Join(err, e) + break + } + } + + if err != nil { + err = fmt.Errorf("failed to load TLS: %w", err) + otel.Handle(err) + } else if rootCAs != nil || certs != nil { + s.Set = true + s.Value = &tls.Config{RootCAs: rootCAs, Certificates: certs} + } + return s + } +} + +// readFile is used for testing. +var readFile = os.ReadFile + +// loadCertPool loads and returns the *x509.CertPool found at path if it exists +// and is valid. Otherwise, nil and an error is returned. +func loadCertPool(path string) (*x509.CertPool, error) { + b, err := readFile(path) + if err != nil { + return nil, err + } + cp := x509.NewCertPool() + if ok := cp.AppendCertsFromPEM(b); !ok { + return nil, errors.New("certificate not added") + } + return cp, nil +} + +// loadCertificates loads and returns the tls.Certificate found at path if it +// exists and is valid. Otherwise, nil and an error is returned. +func loadCertificates(certPath, keyPath string) ([]tls.Certificate, error) { + cert, err := readFile(certPath) + if err != nil { + return nil, err + } + key, err := readFile(keyPath) + if err != nil { + return nil, err + } + crt, err := tls.X509KeyPair(cert, key) + if err != nil { + return nil, err + } + return []tls.Certificate{crt}, nil +} + +func compressorToCompression(compressor string) Compression { + c, err := convCompression(compressor) + if err != nil { + otel.Handle(fmt.Errorf("%s, using no compression as default", err)) + return NoCompression + } + + return c +} + +// setting is a configuration setting value. +type setting[T any] struct { + Value T + Set bool +} + +// newSetting returns a new setting with the value set. +func newSetting[T any](value T) setting[T] { + return setting[T]{Value: value, Set: true} +} + +// resolver returns an updated setting after applying an resolution operation. +type resolver[T any] func(setting[T]) setting[T] + +// Resolve returns a resolved version of s. +// +// It will apply all the passed fn in the order provided, chaining together the +// return setting to the next input. The setting s is used as the initial +// argument to the first fn. +// +// Each fn needs to validate if it should apply given the Set state of the +// setting. This will not perform any checks on the set state when chaining +// function. +func (s setting[T]) Resolve(fn ...resolver[T]) setting[T] { + for _, f := range fn { + s = f(s) + } + return s +} + +// getEnv returns a resolver that will apply an environment variable value +// associated with the first set key to a setting value. The conv function is +// used to convert between the environment variable value and the setting type. +// +// If the input setting to the resolver is set, the environment variable will +// not be applied. +// +// Any error returned from conv is sent to the OTel ErrorHandler and the +// setting will not be updated. +func getEnv[T any](keys []string, conv func(string) (T, error)) resolver[T] { + return func(s setting[T]) setting[T] { + if s.Set { + // Passed, valid, options have precedence. + return s + } + + for _, key := range keys { + if vStr := os.Getenv(key); vStr != "" { + v, err := conv(vStr) + if err == nil { + s.Value = v + s.Set = true + break + } + otel.Handle(fmt.Errorf("invalid %s value %s: %w", key, vStr, err)) + } + } + return s + } +} + +// fallback returns a resolve that will set a setting value to val if it is not +// already set. +// +// This is usually passed at the end of a resolver chain to ensure a default is +// applied if the setting has not already been set. +func fallback[T any](val T) resolver[T] { + return func(s setting[T]) setting[T] { + if !s.Set { + s.Value = val + s.Set = true + } + return s + } } diff --git a/exporters/otlp/otlplog/otlploggrpc/config_test.go b/exporters/otlp/otlplog/otlploggrpc/config_test.go new file mode 100644 index 00000000000..02817476f5c --- /dev/null +++ b/exporters/otlp/otlplog/otlploggrpc/config_test.go @@ -0,0 +1,389 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package otlploggrpc + +import ( + "crypto/tls" + "crypto/x509" + "errors" + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc/internal/retry" +) + +const ( + weakCertificate = ` +-----BEGIN CERTIFICATE----- +MIIBhzCCASygAwIBAgIRANHpHgAWeTnLZpTSxCKs0ggwCgYIKoZIzj0EAwIwEjEQ +MA4GA1UEChMHb3RlbC1nbzAeFw0yMTA0MDExMzU5MDNaFw0yMTA0MDExNDU5MDNa +MBIxEDAOBgNVBAoTB290ZWwtZ28wWTATBgcqhkjOPQIBBggqhkjOPQMBBwNCAAS9 +nWSkmPCxShxnp43F+PrOtbGV7sNfkbQ/kxzi9Ego0ZJdiXxkmv/C05QFddCW7Y0Z +sJCLHGogQsYnWJBXUZOVo2MwYTAOBgNVHQ8BAf8EBAMCB4AwEwYDVR0lBAwwCgYI +KwYBBQUHAwEwDAYDVR0TAQH/BAIwADAsBgNVHREEJTAjgglsb2NhbGhvc3SHEAAA +AAAAAAAAAAAAAAAAAAGHBH8AAAEwCgYIKoZIzj0EAwIDSQAwRgIhANwZVVKvfvQ/ +1HXsTvgH+xTQswOwSSKYJ1cVHQhqK7ZbAiEAus8NxpTRnp5DiTMuyVmhVNPB+bVH +Lhnm4N/QDk5rek0= +-----END CERTIFICATE----- +` + weakPrivateKey = ` +-----BEGIN PRIVATE KEY----- +MIGHAgEAMBMGByqGSM49AgEGCCqGSM49AwEHBG0wawIBAQQgN8HEXiXhvByrJ1zK +SFT6Y2l2KqDWwWzKf+t4CyWrNKehRANCAAS9nWSkmPCxShxnp43F+PrOtbGV7sNf +kbQ/kxzi9Ego0ZJdiXxkmv/C05QFddCW7Y0ZsJCLHGogQsYnWJBXUZOV +-----END PRIVATE KEY----- +` +) + +func newTLSConf(cert, key []byte) (*tls.Config, error) { + cp := x509.NewCertPool() + if ok := cp.AppendCertsFromPEM(cert); !ok { + return nil, errors.New("failed to append certificate to the cert pool") + } + crt, err := tls.X509KeyPair(cert, key) + if err != nil { + return nil, err + } + crts := []tls.Certificate{crt} + return &tls.Config{RootCAs: cp, Certificates: crts}, nil +} + +func TestNewConfig(t *testing.T) { + orig := readFile + readFile = func() func(name string) ([]byte, error) { + index := map[string][]byte{ + "cert_path": []byte(weakCertificate), + "key_path": []byte(weakPrivateKey), + "invalid_cert": []byte("invalid certificate file."), + "invalid_key": []byte("invalid key file."), + } + return func(name string) ([]byte, error) { + b, ok := index[name] + if !ok { + err := fmt.Errorf("file does not exist: %s", name) + return nil, err + } + return b, nil + } + }() + t.Cleanup(func() { readFile = orig }) + + tlsCfg, err := newTLSConf([]byte(weakCertificate), []byte(weakPrivateKey)) + require.NoError(t, err, "testing TLS config") + + headers := map[string]string{"a": "A"} + rc := retry.Config{} + + dialOptions := []grpc.DialOption{grpc.WithUserAgent("test-agent")} + + testcases := []struct { + name string + options []Option + envars map[string]string + want config + errs []string + }{ + { + name: "Defaults", + want: config{ + endpoint: newSetting(defaultEndpoint), + timeout: newSetting(defaultTimeout), + retryCfg: newSetting(defaultRetryCfg), + }, + }, + { + name: "Options", + options: []Option{ + WithInsecure(), + WithEndpoint("test"), + WithEndpointURL("http://test:8080/path"), + WithReconnectionPeriod(time.Second), + WithCompressor("gzip"), + WithHeaders(headers), + WithTLSCredentials(credentials.NewTLS(tlsCfg)), + WithServiceConfig("{}"), + WithDialOption(dialOptions...), + WithGRPCConn(&grpc.ClientConn{}), + WithTimeout(2 * time.Second), + WithRetry(RetryConfig(rc)), + }, + want: config{ + endpoint: newSetting("test:8080"), + insecure: newSetting(true), + headers: newSetting(headers), + compression: newSetting(GzipCompression), + timeout: newSetting(2 * time.Second), + retryCfg: newSetting(rc), + gRPCCredentials: newSetting(credentials.NewTLS(tlsCfg)), + serviceConfig: newSetting("{}"), + reconnectionPeriod: newSetting(time.Second), + gRPCConn: newSetting(&grpc.ClientConn{}), + dialOptions: newSetting(dialOptions), + }, + }, + { + name: "WithEndpointURL", + options: []Option{ + WithEndpointURL("http://test:8080/path"), + }, + want: config{ + endpoint: newSetting("test:8080"), + insecure: newSetting(true), + timeout: newSetting(defaultTimeout), + retryCfg: newSetting(defaultRetryCfg), + }, + }, + { + name: "EndpointPrecedence", + options: []Option{ + WithEndpointURL("https://test:8080/path"), + WithEndpoint("not-test:9090"), + WithInsecure(), + }, + want: config{ + endpoint: newSetting("not-test:9090"), + insecure: newSetting(true), + timeout: newSetting(defaultTimeout), + retryCfg: newSetting(defaultRetryCfg), + }, + }, + { + name: "EndpointURLPrecedence", + options: []Option{ + WithEndpoint("not-test:9090"), + WithInsecure(), + WithEndpointURL("https://test:8080/path"), + }, + want: config{ + endpoint: newSetting("test:8080"), + insecure: newSetting(false), + timeout: newSetting(defaultTimeout), + retryCfg: newSetting(defaultRetryCfg), + }, + }, + { + name: "LogEnvironmentVariables", + envars: map[string]string{ + "OTEL_EXPORTER_OTLP_LOGS_ENDPOINT": "https://env.endpoint:8080/prefix", + "OTEL_EXPORTER_OTLP_LOGS_HEADERS": "a=A", + "OTEL_EXPORTER_OTLP_LOGS_COMPRESSION": "gzip", + "OTEL_EXPORTER_OTLP_LOGS_TIMEOUT": "15000", + "OTEL_EXPORTER_OTLP_LOGS_CERTIFICATE": "cert_path", + "OTEL_EXPORTER_OTLP_LOGS_CLIENT_CERTIFICATE": "cert_path", + "OTEL_EXPORTER_OTLP_LOGS_CLIENT_KEY": "key_path", + }, + want: config{ + endpoint: newSetting("env.endpoint:8080"), + insecure: newSetting(false), + tlsCfg: newSetting(tlsCfg), + headers: newSetting(headers), + compression: newSetting(GzipCompression), + timeout: newSetting(15 * time.Second), + retryCfg: newSetting(defaultRetryCfg), + }, + }, + { + name: "LogEnpointEnvironmentVariablesDefaultPath", + envars: map[string]string{ + "OTEL_EXPORTER_OTLP_LOGS_ENDPOINT": "http://env.endpoint", + }, + want: config{ + endpoint: newSetting("env.endpoint"), + insecure: newSetting(true), + timeout: newSetting(defaultTimeout), + retryCfg: newSetting(defaultRetryCfg), + }, + }, + { + name: "OTLPEnvironmentVariables", + envars: map[string]string{ + "OTEL_EXPORTER_OTLP_ENDPOINT": "http://env.endpoint:8080/prefix", + "OTEL_EXPORTER_OTLP_HEADERS": "a=A", + "OTEL_EXPORTER_OTLP_COMPRESSION": "none", + "OTEL_EXPORTER_OTLP_TIMEOUT": "15000", + "OTEL_EXPORTER_OTLP_CERTIFICATE": "cert_path", + "OTEL_EXPORTER_OTLP_CLIENT_CERTIFICATE": "cert_path", + "OTEL_EXPORTER_OTLP_CLIENT_KEY": "key_path", + }, + want: config{ + endpoint: newSetting("env.endpoint:8080"), + insecure: newSetting(true), + tlsCfg: newSetting(tlsCfg), + headers: newSetting(headers), + compression: newSetting(NoCompression), + timeout: newSetting(15 * time.Second), + retryCfg: newSetting(defaultRetryCfg), + }, + }, + { + name: "OTLPEnpointEnvironmentVariablesDefaultPath", + envars: map[string]string{ + "OTEL_EXPORTER_OTLP_ENDPOINT": "http://env.endpoint", + }, + want: config{ + endpoint: newSetting("env.endpoint"), + insecure: newSetting(true), + timeout: newSetting(defaultTimeout), + retryCfg: newSetting(defaultRetryCfg), + }, + }, + { + name: "EnvironmentVariablesPrecedence", + envars: map[string]string{ + "OTEL_EXPORTER_OTLP_ENDPOINT": "http://ignored:9090/alt", + "OTEL_EXPORTER_OTLP_HEADERS": "b=B", + "OTEL_EXPORTER_OTLP_COMPRESSION": "none", + "OTEL_EXPORTER_OTLP_TIMEOUT": "30000", + "OTEL_EXPORTER_OTLP_CERTIFICATE": "invalid_cert", + "OTEL_EXPORTER_OTLP_CLIENT_CERTIFICATE": "invalid_cert", + "OTEL_EXPORTER_OTLP_CLIENT_KEY": "invalid_key", + + "OTEL_EXPORTER_OTLP_LOGS_ENDPOINT": "https://env.endpoint:8080/path", + "OTEL_EXPORTER_OTLP_LOGS_HEADERS": "a=A", + "OTEL_EXPORTER_OTLP_LOGS_COMPRESSION": "gzip", + "OTEL_EXPORTER_OTLP_LOGS_TIMEOUT": "15000", + "OTEL_EXPORTER_OTLP_LOGS_CERTIFICATE": "cert_path", + "OTEL_EXPORTER_OTLP_LOGS_CLIENT_CERTIFICATE": "cert_path", + "OTEL_EXPORTER_OTLP_LOGS_CLIENT_KEY": "key_path", + }, + want: config{ + endpoint: newSetting("env.endpoint:8080"), + insecure: newSetting(false), + tlsCfg: newSetting(tlsCfg), + headers: newSetting(headers), + compression: newSetting(GzipCompression), + timeout: newSetting(15 * time.Second), + retryCfg: newSetting(defaultRetryCfg), + }, + }, + { + name: "OptionsPrecedence", + envars: map[string]string{ + "OTEL_EXPORTER_OTLP_ENDPOINT": "http://ignored:9090/alt", + "OTEL_EXPORTER_OTLP_HEADERS": "b=B", + "OTEL_EXPORTER_OTLP_COMPRESSION": "none", + "OTEL_EXPORTER_OTLP_TIMEOUT": "30000", + "OTEL_EXPORTER_OTLP_CERTIFICATE": "invalid_cert", + "OTEL_EXPORTER_OTLP_CLIENT_CERTIFICATE": "invalid_cert", + "OTEL_EXPORTER_OTLP_CLIENT_KEY": "invalid_key", + + "OTEL_EXPORTER_OTLP_LOGS_ENDPOINT": "https://env.endpoint:8080/prefix", + "OTEL_EXPORTER_OTLP_LOGS_HEADERS": "a=A", + "OTEL_EXPORTER_OTLP_LOGS_COMPRESSION": "gzip", + "OTEL_EXPORTER_OTLP_LOGS_TIMEOUT": "15000", + "OTEL_EXPORTER_OTLP_LOGS_CERTIFICATE": "cert_path", + "OTEL_EXPORTER_OTLP_LOGS_CLIENT_CERTIFICATE": "cert_path", + "OTEL_EXPORTER_OTLP_LOGS_CLIENT_KEY": "key_path", + }, + options: []Option{ + WithEndpoint("foo"), + WithEndpointURL("https://test/path"), + WithInsecure(), + WithTLSCredentials(credentials.NewTLS(tlsCfg)), + WithCompressor("gzip"), + WithHeaders(headers), + WithTimeout(time.Second), + WithRetry(RetryConfig(rc)), + }, + want: config{ + endpoint: newSetting("test"), + insecure: newSetting(true), + tlsCfg: newSetting(tlsCfg), + headers: newSetting(headers), + compression: newSetting(GzipCompression), + timeout: newSetting(time.Second), + retryCfg: newSetting(rc), + gRPCCredentials: newSetting(credentials.NewTLS(tlsCfg)), + }, + }, + { + name: "InvalidEnvironmentVariables", + envars: map[string]string{ + "OTEL_EXPORTER_OTLP_LOGS_ENDPOINT": "%invalid", + "OTEL_EXPORTER_OTLP_LOGS_HEADERS": "a,%ZZ=valid,key=%ZZ", + "OTEL_EXPORTER_OTLP_LOGS_COMPRESSION": "xz", + "OTEL_EXPORTER_OTLP_LOGS_TIMEOUT": "100 seconds", + "OTEL_EXPORTER_OTLP_LOGS_CERTIFICATE": "invalid_cert", + "OTEL_EXPORTER_OTLP_LOGS_CLIENT_CERTIFICATE": "invalid_cert", + "OTEL_EXPORTER_OTLP_LOGS_CLIENT_KEY": "invalid_key", + }, + want: config{ + endpoint: newSetting(defaultEndpoint), + timeout: newSetting(defaultTimeout), + retryCfg: newSetting(defaultRetryCfg), + }, + errs: []string{ + `invalid OTEL_EXPORTER_OTLP_LOGS_ENDPOINT value %invalid: parse "%invalid": invalid URL escape "%in"`, + `failed to load TLS:`, + `certificate not added`, + `tls: failed to find any PEM data in certificate input`, + `invalid OTEL_EXPORTER_OTLP_LOGS_HEADERS value a,%ZZ=valid,key=%ZZ:`, + `invalid header: a`, + `invalid header key: %ZZ`, + `invalid header value: %ZZ`, + `invalid OTEL_EXPORTER_OTLP_LOGS_COMPRESSION value xz: unknown compression: xz`, + `invalid OTEL_EXPORTER_OTLP_LOGS_TIMEOUT value 100 seconds: strconv.Atoi: parsing "100 seconds": invalid syntax`, + }, + }, + } + + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + for key, value := range tc.envars { + t.Setenv(key, value) + } + + var err error + t.Cleanup(func(orig otel.ErrorHandler) func() { + otel.SetErrorHandler(otel.ErrorHandlerFunc(func(e error) { + err = errors.Join(err, e) + })) + return func() { otel.SetErrorHandler(orig) } + }(otel.GetErrorHandler())) + c := newConfig(tc.options) + + // Do not compare pointer values. + assertTLSConfig(t, tc.want.tlsCfg, c.tlsCfg) + var emptyTLS setting[*tls.Config] + c.tlsCfg, tc.want.tlsCfg = emptyTLS, emptyTLS + + assert.Equal(t, tc.want, c) + + for _, errMsg := range tc.errs { + assert.ErrorContains(t, err, errMsg) + } + }) + } +} + +func assertTLSConfig(t *testing.T, want, got setting[*tls.Config]) { + t.Helper() + + assert.Equal(t, want.Set, got.Set, "setting Set") + if !want.Set { + return + } + + if want.Value == nil { + assert.Nil(t, got.Value, "*tls.Config") + return + } + require.NotNil(t, got.Value, "*tls.Config") + + if want.Value.RootCAs == nil { + assert.Nil(t, got.Value.RootCAs, "*tls.Config.RootCAs") + } else { + if assert.NotNil(t, got.Value.RootCAs, "RootCAs") { + assert.True(t, want.Value.RootCAs.Equal(got.Value.RootCAs), "RootCAs equal") + } + } + assert.Equal(t, want.Value.Certificates, got.Value.Certificates, "Certificates") +} diff --git a/exporters/otlp/otlplog/otlploggrpc/go.mod b/exporters/otlp/otlplog/otlploggrpc/go.mod index b43eee4db8a..679c9136d38 100644 --- a/exporters/otlp/otlplog/otlploggrpc/go.mod +++ b/exporters/otlp/otlplog/otlploggrpc/go.mod @@ -5,6 +5,7 @@ go 1.21 require ( github.com/cenkalti/backoff/v4 v4.3.0 github.com/stretchr/testify v1.9.0 + go.opentelemetry.io/otel v1.27.0 go.opentelemetry.io/otel/sdk/log v0.3.0 google.golang.org/grpc v1.64.0 ) @@ -14,7 +15,6 @@ require ( github.com/go-logr/logr v1.4.2 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect - go.opentelemetry.io/otel v1.27.0 // indirect go.opentelemetry.io/otel/log v0.3.0 // indirect go.opentelemetry.io/otel/metric v1.27.0 // indirect go.opentelemetry.io/otel/sdk v1.27.0 // indirect