diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 5c0cf185..aa613b50 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -16,6 +16,16 @@ ===== Bug fixes //// +[float] +[[lambda-unreleased]] +=== Unreleased + +https://github.com/elastic/apm-aws-lambda/compare/v1.3.1...main[View commits] + +[float] +===== Features +- experimental:[] Allow metadata in register transaction {lambda-pull}384[384] + [float] [[lambda-1.3.1]] === 1.3.1 - 2023/04/04 diff --git a/accumulator/batch.go b/accumulator/batch.go index 02ddb5a0..61618a44 100644 --- a/accumulator/batch.go +++ b/accumulator/batch.go @@ -43,7 +43,16 @@ var ( maxSizeThreshold = 0.9 zeroTime = time.Time{} newLineSep = []byte("\n") - transactionKey = []byte("transaction") + transactionKey = "transaction" + metadataKey = "metadata" +) + +type eventType int + +const ( + metadataEvent = iota + transactionEvent + otherEvent ) // Batch manages the data that needs to be shipped to APM Server. It holds @@ -112,22 +121,44 @@ func (b *Batch) RegisterInvocation( } // OnAgentInit caches the transaction ID and the payload for the currently -// executing invocation as reported by the agent. The agent payload will be -// used to create a new transaction in an event the actual transaction is -// not reported by the agent due to unexpected termination. -func (b *Batch) OnAgentInit(reqID, txnID string, payload []byte) error { - if !isTransactionEvent(payload) { +// executing invocation as reported by the agent. The payload can contain +// metadata along with partial transaction. Metadata, if available, will +// be cached for all future invocation. The agent payload will be used to +// create a new transaction in an event the actual transaction is not +// reported by the agent due to unexpected termination. +func (b *Batch) OnAgentInit(reqID, contentEncoding string, raw []byte) error { + payload, err := GetUncompressedBytes(raw, contentEncoding) + if err != nil { + return fmt.Errorf("failed to decompress request body: %w", err) + } + + var metadata, txnData []byte + switch findEventType(payload) { + case metadataEvent: + metadata, txnData, _ = bytes.Cut(payload, newLineSep) + case transactionEvent: + txnData = payload + default: return errors.New("invalid payload") } + + txnID := gjson.GetBytes(txnData, "transaction.id").String() + if txnID == "" { + return errors.New("failed to parse transaction id from registration body") + } + b.mu.Lock() defer b.mu.Unlock() + if b.metadataBytes == 0 && len(metadata) > 0 { + b.metadataBytes, _ = b.buf.Write(metadata) + } i, ok := b.invocations[reqID] if !ok { // It is possible that the invocation is registered at a later time i = &Invocation{} b.invocations[reqID] = i } - i.TransactionID, i.AgentPayload = txnID, payload + i.TransactionID, i.AgentPayload = txnID, txnData b.currentlyExecutingRequestID = reqID return nil } @@ -167,7 +198,7 @@ func (b *Batch) AddAgentData(apmData APMData) error { } for { data, after, _ = bytes.Cut(after, newLineSep) - if inc.NeedProxyTransaction() && isTransactionEvent(data) { + if inc.NeedProxyTransaction() && findEventType(data) == transactionEvent { res := gjson.GetBytes(data, "transaction.id") if res.Str != "" && inc.TransactionID == res.Str { inc.TransactionObserved = true @@ -313,21 +344,25 @@ func (b *Batch) addData(data []byte) error { return nil } -func isTransactionEvent(body []byte) bool { +func findEventType(body []byte) eventType { + var quote byte var key []byte for i, r := range body { if r == '"' || r == '\'' { + quote = r key = body[i+1:] break } } - if len(key) < len(transactionKey) { - return false + end := bytes.IndexByte(key, quote) + if end == -1 { + return otherEvent } - for i := 0; i < len(transactionKey); i++ { - if transactionKey[i] != key[i] { - return false - } + switch string(key[:end]) { + case transactionKey: + return transactionEvent + case metadataKey: + return metadataEvent } - return true + return otherEvent } diff --git a/accumulator/batch_test.go b/accumulator/batch_test.go index 2836cc30..244d7e66 100644 --- a/accumulator/batch_test.go +++ b/accumulator/batch_test.go @@ -95,22 +95,26 @@ func TestShouldShip_ReasonAge(t *testing.T) { func TestLifecycle(t *testing.T) { reqID := "test-req-id" fnARN := "test-fn-arn" - txnID := "023d90ff77f13b9f" lambdaData := `{"log":{"message":"this is log"}}` - txnData := fmt.Sprintf(`{"transaction":{"id":"%s"}}`, txnID) + txnData := fmt.Sprintf(`{"transaction":{"id":"%s"}}`, "023d90ff77f13b9f") ts := time.Date(2022, time.October, 1, 1, 1, 1, 0, time.UTC) txnDur := time.Second + type agentInit struct { + init bool + withMetadata bool + } + for _, tc := range []struct { name string - agentInit bool + agentInit agentInit receiveAgentRootTxn bool receiveLambdaLogRuntime bool expected string }{ { name: "without_agent_init_without_root_txn", - agentInit: false, + agentInit: agentInit{init: false, withMetadata: false}, receiveAgentRootTxn: false, receiveLambdaLogRuntime: false, // Without agent init no proxy txn is created if root txn is not reported @@ -122,7 +126,7 @@ func TestLifecycle(t *testing.T) { }, { name: "without_agent_init_with_root_txn", - agentInit: false, + agentInit: agentInit{init: false, withMetadata: false}, receiveAgentRootTxn: true, receiveLambdaLogRuntime: false, expected: fmt.Sprintf( @@ -133,8 +137,8 @@ func TestLifecycle(t *testing.T) { ), }, { - name: "with_agent_init_with_root_txn", - agentInit: true, + name: "with_no_meta_agent_init_with_root_txn", + agentInit: agentInit{init: true, withMetadata: false}, receiveAgentRootTxn: true, receiveLambdaLogRuntime: false, expected: fmt.Sprintf( @@ -145,8 +149,20 @@ func TestLifecycle(t *testing.T) { ), }, { - name: "with_agent_init_without_root_txn_with_runtimeDone", - agentInit: true, + name: "with_meta_agent_init_with_root_txn", + agentInit: agentInit{init: true, withMetadata: true}, + receiveAgentRootTxn: true, + receiveLambdaLogRuntime: false, + expected: fmt.Sprintf( + "%s\n%s\n%s", + metadata, + generateCompleteTxn(t, txnData, "success", "", txnDur), + lambdaData, + ), + }, + { + name: "with_no_meta_agent_init_without_root_txn_with_runtimeDone", + agentInit: agentInit{init: true, withMetadata: false}, receiveAgentRootTxn: false, receiveLambdaLogRuntime: true, // With agent init proxy txn is created if root txn is not reported. @@ -159,8 +175,37 @@ func TestLifecycle(t *testing.T) { ), }, { - name: "with_agent_init_without_root_txn", - agentInit: true, + name: "with_meta_agent_init_without_root_txn_with_runtimeDone", + agentInit: agentInit{init: true, withMetadata: true}, + receiveAgentRootTxn: false, + receiveLambdaLogRuntime: true, + // With agent init proxy txn is created if root txn is not reported. + // Details in runtimeDone event is used to find the result of the txn. + expected: fmt.Sprintf( + "%s\n%s\n%s", + metadata, + lambdaData, + generateCompleteTxn(t, txnData, "failure", "failure", txnDur), + ), + }, + { + name: "with_no_meta_agent_init_without_root_txn", + agentInit: agentInit{init: true, withMetadata: false}, + receiveAgentRootTxn: false, + receiveLambdaLogRuntime: false, + // With agent init proxy txn is created if root txn is not reported. + // If runtimeDone event is not available `timeout` is used as the + // result of the transaction. + expected: fmt.Sprintf( + "%s\n%s\n%s", + metadata, + lambdaData, + generateCompleteTxn(t, txnData, "timeout", "failure", txnDur), + ), + }, + { + name: "with_meta_agent_init_without_root_txn", + agentInit: agentInit{init: true, withMetadata: true}, receiveAgentRootTxn: false, receiveLambdaLogRuntime: false, // With agent init proxy txn is created if root txn is not reported. @@ -179,8 +224,12 @@ func TestLifecycle(t *testing.T) { // NEXT API response creates a new invocation cache b.RegisterInvocation(reqID, fnARN, ts.Add(txnDur).UnixMilli(), ts) // Agent creates and registers a partial transaction in the extn - if tc.agentInit { - require.NoError(t, b.OnAgentInit(reqID, txnID, []byte(txnData))) + if tc.agentInit.init { + initData := txnData + if tc.agentInit.withMetadata { + initData = fmt.Sprintf("%s\n%s", metadata, txnData) + } + require.NoError(t, b.OnAgentInit(reqID, "", []byte(initData))) } // Agent sends a request with metadata require.NoError(t, b.AddAgentData(APMData{ @@ -209,17 +258,18 @@ func TestLifecycle(t *testing.T) { } } -func TestIsTransactionEvent(t *testing.T) { +func TestFindEventType(t *testing.T) { for _, tc := range []struct { body []byte - expected bool + expected eventType }{ - {body: []byte(`{}`), expected: false}, - {body: []byte(`{"tran":{}}`), expected: false}, - {body: []byte(`{"span":{}}`), expected: false}, - {body: []byte(`{"transaction":{}}`), expected: true}, + {body: []byte(`{}`), expected: otherEvent}, + {body: []byte(`{"tran":{}}`), expected: otherEvent}, + {body: []byte(`{"span":{}}`), expected: otherEvent}, + {body: []byte(`{"metadata":{}}\n{"transaction":{}}`), expected: metadataEvent}, + {body: []byte(`{"transaction":{}}`), expected: transactionEvent}, } { - assert.Equal(t, tc.expected, isTransactionEvent(tc.body)) + assert.Equal(t, tc.expected, findEventType(tc.body)) } } diff --git a/apmproxy/receiver.go b/apmproxy/receiver.go index 7cc6e95b..d28d99e8 100644 --- a/apmproxy/receiver.go +++ b/apmproxy/receiver.go @@ -29,10 +29,9 @@ import ( "time" "github.com/elastic/apm-aws-lambda/accumulator" - "github.com/tidwall/gjson" ) -const txnRegistrationContentType = "application/vnd.elastic.apm.transaction+json" +const txnRegistrationContentType = "application/vnd.elastic.apm.transaction+ndjson" // StartReceiver starts the server listening for APM agent data. func (c *Client) StartReceiver() error { @@ -76,12 +75,12 @@ func (c *Client) Shutdown() error { // URL: http://server/ func (c *Client) handleInfoRequest() (func(w http.ResponseWriter, r *http.Request), error) { // Init reverse proxy - parsedApmServerUrl, err := url.Parse(c.serverURL) + parsedApmServerURL, err := url.Parse(c.serverURL) if err != nil { return nil, fmt.Errorf("could not parse APM server URL: %w", err) } - reverseProxy := httputil.NewSingleHostReverseProxy(parsedApmServerUrl) + reverseProxy := httputil.NewSingleHostReverseProxy(parsedApmServerURL) reverseProxy.Transport = c.client.Transport.(*http.Transport).Clone() @@ -103,10 +102,10 @@ func (c *Client) handleInfoRequest() (func(w http.ResponseWriter, r *http.Reques r.Header.Del("X-Forwarded-For") // Update headers to allow for SSL redirection - r.URL.Host = parsedApmServerUrl.Host - r.URL.Scheme = parsedApmServerUrl.Scheme + r.URL.Host = parsedApmServerURL.Host + r.URL.Scheme = parsedApmServerURL.Scheme r.Header.Set("X-Forwarded-Host", r.Header.Get("Host")) - r.Host = parsedApmServerUrl.Host + r.Host = parsedApmServerURL.Host // Forward request to the APM server reverseProxy.ServeHTTP(w, r) @@ -185,14 +184,11 @@ func (c *Client) handleTransactionRegistration() func(w http.ResponseWriter, r * w.WriteHeader(http.StatusBadRequest) return } - txnID := gjson.GetBytes(rawBytes, "transaction.id").String() - if txnID == "" { - c.logger.Warn("Could not parse transaction id from transaction registration body") - w.WriteHeader(http.StatusUnprocessableEntity) - return - } - if err := c.batch.OnAgentInit(reqID, txnID, rawBytes); err != nil { - c.logger.Warnf("Failed to update invocation for transaction ID %s: %v", txnID, err) + + if err := c.batch.OnAgentInit( + reqID, r.Header.Get("Content-Encoding"), rawBytes, + ); err != nil { + c.logger.Warnf("Failed to update invocation: %w", err) w.WriteHeader(http.StatusUnprocessableEntity) return }