Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add syslog batching poc implementation #491

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
42 changes: 25 additions & 17 deletions src/pkg/egress/syslog/https.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ func NewHTTPSWriter(
) egress.WriteCloser {

client := httpClient(netConf, tlsConf)

return &HTTPSWriter{
url: binding.URL,
appID: binding.AppID,
Expand All @@ -43,31 +42,40 @@ func NewHTTPSWriter(
}
}

func (w *HTTPSWriter) sendHttpRequest(msg []byte, msgCount float64) error {
req := fasthttp.AcquireRequest()
req.SetRequestURI(w.url.String())
req.Header.SetMethod("POST")
req.Header.SetContentType("text/plain")
req.SetBody(msg)

resp := fasthttp.AcquireResponse()

err := w.client.Do(req, resp)
if err != nil {
return w.sanitizeError(w.url, err)
}

if resp.StatusCode() < 200 || resp.StatusCode() > 299 {
return fmt.Errorf("syslog Writer: Post responded with %d status code", resp.StatusCode())
}

w.egressMetric.Add(msgCount)

return nil
}

func (w *HTTPSWriter) Write(env *loggregator_v2.Envelope) error {
msgs, err := w.syslogConverter.ToRFC5424(env, w.hostname)
if err != nil {
return err
}

for _, msg := range msgs {
req := fasthttp.AcquireRequest()
req.SetRequestURI(w.url.String())
req.Header.SetMethod("POST")
req.Header.SetContentType("text/plain")
req.SetBody(msg)

resp := fasthttp.AcquireResponse()

err := w.client.Do(req, resp)
err = w.sendHttpRequest(msg, 1)
if err != nil {
return w.sanitizeError(w.url, err)
}

if resp.StatusCode() < 200 || resp.StatusCode() > 299 {
return fmt.Errorf("syslog Writer: Post responded with %d status code", resp.StatusCode())
return err
}

w.egressMetric.Add(1)
}

return nil
Expand Down
88 changes: 88 additions & 0 deletions src/pkg/egress/syslog/https_batch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package syslog

import (
"bytes"
"crypto/tls"
"time"

"code.cloudfoundry.org/go-loggregator/v9/rpc/loggregator_v2"
metrics "code.cloudfoundry.org/go-metric-registry"
"code.cloudfoundry.org/loggregator-agent-release/src/pkg/egress"
)

const BATCHSIZE = 256 * 1024

type HTTPSBatchWriter struct {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We understand the goal of the HTTPSBatchWriter to be:

  • Buffer & batch incoming envelopes.
  • If the batch reaches a certain size, flush the batch to a destination.
  • On some interval, flush the batch to a destination.

❓ Is that right?

Currently, HTTPSBatchWriter and TriggerTimer doesn't appear to do that. We think that code actually results in the following behaviour:

  • Add one message to the batch and sleep for some interval, then flush to a destination.
  • After that, add messages to the batch as they come in.
  • If the batch reaches a certain size, flush the batch to a destination.

➡️ Can you please have a look at adjusting this code.

Here are some examples of ways we've done batching in the past:

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

first thing:
Yes your understanding seems right on what we try to do.

I have pushed a newer version, that should comply with the envisioned behaviour.
Tests for the new version are also added, confirming the implementation complying with the wanted behaviour.
I am currently looking through your pointers to see if I can leverage some of the implementations shown there.

Copy link
Author

@nicklas-dohrn nicklas-dohrn Jun 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding the code proposed:
The implementation for the signal batcher uses slices and an append structure.
It has information of how long the batch is going to be, speeding up the execution.
In contrast, the syslog batching feature will not know beforehand, how long batches are going to be,
Giving the edge of speed to Byte.buffers:
https://stackoverflow.com/questions/39319024/builtin-append-vs-bytes-buffer-write

Also, the actual buffering code is pretty short, and the expected type for sending data with the http client used is Byte[], so the choice still seems obvious to me.

HTTPSWriter
msgs chan []byte
batchSize int
sendInterval time.Duration
egrMsgCount float64
}

func NewHTTPSBatchWriter(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please add tests for this writer.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some tests are already added, do we need tests for the things already done by the httpsWriter (Error handling and the likes, which is anyways the same?)

binding *URLBinding,
netConf NetworkTimeoutConfig,
tlsConf *tls.Config,
egressMetric metrics.Counter,
c *Converter,
) egress.WriteCloser {
client := httpClient(netConf, tlsConf)
binding.URL.Scheme = "https" // reset the scheme for usage to a valid http scheme
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the purpose of changing the scheme here?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the scheme is used to differentiate between the different endpoints (https and https-batched)
If i do not change it back to https for sends, the queried url will be https-batched://... , which is just not working then.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does mutating the scheme here affect the metrics emitted?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will check for the metrics being changed here.

BatchWriter := &HTTPSBatchWriter{
HTTPSWriter: HTTPSWriter{
url: binding.URL,
appID: binding.AppID,
hostname: binding.Hostname,
client: client,
egressMetric: egressMetric,
syslogConverter: c,
},
batchSize: BATCHSIZE,
sendInterval: 1 * time.Second,
egrMsgCount: 0,
msgs: make(chan []byte),
}
go BatchWriter.startSender()
return BatchWriter
}

// Modified Write function
func (w *HTTPSBatchWriter) Write(env *loggregator_v2.Envelope) error {
msgs, err := w.syslogConverter.ToRFC5424(env, w.hostname)
if err != nil {
return err
}

for _, msg := range msgs {
w.msgs <- msg
}
return nil
}

func (w *HTTPSBatchWriter) startSender() {
t := time.NewTimer(w.sendInterval)

var msgBatch bytes.Buffer
var msgCount float64
for {
nicklas-dohrn marked this conversation as resolved.
Show resolved Hide resolved
select {
case msg := <-w.msgs:
msgBatch.Write(msg)
msgCount++
if msgBatch.Len() >= w.batchSize {
w.sendHttpRequest(msgBatch.Bytes(), msgCount)
msgBatch.Reset()
msgCount = 0
t.Reset(w.sendInterval)
}
case <-t.C:
if msgBatch.Len() > 0 {
w.sendHttpRequest(msgBatch.Bytes(), msgCount)
msgBatch.Reset()
msgCount = 0
}
t.Reset(w.sendInterval)
}
}
}
144 changes: 144 additions & 0 deletions src/pkg/egress/syslog/https_batch_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
package syslog_test

import (
"bytes"
"crypto/tls"
"io"
"net/http"
"net/http/httptest"
"time"

"code.cloudfoundry.org/go-loggregator/v9/rfc5424"
"code.cloudfoundry.org/go-loggregator/v9/rpc/loggregator_v2"
metricsHelpers "code.cloudfoundry.org/go-metric-registry/testhelpers"
"code.cloudfoundry.org/loggregator-agent-release/src/pkg/egress"
"code.cloudfoundry.org/loggregator-agent-release/src/pkg/egress/syslog"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)

var string_to_1024_chars = "saljdflajsdssdfsdfljkfkajafjajlköflkjöjaklgljksdjlakljkflkjweljklkwjejlkfekljwlkjefjklwjklsdajkljklwerlkaskldgjksakjekjwrjkljasdjkgfkljwejklrkjlklasdkjlsadjlfjlkadfljkajklsdfjklslkdfjkllkjasdjkflsdlakfjklasldfkjlasdjfkjlsadlfjklaljsafjlslkjawjklerkjljklasjkdfjklwerjljalsdjkflwerjlkwejlkarjklalkklfsdjlfhkjsdfkhsewhkjjasdjfkhwkejrkjahjefkhkasdjhfkashfkjwehfkksadfjaskfkhjdshjfhewkjhasdfjdajskfjwehkfajkankaskjdfasdjhfkkjhjjkasdfjhkjahksdf"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find the name here a little confusing. The string is ~440 bytes in length.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes this might be confusing.
just did it that way to not put way to many characters to reach the 1024.


var _ = Describe("HTTPS_batch_testing", func() {
var (
netConf syslog.NetworkTimeoutConfig
skipSSLTLSConfig = &tls.Config{
InsecureSkipVerify: true, //nolint:gosec
}
c = syslog.NewConverter()
drain *SpyDrain
b *syslog.URLBinding
writer egress.WriteCloser
)
string_to_1024_chars += string_to_1024_chars
nicklas-dohrn marked this conversation as resolved.
Show resolved Hide resolved

BeforeEach(func() {
drain = newBatchMockDrain(200)
b = buildURLBinding(
drain.URL,
"test-app-id",
"test-hostname",
)
writer = syslog.NewHTTPSBatchWriter(
b,
netConf,
skipSSLTLSConfig,
&metricsHelpers.SpyMetric{},
c,
)
})

It("testing simple appending of one log", func() {
env1 := buildLogEnvelope("APP", "1", "message 1", loggregator_v2.Log_OUT)
Expect(writer.Write(env1)).To(Succeed())
env2 := buildLogEnvelope("APP", "2", "message 2", loggregator_v2.Log_OUT)
Expect(writer.Write(env2)).To(Succeed())
time.Sleep(2 * time.Second)

Expect(drain.messages).To(HaveLen(2))
expected := &rfc5424.Message{
AppName: "test-app-id",
Hostname: "test-hostname",
Priority: rfc5424.Priority(14),
ProcessID: "[APP/1]",
Message: []byte("message 1\n"),
}
Expect(drain.messages[0].AppName).To(Equal(expected.AppName))
Expect(drain.messages[0].Hostname).To(Equal(expected.Hostname))
Expect(drain.messages[0].Priority).To(BeEquivalentTo(expected.Priority))
Expect(drain.messages[0].ProcessID).To(Equal(expected.ProcessID))
Expect(drain.messages[0].Message).To(Equal(expected.Message))
expected = &rfc5424.Message{
AppName: "test-app-id",
Hostname: "test-hostname",
Priority: rfc5424.Priority(14),
ProcessID: "[APP/2]",
Message: []byte("message 2\n"),
}
Expect(drain.messages[1].AppName).To(Equal(expected.AppName))
Expect(drain.messages[1].Hostname).To(Equal(expected.Hostname))
Expect(drain.messages[1].Priority).To(BeEquivalentTo(expected.Priority))
Expect(drain.messages[1].ProcessID).To(Equal(expected.ProcessID))
Expect(drain.messages[1].Message).To(Equal(expected.Message))
})

It("test early dispatch on high message load", func() {
env1 := buildLogEnvelope("APP", "1", "string to get log to 1024 characters:"+string_to_1024_chars, loggregator_v2.Log_OUT)
for i := 0; i < 300; i++ {
writer.Write(env1)
}
time.Sleep(100 * time.Millisecond)
Expect(drain.messages).To(HaveLen(256))
})

It("test batch dispatching with all logs in a given timeframe", func() {
env1 := buildLogEnvelope("APP", "1", "string to get log to 1024 characters:"+string_to_1024_chars, loggregator_v2.Log_OUT)
for i := 0; i < 10; i++ {
writer.Write(env1)
time.Sleep(99 * time.Millisecond)
}
Expect(drain.messages).To(HaveLen(0))
time.Sleep(100 * time.Millisecond)
Expect(drain.messages).To(HaveLen(10))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general in these tests can we use Eventually rather than relying on timing?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are not final yet.
this test especially is set to test for the time window being exactly 1s, like defined in the class, and not something else.
If I reread it right now, it does not exactly test that, but an eventually would not test, if the timings defined would be adhered to.

})

It("probabilistic test for race condition", func() {
env1 := buildLogEnvelope("APP", "1", "string to get log to 1024 characters:"+string_to_1024_chars, loggregator_v2.Log_OUT)
for i := 0; i < 10; i++ {
writer.Write(env1)
time.Sleep(99 * time.Millisecond)
}
time.Sleep(100 * time.Millisecond)
Expect(drain.messages).To(HaveLen(10))
})
})

func newBatchMockDrain(status int) *SpyDrain {
drain := &SpyDrain{}
handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {

body, err := io.ReadAll(r.Body)
Expect(err).ToNot(HaveOccurred())
defer r.Body.Close()

println(body)

message := &rfc5424.Message{}

messages := bytes.SplitAfter(body, []byte("\n"))
for _, raw := range messages {
if bytes.Equal(raw, []byte("")) {
continue
}
message = &rfc5424.Message{}
err = message.UnmarshalBinary(raw)
Expect(err).ToNot(HaveOccurred())
drain.messages = append(drain.messages, message)
drain.headers = append(drain.headers, r.Header)
}
w.WriteHeader(status)
})
server := httptest.NewTLSServer(handler)
drain.Server = server
return drain
}
56 changes: 1 addition & 55 deletions src/pkg/egress/syslog/https_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ var _ = Describe("HTTPWriter", func() {
&metricsHelpers.SpyMetric{},
c,
)

env := buildLogEnvelope("APP", "1", "just a test", loggregator_v2.Log_OUT)
Expect(writer.Write(env)).To(HaveOccurred())
})
Expand Down Expand Up @@ -106,60 +105,6 @@ var _ = Describe("HTTPWriter", func() {
Expect(err.Error()).ToNot(ContainSubstring("password"))
})

