From 08c637f9500a8afff1c059232ffcec9f7458c964 Mon Sep 17 00:00:00 2001 From: Florian Lehner Date: Fri, 25 Oct 2024 11:42:44 +0200 Subject: [PATCH 01/10] apmproxy: do not continue without APMData Signed-off-by: Florian Lehner --- accumulator/batch.go | 4 +++- apmproxy/apmserver.go | 4 ++++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/accumulator/batch.go b/accumulator/batch.go index 1a16de78..2d25247e 100644 --- a/accumulator/batch.go +++ b/accumulator/batch.go @@ -37,6 +37,8 @@ var ( // ErrInvalidEncoding is returned for any APMData that is encoded // with any encoding format ErrInvalidEncoding = errors.New("encoded data not supported") + // ErrNoData indicates that APMData.data is empty + ErrNoData = errors.New("no data") ) var ( @@ -170,7 +172,7 @@ func (b *Batch) OnAgentInit(reqID, contentEncoding string, raw []byte) error { // before adding any events then ErrBatchFull is returned. func (b *Batch) AddAgentData(apmData APMData) error { if len(apmData.Data) == 0 { - return nil + return ErrNoData } raw, err := GetUncompressedBytes(apmData.Data, apmData.ContentEncoding) if err != nil { diff --git a/apmproxy/apmserver.go b/apmproxy/apmserver.go index 8a1ee983..4a7bfd9a 100644 --- a/apmproxy/apmserver.go +++ b/apmproxy/apmserver.go @@ -60,6 +60,10 @@ func (c *Client) ForwardApmData(ctx context.Context) error { return nil case data := <-c.AgentDataChannel: if err := c.forwardAgentData(ctx, data); err != nil { + if errors.Is(err, accumulator.ErrNoData) { + c.logger.Debug("received something from AgentDataChannel without APMData") + continue + } return err } // Wait for metadata to be available, metadata will be available as soon as From 3dd99800a009e4be7a9254cbea8ac76b8c6f1227 Mon Sep 17 00:00:00 2001 From: Florian Lehner Date: Fri, 25 Oct 2024 11:45:11 +0200 Subject: [PATCH 02/10] apmproxy: set channel only once Signed-off-by: Florian Lehner --- apmproxy/apmserver.go | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/apmproxy/apmserver.go b/apmproxy/apmserver.go index 4a7bfd9a..2d95ae23 100644 --- a/apmproxy/apmserver.go +++ b/apmproxy/apmserver.go @@ -29,6 +29,7 @@ import ( "math/rand" "net/http" "time" + "sync" "github.com/elastic/apm-aws-lambda/accumulator" "github.com/elastic/apm-aws-lambda/version" @@ -52,6 +53,7 @@ func (c *Client) ForwardApmData(ctx context.Context) error { c.logger.Warn("Failed to start APM data forwarder due to client unhealthy") return nil } + var once sync.Once var lambdaDataChan chan []byte for { select { @@ -66,9 +68,11 @@ func (c *Client) ForwardApmData(ctx context.Context) error { } return err } - // Wait for metadata to be available, metadata will be available as soon as - // the first agent data is processed. - lambdaDataChan = c.LambdaDataChannel + once.Do(func() { + // Wait for metadata to be available, metadata will be available as soon as + // the first agent data is processed. + lambdaDataChan = c.LambdaDataChannel + }) case data := <-lambdaDataChan: if err := c.forwardLambdaData(ctx, data); err != nil { return err From 9fd088671c833c4dc0af205a4dca4dcd97b62f27 Mon Sep 17 00:00:00 2001 From: Florian Lehner Date: Fri, 25 Oct 2024 11:54:58 +0200 Subject: [PATCH 03/10] accumulator: add test for call to AddAgentData without data Signed-off-by: Florian Lehner --- accumulator/batch_test.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/accumulator/batch_test.go b/accumulator/batch_test.go index 244d7e66..b33ce43a 100644 --- a/accumulator/batch_test.go +++ b/accumulator/batch_test.go @@ -48,6 +48,10 @@ func TestAdd(t *testing.T) { assert.ErrorIs(t, ErrBatchFull, b.AddLambdaData([]byte(`{"log":{}}`))) }) + t.Run("empty AddAgentData", func(t *testing.T) { + b := NewBatch(1, time.Hour) + assert.ErrorIs(t, ErrNoData, b.AddAgentData(APMData{})) + }) } func TestReset(t *testing.T) { From b383cda3a8f6bdf6f91f72bb9c4df76cb734242e Mon Sep 17 00:00:00 2001 From: Florian Lehner Date: Fri, 25 Oct 2024 11:57:28 +0200 Subject: [PATCH 04/10] fixup: fix linter Signed-off-by: Florian Lehner --- apmproxy/apmserver.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apmproxy/apmserver.go b/apmproxy/apmserver.go index 2d95ae23..aa762c16 100644 --- a/apmproxy/apmserver.go +++ b/apmproxy/apmserver.go @@ -28,8 +28,8 @@ import ( "math" "math/rand" "net/http" - "time" "sync" + "time" "github.com/elastic/apm-aws-lambda/accumulator" "github.com/elastic/apm-aws-lambda/version" From e9cde491e1b5dedd24257a1108f50cc3159e2822 Mon Sep 17 00:00:00 2001 From: Florian Lehner Date: Fri, 25 Oct 2024 12:11:32 +0200 Subject: [PATCH 05/10] fixup: add AgentInfo to debug message Signed-off-by: Florian Lehner --- apmproxy/apmserver.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apmproxy/apmserver.go b/apmproxy/apmserver.go index aa762c16..82866761 100644 --- a/apmproxy/apmserver.go +++ b/apmproxy/apmserver.go @@ -63,7 +63,7 @@ func (c *Client) ForwardApmData(ctx context.Context) error { case data := <-c.AgentDataChannel: if err := c.forwardAgentData(ctx, data); err != nil { if errors.Is(err, accumulator.ErrNoData) { - c.logger.Debug("received something from AgentDataChannel without APMData") + c.logger.Debugf("received something from '%s' without APMData", data.AgentInfo) continue } return err From 4fb0bc58bd2b39381694e8b405972d12a110e311 Mon Sep 17 00:00:00 2001 From: Florian Lehner Date: Fri, 25 Oct 2024 12:14:12 +0200 Subject: [PATCH 06/10] fixup: use capital letter in debug message Signed-off-by: Florian Lehner --- apmproxy/apmserver.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apmproxy/apmserver.go b/apmproxy/apmserver.go index 82866761..54255f3c 100644 --- a/apmproxy/apmserver.go +++ b/apmproxy/apmserver.go @@ -63,7 +63,7 @@ func (c *Client) ForwardApmData(ctx context.Context) error { case data := <-c.AgentDataChannel: if err := c.forwardAgentData(ctx, data); err != nil { if errors.Is(err, accumulator.ErrNoData) { - c.logger.Debugf("received something from '%s' without APMData", data.AgentInfo) + c.logger.Debugf("Received something from '%s' without APMData", data.AgentInfo) continue } return err From 18191d73075b793031035997f32cf3afd35bb23f Mon Sep 17 00:00:00 2001 From: Florian Lehner Date: Fri, 25 Oct 2024 12:17:29 +0200 Subject: [PATCH 07/10] receiver: avoid allocation if rawBytes is empty Signed-off-by: Florian Lehner --- apmproxy/receiver.go | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/apmproxy/receiver.go b/apmproxy/receiver.go index 40c34310..01d834c3 100644 --- a/apmproxy/receiver.go +++ b/apmproxy/receiver.go @@ -136,18 +136,20 @@ func (c *Client) handleIntakeV2Events() func(w http.ResponseWriter, r *http.Requ agentFlushed := r.URL.Query().Get("flushed") == "true" - agentData := accumulator.APMData{ - Data: rawBytes, - ContentEncoding: r.Header.Get("Content-Encoding"), - AgentInfo: r.UserAgent(), - } + if len(rawBytes) != 0 { + agentData := accumulator.APMData{ + Data: rawBytes, + ContentEncoding: r.Header.Get("Content-Encoding"), + AgentInfo: r.UserAgent(), + } - if len(agentData.Data) != 0 { select { case c.AgentDataChannel <- agentData: default: c.logger.Warnf("Channel full: dropping a subset of agent data") } + } else { + c.logger.Debugf("Received empy request from '%s'", r.UserAgent()) } if agentFlushed { From 7330f870e29d12fbaf390fcf2d107225f568cc90 Mon Sep 17 00:00:00 2001 From: Florian Lehner Date: Fri, 25 Oct 2024 12:55:19 +0200 Subject: [PATCH 08/10] fixup: check len() first before calling function Signed-off-by: Florian Lehner --- apmproxy/apmserver.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/apmproxy/apmserver.go b/apmproxy/apmserver.go index 54255f3c..da21451a 100644 --- a/apmproxy/apmserver.go +++ b/apmproxy/apmserver.go @@ -61,11 +61,11 @@ func (c *Client) ForwardApmData(ctx context.Context) error { c.logger.Debug("Invocation context cancelled, not processing any more agent data") return nil case data := <-c.AgentDataChannel: + if len(data.Data) == 0 { + c.logger.Debugf("Received something from '%s' without APMData", data.AgentInfo) + continue + } if err := c.forwardAgentData(ctx, data); err != nil { - if errors.Is(err, accumulator.ErrNoData) { - c.logger.Debugf("Received something from '%s' without APMData", data.AgentInfo) - continue - } return err } once.Do(func() { From bd59d4a68fd63e0c99c4554674ea3bfd6a3f203a Mon Sep 17 00:00:00 2001 From: Florian Lehner Date: Fri, 25 Oct 2024 13:01:08 +0200 Subject: [PATCH 09/10] fixup: update comment Signed-off-by: Florian Lehner --- apmproxy/apmserver.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/apmproxy/apmserver.go b/apmproxy/apmserver.go index da21451a..c283e469 100644 --- a/apmproxy/apmserver.go +++ b/apmproxy/apmserver.go @@ -69,8 +69,8 @@ func (c *Client) ForwardApmData(ctx context.Context) error { return err } once.Do(func() { - // Wait for metadata to be available, metadata will be available as soon as - // the first agent data is processed. + // With the first successful request to c.forwardAgent Data() metadata should be + // available and processing data from c.LambdaDataChannel can start. lambdaDataChan = c.LambdaDataChannel }) case data := <-lambdaDataChan: From 1763b69bc212ca52f5c8942eb2359b2e15cbbfec Mon Sep 17 00:00:00 2001 From: Florian Lehner Date: Thu, 31 Oct 2024 10:28:03 +0100 Subject: [PATCH 10/10] fixup: replace sync.Once with nil check Signed-off-by: Florian Lehner --- apmproxy/apmserver.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/apmproxy/apmserver.go b/apmproxy/apmserver.go index c283e469..ff030cfa 100644 --- a/apmproxy/apmserver.go +++ b/apmproxy/apmserver.go @@ -28,7 +28,6 @@ import ( "math" "math/rand" "net/http" - "sync" "time" "github.com/elastic/apm-aws-lambda/accumulator" @@ -53,7 +52,6 @@ func (c *Client) ForwardApmData(ctx context.Context) error { c.logger.Warn("Failed to start APM data forwarder due to client unhealthy") return nil } - var once sync.Once var lambdaDataChan chan []byte for { select { @@ -68,11 +66,12 @@ func (c *Client) ForwardApmData(ctx context.Context) error { if err := c.forwardAgentData(ctx, data); err != nil { return err } - once.Do(func() { + if lambdaDataChan == nil { // With the first successful request to c.forwardAgent Data() metadata should be // available and processing data from c.LambdaDataChannel can start. lambdaDataChan = c.LambdaDataChannel - }) + c.logger.Debug("Assigned Lambda data channel") + } case data := <-lambdaDataChan: if err := c.forwardLambdaData(ctx, data); err != nil { return err