Skip to content
42 changes: 30 additions & 12 deletions apm-lambda-extension/extension/apm_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,35 +21,53 @@ import (
"bytes"
"compress/gzip"
"fmt"
"io"
"io/ioutil"
"log"
"net/http"
)

// todo: can this be a streaming or streaming style call that keeps the
// connection open across invocations?
func PostToApmServer(postBody []byte, config *extensionConfig) error {
func PostToApmServer(agentData AgentData, config *extensionConfig) error {
endpointUri := "intake/v2/events"
var compressedBytes bytes.Buffer
w := gzip.NewWriter(&compressedBytes)
w.Write(postBody)
w.Write([]byte{10})
w.Close()
var req *http.Request
var err error

client := &http.Client{}
if agentData.ContentEncoding == "" {
pr, pw := io.Pipe()
gw, _ := gzip.NewWriterLevel(pw, gzip.BestSpeed)

req, err := http.NewRequest("POST", config.apmServerUrl+endpointUri, bytes.NewReader(compressedBytes.Bytes()))
if err != nil {
return fmt.Errorf("failed to create a new request when posting to APM server: %v", err)
go func() {
_, err = io.Copy(gw, bytes.NewReader(agentData.Data))
gw.Close()
pw.Close()
if err != nil {
log.Printf("Failed to compress data: %v", err)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[minor suggestion]
IIUC, this function will only be executed once the request has been established and is then streamed (via chunked-encoding) to the server.
If that's correct, the message may be a bit misleading as the more common source of failure is that there's a network issue when streaming the data to APM Server.

Suggested change
log.Printf("Failed to compress data: %v", err)
log.Printf("Failed to send compressed data: %v", err)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are actually not streaming data yet, but rather sending in batches. At this point in the code, the actual failure is that the data could not be compressed in a go routine. If compression failed in that go routine, we will also get an error that is logged on line 52 (failed to create a new request when posting to APM server) and the function will return. So I think the error message here is accurate and should stay as-is.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you're right that the actual send could be chunked by the go internal transport code, I thought you were referring to the way that we send the data as it's received from the agent. But at this point in the code, I'm calling io.Copy, which ends up calling Write on the gzipWriter before it gets to the network code. So any compression failures would be returned by io.Copy. Network errors would be returned by resp, err := client.Do(req).

But that actually brings up another point: the error returned from client.Do(req) actually wraps the compression error, which means we don't really need to do anything with the error returned from io.Copy. In other words, we could removing the logging of the error and just rely on the error returned in this line

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm calling io.Copy, which ends up calling Write on the gzipWriter before it gets to the network code.

What I'm understanding from the blog that I linked is that io.Copy blocks until the PipeReader is being read from. This happens when client.Do(req) is invoked and the data from the PipeReader is written into the HTTP request body via chunked encoding. It's because io.Copy blocks that we have to execute it in a concurrently running goroutine. If we didn't do that, we'd never get to the client.Do(req) part.

See also the docs for io.Pipe

each Write to the PipeWriter blocks until it has satisfied one or more Reads from the PipeReader that fully consume the written data. The data is copied directly from the Write to the corresponding Read (or Reads); there is no internal buffering.

}
}()

req, err = http.NewRequest("POST", config.apmServerUrl+endpointUri, pr)
if err != nil {
return fmt.Errorf("failed to create a new request when posting to APM server: %v", err)
}
req.Header.Add("Content-Encoding", "gzip")
} else {
req, err = http.NewRequest("POST", config.apmServerUrl+endpointUri, bytes.NewReader(agentData.Data))
if err != nil {
return fmt.Errorf("failed to create a new request when posting to APM server: %v", err)
}
req.Header.Add("Content-Encoding", agentData.ContentEncoding)
}
req.Header.Add("Content-Type", "application/x-ndjson")
req.Header.Add("Content-Encoding", "gzip")

req.Header.Add("Content-Type", "application/x-ndjson")
if config.apmServerApiKey != "" {
req.Header.Add("Authorization", "ApiKey "+config.apmServerApiKey)
} else if config.apmServerSecretToken != "" {
req.Header.Add("Authorization", "Bearer "+config.apmServerSecretToken)
}

client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
return fmt.Errorf("failed to post to APM server: %v", err)
Expand Down
95 changes: 95 additions & 0 deletions apm-lambda-extension/extension/apm_server_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package extension

import (
"compress/gzip"
"io"
"io/ioutil"
"net/http"
"net/http/httptest"
"testing"

"gotest.tools/assert"
)

func TestPostToApmServerDataCompressed(t *testing.T) {
s := "A long time ago in a galaxy far, far away..."

// Compress the data
pr, pw := io.Pipe()
gw, _ := gzip.NewWriterLevel(pw, gzip.BestSpeed)
go func() {
gw.Write([]byte(s))
gw.Close()
pw.Close()
}()

// Create AgentData struct with compressed data
data, _ := ioutil.ReadAll(pr)
agentData := AgentData{Data: data, ContentEncoding: "gzip"}

// Create apm server and handler
apmServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
bytes, _ := ioutil.ReadAll(r.Body)
assert.Equal(t, string(data), string(bytes))
assert.Equal(t, "gzip", r.Header.Get("Content-Encoding"))
w.Write([]byte(`{"foo": "bar"}`))
}))
defer apmServer.Close()

config := extensionConfig{
apmServerUrl: apmServer.URL + "/",
}

err := PostToApmServer(agentData, &config)
assert.Equal(t, nil, err)
}

