diff --git a/go.mod b/go.mod index e1f5b5a2e7..290ebb2ba6 100644 --- a/go.mod +++ b/go.mod @@ -19,6 +19,7 @@ require ( github.com/klauspost/compress v1.15.7 github.com/oklog/ulid v1.3.1 github.com/olekukonko/tablewriter v0.0.5 + github.com/opentracing-contrib/go-stdlib v1.0.0 github.com/opentracing/opentracing-go v1.2.0 github.com/parca-dev/parca v0.11.1-0.20220628070043-1c1a82d2d7bd github.com/pkg/errors v0.9.1 @@ -196,7 +197,6 @@ require ( github.com/opencontainers/go-digest v1.0.0 // indirect github.com/opencontainers/image-spec v1.0.2 // indirect github.com/opentracing-contrib/go-grpc v0.0.0-20210225150812-73cb765af46e // indirect - github.com/opentracing-contrib/go-stdlib v1.0.0 // indirect github.com/pierrec/lz4/v4 v4.1.15 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/client_model v0.2.0 // indirect diff --git a/pkg/fire/fire.go b/pkg/fire/fire.go index e7ebb84907..4af04ea887 100644 --- a/pkg/fire/fire.go +++ b/pkg/fire/fire.go @@ -189,6 +189,7 @@ func New(cfg Config) (*Fire, error) { if err != nil { return nil, err } + pusherHTTPClient.Transport = util.WrapWithInstrumentedHTTPTransport(pusherHTTPClient.Transport) fire.pusherClient = pushv1connect.NewPusherServiceClient(pusherHTTPClient, cfg.AgentConfig.ClientConfig.URL.String()) return fire, nil diff --git a/pkg/ingester/clientpool/ingester_client_pool.go b/pkg/ingester/clientpool/ingester_client_pool.go index 180f3bc4d9..02b80dedb4 100644 --- a/pkg/ingester/clientpool/ingester_client_pool.go +++ b/pkg/ingester/clientpool/ingester_client_pool.go @@ -3,7 +3,6 @@ package clientpool import ( "flag" "io" - "net/http" "time" "github.com/go-kit/log" @@ -14,6 +13,7 @@ import ( "google.golang.org/grpc/health/grpc_health_v1" "github.com/grafana/fire/pkg/gen/ingester/v1/ingesterv1connect" + "github.com/grafana/fire/pkg/util" ) // PoolConfig is config for creating a Pool. @@ -49,7 +49,7 @@ func PoolFactory(addr string) (ring_client.PoolClient, error) { return nil, err } return &ingesterPoolClient{ - IngesterServiceClient: ingesterv1connect.NewIngesterServiceClient(http.DefaultClient, "http://"+addr), + IngesterServiceClient: ingesterv1connect.NewIngesterServiceClient(util.InstrumentedHTTPClient(), "http://"+addr), HealthClient: grpc_health_v1.NewHealthClient(conn), Closer: conn, }, nil diff --git a/pkg/util/http.go b/pkg/util/http.go new file mode 100644 index 0000000000..a584946337 --- /dev/null +++ b/pkg/util/http.go @@ -0,0 +1,47 @@ +package util + +import ( + "net" + "net/http" + "time" + + "github.com/opentracing-contrib/go-stdlib/nethttp" + "github.com/opentracing/opentracing-go" +) + +var defaultTransport http.RoundTripper = &http.Transport{ + Proxy: http.ProxyFromEnvironment, + DialContext: (&net.Dialer{ + Timeout: 30 * time.Second, + KeepAlive: 30 * time.Second, + }).DialContext, + ForceAttemptHTTP2: true, + MaxIdleConns: 200, + MaxIdleConnsPerHost: 200, + IdleConnTimeout: 90 * time.Second, + TLSHandshakeTimeout: 10 * time.Second, + ExpectContinueTimeout: 1 * time.Second, +} + +type RoundTripperFunc func(req *http.Request) (*http.Response, error) + +func (f RoundTripperFunc) RoundTrip(req *http.Request) (*http.Response, error) { + return f(req) +} + +// InstrumentedHTTPClient returns a HTTP client with tracing instrumented default RoundTripper. +func InstrumentedHTTPClient() *http.Client { + return &http.Client{ + Transport: WrapWithInstrumentedHTTPTransport(defaultTransport), + } +} + +// WrapWithInstrumentedHTTPTransport wraps the given RoundTripper with an tracing instrumented one. +func WrapWithInstrumentedHTTPTransport(next http.RoundTripper) http.RoundTripper { + next = &nethttp.Transport{RoundTripper: next} + return RoundTripperFunc(func(req *http.Request) (*http.Response, error) { + req, tr := nethttp.TraceRequest(opentracing.GlobalTracer(), req) + defer tr.Finish() + return next.RoundTrip(req) + }) +}