Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,5 @@ examples/go
!examples/go/main.go

node_modules

.idea/
17 changes: 9 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,15 @@ If needed, the `PYROSCOPE_AUTH_TOKEN` can be supplied.
For a complete list of variables check the section below.

## Configuration
| env var | default | description |
| -------------------------- | -------------------------------- | ---------------------------------------------- |
| `PYROSCOPE_REMOTE_ADDRESS` | `https://ingest.pyroscope.cloud` | the pyroscope instance data will be relayed to |
| `PYROSCOPE_AUTH_TOKEN` | `""` | authorization key (token authentication) |
| `PYROSCOPE_SELF_PROFILING` | `false` | whether to profile the extension itself or not |
| `PYROSCOPE_LOG_LEVEL` | `info` | `error` or `info` or `debug` or `trace` |
| `PYROSCOPE_TIMEOUT` | `10s` | http client timeout ([go duration format](https://pkg.go.dev/time#Duration)) |
| `PYROSCOPE_NUM_WORKERS` | `5` | num of relay workers, pick based on the number of profile types |
| env var | default | description |
|------------------------------|----------------------------------|---------------------------------------------------------------------------------|
| `PYROSCOPE_REMOTE_ADDRESS` | `https://ingest.pyroscope.cloud` | the pyroscope instance data will be relayed to |
| `PYROSCOPE_AUTH_TOKEN` | `""` | authorization key (token authentication) |
| `PYROSCOPE_SELF_PROFILING` | `false` | whether to profile the extension itself or not |
| `PYROSCOPE_LOG_LEVEL` | `info` | `error` or `info` or `debug` or `trace` |
| `PYROSCOPE_TIMEOUT` | `10s` | http client timeout ([go duration format](https://pkg.go.dev/time#Duration)) |
| `PYROSCOPE_NUM_WORKERS` | `5` | num of relay workers, pick based on the number of profile types |
| `PYROSCOPE_FLUSH_ON_INVOKE` | `false` | wait for all relay requests to be finished/flushed before next `Invocation` event is allowed |

# How it works
The profiler will run as normal, and periodically will send data to the relay server (the server running at `http://localhost:4040`).
Expand Down
14 changes: 10 additions & 4 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ var (

// profile the extension?
selfProfiling = getEnvBool("PYROSCOPE_SELF_PROFILING")

flushOnInvoke = getEnvBool("PYROSCOPE_FLUSH_ON_INVOKE")
)

func main() {
Expand Down Expand Up @@ -77,7 +79,7 @@ func main() {
runDevMode(ctx, logger, orch)
} else {
// Register extension and start listening for events
runProdMode(ctx, logger, orch)
runProdMode(ctx, logger, orch, queue)
}
}

Expand Down Expand Up @@ -105,17 +107,18 @@ func runDevMode(ctx context.Context, logger *logrus.Entry, orch *relay.Orchestra
}
}

func runProdMode(ctx context.Context, logger *logrus.Entry, orch *relay.Orchestrator) {
func runProdMode(ctx context.Context, logger *logrus.Entry, orch *relay.Orchestrator, queue *relay.RemoteQueue) {
res, err := extensionClient.Register(ctx, extensionName)
if err != nil {
panic(err)
}
logger.Trace("Register response", res)

// Will block until shutdown event is received or cancelled via the context.
processEvents(ctx, logger, orch)
processEvents(ctx, logger, orch, queue)
}
func processEvents(ctx context.Context, log *logrus.Entry, orch *relay.Orchestrator) {

func processEvents(ctx context.Context, log *logrus.Entry, orch *relay.Orchestrator, queue *relay.RemoteQueue) {
log.Debug("Starting processing events")

shutdown := func() {
Expand Down Expand Up @@ -148,6 +151,9 @@ func processEvents(ctx context.Context, log *logrus.Entry, orch *relay.Orchestra
shutdown()
return
}
if res.EventType == extension.Invoke && flushOnInvoke {
queue.Flush()
}
}
}
}
Expand Down
26 changes: 20 additions & 6 deletions relay/remotequeue.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@ type RemoteQueueCfg struct {
}

type RemoteQueue struct {
config *RemoteQueueCfg
jobs chan *http.Request
done chan struct{}
wg sync.WaitGroup
log *logrus.Entry
relayer Relayer
config *RemoteQueueCfg
jobs chan *http.Request
done chan struct{}
wg sync.WaitGroup
flushWG sync.WaitGroup
flushGuard sync.Mutex
log *logrus.Entry
relayer Relayer
}

type Relayer interface {
Expand Down Expand Up @@ -66,15 +68,26 @@ func (r *RemoteQueue) Stop(_ context.Context) error {

// Send adds a request to the queue to be processed later
func (r *RemoteQueue) Send(req *http.Request) error {
r.flushGuard.Lock() // block if we are currently trying to Flush
defer r.flushGuard.Unlock()
r.flushWG.Add(1)
select {
case r.jobs <- req:
default:
r.flushWG.Done()
r.log.Error("Request queue is full, dropping a profile job.")
return fmt.Errorf("request queue is full")
}

return nil
}
func (r *RemoteQueue) Flush() {
r.log.Debugf("Flush: Waiting for enqueued jobs to finish")
r.flushGuard.Lock()
defer r.flushGuard.Unlock()
r.flushWG.Wait()
r.log.Debugf("Flush: Done")
}

func (r *RemoteQueue) handleJobs(workerID int) {
for {
Expand All @@ -89,6 +102,7 @@ func (r *RemoteQueue) handleJobs(workerID int) {
r.wg.Add(1)
err := r.relayer.Send(job)
r.wg.Done()
r.flushWG.Done()

if err != nil {
log.Error("Failed to relay request: ", err)
Expand Down
169 changes: 169 additions & 0 deletions relay/remotequeue_flush_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
package relay_test

import (
"github.com/pyroscope-io/pyroscope-lambda-extension/relay"
"github.com/sirupsen/logrus"
"net/http"
"sync"
"testing"
"time"
)

type asyncJob struct {
name string
m sync.Mutex
t *testing.T
}

func newAsyncJob(t *testing.T, name string, f func()) *asyncJob {
res := &asyncJob{t: t, name: name}
res.m.Lock()
go func() {
f()
res.m.Unlock()
}()
return res
}

func (j *asyncJob) assertNotFinished() {
locked := j.m.TryLock()
if locked {
j.t.Fatalf("should be still working... " + j.name)
}
}

func (j *asyncJob) assertFinished() {
j.m.Lock()
}

type flushTestHelper struct {
t *testing.T
log *logrus.Entry
responses chan struct{}
requests chan struct{}
req *http.Request
queue *relay.RemoteQueue
}

func newFlushMockRelay(t *testing.T) *flushTestHelper {
req, _ := http.NewRequest(http.MethodPost, "/", nil)
log := logrus.WithFields(logrus.Fields{"svc": "flush-test"})
res := &flushTestHelper{
t: t,
log: log,
responses: make(chan struct{}, 128),
requests: make(chan struct{}, 128),
req: req,
}
res.queue = relay.NewRemoteQueue(log, &relay.RemoteQueueCfg{
NumWorkers: 2,
}, res)
logrus.SetLevel(logrus.DebugLevel)

return res
}

func (h *flushTestHelper) Send(_ *http.Request) error {
//h.log.Debug("flushTestHelper.send 1")
h.requests <- struct{}{}
//h.log.Debug("flushTestHelper.send 2")
<-h.responses
//h.log.Debug("flushTestHelper.send 3")
return nil
}

func (h *flushTestHelper) respond() {
h.responses <- struct{}{}
}

func (h *flushTestHelper) flushAsync() *asyncJob {
return newAsyncJob(h.t, "flush", func() {
h.queue.Flush()
})
}

func (h *flushTestHelper) sendAsync() *asyncJob {
return newAsyncJob(h.t, "send", func() {
_ = h.queue.Send(h.req)
})
}
func (h *flushTestHelper) send() {
_ = h.queue.Send(h.req)
}

func (h *flushTestHelper) step() {
time.Sleep(100 * time.Millisecond)
}

func (h *flushTestHelper) assertRequestsProcessed(n int) {
if n != len(h.requests) {
h.t.Fatalf("expected %d got %d", n, len(h.responses))
}
}

func TestFlushWaitsForAllEnqueuedRequests(t *testing.T) {
n := 3
h := newFlushMockRelay(t)
_ = h.queue.Start()
for i := 0; i < n; i++ {
h.send()
}
f := h.flushAsync()
for i := 0; i < n; i++ {
h.step()
f.assertNotFinished()
h.respond()
}
f.assertFinished()
h.assertRequestsProcessed(n)
}

func TestFlushWaitsForAllEnqueuedRequestsWhenQueueIsFullAndSomeAreDropped(t *testing.T) {
n := 30
h := newFlushMockRelay(t)
//queueSize := cap(h.queue.jobs)
queueSize := 20
for i := 0; i < n; i++ { //send 30, 10 are dropped
h.send()
}
_ = h.queue.Start()
f := h.flushAsync()
for i := 0; i < queueSize; i++ { //20 are processed
h.step()
f.assertNotFinished()
h.respond()
}
f.assertFinished()
h.assertRequestsProcessed(queueSize)
}

func TestFlushWithQueueEmpty(t *testing.T) {
h := newFlushMockRelay(t)
_ = h.queue.Start()
f := h.flushAsync()
f.assertFinished()
h.assertRequestsProcessed(0)
}

func TestFlushSendEventDuringFlushBlocks(t *testing.T) {
n := 3
h := newFlushMockRelay(t)
_ = h.queue.Start()
for i := 0; i < n; i++ {
h.send()
}
f := h.flushAsync()
h.step()
s := h.sendAsync()
for i := 0; i < n; i++ {
h.step()
f.assertNotFinished()
s.assertNotFinished()
}
for i := 0; i < n; i++ {
h.respond()
}
f.assertFinished()
s.assertFinished()

}
5 changes: 2 additions & 3 deletions relay/remotequeue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,11 @@ package relay_test

import (
"context"
"github.com/pyroscope-io/pyroscope-lambda-extension/relay"
"github.com/stretchr/testify/assert"
"net/http"
"sync"
"testing"

"github.com/pyroscope-io/pyroscope-lambda-extension/relay"
"github.com/stretchr/testify/assert"
)

type mockRelayer struct {
Expand Down