func TestPostToApmServerDataNotCompressed(t *testing.T) {
s := "A long time ago in a galaxy far, far away..."
body := []byte(s)
agentData := AgentData{Data: body, ContentEncoding: ""}

// Compress the data, so it can be compared with what
// the apm server receives
pr, pw := io.Pipe()
gw, _ := gzip.NewWriterLevel(pw, gzip.BestSpeed)
go func() {
gw.Write(body)
gw.Close()
pw.Close()
}()

// Create apm server and handler
apmServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
request_bytes, _ := ioutil.ReadAll(r.Body)
compressed_bytes, _ := ioutil.ReadAll(pr)
assert.Equal(t, string(compressed_bytes), string(request_bytes))
assert.Equal(t, "gzip", r.Header.Get("Content-Encoding"))
w.Write([]byte(`{"foo": "bar"}`))
}))
defer apmServer.Close()

config := extensionConfig{
apmServerUrl: apmServer.URL + "/",
}

err := PostToApmServer(agentData, &config)
assert.Equal(t, nil, err)
}
43 changes: 2 additions & 41 deletions apm-lambda-extension/extension/http_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,13 @@
package extension

import (
"bytes"
"compress/gzip"
"compress/zlib"
"fmt"
"io/ioutil"
"net"
"net/http"
"time"
)

type serverHandler struct {
data chan []byte
data chan AgentData
config *extensionConfig
}

Expand All @@ -50,7 +45,7 @@ func (handler *serverHandler) ServeHTTP(w http.ResponseWriter, r *http.Request)

}

