From 280360615a8e2e830fd9488a504a973e24b82090 Mon Sep 17 00:00:00 2001 From: Tolya Korniltsev Date: Wed, 5 Oct 2022 21:01:09 +0700 Subject: [PATCH 1/6] feat: optionally flush queue before polling for next event --- main.go | 14 ++- relay/remotequeue.go | 26 +++-- relay/remotequeue_flush_test.go | 170 ++++++++++++++++++++++++++++++++ relay/remotequeue_test.go | 5 +- 4 files changed, 202 insertions(+), 13 deletions(-) create mode 100644 relay/remotequeue_flush_test.go diff --git a/main.go b/main.go index e0c9f95..52c04bb 100644 --- a/main.go +++ b/main.go @@ -37,6 +37,8 @@ var ( // profile the extension? selfProfiling = getEnvBool("PYROSCOPE_SELF_PROFILING") + + flushRelayQueue = getEnvBool("PYROSCOPE_FLUSH_RELAY_QUEUE") ) func main() { @@ -77,7 +79,7 @@ func main() { runDevMode(ctx, logger, orch) } else { // Register extension and start listening for events - runProdMode(ctx, logger, orch) + runProdMode(ctx, logger, orch, queue) } } @@ -105,7 +107,7 @@ func runDevMode(ctx context.Context, logger *logrus.Entry, orch *relay.Orchestra } } -func runProdMode(ctx context.Context, logger *logrus.Entry, orch *relay.Orchestrator) { +func runProdMode(ctx context.Context, logger *logrus.Entry, orch *relay.Orchestrator, queue *relay.RemoteQueue) { res, err := extensionClient.Register(ctx, extensionName) if err != nil { panic(err) @@ -113,9 +115,10 @@ func runProdMode(ctx context.Context, logger *logrus.Entry, orch *relay.Orchestr logger.Trace("Register response", res) // Will block until shutdown event is received or cancelled via the context. - processEvents(ctx, logger, orch) + processEvents(ctx, logger, orch, queue) } -func processEvents(ctx context.Context, log *logrus.Entry, orch *relay.Orchestrator) { + +func processEvents(ctx context.Context, log *logrus.Entry, orch *relay.Orchestrator, queue *relay.RemoteQueue) { log.Debug("Starting processing events") shutdown := func() { @@ -148,6 +151,9 @@ func processEvents(ctx context.Context, log *logrus.Entry, orch *relay.Orchestra shutdown() return } + if res.EventType == extension.Invoke && flushRelayQueue { + queue.Flush() + } } } } diff --git a/relay/remotequeue.go b/relay/remotequeue.go index 55c55c1..d840e75 100644 --- a/relay/remotequeue.go +++ b/relay/remotequeue.go @@ -14,12 +14,14 @@ type RemoteQueueCfg struct { } type RemoteQueue struct { - config *RemoteQueueCfg - jobs chan *http.Request - done chan struct{} - wg sync.WaitGroup - log *logrus.Entry - relayer Relayer + config *RemoteQueueCfg + jobs chan *http.Request + done chan struct{} + wg sync.WaitGroup + flushWG sync.WaitGroup + flushGuard sync.Mutex + log *logrus.Entry + relayer Relayer } type Relayer interface { @@ -66,15 +68,26 @@ func (r *RemoteQueue) Stop(_ context.Context) error { // Send adds a request to the queue to be processed later func (r *RemoteQueue) Send(req *http.Request) error { + r.flushGuard.Lock() // block if we are currently trying to Flush + defer r.flushGuard.Unlock() + r.flushWG.Add(1) select { case r.jobs <- req: default: + r.flushWG.Done() r.log.Error("Request queue is full, dropping a profile job.") return fmt.Errorf("request queue is full") } return nil } +func (r *RemoteQueue) Flush() { + r.log.Debugf("Flush: Waiting for enqueued jobs to finish") + r.flushGuard.Lock() + defer r.flushGuard.Unlock() + r.flushWG.Wait() + r.log.Debugf("Flush: Done") +} func (r *RemoteQueue) handleJobs(workerID int) { for { @@ -89,6 +102,7 @@ func (r *RemoteQueue) handleJobs(workerID int) { r.wg.Add(1) err := r.relayer.Send(job) r.wg.Done() + r.flushWG.Done() if err != nil { log.Error("Failed to relay request: ", err) diff --git a/relay/remotequeue_flush_test.go b/relay/remotequeue_flush_test.go new file mode 100644 index 0000000..49304e2 --- /dev/null +++ b/relay/remotequeue_flush_test.go @@ -0,0 +1,170 @@ +package relay_test + +import ( + "github.com/pyroscope-io/pyroscope-lambda-extension/relay" + "github.com/sirupsen/logrus" + "net/http" + "sync" + "testing" + "time" +) + +type asyncJob struct { + name string + m sync.Mutex + t *testing.T +} + +func newAsyncJob(t *testing.T, name string, f func()) *asyncJob { + res := &asyncJob{t: t, name: name} + res.m.Lock() + go func() { + f() + res.m.Unlock() + }() + return res +} + +func (j *asyncJob) assertNotFinished() { + locked := j.m.TryLock() + if locked { + j.t.Fatalf("should be still working... " + j.name) + } +} + +func (j *asyncJob) assertFinished() { + j.m.Lock() +} + +type flushTestHelper struct { + t *testing.T + log *logrus.Entry + responses chan struct{} + requests chan struct{} + req *http.Request + queue *relay.RemoteQueue + flushMutex sync.Mutex +} + +func newFlushMockRelay(t *testing.T) *flushTestHelper { + req, _ := http.NewRequest(http.MethodPost, "/", nil) + log := logrus.WithFields(logrus.Fields{"svc": "flush-test"}) + res := &flushTestHelper{ + t: t, + log: log, + responses: make(chan struct{}, 128), + requests: make(chan struct{}, 128), + req: req, + } + res.queue = relay.NewRemoteQueue(log, &relay.RemoteQueueCfg{ + NumWorkers: 2, + }, res) + logrus.SetLevel(logrus.DebugLevel) + + return res +} + +func (h *flushTestHelper) Send(_ *http.Request) error { + //h.log.Debug("flushTestHelper.send 1") + h.requests <- struct{}{} + //h.log.Debug("flushTestHelper.send 2") + <-h.responses + //h.log.Debug("flushTestHelper.send 3") + return nil +} + +func (h *flushTestHelper) respond() { + h.responses <- struct{}{} +} + +func (h *flushTestHelper) flushAsync() *asyncJob { + return newAsyncJob(h.t, "flush", func() { + h.queue.Flush() + }) +} + +func (h *flushTestHelper) sendAsync() *asyncJob { + return newAsyncJob(h.t, "send", func() { + h.queue.Send(h.req) + }) +} +func (h *flushTestHelper) send() { + h.queue.Send(h.req) +} + +func (h *flushTestHelper) step() { + time.Sleep(100 * time.Millisecond) +} + +func (h *flushTestHelper) assertRequestsProcessed(n int) { + if n != len(h.requests) { + h.t.Fatalf("expected %d got %d", n, len(h.responses)) + } +} + +func TestFlushWaitsForAllEnqueuedRequests(t *testing.T) { + n := 3 + h := newFlushMockRelay(t) + h.queue.Start() + for i := 0; i < n; i++ { + h.send() + } + f := h.flushAsync() + for i := 0; i < n; i++ { + h.step() + f.assertNotFinished() + h.respond() + } + f.assertFinished() + h.assertRequestsProcessed(n) +} + +func TestFlushWaitsForAllEnqueuedRequestsWhenQueueIsFullAndSomeAreDropped(t *testing.T) { + n := 30 + h := newFlushMockRelay(t) + //queueSize := cap(h.queue.jobs) + queueSize := 20 + for i := 0; i < n; i++ { //send 30, 10 are dropped + h.send() + } + h.queue.Start() + f := h.flushAsync() + for i := 0; i < queueSize; i++ { //20 are processed + h.step() + f.assertNotFinished() + h.respond() + } + f.assertFinished() + h.assertRequestsProcessed(queueSize) +} + +func TestFlushWithQueueEmpty(t *testing.T) { + h := newFlushMockRelay(t) + h.queue.Start() + f := h.flushAsync() + f.assertFinished() + h.assertRequestsProcessed(0) +} + +func TestFlushSendEventDuringFlushBlocks(t *testing.T) { + n := 3 + h := newFlushMockRelay(t) + h.queue.Start() + for i := 0; i < n; i++ { + h.send() + } + f := h.flushAsync() + h.step() + s := h.sendAsync() + for i := 0; i < n; i++ { + h.step() + f.assertNotFinished() + s.assertNotFinished() + } + for i := 0; i < n; i++ { + h.respond() + } + f.assertFinished() + s.assertFinished() + +} diff --git a/relay/remotequeue_test.go b/relay/remotequeue_test.go index 7044c6d..818a961 100644 --- a/relay/remotequeue_test.go +++ b/relay/remotequeue_test.go @@ -2,12 +2,11 @@ package relay_test import ( "context" + "github.com/pyroscope-io/pyroscope-lambda-extension/relay" + "github.com/stretchr/testify/assert" "net/http" "sync" "testing" - - "github.com/pyroscope-io/pyroscope-lambda-extension/relay" - "github.com/stretchr/testify/assert" ) type mockRelayer struct { From dd7cd45e80e98856f0cf003efafba5017927b4f2 Mon Sep 17 00:00:00 2001 From: Tolya Korniltsev Date: Wed, 5 Oct 2022 21:01:29 +0700 Subject: [PATCH 2/6] ignore .idea --- .gitignore | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.gitignore b/.gitignore index b86c5f0..a6403cf 100644 --- a/.gitignore +++ b/.gitignore @@ -19,3 +19,5 @@ examples/go !examples/go/main.go node_modules + +.idea/ From 12fb2dba1b02a62dda05354d99e16d13260d72c4 Mon Sep 17 00:00:00 2001 From: Tolya Korniltsev Date: Wed, 5 Oct 2022 21:13:25 +0700 Subject: [PATCH 3/6] linter fix --- relay/remotequeue_flush_test.go | 25 ++++++++++++------------- 1 file changed, 12 insertions(+), 13 deletions(-) diff --git a/relay/remotequeue_flush_test.go b/relay/remotequeue_flush_test.go index 49304e2..3b14f4c 100644 --- a/relay/remotequeue_flush_test.go +++ b/relay/remotequeue_flush_test.go @@ -37,13 +37,12 @@ func (j *asyncJob) assertFinished() { } type flushTestHelper struct { - t *testing.T - log *logrus.Entry - responses chan struct{} - requests chan struct{} - req *http.Request - queue *relay.RemoteQueue - flushMutex sync.Mutex + t *testing.T + log *logrus.Entry + responses chan struct{} + requests chan struct{} + req *http.Request + queue *relay.RemoteQueue } func newFlushMockRelay(t *testing.T) *flushTestHelper { @@ -85,11 +84,11 @@ func (h *flushTestHelper) flushAsync() *asyncJob { func (h *flushTestHelper) sendAsync() *asyncJob { return newAsyncJob(h.t, "send", func() { - h.queue.Send(h.req) + _ = h.queue.Send(h.req) }) } func (h *flushTestHelper) send() { - h.queue.Send(h.req) + _ = h.queue.Send(h.req) } func (h *flushTestHelper) step() { @@ -105,7 +104,7 @@ func (h *flushTestHelper) assertRequestsProcessed(n int) { func TestFlushWaitsForAllEnqueuedRequests(t *testing.T) { n := 3 h := newFlushMockRelay(t) - h.queue.Start() + _ = h.queue.Start() for i := 0; i < n; i++ { h.send() } @@ -127,7 +126,7 @@ func TestFlushWaitsForAllEnqueuedRequestsWhenQueueIsFullAndSomeAreDropped(t *tes for i := 0; i < n; i++ { //send 30, 10 are dropped h.send() } - h.queue.Start() + _ = h.queue.Start() f := h.flushAsync() for i := 0; i < queueSize; i++ { //20 are processed h.step() @@ -140,7 +139,7 @@ func TestFlushWaitsForAllEnqueuedRequestsWhenQueueIsFullAndSomeAreDropped(t *tes func TestFlushWithQueueEmpty(t *testing.T) { h := newFlushMockRelay(t) - h.queue.Start() + _ = h.queue.Start() f := h.flushAsync() f.assertFinished() h.assertRequestsProcessed(0) @@ -149,7 +148,7 @@ func TestFlushWithQueueEmpty(t *testing.T) { func TestFlushSendEventDuringFlushBlocks(t *testing.T) { n := 3 h := newFlushMockRelay(t) - h.queue.Start() + _ = h.queue.Start() for i := 0; i < n; i++ { h.send() } From 55f3c647f36d63c831b376c9448cf032cdb67aa9 Mon Sep 17 00:00:00 2001 From: Tolya Korniltsev Date: Wed, 5 Oct 2022 21:42:18 +0700 Subject: [PATCH 4/6] rename env var --- main.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/main.go b/main.go index 52c04bb..7ca10c8 100644 --- a/main.go +++ b/main.go @@ -38,7 +38,7 @@ var ( // profile the extension? selfProfiling = getEnvBool("PYROSCOPE_SELF_PROFILING") - flushRelayQueue = getEnvBool("PYROSCOPE_FLUSH_RELAY_QUEUE") + flushOnInvoke = getEnvBool("PYROSCOPE_FLUSH_ON_INVOKE") ) func main() { @@ -151,7 +151,7 @@ func processEvents(ctx context.Context, log *logrus.Entry, orch *relay.Orchestra shutdown() return } - if res.EventType == extension.Invoke && flushRelayQueue { + if res.EventType == extension.Invoke && flushOnInvoke { queue.Flush() } } From 612b57b8a4955fdac1b61be71b51a32841882912 Mon Sep 17 00:00:00 2001 From: Tolya Korniltsev Date: Wed, 5 Oct 2022 22:23:56 +0700 Subject: [PATCH 5/6] readme --- README.md | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/README.md b/README.md index 4109e94..b75f00f 100644 --- a/README.md +++ b/README.md @@ -23,14 +23,15 @@ If needed, the `PYROSCOPE_AUTH_TOKEN` can be supplied. For a complete list of variables check the section below. ## Configuration -| env var | default | description | -| -------------------------- | -------------------------------- | ---------------------------------------------- | -| `PYROSCOPE_REMOTE_ADDRESS` | `https://ingest.pyroscope.cloud` | the pyroscope instance data will be relayed to | -| `PYROSCOPE_AUTH_TOKEN` | `""` | authorization key (token authentication) | -| `PYROSCOPE_SELF_PROFILING` | `false` | whether to profile the extension itself or not | -| `PYROSCOPE_LOG_LEVEL` | `info` | `error` or `info` or `debug` or `trace` | -| `PYROSCOPE_TIMEOUT` | `10s` | http client timeout ([go duration format](https://pkg.go.dev/time#Duration)) | -| `PYROSCOPE_NUM_WORKERS` | `5` | num of relay workers, pick based on the number of profile types | +| env var | default | description | +|------------------------------|----------------------------------|---------------------------------------------------------------------------------| +| `PYROSCOPE_REMOTE_ADDRESS` | `https://ingest.pyroscope.cloud` | the pyroscope instance data will be relayed to | +| `PYROSCOPE_AUTH_TOKEN` | `""` | authorization key (token authentication) | +| `PYROSCOPE_SELF_PROFILING` | `false` | whether to profile the extension itself or not | +| `PYROSCOPE_LOG_LEVEL` | `info` | `error` or `info` or `debug` or `trace` | +| `PYROSCOPE_TIMEOUT` | `10s` | http client timeout ([go duration format](https://pkg.go.dev/time#Duration)) | +| `PYROSCOPE_NUM_WORKERS` | `5` | num of relay workers, pick based on the number of profile types | +| `PYROSCOPE_FLUSH_ON_INVOKE` | `false` | wait for all relay requests to be finished the extension got `Invocation` event | # How it works The profiler will run as normal, and periodically will send data to the relay server (the server running at `http://localhost:4040`). From bfc323aed41207a7157f66530b40308a5562541d Mon Sep 17 00:00:00 2001 From: Tolya Korniltsev Date: Fri, 7 Oct 2022 19:24:09 +0700 Subject: [PATCH 6/6] Update README.md Co-authored-by: eduardo aleixo --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index b75f00f..8f83bba 100644 --- a/README.md +++ b/README.md @@ -31,7 +31,7 @@ For a complete list of variables check the section below. | `PYROSCOPE_LOG_LEVEL` | `info` | `error` or `info` or `debug` or `trace` | | `PYROSCOPE_TIMEOUT` | `10s` | http client timeout ([go duration format](https://pkg.go.dev/time#Duration)) | | `PYROSCOPE_NUM_WORKERS` | `5` | num of relay workers, pick based on the number of profile types | -| `PYROSCOPE_FLUSH_ON_INVOKE` | `false` | wait for all relay requests to be finished the extension got `Invocation` event | +| `PYROSCOPE_FLUSH_ON_INVOKE` | `false` | wait for all relay requests to be finished/flushed before next `Invocation` event is allowed | # How it works The profiler will run as normal, and periodically will send data to the relay server (the server running at `http://localhost:4040`).