Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(otlp): experimental otlp support #2177

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ require (
github.com/xlab/treeprint v1.2.0
go.uber.org/atomic v1.10.0
go.uber.org/goleak v1.2.1
golang.org/x/exp v0.0.0-20230522175609-2e198f4a06a1
golang.org/x/exp v0.0.0-20230626212559-97b1e661b5df
golang.org/x/net v0.11.0
golang.org/x/sync v0.3.0
golang.org/x/sys v0.9.0
Expand Down Expand Up @@ -247,7 +247,7 @@ require (
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.24.0 // indirect
golang.org/x/crypto v0.10.0 // indirect
golang.org/x/mod v0.10.0 // indirect
golang.org/x/mod v0.11.0 // indirect
golang.org/x/oauth2 v0.8.0 // indirect
golang.org/x/term v0.9.0 // indirect
golang.org/x/tools v0.8.0 // indirect
Expand Down Expand Up @@ -283,3 +283,10 @@ replace (
// + https://github.com/go-yaml/yaml/pull/876
gopkg.in/yaml.v3 => github.com/colega/go-yaml-yaml v0.0.0-20220720105220-255a8d16d094
)

require (
github.com/jzelinskie/must v0.0.1
go.opentelemetry.io/collector/pdata v1.0.0-rcv0012
)

replace go.opentelemetry.io/collector/pdata => ../opentelemetry-collector/pdata
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to be clear, we won't have this dependency once this is ready to merge

10 changes: 6 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -961,6 +961,8 @@ github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/X
github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
github.com/julienschmidt/httprouter v1.3.0 h1:U0609e9tgbseu3rBINet9P48AI/D3oJs4dN7jwJOQ1U=
github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM=
github.com/jzelinskie/must v0.0.1 h1:LT7fxrQCW9yWDkjnF75XFvODjKb6Su7qlx2LO8967Y8=
github.com/jzelinskie/must v0.0.1/go.mod h1:BTPQ0S/fIbENkk9zFU4JBtdtB+B4vibfTyeDC48z6zc=
github.com/k0kubun/pp/v3 v3.2.0 h1:h33hNTZ9nVFNP3u2Fsgz8JXiF5JINoZfFq4SvKJwNcs=
github.com/k0kubun/pp/v3 v3.2.0/go.mod h1:ODtJQbQcIRfAD3N+theGCV1m/CBxweERz2dapdz1EwA=
github.com/karrick/godirwalk v1.8.0/go.mod h1:H5KPZjojv4lE+QYImBI8xVtrBRgYrIVsaRPx4tDPEn4=
Expand Down Expand Up @@ -1348,8 +1350,8 @@ golang.org/x/exp v0.0.0-20191227195350-da58074b4299/go.mod h1:2RIsYlXP63K8oxa1u0
golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4=
golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM=
golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU=
golang.org/x/exp v0.0.0-20230522175609-2e198f4a06a1 h1:k/i9J1pBpvlfR+9QsetwPyERsqu1GIbi967PQMq3Ivc=
golang.org/x/exp v0.0.0-20230522175609-2e198f4a06a1/go.mod h1:V1LtkGg67GoY2N1AnLN78QLrzxkLyJw7RJb1gzOOz9w=
golang.org/x/exp v0.0.0-20230626212559-97b1e661b5df h1:UA2aFVmmsIlefxMk29Dp2juaUSth8Pyn3Tq5Y5mJGME=
golang.org/x/exp v0.0.0-20230626212559-97b1e661b5df/go.mod h1:FXUEEKJgO7OQYeo8N01OfiKP8RXMtf6e8aTskBGqWdc=
golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js=
golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
Expand Down Expand Up @@ -1378,8 +1380,8 @@ golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.5.0/go.mod h1:5OXOZSfqPIIbmVBIIKWRFfZjPR0E5r58TLhUjH0a2Ro=
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
golang.org/x/mod v0.7.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/mod v0.10.0 h1:lFO9qtOdlre5W1jxS3r/4szv2/6iXxScdzjoBMXNhYk=
golang.org/x/mod v0.10.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/mod v0.11.0 h1:bUO06HqtnRcc/7l71XBe4WcqTZ+3AH1J59zWDDwLKgU=
golang.org/x/mod v0.11.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
Expand Down
1 change: 1 addition & 0 deletions go.work.sum
Original file line number Diff line number Diff line change
Expand Up @@ -879,6 +879,7 @@ golang.org/x/mobile v0.0.0-20190719004257-d2bd2a29d028 h1:4+4C/Iv2U4fMZBiMCc98MG
golang.org/x/mod v0.6.0/go.mod h1:4mET923SAdbXp2ki8ey+zGs1SLqsuM2Y0uvdZR/fUNI=
golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/mod v0.9.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/mod v0.10.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/net v0.0.0-20220520000938-2e3eb7b945c2/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
golang.org/x/net v0.4.0/go.mod h1:MBQ8lrhLObU/6UmLb4fmbmk5OcyYmqtbGd/9yIeKjEE=
golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
Expand Down
8 changes: 8 additions & 0 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
"strings"

"github.com/bufbuild/connect-go"
"github.com/felixge/fgprof"

Check failure on line 16 in pkg/api/api.go

View workflow job for this annotation

GitHub Actions / regular-path

go.opentelemetry.io/collector/pdata@v1.0.0-rcv0012 (replaced by ../opentelemetry-collector/pdata): reading ../opentelemetry-collector/pdata/go.mod: open /home/runner/work/pyroscope/opentelemetry-collector/pdata/go.mod: no such file or directory

Check failure on line 16 in pkg/api/api.go

View workflow job for this annotation

GitHub Actions / base-path

go.opentelemetry.io/collector/pdata@v1.0.0-rcv0012 (replaced by ../opentelemetry-collector/pdata): reading ../opentelemetry-collector/pdata/go.mod: open /home/runner/work/pyroscope/opentelemetry-collector/pdata/go.mod: no such file or directory

Check failure on line 16 in pkg/api/api.go

View workflow job for this annotation

GitHub Actions / build-image

go.opentelemetry.io/collector/pdata@v1.0.0-rcv0012 (replaced by ../opentelemetry-collector/pdata): reading ../opentelemetry-collector/pdata/go.mod: open /home/runner/work/pyroscope/opentelemetry-collector/pdata/go.mod: no such file or directory
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/gorilla/mux"
Expand All @@ -21,6 +21,7 @@
grpcgw "github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
"github.com/weaveworks/common/middleware"
"github.com/weaveworks/common/server"
"go.opentelemetry.io/collector/pdata/pprofile/pprofileotlp"

"github.com/grafana/pyroscope/public"

Expand All @@ -37,6 +38,7 @@
"github.com/grafana/pyroscope/pkg/frontend"
"github.com/grafana/pyroscope/pkg/frontend/frontendpb/frontendpbconnect"
"github.com/grafana/pyroscope/pkg/ingester"
"github.com/grafana/pyroscope/pkg/ingester/otlp"
"github.com/grafana/pyroscope/pkg/ingester/pyroscope"
"github.com/grafana/pyroscope/pkg/querier"
"github.com/grafana/pyroscope/pkg/scheduler"
Expand Down Expand Up @@ -197,13 +199,19 @@
// RegisterDistributor registers the endpoints associated with the distributor.
func (a *API) RegisterDistributor(d *distributor.Distributor) {
pyroscopeHandler := pyroscope.NewPyroscopeIngestHandler(d, a.logger)
otlpHandler := otlp.NewOTLPIngestHandler(d, a.logger)
a.RegisterRoute("/ingest", pyroscopeHandler, true, true, "POST")
a.RegisterRoute("/pyroscope/ingest", pyroscopeHandler, true, true, "POST")
pushv1connect.RegisterPusherServiceHandler(a.server.HTTP, d, a.grpcAuthMiddleware)
a.RegisterRoute("/distributor/ring", d, false, true, "GET", "POST")
a.indexPage.AddLinks(defaultWeight, "Distributor", []IndexPageLink{
{Desc: "Ring status", Path: "/distributor/ring"},
})

pprofileotlp.RegisterGRPCServer(a.server.GRPC, otlpHandler)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just so you know I don't think we expose GRPC endpoint anywhere in our infra nor in helm.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cyriltovena yeah we might need a separate grpc server


// TODO(@petethepig): implement http/protobuf and http/json support
// a.RegisterRoute("/v1/profiles", otlpHandler, true, true, "POST")
}

// RegisterMemberlistKV registers the endpoints associated with the memberlist KV store.
Expand Down
119 changes: 119 additions & 0 deletions pkg/ingester/otlp/ingest_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package otlp

import (
"context"
"fmt"
"net/http"

"github.com/bufbuild/connect-go"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/google/uuid"

pushv1 "github.com/grafana/pyroscope/api/gen/proto/go/push/v1"
typesv1 "github.com/grafana/pyroscope/api/gen/proto/go/types/v1"
"github.com/grafana/pyroscope/pkg/tenant"
"go.opentelemetry.io/collector/pdata/pprofile"
"go.opentelemetry.io/collector/pdata/pprofile/pprofileotlp"
)

type ingestHandler struct {
pprofileotlp.UnimplementedGRPCServer
svc PushService
log log.Logger
}

// TODO(@petethepig): split http and grpc
type Handler interface {
http.Handler
pprofileotlp.GRPCServer
}

type PushService interface {
Push(ctx context.Context, req *connect.Request[pushv1.PushRequest]) (*connect.Response[pushv1.PushResponse], error)
}

func NewOTLPIngestHandler(svc PushService, l log.Logger) Handler {
return &ingestHandler{
svc: svc,
log: level.Error(l),
}
}

// TODO(@petethepig): implement
// TODO(@petethepig): split http and grpc
func (h *ingestHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
panic("not implemented")

req := &pushv1.PushRequest{}
_, err := h.svc.Push(r.Context(), connect.NewRequest(req))
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte("Internal Server Error: " + err.Error()))
return
}
w.WriteHeader(http.StatusOK)
w.Write([]byte("OK"))
}

