From 709a10ada4ad7174dca72b1ce1de3797e6f3ceaf Mon Sep 17 00:00:00 2001 From: Emilio Garcia Date: Mon, 5 Dec 2022 16:01:33 -0500 Subject: [PATCH 1/2] collab changes 12/5 --- .../extensionApi/client.go | 10 +- go-telemetry-api-extension/main.go | 4 +- .../telemetryApi/listener.go | 8 +- .../telemetryApi/send_to_new_relic.go | 280 ++++++++++-------- 4 files changed, 166 insertions(+), 136 deletions(-) diff --git a/go-telemetry-api-extension/extensionApi/client.go b/go-telemetry-api-extension/extensionApi/client.go index 442cfd73..489f82a4 100644 --- a/go-telemetry-api-extension/extensionApi/client.go +++ b/go-telemetry-api-extension/extensionApi/client.go @@ -59,12 +59,16 @@ const ( // Client is a simple client for the Lambda Extensions API type Client struct { - httpClient *http.Client - baseUrl string - ExtensionID string + httpClient *http.Client + baseUrl string + ExtensionID string functionName string } +func (e *Client) GetFunctionName() string { + return e.functionName +} + var l = log.WithFields(log.Fields{"pkg": "extensionApi"}) // Returns a Lambda Extensions API client diff --git a/go-telemetry-api-extension/main.go b/go-telemetry-api-extension/main.go index 92e9899e..132e3ef7 100644 --- a/go-telemetry-api-extension/main.go +++ b/go-telemetry-api-extension/main.go @@ -13,9 +13,9 @@ Notes: package main import ( + "context" "newrelic-lambda-extension/go-telemetry-api-extension/extensionApi" "newrelic-lambda-extension/go-telemetry-api-extension/telemetryApi" - "context" "os" "os/signal" "path" @@ -65,7 +65,7 @@ func main() { panic(err) } l.Info("[main] Subscription success") - dispatcher := telemetryApi.NewDispatcher(extensionApiClient.functionName) + dispatcher := telemetryApi.NewDispatcher(extensionApiClient.GetFunctionName()) // Will block until invoke or shutdown event is received or cancelled via the context. for { diff --git a/go-telemetry-api-extension/telemetryApi/listener.go b/go-telemetry-api-extension/telemetryApi/listener.go index eb60ee9c..edebbcc4 100644 --- a/go-telemetry-api-extension/telemetryApi/listener.go +++ b/go-telemetry-api-extension/telemetryApi/listener.go @@ -75,7 +75,7 @@ func (s *TelemetryApiListener) http_handler(w http.ResponseWriter, r *http.Reque return } // Parse and put the log messages into the queue - var slice []interface{} + var slice []LambdaTelemetryEvent _ = json.Unmarshal(body, &slice) for _, el := range slice { @@ -98,3 +98,9 @@ func (s *TelemetryApiListener) Shutdown() { } } } + +type LambdaTelemetryEvent struct { + Time string + Type string + Record any +} diff --git a/go-telemetry-api-extension/telemetryApi/send_to_new_relic.go b/go-telemetry-api-extension/telemetryApi/send_to_new_relic.go index 4d5326ce..eef2bc3b 100644 --- a/go-telemetry-api-extension/telemetryApi/send_to_new_relic.go +++ b/go-telemetry-api-extension/telemetryApi/send_to_new_relic.go @@ -4,60 +4,57 @@ import ( "bytes" "context" "encoding/json" - "fmt" + "net/http" "os" "path" "reflect" "strings" - "net/http" - "strconv" "time" - "github.com/google/uuid" ) const ( - LogEndpointEU string = "https://log-api.eu.newrelic.com/log/v1" - LogEndpointUS string = "https://log-api.newrelic.com/log/v1" + LogEndpointEU string = "https://log-api.eu.newrelic.com/log/v1" + LogEndpointUS string = "https://log-api.newrelic.com/log/v1" - MetricsEndpointEU string = "https://metric-api.eu.newrelic.com/metric/v1" - MetricsEndpointUS string = "https://metric-api.newrelic.com/metric/v1" + MetricsEndpointEU string = "https://metric-api.eu.newrelic.com/metric/v1" + MetricsEndpointUS string = "https://metric-api.newrelic.com/metric/v1" - EventsEndpointEU string = "https://insights-collector.eu01.nr-data.net" - EventsEndpointUS string = "https://insights-collector.newrelic.com" + EventsEndpointEU string = "https://insights-collector.eu01.nr-data.net" + EventsEndpointUS string = "https://insights-collector.newrelic.com" - TracesEndpointEU string = "https://trace-api.eu.newrelic.com/trace/v1" - TracesEndpointUS string = "https://trace-api.newrelic.com/trace/v1" + TracesEndpointEU string = "https://trace-api.eu.newrelic.com/trace/v1" + TracesEndpointUS string = "https://trace-api.newrelic.com/trace/v1" ) func getEndpointURL(licenseKey string, typ string, EndpointOverride string) string { - if EndpointOverride != "" { - return EndpointOverride - } + if EndpointOverride != "" { + return EndpointOverride + } switch typ { - case "logging": - if strings.HasPrefix(licenseKey, "eu") { - return LogEndpointEU - } else { - return LogEndpointUS - } - case "metrics": - if strings.HasPrefix(licenseKey, "eu") { - return MetricsEndpointEU - } else { - return MetricsEndpointUS - } - case "events": - if strings.HasPrefix(licenseKey, "eu") { - return EventsEndpointEU - } else { - return EventsEndpointUS - } - case "traces": - if strings.HasPrefix(licenseKey, "eu") { - return TracesEndpointEU - } else { - return TracesEndpointUS - } + case "logging": + if strings.HasPrefix(licenseKey, "eu") { + return LogEndpointEU + } else { + return LogEndpointUS + } + case "metrics": + if strings.HasPrefix(licenseKey, "eu") { + return MetricsEndpointEU + } else { + return MetricsEndpointUS + } + case "events": + if strings.HasPrefix(licenseKey, "eu") { + return EventsEndpointEU + } else { + return EventsEndpointUS + } + case "traces": + if strings.HasPrefix(licenseKey, "eu") { + return TracesEndpointEU + } else { + return TracesEndpointUS + } } return "" } @@ -67,7 +64,7 @@ func sendBatch(ctx context.Context, d *Dispatcher, uri string, bodyBytes []byte) if err != nil { return err } -// the headers might be different for different endpoints + // the headers might be different for different endpoints req.Header.Set("Content-Type", "application/json") req.Header.Set("Api-Key", d.licenseKey) _, err = d.httpClient.Do(req) @@ -75,29 +72,29 @@ func sendBatch(ctx context.Context, d *Dispatcher, uri string, bodyBytes []byte) return err } -func sendDataToNR(ctx context.Context, logEntries []interface{}, d *Dispatcher) error { +func sendDataToNR(ctx context.Context, logEntries []LambdaTelemetryEvent, d *Dispatcher) error { -// will be replaced later - var lambda_name = "---" -// should be as below -// var lambda_name = d.functionName + // will be replaced later + var lambda_name = "---" + // should be as below + // var lambda_name = d.functionName var agent_name = path.Base(os.Args[0]) // NB "." is not allowed in NR eventType var replacer = strings.NewReplacer(".", "_") - data := make(map[string][]interface{}) - data["events"] := []map[string]interface{}{} - data["traces"] := []map[string]interface{}{} - data["logging"] := []map[string]interface{}{} - data["metrics"] := []map[string]interface{}{} + data := make(map[string][]map[string]interface{}) + data["events"] = []map[string]interface{}{} + data["traces"] = []map[string]interface{}{} + data["logging"] = []map[string]interface{}{} + data["metrics"] = []map[string]interface{}{} var bb map[string]any - var event map[string]any + // var event map[string]any - for _, ev := range logEntries { - json.Unmarshal([]byte(ev), &event) - msInt, err := time.Parse(time.RFC3339, event["time"]) + for _, event := range logEntries { + //json.Unmarshal([]byte(ev), &event) + msInt, err := time.Parse(time.RFC3339, event.Time) if err != nil { return err } @@ -106,87 +103,110 @@ func sendDataToNR(ctx context.Context, logEntries []interface{}, d *Dispatcher) "timestamp": msInt.UnixMilli() "eventType": "Lambda_Ext_"+ replacer.Replace(event["type"]) }`), &bb) - data["events"] := append(data["events"], bb) -/* - data["events"] := append(data["events"], map[string]interface{}(`{ - "timestamp": msInt.UnixMilli() - "eventType": "Lambda_Ext_"+ replacer.Replace(event["type"]) - }`)) -*/ - // logging - if val, ok := event["record"]; ok { - if len(val) > 0 { - data["logging"] := append(data["logging"], map[string]interface{}(`{ - "timestamp": msInt.UnixMilli(), - "message": event["record"], - "attributes": { - "aws": { - "event": event["type"], - "lambda": lambda_name - } - } - }`)) - } + data["events"] = append(data["events"], bb) + + data["events"] = append(data["events"], map[string]interface{}{ + "timestamp": msInt.UnixMilli(), + "eventType": "Lambda_Ext_" + replacer.Replace(event.Type), + }) + + switch event.Type { + case "platform.iniStart": + + case "platform.iniRuntimeDone": + + case "platform.iniReport": + + case "platform.start": + + case "platform.runtimeDone": + + case "platform.report": + + case "platform.extension": + + case "platform.telemetrySubscription": + + case "platform.logsDropped": + + } + + if event.Record != nil { + data["logging"] = append(data["logging"], map[string]interface{}{ + "timestamp": msInt.UnixMilli(), + "message": event.Record, + "attributes": map[string]map[string]string{ + "aws": { + "event": event.Type, + "lambda": lambda_name, + }, + }, + }) } + // metrics - if reflect.ValueOf(event["record"]).Kind() == reflect.Map && val, ok := event["record"]["metrics"]; ok { - mts := []map[string]interface{}{} - for key := range val { - mts := appand(mts, map[string]interface{}(`{ - "name": "aws.telemetry.lambda_ext."+lambda_name+"."+key, - "value": event["record"]["metrics"][key] + if event.Record != nil { + if val, ok := event.Record["metrics"]; ok { + mts := []map[string]interface{}{} + for key := range val { + mts := appand(mts, map[string]interface{}(`{ + "name": "aws.telemetry.lambda_ext."+lambda_name+"."+key, + "value": event["record"]["metrics"][key] + }`)) + } + rid := "" + if val, ok := event["record"]["requestId"]; ok { + rid = val + } + data["metrics"] = append(data["metrics"], map[string]interface{}(`{ + "common" : { + "timestamp": msInt.UnixMilli(), + "attributes": { + "event": event["type"], + "requestId": rid, + "extension": agent_name + } + }, + "metrics": mts }`)) } - rid := "" - if val, ok := event["record"]["requestId"]; ok { - rid = val - } - data["metrics"] := append(data["metrics"], map[string]interface{}(`{ - "common" : { - "timestamp": msInt.UnixMilli(), - "attributes": { - "event": event["type"], - "requestId": rid, - "extension": agent_name - } - }, - "metrics": mts - }`)) } // spans - if (reflect.ValueOf(event["record"]).Kind() == reflect.Map) && val, ok := event["record"]["spans"]; ok { - spans := [...]string{} - for span := range val { - el := `{ - "trace.id": event["record"]["requestId"], - "id": uuid.New().String(), - "attributes": { - "event": event["type"], - "service.name": agent_name + if reflect.ValueOf(event["record"]).Kind() == reflect.Map { + if val, ok := event["record"]["spans"]; ok { + spans := [...]string{} + for span := range val { + el := `{ + "trace.id": event["record"]["requestId"], + "id": uuid.New().String(), + "attributes": { + "event": event["type"], + "service.name": agent_name + } + }` + start, err := time.Parse(time.RFC3339, event["time"]) + if err != nil { + return err + } + for key := range span { + if key == "durationMs" { + el["attributes"]["duration.ms"] = span[key] + } else if key == "start" { + el["timestamp"] = start.UnixMilli() + } else { + el["attributes"][key] = span[key] } - }` - start, err := time.Parse(time.RFC3339, event["time"]) - if err != nil { - return err - } - for key := range span { - if (key == "durationMs") { - el["attributes"]["duration.ms"] := span[key] - } else if (key =="start") { - el["timestamp"] := start.UnixMilli() - } else { - el["attributes"][key] := span[key] } + data["traces"] = append(data["traces"], el) } - data["traces"] := append(data["traces"], el) } } } // data ready - if (len(data) > 0) { + if len(data) > 0 { // send logs - if (len(data["logging"]) > 0) { -// bodyBytes, _ := json.Marshal(map[string]interface{}(`{ + if len(data["logging"]) > 0 { + // bodyBytes, _ := json.Marshal(map[string]interface{}(`{ bodyBytes := `{ "common": { "attributes": { @@ -199,23 +219,23 @@ func sendDataToNR(ctx context.Context, logEntries []interface{}, d *Dispatcher) }, "logs": data["logging"] }` -// }`)) - err := sendBatch(ctx, d, getEndpointURL(d.licenseKey,"logging"), bodyBytes) + // }`)) + err := sendBatch(ctx, d, getEndpointURL(d.licenseKey, "logging"), bodyBytes) } // send metrics - if (len(data["metrics"]) > 0) { + if len(data["metrics"]) > 0 { for payload := range data["metrics"] { bodyBytes, _ := json.Marshal(payload) - err := sendBatch(ctx, d, getEndpointURL(d.licenseKey,"metrics"), bodyBytes) + err := sendBatch(ctx, d, getEndpointURL(d.licenseKey, "metrics"), bodyBytes) } } // send events - if (len(data["events"]) > 0) { + if len(data["events"]) > 0 { bodyBytes, _ := json.Marshal(data["events"]) - err := sendBatch(ctx, d, getEndpointURL(d.licenseKey,"events"), bodyBytes) + err := sendBatch(ctx, d, getEndpointURL(d.licenseKey, "events"), bodyBytes) } // send traces - if (len(data["traces"]) > 0) { + if len(data["traces"]) > 0 { bodyBytes, _ := json.Marshal(map[string]interface{}(`{ "common": { "attributes": { @@ -225,9 +245,9 @@ func sendDataToNR(ctx context.Context, logEntries []interface{}, d *Dispatcher) }, "spans": data["traces"] }`)) - err := sendBatch(ctx, d, getEndpointURL(d.licenseKey,"traces"), bodyBytes) + err := sendBatch(ctx, d, getEndpointURL(d.licenseKey, "traces"), bodyBytes) } } - return err // if one of the sents failed, it'd be nice to know which + return err // if one of the sents failed, it'd be nice to know which } From 98f01300ea39a64b53effec1946f2739fd0faded Mon Sep 17 00:00:00 2001 From: Emilio Garcia Date: Mon, 5 Dec 2022 16:07:15 -0500 Subject: [PATCH 2/2] interface fix --- .../telemetryApi/send_to_new_relic.go | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/go-telemetry-api-extension/telemetryApi/send_to_new_relic.go b/go-telemetry-api-extension/telemetryApi/send_to_new_relic.go index eef2bc3b..05c0713a 100644 --- a/go-telemetry-api-extension/telemetryApi/send_to_new_relic.go +++ b/go-telemetry-api-extension/telemetryApi/send_to_new_relic.go @@ -72,7 +72,7 @@ func sendBatch(ctx context.Context, d *Dispatcher, uri string, bodyBytes []byte) return err } -func sendDataToNR(ctx context.Context, logEntries []LambdaTelemetryEvent, d *Dispatcher) error { +func sendDataToNR(ctx context.Context, logEntries []interface{}, d *Dispatcher) error { // will be replaced later var lambda_name = "---" @@ -94,7 +94,7 @@ func sendDataToNR(ctx context.Context, logEntries []LambdaTelemetryEvent, d *Dis for _, event := range logEntries { //json.Unmarshal([]byte(ev), &event) - msInt, err := time.Parse(time.RFC3339, event.Time) + msInt, err := time.Parse(time.RFC3339, event.(LambdaTelemetryEvent).Time) if err != nil { return err } @@ -107,10 +107,10 @@ func sendDataToNR(ctx context.Context, logEntries []LambdaTelemetryEvent, d *Dis data["events"] = append(data["events"], map[string]interface{}{ "timestamp": msInt.UnixMilli(), - "eventType": "Lambda_Ext_" + replacer.Replace(event.Type), + "eventType": "Lambda_Ext_" + replacer.Replace(event.(LambdaTelemetryEvent).Type), }) - switch event.Type { + switch event.(LambdaTelemetryEvent).Type { case "platform.iniStart": case "platform.iniRuntimeDone": @@ -131,13 +131,13 @@ func sendDataToNR(ctx context.Context, logEntries []LambdaTelemetryEvent, d *Dis } - if event.Record != nil { + if event.(LambdaTelemetryEvent).Record != nil { data["logging"] = append(data["logging"], map[string]interface{}{ "timestamp": msInt.UnixMilli(), - "message": event.Record, + "message": event.(LambdaTelemetryEvent).Record, "attributes": map[string]map[string]string{ "aws": { - "event": event.Type, + "event": event.(LambdaTelemetryEvent).Type, "lambda": lambda_name, }, }, @@ -145,11 +145,11 @@ func sendDataToNR(ctx context.Context, logEntries []LambdaTelemetryEvent, d *Dis } // metrics - if event.Record != nil { - if val, ok := event.Record["metrics"]; ok { + if event.(LambdaTelemetryEvent).Record != nil { + if val, ok := event.(LambdaTelemetryEvent).Record["metrics"]; ok { mts := []map[string]interface{}{} for key := range val { - mts := appand(mts, map[string]interface{}(`{ + mts := append(mts, map[string]interface{}(`{ "name": "aws.telemetry.lambda_ext."+lambda_name+"."+key, "value": event["record"]["metrics"][key] }`))