Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow metadata in register transaction #384

Merged
merged 2 commits into from
Apr 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 10 additions & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,16 @@
===== Bug fixes
////

[float]
[[lambda-unreleased]]
=== Unreleased

https://github.com/elastic/apm-aws-lambda/compare/v1.3.1...main[View commits]

[float]
===== Features
- experimental:[] Allow metadata in register transaction {lambda-pull}384[384]

[float]
[[lambda-1.3.1]]
=== 1.3.1 - 2023/04/04
Expand Down
67 changes: 51 additions & 16 deletions accumulator/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,16 @@ var (
maxSizeThreshold = 0.9
zeroTime = time.Time{}
newLineSep = []byte("\n")
transactionKey = []byte("transaction")
transactionKey = "transaction"
metadataKey = "metadata"
)

type eventType int

const (
metadataEvent = iota
transactionEvent
otherEvent
)

// Batch manages the data that needs to be shipped to APM Server. It holds
Expand Down Expand Up @@ -112,22 +121,44 @@ func (b *Batch) RegisterInvocation(
}

// OnAgentInit caches the transaction ID and the payload for the currently
// executing invocation as reported by the agent. The agent payload will be
// used to create a new transaction in an event the actual transaction is
// not reported by the agent due to unexpected termination.
func (b *Batch) OnAgentInit(reqID, txnID string, payload []byte) error {
if !isTransactionEvent(payload) {
// executing invocation as reported by the agent. The payload can contain
// metadata along with partial transaction. Metadata, if available, will
// be cached for all future invocation. The agent payload will be used to
// create a new transaction in an event the actual transaction is not
// reported by the agent due to unexpected termination.
func (b *Batch) OnAgentInit(reqID, contentEncoding string, raw []byte) error {
payload, err := GetUncompressedBytes(raw, contentEncoding)
if err != nil {
return fmt.Errorf("failed to decompress request body: %w", err)
}

var metadata, txnData []byte
switch findEventType(payload) {
case metadataEvent:
metadata, txnData, _ = bytes.Cut(payload, newLineSep)
case transactionEvent:
txnData = payload
default:
return errors.New("invalid payload")
}

txnID := gjson.GetBytes(txnData, "transaction.id").String()
if txnID == "" {
return errors.New("failed to parse transaction id from registration body")
}

b.mu.Lock()
defer b.mu.Unlock()
if b.metadataBytes == 0 && len(metadata) > 0 {
b.metadataBytes, _ = b.buf.Write(metadata)
}
i, ok := b.invocations[reqID]
if !ok {
// It is possible that the invocation is registered at a later time
i = &Invocation{}
b.invocations[reqID] = i
}
i.TransactionID, i.AgentPayload = txnID, payload
i.TransactionID, i.AgentPayload = txnID, txnData
b.currentlyExecutingRequestID = reqID
return nil
}
Expand Down Expand Up @@ -167,7 +198,7 @@ func (b *Batch) AddAgentData(apmData APMData) error {
}
for {
data, after, _ = bytes.Cut(after, newLineSep)
if inc.NeedProxyTransaction() && isTransactionEvent(data) {
if inc.NeedProxyTransaction() && findEventType(data) == transactionEvent {
res := gjson.GetBytes(data, "transaction.id")
if res.Str != "" && inc.TransactionID == res.Str {
inc.TransactionObserved = true
Expand Down Expand Up @@ -313,21 +344,25 @@ func (b *Batch) addData(data []byte) error {
return nil
}

func isTransactionEvent(body []byte) bool {
func findEventType(body []byte) eventType {
var quote byte
var key []byte
for i, r := range body {
if r == '"' || r == '\'' {
quote = r
key = body[i+1:]
break
}
}
if len(key) < len(transactionKey) {
return false
end := bytes.IndexByte(key, quote)
if end == -1 {
return otherEvent
}
for i := 0; i < len(transactionKey); i++ {
if transactionKey[i] != key[i] {
return false
}
switch string(key[:end]) {
case transactionKey:
return transactionEvent
case metadataKey:
return metadataEvent
}
return true
return otherEvent
}
90 changes: 70 additions & 20 deletions accumulator/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,22 +95,26 @@ func TestShouldShip_ReasonAge(t *testing.T) {
func TestLifecycle(t *testing.T) {
reqID := "test-req-id"
fnARN := "test-fn-arn"
txnID := "023d90ff77f13b9f"
lambdaData := `{"log":{"message":"this is log"}}`
txnData := fmt.Sprintf(`{"transaction":{"id":"%s"}}`, txnID)
txnData := fmt.Sprintf(`{"transaction":{"id":"%s"}}`, "023d90ff77f13b9f")
ts := time.Date(2022, time.October, 1, 1, 1, 1, 0, time.UTC)
txnDur := time.Second

type agentInit struct {
init bool
withMetadata bool
}

for _, tc := range []struct {
name string
agentInit bool
agentInit agentInit
receiveAgentRootTxn bool
receiveLambdaLogRuntime bool
expected string
}{
{
name: "without_agent_init_without_root_txn",
agentInit: false,
agentInit: agentInit{init: false, withMetadata: false},
receiveAgentRootTxn: false,
receiveLambdaLogRuntime: false,
// Without agent init no proxy txn is created if root txn is not reported
Expand All @@ -122,7 +126,7 @@ func TestLifecycle(t *testing.T) {
},
{
name: "without_agent_init_with_root_txn",
agentInit: false,
agentInit: agentInit{init: false, withMetadata: false},
receiveAgentRootTxn: true,
receiveLambdaLogRuntime: false,
expected: fmt.Sprintf(
Expand All @@ -133,8 +137,8 @@ func TestLifecycle(t *testing.T) {
),
},
{
name: "with_agent_init_with_root_txn",
agentInit: true,
name: "with_no_meta_agent_init_with_root_txn",
agentInit: agentInit{init: true, withMetadata: false},
receiveAgentRootTxn: true,
receiveLambdaLogRuntime: false,
expected: fmt.Sprintf(
Expand All @@ -145,8 +149,20 @@ func TestLifecycle(t *testing.T) {
),
},
{
name: "with_agent_init_without_root_txn_with_runtimeDone",
agentInit: true,
name: "with_meta_agent_init_with_root_txn",
agentInit: agentInit{init: true, withMetadata: true},
receiveAgentRootTxn: true,
receiveLambdaLogRuntime: false,
expected: fmt.Sprintf(
"%s\n%s\n%s",
metadata,
generateCompleteTxn(t, txnData, "success", "", txnDur),
lambdaData,
),
},
{
name: "with_no_meta_agent_init_without_root_txn_with_runtimeDone",
agentInit: agentInit{init: true, withMetadata: false},
receiveAgentRootTxn: false,
receiveLambdaLogRuntime: true,
// With agent init proxy txn is created if root txn is not reported.
Expand All @@ -159,8 +175,37 @@ func TestLifecycle(t *testing.T) {
),
},
{
name: "with_agent_init_without_root_txn",
agentInit: true,
name: "with_meta_agent_init_without_root_txn_with_runtimeDone",
agentInit: agentInit{init: true, withMetadata: true},
receiveAgentRootTxn: false,
receiveLambdaLogRuntime: true,
// With agent init proxy txn is created if root txn is not reported.
// Details in runtimeDone event is used to find the result of the txn.
expected: fmt.Sprintf(
"%s\n%s\n%s",
metadata,
lambdaData,
generateCompleteTxn(t, txnData, "failure", "failure", txnDur),
),
},
{
name: "with_no_meta_agent_init_without_root_txn",
agentInit: agentInit{init: true, withMetadata: false},
receiveAgentRootTxn: false,
receiveLambdaLogRuntime: false,
// With agent init proxy txn is created if root txn is not reported.
// If runtimeDone event is not available `timeout` is used as the
// result of the transaction.
expected: fmt.Sprintf(
"%s\n%s\n%s",
metadata,
lambdaData,
generateCompleteTxn(t, txnData, "timeout", "failure", txnDur),
),
},
{
name: "with_meta_agent_init_without_root_txn",
agentInit: agentInit{init: true, withMetadata: true},
receiveAgentRootTxn: false,
receiveLambdaLogRuntime: false,
// With agent init proxy txn is created if root txn is not reported.
Expand All @@ -179,8 +224,12 @@ func TestLifecycle(t *testing.T) {
// NEXT API response creates a new invocation cache
b.RegisterInvocation(reqID, fnARN, ts.Add(txnDur).UnixMilli(), ts)
// Agent creates and registers a partial transaction in the extn
if tc.agentInit {
require.NoError(t, b.OnAgentInit(reqID, txnID, []byte(txnData)))
if tc.agentInit.init {
initData := txnData
if tc.agentInit.withMetadata {
initData = fmt.Sprintf("%s\n%s", metadata, txnData)
}
require.NoError(t, b.OnAgentInit(reqID, "", []byte(initData)))
}
// Agent sends a request with metadata
require.NoError(t, b.AddAgentData(APMData{
Expand Down Expand Up @@ -209,17 +258,18 @@ func TestLifecycle(t *testing.T) {
}
}

func TestIsTransactionEvent(t *testing.T) {
func TestFindEventType(t *testing.T) {
for _, tc := range []struct {
body []byte
expected bool
expected eventType
}{
{body: []byte(`{}`), expected: false},
{body: []byte(`{"tran":{}}`), expected: false},
{body: []byte(`{"span":{}}`), expected: false},
{body: []byte(`{"transaction":{}}`), expected: true},
{body: []byte(`{}`), expected: otherEvent},
{body: []byte(`{"tran":{}}`), expected: otherEvent},
{body: []byte(`{"span":{}}`), expected: otherEvent},
{body: []byte(`{"metadata":{}}\n{"transaction":{}}`), expected: metadataEvent},
{body: []byte(`{"transaction":{}}`), expected: transactionEvent},
} {
assert.Equal(t, tc.expected, isTransactionEvent(tc.body))
assert.Equal(t, tc.expected, findEventType(tc.body))
}
}

Expand Down
26 changes: 11 additions & 15 deletions apmproxy/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,9 @@ import (
"time"

"github.com/elastic/apm-aws-lambda/accumulator"
"github.com/tidwall/gjson"
)

const txnRegistrationContentType = "application/vnd.elastic.apm.transaction+json"
const txnRegistrationContentType = "application/vnd.elastic.apm.transaction+ndjson"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it OK to break backwards compatibility here? Is there a concern for using older agents? AFAICS, if older agents attempt to send data to this endpoint, we'll just reject it and lose the partial transaction information.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This hasn't been implemented yet by any agent, in fact python agent is the first one to start implementing it so backward compatibility is not an issue at this point.


// StartReceiver starts the server listening for APM agent data.
func (c *Client) StartReceiver() error {
Expand Down Expand Up @@ -76,12 +75,12 @@ func (c *Client) Shutdown() error {
// URL: http://server/
func (c *Client) handleInfoRequest() (func(w http.ResponseWriter, r *http.Request), error) {
// Init reverse proxy
parsedApmServerUrl, err := url.Parse(c.serverURL)
parsedApmServerURL, err := url.Parse(c.serverURL)
if err != nil {
return nil, fmt.Errorf("could not parse APM server URL: %w", err)
}

reverseProxy := httputil.NewSingleHostReverseProxy(parsedApmServerUrl)
reverseProxy := httputil.NewSingleHostReverseProxy(parsedApmServerURL)

reverseProxy.Transport = c.client.Transport.(*http.Transport).Clone()

Expand All @@ -103,10 +102,10 @@ func (c *Client) handleInfoRequest() (func(w http.ResponseWriter, r *http.Reques
r.Header.Del("X-Forwarded-For")

// Update headers to allow for SSL redirection
r.URL.Host = parsedApmServerUrl.Host
r.URL.Scheme = parsedApmServerUrl.Scheme
r.URL.Host = parsedApmServerURL.Host
r.URL.Scheme = parsedApmServerURL.Scheme
r.Header.Set("X-Forwarded-Host", r.Header.Get("Host"))
r.Host = parsedApmServerUrl.Host
r.Host = parsedApmServerURL.Host

// Forward request to the APM server
reverseProxy.ServeHTTP(w, r)
Expand Down Expand Up @@ -185,14 +184,11 @@ func (c *Client) handleTransactionRegistration() func(w http.ResponseWriter, r *
w.WriteHeader(http.StatusBadRequest)
return
}
txnID := gjson.GetBytes(rawBytes, "transaction.id").String()
if txnID == "" {
c.logger.Warn("Could not parse transaction id from transaction registration body")
w.WriteHeader(http.StatusUnprocessableEntity)
return
}
if err := c.batch.OnAgentInit(reqID, txnID, rawBytes); err != nil {
c.logger.Warnf("Failed to update invocation for transaction ID %s: %v", txnID, err)

if err := c.batch.OnAgentInit(
reqID, r.Header.Get("Content-Encoding"), rawBytes,
); err != nil {
c.logger.Warnf("Failed to update invocation: %w", err)
w.WriteHeader(http.StatusUnprocessableEntity)
return
}
Expand Down