forked from newrelic/go-agent
-
Notifications
You must be signed in to change notification settings - Fork 0
/
serverless.go
160 lines (141 loc) · 4.28 KB
/
serverless.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
// Copyright 2020 New Relic Corporation. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package newrelic
import (
"bytes"
"compress/gzip"
"encoding/base64"
"encoding/json"
"fmt"
"io"
"sync"
"time"
)
const (
// agentLanguage is used in the connect JSON and the Lambda JSON.
agentLanguage = "go"
lambdaMetadataVersion = 2
)
// serverlessHarvest is used to store and log data when the agent is running in
// serverless mode.
type serverlessHarvest struct {
logger Logger
awsExecutionEnv string
// The Lambda handler could be using multiple goroutines so we use a
// mutex to prevent race conditions.
sync.Mutex
harvest *harvest
}
// newServerlessHarvest creates a new serverlessHarvest.
func newServerlessHarvest(logger Logger, getEnv func(string) string) *serverlessHarvest {
return &serverlessHarvest{
logger: logger,
awsExecutionEnv: getEnv("AWS_EXECUTION_ENV"),
// We can use dfltHarvestCfgr because
// serverless mode doesn't have a connect, and therefore won't
// have custom event limits from the server.
harvest: newHarvest(time.Now(), dfltHarvestCfgr),
}
}
// Consume adds data to the harvest.
func (sh *serverlessHarvest) Consume(data harvestable) {
if nil == sh {
return
}
sh.Lock()
defer sh.Unlock()
data.MergeIntoHarvest(sh.harvest)
}
func (sh *serverlessHarvest) swapHarvest() *harvest {
sh.Lock()
defer sh.Unlock()
h := sh.harvest
sh.harvest = newHarvest(time.Now(), dfltHarvestCfgr)
return h
}
// Write logs the data in the format described by:
// https://source.datanerd.us/agents/agent-specs/blob/master/Lambda.md
func (sh *serverlessHarvest) Write(arn string, writer io.Writer) {
if nil == sh {
return
}
harvest := sh.swapHarvest()
payloads := harvest.Payloads(false)
// Note that *json.RawMessage (instead of json.RawMessage) is used to
// support older Go versions: https://go-review.googlesource.com/c/go/+/21811/
harvestPayloads := make(map[string]*json.RawMessage, len(payloads))
for _, p := range payloads {
agentRunID := ""
cmd := p.EndpointMethod()
data, err := p.Data(agentRunID, time.Now())
if err != nil {
sh.logger.Error("error creating payload json", map[string]interface{}{
"command": cmd,
"error": err.Error(),
})
continue
}
if nil == data {
continue
}
// NOTE! This code relies on the fact that each payload is
// using a different endpoint method. Sometimes the transaction
// events payload might be split, but since there is only one
// transaction event per serverless transaction, that's not an
// issue. Likewise, if we ever split normal transaction events
// apart from synthetics events, the transaction will either be
// normal or synthetic, so that won't be an issue. Log an error
// if this happens for future defensiveness.
if _, ok := harvestPayloads[cmd]; ok {
sh.logger.Error("data with duplicate command name lost", map[string]interface{}{
"command": cmd,
})
}
d := json.RawMessage(data)
harvestPayloads[cmd] = &d
}
if len(harvestPayloads) == 0 {
// The harvest may not contain any data if the serverless
// transaction was ignored.
return
}
data, err := json.Marshal(harvestPayloads)
if nil != err {
sh.logger.Error("error creating serverless data json", map[string]interface{}{
"error": err.Error(),
})
return
}
var dataBuf bytes.Buffer
gz := gzip.NewWriter(&dataBuf)
gz.Write(data)
gz.Flush()
gz.Close()
js, err := json.Marshal([]interface{}{
lambdaMetadataVersion,
"NR_LAMBDA_MONITORING",
struct {
MetadataVersion int `json:"metadata_version"`
ARN string `json:"arn,omitempty"`
ProtocolVersion int `json:"protocol_version"`
ExecutionEnvironment string `json:"execution_environment,omitempty"`
AgentVersion string `json:"agent_version"`
AgentLanguage string `json:"agent_language"`
}{
MetadataVersion: lambdaMetadataVersion,
ProtocolVersion: procotolVersion,
AgentVersion: Version,
ExecutionEnvironment: sh.awsExecutionEnv,
ARN: arn,
AgentLanguage: agentLanguage,
},
base64.StdEncoding.EncodeToString(dataBuf.Bytes()),
})
if err != nil {
sh.logger.Error("error creating serverless json", map[string]interface{}{
"error": err.Error(),
})
return
}
fmt.Fprintln(writer, string(js))
}