func NewHttpServer(dataChannel chan []byte, config *extensionConfig) *http.Server {
func NewHttpServer(dataChannel chan AgentData, config *extensionConfig) *http.Server {
var handler = serverHandler{data: dataChannel, config: config}
timeout := time.Duration(config.dataReceiverTimeoutSeconds) * time.Second
s := &http.Server{
Expand All @@ -70,37 +65,3 @@ func NewHttpServer(dataChannel chan []byte, config *extensionConfig) *http.Serve

return s
}

func getDecompressedBytesFromRequest(req *http.Request) ([]byte, error) {
var rawBytes []byte
if req.Body != nil {
rawBytes, _ = ioutil.ReadAll(req.Body)
}

switch req.Header.Get("Content-Encoding") {
case "deflate":
reader := bytes.NewReader([]byte(rawBytes))
zlibreader, err := zlib.NewReader(reader)
if err != nil {
return nil, fmt.Errorf("could not create zlib.NewReader: %v", err)
}
bodyBytes, err := ioutil.ReadAll(zlibreader)
if err != nil {
return nil, fmt.Errorf("could not read from zlib reader using ioutil.ReadAll: %v", err)
}
return bodyBytes, nil
case "gzip":
reader := bytes.NewReader([]byte(rawBytes))
zlibreader, err := gzip.NewReader(reader)
if err != nil {
return nil, fmt.Errorf("could not create gzip.NewReader: %v", err)
}
bodyBytes, err := ioutil.ReadAll(zlibreader)
if err != nil {
return nil, fmt.Errorf("could not read from gzip reader using ioutil.ReadAll: %v", err)
}
return bodyBytes, nil
default:
return rawBytes, nil
}
}
118 changes: 1 addition & 117 deletions apm-lambda-extension/extension/http_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,130 +18,14 @@
package extension

import (
"bytes"
"compress/gzip"
"compress/zlib"
"io/ioutil"
"net/http"
"net/http/httptest"
"strings"
"testing"

"gotest.tools/assert"
)

func Test_getDecompressedBytesFromRequestUncompressed(t *testing.T) {
s := "A long time ago in a galaxy far, far away..."
body := strings.NewReader(s)

// Create the request
req, err := http.NewRequest(http.MethodPost, "example.com", body)
if err != nil {
t.Errorf("Error creating new request: %v", err)
t.Fail()
}

// Decompress the request's body
got, err1 := getDecompressedBytesFromRequest(req)
if err1 != nil {
t.Errorf("Error decompressing request body: %v", err1)
t.Fail()
}

if s != string(got) {
t.Errorf("Original string and decompressed data do not match")
t.Fail()
}
}

func Test_getDecompressedBytesFromRequestGzip(t *testing.T) {
s := "A long time ago in a galaxy far, far away..."
var b bytes.Buffer

// Compress the data
w := gzip.NewWriter(&b)
w.Write([]byte(s))
w.Close()

// Create a reader reading from the bytes on the buffer
body := bytes.NewReader(b.Bytes())

// Create the request
req, err := http.NewRequest(http.MethodPost, "example.com", body)
if err != nil {
t.Errorf("Error creating new request: %v", err)
t.Fail()
}

// Set the encoding to gzip
req.Header.Set("Content-Encoding", "gzip")

// Decompress the request's body
got, err1 := getDecompressedBytesFromRequest(req)
if err1 != nil {
t.Errorf("Error decompressing request body: %v", err1)
t.Fail()
}

if s != string(got) {
t.Errorf("Original string and decompressed data do not match")
t.Fail()
}
}

func Test_getDecompressedBytesFromRequestDeflate(t *testing.T) {
s := "A long time ago in a galaxy far, far away..."
var b bytes.Buffer

// Compress the data
w := zlib.NewWriter(&b)
w.Write([]byte(s))
w.Close()

// Create a reader reading from the bytes on the buffer
body := bytes.NewReader(b.Bytes())

// Create the request
req, err := http.NewRequest(http.MethodPost, "example.com", body)
if err != nil {
t.Errorf("Error creating new request: %v", err)
t.Fail()
}

// Set the encoding to deflate
req.Header.Set("Content-Encoding", "deflate")

// Decompress the request's body
got, err1 := getDecompressedBytesFromRequest(req)
if err1 != nil {
t.Errorf("Error decompressing request body: %v", err1)
t.Fail()
}

if s != string(got) {
t.Errorf("Original string and decompressed data do not match")
t.Fail()
}
}

func Test_getDecompressedBytesFromRequestEmptyBody(t *testing.T) {
// Create the request
req, err := http.NewRequest(http.MethodPost, "example.com", nil)
if err != nil {
t.Errorf("Error creating new request: %v", err)
}

got, err := getDecompressedBytesFromRequest(req)
if err != nil {
t.Errorf("Error decompressing request body: %v", err)
}

if len(got) != 0 {
t.Errorf("A non-empty byte slice was returned")
t.Fail()
}
}

func TestInfoProxy(t *testing.T) {
headers := map[string]string{"Authorization": "test-value"}
wantResp := "{\"foo\": \"bar\"}"
Expand All @@ -157,7 +41,7 @@ func TestInfoProxy(t *testing.T) {
defer apmServer.Close()

// Create extension config and start the server
dataChannel := make(chan []byte, 100)
dataChannel := make(chan AgentData, 100)
config := extensionConfig{
apmServerUrl: apmServer.URL,
apmServerSecretToken: "foo",
Expand Down
2 changes: 1 addition & 1 deletion apm-lambda-extension/extension/process_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func ProcessShutdown() {
log.Println("Exiting")
}

func FlushAPMData(dataChannel chan []byte, config *extensionConfig) {
func FlushAPMData(dataChannel chan AgentData, config *extensionConfig) {
log.Println("Checking for agent data")
for {
select {
Expand Down
Loading