It("writes syslog formatted messages to http drain", func() {
drain := newMockOKDrain()

b := buildURLBinding(
drain.URL,
"test-app-id",
"test-hostname",
)

writer := syslog.NewHTTPSWriter(
b,
netConf,
skipSSLTLSConfig,
&metricsHelpers.SpyMetric{},
c,
)

env1 := buildLogEnvelope("APP", "1", "just a test", loggregator_v2.Log_OUT)
Expect(writer.Write(env1)).To(Succeed())
env2 := buildLogEnvelope("CELL", "5", "log from cell", loggregator_v2.Log_ERR)
Expect(writer.Write(env2)).To(Succeed())
env3 := buildLogEnvelope("CELL", "", "log from cell", loggregator_v2.Log_ERR)
Expect(writer.Write(env3)).To(Succeed())

Expect(drain.messages).To(HaveLen(3))
expected := &rfc5424.Message{
AppName: "test-app-id",
Hostname: "test-hostname",
Priority: rfc5424.Priority(14),
ProcessID: "[APP/1]",
Message: []byte("just a test\n"),
}
Expect(drain.messages[0].AppName).To(Equal(expected.AppName))
Expect(drain.messages[0].Hostname).To(Equal(expected.Hostname))
Expect(drain.messages[0].Priority).To(BeEquivalentTo(expected.Priority))
Expect(drain.messages[0].ProcessID).To(Equal(expected.ProcessID))
Expect(drain.messages[0].Message).To(Equal(expected.Message))

expected = &rfc5424.Message{
AppName: "test-app-id",
Hostname: "test-hostname",
Priority: rfc5424.Priority(11),
ProcessID: "[CELL/5]",
Message: []byte("log from cell\n"),
}
Expect(drain.messages[1].AppName).To(Equal(expected.AppName))
Expect(drain.messages[1].Hostname).To(Equal(expected.Hostname))
Expect(drain.messages[1].Priority).To(BeEquivalentTo(expected.Priority))
Expect(drain.messages[1].ProcessID).To(Equal(expected.ProcessID))
Expect(drain.messages[1].Message).To(Equal(expected.Message))

Expect(drain.messages[2].ProcessID).To(Equal("[CELL]"))
})

It("sets Content-Type to text/plain", func() {
drain := newMockOKDrain()

Expand Down Expand Up @@ -342,6 +287,7 @@ func newMockDrain(status int) *SpyDrain {
Expect(err).ToNot(HaveOccurred())
defer r.Body.Close()

println(body)
err = message.UnmarshalBinary(body)
Expect(err).ToNot(HaveOccurred())

Expand Down
8 changes: 8 additions & 0 deletions src/pkg/egress/syslog/writer_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,14 @@ func (f WriterFactory) NewWriter(ub *URLBinding) (egress.WriteCloser, error) {
egressMetric,
converter,
)
case "https-batch":
w = NewHTTPSBatchWriter(
ub,
f.netConf,
tlsCfg,
egressMetric,
converter,
)
case "syslog":
w = NewTCPWriter(
ub,
Expand Down
Loading