// TODO(@petethepig): split http and grpc
func (h *ingestHandler) Export(ctx context.Context, er pprofileotlp.ExportRequest) (pprofileotlp.ExportResponse, error) {
// TODO(@petethepig): make it tenant-aware
ctx = tenant.InjectTenantID(ctx, tenant.DefaultTenantID)

h.log.Log("msg", "Export called")

rps := er.Profiles().ResourceProfiles()
for i := 0; i < rps.Len(); i++ {
rp := rps.At(i)

labelsDst := []*typesv1.LabelPair{}
// TODO(@petethepig): make labels work
labelsDst = append(labelsDst, &typesv1.LabelPair{
Name: "__name__",
Value: "process_cpu",
})
labelsDst = append(labelsDst, &typesv1.LabelPair{
Name: "service_name",
Value: "otlp_test_app",
})
labelsDst = append(labelsDst, &typesv1.LabelPair{
Name: "__delta__",
Value: "false",
})
labelsDst = append(labelsDst, &typesv1.LabelPair{
Name: "pyroscope_spy",
Value: "unknown",
})

sps := rp.ScopeProfiles()
for j := 0; j < sps.Len(); j++ {
sp := sps.At(j)
for k := 0; k < sp.Profiles().Len(); k++ {
p := sp.Profiles().At(k)

pprofBytes := pprofile.OprofToPprof(p)

req := &pushv1.PushRequest{
Series: []*pushv1.RawProfileSeries{
{
Labels: labelsDst,
Samples: []*pushv1.RawSample{{
RawProfile: pprofBytes,
ID: uuid.New().String(),
}},
},
},
}
_, err := h.svc.Push(ctx, connect.NewRequest(req))
if err != nil {
h.log.Log("msg", "failed to push profile", "err", err)
return pprofileotlp.NewExportResponse(), fmt.Errorf("failed to make a GRPC request: %w", err)
}
}
}

}

return pprofileotlp.NewExportResponse(), nil
}
Loading