From 4b978265d5dd3952d4a464a2c5b46ef24b21fe63 Mon Sep 17 00:00:00 2001 From: Emily Stolfo Date: Mon, 8 Nov 2021 14:21:04 +0100 Subject: [PATCH 01/11] Preserve compressed agent data and compress agent data that is not already compressed --- apm-lambda-extension/extension/apm_server.go | 35 ++-- .../extension/apm_server_test.go | 86 ++++++++ apm-lambda-extension/extension/http_server.go | 43 +--- .../extension/http_server_test.go | 192 ------------------ .../extension/process_events.go | 2 +- .../extension/route_handlers.go | 18 +- 6 files changed, 125 insertions(+), 251 deletions(-) create mode 100644 apm-lambda-extension/extension/apm_server_test.go delete mode 100644 apm-lambda-extension/extension/http_server_test.go diff --git a/apm-lambda-extension/extension/apm_server.go b/apm-lambda-extension/extension/apm_server.go index 2fb68d5c..1127e343 100644 --- a/apm-lambda-extension/extension/apm_server.go +++ b/apm-lambda-extension/extension/apm_server.go @@ -28,28 +28,37 @@ import ( // todo: can this be a streaming or streaming style call that keeps the // connection open across invocations? -func PostToApmServer(postBody []byte, config *extensionConfig) error { +func PostToApmServer(agentData AgentData, config *extensionConfig) error { endpointUri := "intake/v2/events" - var compressedBytes bytes.Buffer - w := gzip.NewWriter(&compressedBytes) - w.Write(postBody) - w.Write([]byte{10}) - w.Close() + var req *http.Request + var err error - client := &http.Client{} - - req, err := http.NewRequest("POST", config.apmServerUrl+endpointUri, bytes.NewReader(compressedBytes.Bytes())) - if err != nil { - return fmt.Errorf("failed to create a new request when posting to APM server: %v", err) + if agentData.ContentEncoding == "" { + var compressedBytes bytes.Buffer + w := gzip.NewWriter(&compressedBytes) + w.Write(agentData.Data) + w.Close() + req, err = http.NewRequest("POST", config.apmServerUrl+endpointUri, bytes.NewReader(compressedBytes.Bytes())) + if err != nil { + return fmt.Errorf("failed to create a new request when posting to APM server: %v", err) + } + req.Header.Add("Content-Encoding", "gzip") + } else { + req, err = http.NewRequest("POST", config.apmServerUrl+endpointUri, bytes.NewReader(agentData.Data)) + if err != nil { + return fmt.Errorf("failed to create a new request when posting to APM server: %v", err) + } + req.Header.Add("Content-Encoding", agentData.ContentEncoding) } - req.Header.Add("Content-Type", "application/x-ndjson") - req.Header.Add("Content-Encoding", "gzip") + req.Header.Add("Content-Type", "application/x-ndjson") if config.apmServerApiKey != "" { req.Header.Add("Authorization", "ApiKey "+config.apmServerApiKey) } else if config.apmServerSecretToken != "" { req.Header.Add("Authorization", "Bearer "+config.apmServerSecretToken) } + + client := &http.Client{} resp, err := client.Do(req) if err != nil { return fmt.Errorf("failed to post to APM server: %v", err) diff --git a/apm-lambda-extension/extension/apm_server_test.go b/apm-lambda-extension/extension/apm_server_test.go new file mode 100644 index 00000000..df01e4d9 --- /dev/null +++ b/apm-lambda-extension/extension/apm_server_test.go @@ -0,0 +1,86 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package extension + +import ( + "bytes" + "compress/gzip" + "compress/zlib" + "io/ioutil" + "net/http" + "net/http/httptest" + "testing" + + "gotest.tools/assert" +) + +func TestPostToApmServerDataCompressed(t *testing.T) { + s := "A long time ago in a galaxy far, far away..." + var b bytes.Buffer + + // Compress the data + w := zlib.NewWriter(&b) + w.Write([]byte(s)) + w.Close() + + agentData := AgentData{Data: b.Bytes(), ContentEncoding: "gzip"} + // Create apm server and handler + apmServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + bytes, _ := ioutil.ReadAll(r.Body) + assert.Equal(t, b.String(), string(bytes)) + assert.Equal(t, "gzip", r.Header.Get("Content-Encoding")) + w.Write([]byte(`{"foo": "bar"}`)) + })) + defer apmServer.Close() + + config := extensionConfig{ + apmServerUrl: apmServer.URL + "/", + } + + err := PostToApmServer(agentData, &config) + assert.Equal(t, nil, err) +} + +func TestPostToApmServerDataNotCompressed(t *testing.T) { + s := "A long time ago in a galaxy far, far away..." + body := []byte(s) + agentData := AgentData{Data: body, ContentEncoding: ""} + + // Compress the data, so it can be compared with what + // the apm server receives + var b bytes.Buffer + w := gzip.NewWriter(&b) + w.Write([]byte(s)) + w.Close() + + // Create apm server and handler + apmServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + bytes, _ := ioutil.ReadAll(r.Body) + assert.Equal(t, b.String(), string(bytes)) + assert.Equal(t, "gzip", r.Header.Get("Content-Encoding")) + w.Write([]byte(`{"foo": "bar"}`)) + })) + defer apmServer.Close() + + config := extensionConfig{ + apmServerUrl: apmServer.URL + "/", + } + + err := PostToApmServer(agentData, &config) + assert.Equal(t, nil, err) +} diff --git a/apm-lambda-extension/extension/http_server.go b/apm-lambda-extension/extension/http_server.go index 1bd6395a..30bd45a8 100644 --- a/apm-lambda-extension/extension/http_server.go +++ b/apm-lambda-extension/extension/http_server.go @@ -18,18 +18,13 @@ package extension import ( - "bytes" - "compress/gzip" - "compress/zlib" - "fmt" - "io/ioutil" "net" "net/http" "time" ) type serverHandler struct { - data chan []byte + data chan AgentData config *extensionConfig } @@ -50,7 +45,7 @@ func (handler *serverHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) } -func NewHttpServer(dataChannel chan []byte, config *extensionConfig) *http.Server { +func NewHttpServer(dataChannel chan AgentData, config *extensionConfig) *http.Server { var handler = serverHandler{data: dataChannel, config: config} timeout := time.Duration(config.dataReceiverTimeoutSeconds) * time.Second s := &http.Server{ @@ -70,37 +65,3 @@ func NewHttpServer(dataChannel chan []byte, config *extensionConfig) *http.Serve return s } - -func getDecompressedBytesFromRequest(req *http.Request) ([]byte, error) { - var rawBytes []byte - if req.Body != nil { - rawBytes, _ = ioutil.ReadAll(req.Body) - } - - switch req.Header.Get("Content-Encoding") { - case "deflate": - reader := bytes.NewReader([]byte(rawBytes)) - zlibreader, err := zlib.NewReader(reader) - if err != nil { - return nil, fmt.Errorf("could not create zlib.NewReader: %v", err) - } - bodyBytes, err := ioutil.ReadAll(zlibreader) - if err != nil { - return nil, fmt.Errorf("could not read from zlib reader using ioutil.ReadAll: %v", err) - } - return bodyBytes, nil - case "gzip": - reader := bytes.NewReader([]byte(rawBytes)) - zlibreader, err := gzip.NewReader(reader) - if err != nil { - return nil, fmt.Errorf("could not create gzip.NewReader: %v", err) - } - bodyBytes, err := ioutil.ReadAll(zlibreader) - if err != nil { - return nil, fmt.Errorf("could not read from gzip reader using ioutil.ReadAll: %v", err) - } - return bodyBytes, nil - default: - return rawBytes, nil - } -} diff --git a/apm-lambda-extension/extension/http_server_test.go b/apm-lambda-extension/extension/http_server_test.go deleted file mode 100644 index dd36c24d..00000000 --- a/apm-lambda-extension/extension/http_server_test.go +++ /dev/null @@ -1,192 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package extension - -import ( - "bytes" - "compress/gzip" - "compress/zlib" - "io/ioutil" - "net/http" - "net/http/httptest" - "strings" - "testing" - - "gotest.tools/assert" -) - -func Test_getDecompressedBytesFromRequestUncompressed(t *testing.T) { - s := "A long time ago in a galaxy far, far away..." - body := strings.NewReader(s) - - // Create the request - req, err := http.NewRequest(http.MethodPost, "example.com", body) - if err != nil { - t.Errorf("Error creating new request: %v", err) - t.Fail() - } - - // Decompress the request's body - got, err1 := getDecompressedBytesFromRequest(req) - if err1 != nil { - t.Errorf("Error decompressing request body: %v", err1) - t.Fail() - } - - if s != string(got) { - t.Errorf("Original string and decompressed data do not match") - t.Fail() - } -} - -func Test_getDecompressedBytesFromRequestGzip(t *testing.T) { - s := "A long time ago in a galaxy far, far away..." - var b bytes.Buffer - - // Compress the data - w := gzip.NewWriter(&b) - w.Write([]byte(s)) - w.Close() - - // Create a reader reading from the bytes on the buffer - body := bytes.NewReader(b.Bytes()) - - // Create the request - req, err := http.NewRequest(http.MethodPost, "example.com", body) - if err != nil { - t.Errorf("Error creating new request: %v", err) - t.Fail() - } - - // Set the encoding to gzip - req.Header.Set("Content-Encoding", "gzip") - - // Decompress the request's body - got, err1 := getDecompressedBytesFromRequest(req) - if err1 != nil { - t.Errorf("Error decompressing request body: %v", err1) - t.Fail() - } - - if s != string(got) { - t.Errorf("Original string and decompressed data do not match") - t.Fail() - } -} - -func Test_getDecompressedBytesFromRequestDeflate(t *testing.T) { - s := "A long time ago in a galaxy far, far away..." - var b bytes.Buffer - - // Compress the data - w := zlib.NewWriter(&b) - w.Write([]byte(s)) - w.Close() - - // Create a reader reading from the bytes on the buffer - body := bytes.NewReader(b.Bytes()) - - // Create the request - req, err := http.NewRequest(http.MethodPost, "example.com", body) - if err != nil { - t.Errorf("Error creating new request: %v", err) - t.Fail() - } - - // Set the encoding to deflate - req.Header.Set("Content-Encoding", "deflate") - - // Decompress the request's body - got, err1 := getDecompressedBytesFromRequest(req) - if err1 != nil { - t.Errorf("Error decompressing request body: %v", err1) - t.Fail() - } - - if s != string(got) { - t.Errorf("Original string and decompressed data do not match") - t.Fail() - } -} - -func Test_getDecompressedBytesFromRequestEmptyBody(t *testing.T) { - // Create the request - req, err := http.NewRequest(http.MethodPost, "example.com", nil) - if err != nil { - t.Errorf("Error creating new request: %v", err) - } - - got, err := getDecompressedBytesFromRequest(req) - if err != nil { - t.Errorf("Error decompressing request body: %v", err) - } - - if len(got) != 0 { - t.Errorf("A non-empty byte slice was returned") - t.Fail() - } -} - -func TestInfoProxy(t *testing.T) { - headers := map[string]string{"Authorization": "test-value"} - wantResp := "{\"foo\": \"bar\"}" - - // Create apm server and handler - apmServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - for key := range headers { - assert.Equal(t, 1, len(r.Header[key])) - assert.Equal(t, headers[key], r.Header[key][0]) - } - w.Write([]byte(`{"foo": "bar"}`)) - })) - defer apmServer.Close() - - // Create extension config and start the server - dataChannel := make(chan []byte, 100) - config := extensionConfig{ - apmServerUrl: apmServer.URL, - apmServerSecretToken: "foo", - apmServerApiKey: "bar", - dataReceiverServerPort: "127.0.0.1:1234", - dataReceiverTimeoutSeconds: 15, - } - extensionServer := NewHttpServer(dataChannel, &config) - defer extensionServer.Close() - - // Create a request to send to the extension - client := &http.Client{} - url := "http://" + extensionServer.Addr - req, err := http.NewRequest("GET", url, nil) - if err != nil { - t.Logf("Could not create request") - } - for name, value := range headers { - req.Header.Add(name, value) - } - - // Send the request to the extension - resp, err := client.Do(req) - if err != nil { - t.Logf("Error fetching %s, [%v]", extensionServer.Addr, err) - t.Fail() - } else { - body, _ := ioutil.ReadAll(resp.Body) - assert.Equal(t, string(body), wantResp) - resp.Body.Close() - } -} diff --git a/apm-lambda-extension/extension/process_events.go b/apm-lambda-extension/extension/process_events.go index 4ec5a53e..54f479e3 100644 --- a/apm-lambda-extension/extension/process_events.go +++ b/apm-lambda-extension/extension/process_events.go @@ -27,7 +27,7 @@ func ProcessShutdown() { log.Println("Exiting") } -func FlushAPMData(dataChannel chan []byte, config *extensionConfig) { +func FlushAPMData(dataChannel chan AgentData, config *extensionConfig) { log.Println("Checking for agent data") for { select { diff --git a/apm-lambda-extension/extension/route_handlers.go b/apm-lambda-extension/extension/route_handlers.go index a6471f5e..14719284 100644 --- a/apm-lambda-extension/extension/route_handlers.go +++ b/apm-lambda-extension/extension/route_handlers.go @@ -23,6 +23,11 @@ import ( "net/http" ) +type AgentData struct { + Data []byte + ContentEncoding string +} + // URL: http://server/ func handleInfoRequest(handler *serverHandler, w http.ResponseWriter, r *http.Request) { client := &http.Client{} @@ -68,13 +73,18 @@ func handleInfoRequest(handler *serverHandler, w http.ResponseWriter, r *http.Re // URL: http://server/intake/v2/events func handleIntakeV2Events(handler *serverHandler, w http.ResponseWriter, r *http.Request) { - bodyBytes, err := getDecompressedBytesFromRequest(r) - if nil != err { - log.Printf("could not get decompressed bytes from request body: %v", err) + if r.Body == nil { + log.Println("Could not get bytes from agent request body") } else { + rawBytes, _ := ioutil.ReadAll(r.Body) + agentData := AgentData{ + Data: rawBytes, + ContentEncoding: r.Header.Get("Content-Encoding"), + } log.Println("Adding agent data to buffer to be sent to apm server") - handler.data <- bodyBytes + handler.data <- agentData } + w.WriteHeader(http.StatusAccepted) w.Write([]byte("ok")) } From c2dc87996091bac4c3f4584274ef8aa3c82f4c9e Mon Sep 17 00:00:00 2001 From: Emily Stolfo Date: Mon, 8 Nov 2021 14:27:35 +0100 Subject: [PATCH 02/11] Change name and type of agent data channel --- apm-lambda-extension/main.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/apm-lambda-extension/main.go b/apm-lambda-extension/main.go index b9ec5eb0..c0bbf5f5 100644 --- a/apm-lambda-extension/main.go +++ b/apm-lambda-extension/main.go @@ -61,9 +61,9 @@ func main() { // setup http server to receive data from agent // and get a channel to listen for that data - dataChannel := make(chan []byte, 100) + agentDataChannel := make(chan extension.AgentData, 100) - extension.NewHttpServer(dataChannel, config) + extension.NewHttpServer(agentDataChannel, config) // Make channel for collecting logs and create a HTTP server to listen for them logsChannel := make(chan logsapi.LogEvent) @@ -112,7 +112,7 @@ func main() { // Flush any APM data, in case waiting for the runtimeDone event timed out, // the agent data wasn't available yet, and we got to the next event - extension.FlushAPMData(dataChannel, config) + extension.FlushAPMData(agentDataChannel, config) // Make a channel for signaling that a runtimeDone event has been received runtimeDone := make(chan struct{}) @@ -129,7 +129,7 @@ func main() { case <-funcInvocDone: log.Println("Function invocation is complete, not receiving any more agent data") return - case agentData := <-dataChannel: + case agentData := <-agentDataChannel: err := extension.PostToApmServer(agentData, config) if err != nil { log.Printf("Error sending to APM server, skipping: %v", err) @@ -179,7 +179,7 @@ func main() { } // Flush APM data now that the function invocation has completed - extension.FlushAPMData(dataChannel, config) + extension.FlushAPMData(agentDataChannel, config) // Signal that the function invocation has completed close(funcInvocDone) From 917742a634217d6370f0933a2858a12244fe6042 Mon Sep 17 00:00:00 2001 From: Emily Stolfo Date: Mon, 8 Nov 2021 14:28:23 +0100 Subject: [PATCH 03/11] Accidentally removed a test --- .../extension/http_server_test.go | 76 +++++++++++++++++++ 1 file changed, 76 insertions(+) create mode 100644 apm-lambda-extension/extension/http_server_test.go diff --git a/apm-lambda-extension/extension/http_server_test.go b/apm-lambda-extension/extension/http_server_test.go new file mode 100644 index 00000000..f765d875 --- /dev/null +++ b/apm-lambda-extension/extension/http_server_test.go @@ -0,0 +1,76 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package extension + +import ( + "io/ioutil" + "net/http" + "net/http/httptest" + "testing" + + "gotest.tools/assert" +) + +func TestInfoProxy(t *testing.T) { + headers := map[string]string{"Authorization": "test-value"} + wantResp := "{\"foo\": \"bar\"}" + + // Create apm server and handler + apmServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + for key := range headers { + assert.Equal(t, 1, len(r.Header[key])) + assert.Equal(t, headers[key], r.Header[key][0]) + } + w.Write([]byte(`{"foo": "bar"}`)) + })) + defer apmServer.Close() + + // Create extension config and start the server + dataChannel := make(chan AgentData, 100) + config := extensionConfig{ + apmServerUrl: apmServer.URL, + apmServerSecretToken: "foo", + apmServerApiKey: "bar", + dataReceiverServerPort: "127.0.0.1:1234", + dataReceiverTimeoutSeconds: 15, + } + extensionServer := NewHttpServer(dataChannel, &config) + defer extensionServer.Close() + + // Create a request to send to the extension + client := &http.Client{} + url := "http://" + extensionServer.Addr + req, err := http.NewRequest("GET", url, nil) + if err != nil { + t.Logf("Could not create request") + } + for name, value := range headers { + req.Header.Add(name, value) + } + + // Send the request to the extension + resp, err := client.Do(req) + if err != nil { + t.Logf("Error fetching %s, [%v]", extensionServer.Addr, err) + t.Fail() + } else { + body, _ := ioutil.ReadAll(resp.Body) + assert.Equal(t, string(body), wantResp) + resp.Body.Close() + } +} From f3f1bb6b3252a0dc7f7289a65b202f69bb97c895 Mon Sep 17 00:00:00 2001 From: Emily Stolfo Date: Mon, 8 Nov 2021 14:28:39 +0100 Subject: [PATCH 04/11] Use same body variable --- apm-lambda-extension/extension/apm_server_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apm-lambda-extension/extension/apm_server_test.go b/apm-lambda-extension/extension/apm_server_test.go index df01e4d9..d173b021 100644 --- a/apm-lambda-extension/extension/apm_server_test.go +++ b/apm-lambda-extension/extension/apm_server_test.go @@ -65,7 +65,7 @@ func TestPostToApmServerDataNotCompressed(t *testing.T) { // the apm server receives var b bytes.Buffer w := gzip.NewWriter(&b) - w.Write([]byte(s)) + w.Write(body) w.Close() // Create apm server and handler From d5d27158b01ea582b7b56bfe279c35ced071cb5f Mon Sep 17 00:00:00 2001 From: Emily Stolfo Date: Wed, 10 Nov 2021 12:58:26 +0100 Subject: [PATCH 05/11] Use BestSpeed compression when sending to APM server --- apm-lambda-extension/extension/apm_server.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apm-lambda-extension/extension/apm_server.go b/apm-lambda-extension/extension/apm_server.go index 1127e343..6220bcb8 100644 --- a/apm-lambda-extension/extension/apm_server.go +++ b/apm-lambda-extension/extension/apm_server.go @@ -35,7 +35,7 @@ func PostToApmServer(agentData AgentData, config *extensionConfig) error { if agentData.ContentEncoding == "" { var compressedBytes bytes.Buffer - w := gzip.NewWriter(&compressedBytes) + w, _ := gzip.NewWriterLevel(&compressedBytes, gzip.BestSpeed) w.Write(agentData.Data) w.Close() req, err = http.NewRequest("POST", config.apmServerUrl+endpointUri, bytes.NewReader(compressedBytes.Bytes())) From aff9443e3d3915ffd5f2e16b9db16787ee34a8c4 Mon Sep 17 00:00:00 2001 From: Emily Stolfo Date: Wed, 10 Nov 2021 14:21:34 +0100 Subject: [PATCH 06/11] Use io.Pipe instead of separate buffer when compressing data --- apm-lambda-extension/extension/apm_server.go | 19 +++++++--- .../extension/apm_server_test.go | 37 ++++++++++++------- 2 files changed, 37 insertions(+), 19 deletions(-) diff --git a/apm-lambda-extension/extension/apm_server.go b/apm-lambda-extension/extension/apm_server.go index 6220bcb8..9e928f4c 100644 --- a/apm-lambda-extension/extension/apm_server.go +++ b/apm-lambda-extension/extension/apm_server.go @@ -21,6 +21,7 @@ import ( "bytes" "compress/gzip" "fmt" + "io" "io/ioutil" "log" "net/http" @@ -34,11 +35,19 @@ func PostToApmServer(agentData AgentData, config *extensionConfig) error { var err error if agentData.ContentEncoding == "" { - var compressedBytes bytes.Buffer - w, _ := gzip.NewWriterLevel(&compressedBytes, gzip.BestSpeed) - w.Write(agentData.Data) - w.Close() - req, err = http.NewRequest("POST", config.apmServerUrl+endpointUri, bytes.NewReader(compressedBytes.Bytes())) + pr, pw := io.Pipe() + gw, _ := gzip.NewWriterLevel(pw, gzip.BestSpeed) + var err error + + go func() { + _, err = io.Copy(gw, bytes.NewReader(agentData.Data)) + gw.Close() + pw.Close() + }() + + // Todo: Handle the err (failure to compress the data) + + req, err = http.NewRequest("POST", config.apmServerUrl+endpointUri, pr) if err != nil { return fmt.Errorf("failed to create a new request when posting to APM server: %v", err) } diff --git a/apm-lambda-extension/extension/apm_server_test.go b/apm-lambda-extension/extension/apm_server_test.go index d173b021..884886b3 100644 --- a/apm-lambda-extension/extension/apm_server_test.go +++ b/apm-lambda-extension/extension/apm_server_test.go @@ -18,9 +18,8 @@ package extension import ( - "bytes" "compress/gzip" - "compress/zlib" + "io" "io/ioutil" "net/http" "net/http/httptest" @@ -31,18 +30,24 @@ import ( func TestPostToApmServerDataCompressed(t *testing.T) { s := "A long time ago in a galaxy far, far away..." - var b bytes.Buffer // Compress the data - w := zlib.NewWriter(&b) - w.Write([]byte(s)) - w.Close() + pr, pw := io.Pipe() + gw, _ := gzip.NewWriterLevel(pw, gzip.BestSpeed) + go func() { + gw.Write([]byte(s)) + gw.Close() + pw.Close() + }() + + // Create AgentData struct with compressed data + data, _ := io.ReadAll(pr) + agentData := AgentData{Data: data, ContentEncoding: "gzip"} - agentData := AgentData{Data: b.Bytes(), ContentEncoding: "gzip"} // Create apm server and handler apmServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { bytes, _ := ioutil.ReadAll(r.Body) - assert.Equal(t, b.String(), string(bytes)) + assert.Equal(t, string(data), string(bytes)) assert.Equal(t, "gzip", r.Header.Get("Content-Encoding")) w.Write([]byte(`{"foo": "bar"}`)) })) @@ -63,15 +68,19 @@ func TestPostToApmServerDataNotCompressed(t *testing.T) { // Compress the data, so it can be compared with what // the apm server receives - var b bytes.Buffer - w := gzip.NewWriter(&b) - w.Write(body) - w.Close() + pr, pw := io.Pipe() + gw, _ := gzip.NewWriterLevel(pw, gzip.BestSpeed) + go func() { + gw.Write(body) + gw.Close() + pw.Close() + }() // Create apm server and handler apmServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - bytes, _ := ioutil.ReadAll(r.Body) - assert.Equal(t, b.String(), string(bytes)) + request_bytes, _ := ioutil.ReadAll(r.Body) + compressed_bytes, _ := io.ReadAll(pr) + assert.Equal(t, string(compressed_bytes), string(request_bytes)) assert.Equal(t, "gzip", r.Header.Get("Content-Encoding")) w.Write([]byte(`{"foo": "bar"}`)) })) From c2e479dba4ad0795405ce2442b0d145148e699f5 Mon Sep 17 00:00:00 2001 From: Emily Stolfo Date: Wed, 10 Nov 2021 14:43:19 +0100 Subject: [PATCH 07/11] Log error if data cannot be compressed --- apm-lambda-extension/extension/apm_server.go | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/apm-lambda-extension/extension/apm_server.go b/apm-lambda-extension/extension/apm_server.go index 9e928f4c..3d74a4b9 100644 --- a/apm-lambda-extension/extension/apm_server.go +++ b/apm-lambda-extension/extension/apm_server.go @@ -32,28 +32,29 @@ import ( func PostToApmServer(agentData AgentData, config *extensionConfig) error { endpointUri := "intake/v2/events" var req *http.Request - var err error if agentData.ContentEncoding == "" { pr, pw := io.Pipe() gw, _ := gzip.NewWriterLevel(pw, gzip.BestSpeed) - var err error go func() { - _, err = io.Copy(gw, bytes.NewReader(agentData.Data)) + _, err := io.Copy(gw, bytes.NewReader(agentData.Data)) gw.Close() pw.Close() + if err != nil { + log.Printf("Failed to compress data: %v", err) + } }() // Todo: Handle the err (failure to compress the data) - req, err = http.NewRequest("POST", config.apmServerUrl+endpointUri, pr) + req, err := http.NewRequest("POST", config.apmServerUrl+endpointUri, pr) if err != nil { return fmt.Errorf("failed to create a new request when posting to APM server: %v", err) } req.Header.Add("Content-Encoding", "gzip") } else { - req, err = http.NewRequest("POST", config.apmServerUrl+endpointUri, bytes.NewReader(agentData.Data)) + req, err := http.NewRequest("POST", config.apmServerUrl+endpointUri, bytes.NewReader(agentData.Data)) if err != nil { return fmt.Errorf("failed to create a new request when posting to APM server: %v", err) } From 474410973bbed1eb54c6ffb67796311acd19b6cb Mon Sep 17 00:00:00 2001 From: Emily Stolfo Date: Wed, 10 Nov 2021 14:45:31 +0100 Subject: [PATCH 08/11] Remove Todo note --- apm-lambda-extension/extension/apm_server.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/apm-lambda-extension/extension/apm_server.go b/apm-lambda-extension/extension/apm_server.go index 3d74a4b9..1aa69369 100644 --- a/apm-lambda-extension/extension/apm_server.go +++ b/apm-lambda-extension/extension/apm_server.go @@ -46,8 +46,6 @@ func PostToApmServer(agentData AgentData, config *extensionConfig) error { } }() - // Todo: Handle the err (failure to compress the data) - req, err := http.NewRequest("POST", config.apmServerUrl+endpointUri, pr) if err != nil { return fmt.Errorf("failed to create a new request when posting to APM server: %v", err) From 55bf71759d5d15e8b78ec4fcb7302706f915d289 Mon Sep 17 00:00:00 2001 From: Emily Stolfo Date: Wed, 10 Nov 2021 14:49:14 +0100 Subject: [PATCH 09/11] Use err variable --- apm-lambda-extension/extension/apm_server.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/apm-lambda-extension/extension/apm_server.go b/apm-lambda-extension/extension/apm_server.go index 1aa69369..1ca2822c 100644 --- a/apm-lambda-extension/extension/apm_server.go +++ b/apm-lambda-extension/extension/apm_server.go @@ -32,13 +32,14 @@ import ( func PostToApmServer(agentData AgentData, config *extensionConfig) error { endpointUri := "intake/v2/events" var req *http.Request + var err error if agentData.ContentEncoding == "" { pr, pw := io.Pipe() gw, _ := gzip.NewWriterLevel(pw, gzip.BestSpeed) go func() { - _, err := io.Copy(gw, bytes.NewReader(agentData.Data)) + _, err = io.Copy(gw, bytes.NewReader(agentData.Data)) gw.Close() pw.Close() if err != nil { @@ -46,13 +47,13 @@ func PostToApmServer(agentData AgentData, config *extensionConfig) error { } }() - req, err := http.NewRequest("POST", config.apmServerUrl+endpointUri, pr) + req, err = http.NewRequest("POST", config.apmServerUrl+endpointUri, pr) if err != nil { return fmt.Errorf("failed to create a new request when posting to APM server: %v", err) } req.Header.Add("Content-Encoding", "gzip") } else { - req, err := http.NewRequest("POST", config.apmServerUrl+endpointUri, bytes.NewReader(agentData.Data)) + req, err = http.NewRequest("POST", config.apmServerUrl+endpointUri, bytes.NewReader(agentData.Data)) if err != nil { return fmt.Errorf("failed to create a new request when posting to APM server: %v", err) } From ea0ef6fe9874e06344a7c9552f3f19770788031a Mon Sep 17 00:00:00 2001 From: Emily Stolfo Date: Wed, 10 Nov 2021 15:14:14 +0100 Subject: [PATCH 10/11] Use ioutil.ReadAll instead for go 1.15 --- apm-lambda-extension/extension/apm_server_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/apm-lambda-extension/extension/apm_server_test.go b/apm-lambda-extension/extension/apm_server_test.go index 884886b3..4cfd19d5 100644 --- a/apm-lambda-extension/extension/apm_server_test.go +++ b/apm-lambda-extension/extension/apm_server_test.go @@ -41,7 +41,7 @@ func TestPostToApmServerDataCompressed(t *testing.T) { }() // Create AgentData struct with compressed data - data, _ := io.ReadAll(pr) + data, _ := ioutil.ReadAll(pr) agentData := AgentData{Data: data, ContentEncoding: "gzip"} // Create apm server and handler @@ -79,7 +79,7 @@ func TestPostToApmServerDataNotCompressed(t *testing.T) { // Create apm server and handler apmServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { request_bytes, _ := ioutil.ReadAll(r.Body) - compressed_bytes, _ := io.ReadAll(pr) + compressed_bytes, _ := ioutil.ReadAll(pr) assert.Equal(t, string(compressed_bytes), string(request_bytes)) assert.Equal(t, "gzip", r.Header.Get("Content-Encoding")) w.Write([]byte(`{"foo": "bar"}`)) From 2a93da709ce81ed5fe97b4678d2e421013e52801 Mon Sep 17 00:00:00 2001 From: Emily Stolfo Date: Mon, 15 Nov 2021 10:38:01 +0100 Subject: [PATCH 11/11] Handle all possible errors when reading agent request body --- .../extension/route_handlers.go | 26 ++++++++++++------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/apm-lambda-extension/extension/route_handlers.go b/apm-lambda-extension/extension/route_handlers.go index 14719284..60d4373c 100644 --- a/apm-lambda-extension/extension/route_handlers.go +++ b/apm-lambda-extension/extension/route_handlers.go @@ -73,18 +73,24 @@ func handleInfoRequest(handler *serverHandler, w http.ResponseWriter, r *http.Re // URL: http://server/intake/v2/events func handleIntakeV2Events(handler *serverHandler, w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusAccepted) + w.Write([]byte("ok")) + if r.Body == nil { log.Println("Could not get bytes from agent request body") - } else { - rawBytes, _ := ioutil.ReadAll(r.Body) - agentData := AgentData{ - Data: rawBytes, - ContentEncoding: r.Header.Get("Content-Encoding"), - } - log.Println("Adding agent data to buffer to be sent to apm server") - handler.data <- agentData + return } - w.WriteHeader(http.StatusAccepted) - w.Write([]byte("ok")) + rawBytes, err := ioutil.ReadAll(r.Body) + if err != nil { + log.Println("Could not read bytes from agent request body") + return + } + + agentData := AgentData{ + Data: rawBytes, + ContentEncoding: r.Header.Get("Content-Encoding"), + } + log.Println("Adding agent data to buffer to be sent to apm server") + handler.data <- agentData }