diff --git a/src/pkg/egress/syslog/https.go b/src/pkg/egress/syslog/https.go index 8003d7815..1dd8b8639 100644 --- a/src/pkg/egress/syslog/https.go +++ b/src/pkg/egress/syslog/https.go @@ -32,7 +32,6 @@ func NewHTTPSWriter( ) egress.WriteCloser { client := httpClient(netConf, tlsConf) - return &HTTPSWriter{ url: binding.URL, appID: binding.AppID, @@ -43,6 +42,29 @@ func NewHTTPSWriter( } } +func (w *HTTPSWriter) sendHttpRequest(msg []byte, msgCount float64) error { + req := fasthttp.AcquireRequest() + req.SetRequestURI(w.url.String()) + req.Header.SetMethod("POST") + req.Header.SetContentType("text/plain") + req.SetBody(msg) + + resp := fasthttp.AcquireResponse() + + err := w.client.Do(req, resp) + if err != nil { + return w.sanitizeError(w.url, err) + } + + if resp.StatusCode() < 200 || resp.StatusCode() > 299 { + return fmt.Errorf("syslog Writer: Post responded with %d status code", resp.StatusCode()) + } + + w.egressMetric.Add(msgCount) + + return nil +} + func (w *HTTPSWriter) Write(env *loggregator_v2.Envelope) error { msgs, err := w.syslogConverter.ToRFC5424(env, w.hostname) if err != nil { @@ -50,24 +72,10 @@ func (w *HTTPSWriter) Write(env *loggregator_v2.Envelope) error { } for _, msg := range msgs { - req := fasthttp.AcquireRequest() - req.SetRequestURI(w.url.String()) - req.Header.SetMethod("POST") - req.Header.SetContentType("text/plain") - req.SetBody(msg) - - resp := fasthttp.AcquireResponse() - - err := w.client.Do(req, resp) + err = w.sendHttpRequest(msg, 1) if err != nil { - return w.sanitizeError(w.url, err) - } - - if resp.StatusCode() < 200 || resp.StatusCode() > 299 { - return fmt.Errorf("syslog Writer: Post responded with %d status code", resp.StatusCode()) + return err } - - w.egressMetric.Add(1) } return nil diff --git a/src/pkg/egress/syslog/https_batch.go b/src/pkg/egress/syslog/https_batch.go new file mode 100644 index 000000000..874fc9a1f --- /dev/null +++ b/src/pkg/egress/syslog/https_batch.go @@ -0,0 +1,88 @@ +package syslog + +import ( + "bytes" + "crypto/tls" + "time" + + "code.cloudfoundry.org/go-loggregator/v9/rpc/loggregator_v2" + metrics "code.cloudfoundry.org/go-metric-registry" + "code.cloudfoundry.org/loggregator-agent-release/src/pkg/egress" +) + +const BATCHSIZE = 256 * 1024 + +type HTTPSBatchWriter struct { + HTTPSWriter + msgs chan []byte + batchSize int + sendInterval time.Duration + egrMsgCount float64 +} + +func NewHTTPSBatchWriter( + binding *URLBinding, + netConf NetworkTimeoutConfig, + tlsConf *tls.Config, + egressMetric metrics.Counter, + c *Converter, +) egress.WriteCloser { + client := httpClient(netConf, tlsConf) + binding.URL.Scheme = "https" // reset the scheme for usage to a valid http scheme + BatchWriter := &HTTPSBatchWriter{ + HTTPSWriter: HTTPSWriter{ + url: binding.URL, + appID: binding.AppID, + hostname: binding.Hostname, + client: client, + egressMetric: egressMetric, + syslogConverter: c, + }, + batchSize: BATCHSIZE, + sendInterval: 1 * time.Second, + egrMsgCount: 0, + msgs: make(chan []byte), + } + go BatchWriter.startSender() + return BatchWriter +} + +// Modified Write function +func (w *HTTPSBatchWriter) Write(env *loggregator_v2.Envelope) error { + msgs, err := w.syslogConverter.ToRFC5424(env, w.hostname) + if err != nil { + return err + } + + for _, msg := range msgs { + w.msgs <- msg + } + return nil +} + +func (w *HTTPSBatchWriter) startSender() { + t := time.NewTimer(w.sendInterval) + + var msgBatch bytes.Buffer + var msgCount float64 + for { + select { + case msg := <-w.msgs: + msgBatch.Write(msg) + msgCount++ + if msgBatch.Len() >= w.batchSize { + w.sendHttpRequest(msgBatch.Bytes(), msgCount) + msgBatch.Reset() + msgCount = 0 + t.Reset(w.sendInterval) + } + case <-t.C: + if msgBatch.Len() > 0 { + w.sendHttpRequest(msgBatch.Bytes(), msgCount) + msgBatch.Reset() + msgCount = 0 + } + t.Reset(w.sendInterval) + } + } +} diff --git a/src/pkg/egress/syslog/https_batch_test.go b/src/pkg/egress/syslog/https_batch_test.go new file mode 100644 index 000000000..2a328f9b9 --- /dev/null +++ b/src/pkg/egress/syslog/https_batch_test.go @@ -0,0 +1,144 @@ +package syslog_test + +import ( + "bytes" + "crypto/tls" + "io" + "net/http" + "net/http/httptest" + "time" + + "code.cloudfoundry.org/go-loggregator/v9/rfc5424" + "code.cloudfoundry.org/go-loggregator/v9/rpc/loggregator_v2" + metricsHelpers "code.cloudfoundry.org/go-metric-registry/testhelpers" + "code.cloudfoundry.org/loggregator-agent-release/src/pkg/egress" + "code.cloudfoundry.org/loggregator-agent-release/src/pkg/egress/syslog" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +var string_to_1024_chars = "saljdflajsdssdfsdfljkfkajafjajlköflkjöjaklgljksdjlakljkflkjweljklkwjejlkfekljwlkjefjklwjklsdajkljklwerlkaskldgjksakjekjwrjkljasdjkgfkljwejklrkjlklasdkjlsadjlfjlkadfljkajklsdfjklslkdfjkllkjasdjkflsdlakfjklasldfkjlasdjfkjlsadlfjklaljsafjlslkjawjklerkjljklasjkdfjklwerjljalsdjkflwerjlkwejlkarjklalkklfsdjlfhkjsdfkhsewhkjjasdjfkhwkejrkjahjefkhkasdjhfkashfkjwehfkksadfjaskfkhjdshjfhewkjhasdfjdajskfjwehkfajkankaskjdfasdjhfkkjhjjkasdfjhkjahksdf" + +var _ = Describe("HTTPS_batch_testing", func() { + var ( + netConf syslog.NetworkTimeoutConfig + skipSSLTLSConfig = &tls.Config{ + InsecureSkipVerify: true, //nolint:gosec + } + c = syslog.NewConverter() + drain *SpyDrain + b *syslog.URLBinding + writer egress.WriteCloser + ) + string_to_1024_chars += string_to_1024_chars + + BeforeEach(func() { + drain = newBatchMockDrain(200) + b = buildURLBinding( + drain.URL, + "test-app-id", + "test-hostname", + ) + writer = syslog.NewHTTPSBatchWriter( + b, + netConf, + skipSSLTLSConfig, + &metricsHelpers.SpyMetric{}, + c, + ) + }) + + It("testing simple appending of one log", func() { + env1 := buildLogEnvelope("APP", "1", "message 1", loggregator_v2.Log_OUT) + Expect(writer.Write(env1)).To(Succeed()) + env2 := buildLogEnvelope("APP", "2", "message 2", loggregator_v2.Log_OUT) + Expect(writer.Write(env2)).To(Succeed()) + time.Sleep(2 * time.Second) + + Expect(drain.messages).To(HaveLen(2)) + expected := &rfc5424.Message{ + AppName: "test-app-id", + Hostname: "test-hostname", + Priority: rfc5424.Priority(14), + ProcessID: "[APP/1]", + Message: []byte("message 1\n"), + } + Expect(drain.messages[0].AppName).To(Equal(expected.AppName)) + Expect(drain.messages[0].Hostname).To(Equal(expected.Hostname)) + Expect(drain.messages[0].Priority).To(BeEquivalentTo(expected.Priority)) + Expect(drain.messages[0].ProcessID).To(Equal(expected.ProcessID)) + Expect(drain.messages[0].Message).To(Equal(expected.Message)) + expected = &rfc5424.Message{ + AppName: "test-app-id", + Hostname: "test-hostname", + Priority: rfc5424.Priority(14), + ProcessID: "[APP/2]", + Message: []byte("message 2\n"), + } + Expect(drain.messages[1].AppName).To(Equal(expected.AppName)) + Expect(drain.messages[1].Hostname).To(Equal(expected.Hostname)) + Expect(drain.messages[1].Priority).To(BeEquivalentTo(expected.Priority)) + Expect(drain.messages[1].ProcessID).To(Equal(expected.ProcessID)) + Expect(drain.messages[1].Message).To(Equal(expected.Message)) + }) + + It("test early dispatch on high message load", func() { + env1 := buildLogEnvelope("APP", "1", "string to get log to 1024 characters:"+string_to_1024_chars, loggregator_v2.Log_OUT) + for i := 0; i < 300; i++ { + writer.Write(env1) + } + time.Sleep(100 * time.Millisecond) + Expect(drain.messages).To(HaveLen(256)) + }) + + It("test batch dispatching with all logs in a given timeframe", func() { + env1 := buildLogEnvelope("APP", "1", "string to get log to 1024 characters:"+string_to_1024_chars, loggregator_v2.Log_OUT) + for i := 0; i < 10; i++ { + writer.Write(env1) + time.Sleep(99 * time.Millisecond) + } + Expect(drain.messages).To(HaveLen(0)) + time.Sleep(100 * time.Millisecond) + Expect(drain.messages).To(HaveLen(10)) + }) + + It("probabilistic test for race condition", func() { + env1 := buildLogEnvelope("APP", "1", "string to get log to 1024 characters:"+string_to_1024_chars, loggregator_v2.Log_OUT) + for i := 0; i < 10; i++ { + writer.Write(env1) + time.Sleep(99 * time.Millisecond) + } + time.Sleep(100 * time.Millisecond) + Expect(drain.messages).To(HaveLen(10)) + }) +}) + +func newBatchMockDrain(status int) *SpyDrain { + drain := &SpyDrain{} + handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + + body, err := io.ReadAll(r.Body) + Expect(err).ToNot(HaveOccurred()) + defer r.Body.Close() + + println(body) + + message := &rfc5424.Message{} + + messages := bytes.SplitAfter(body, []byte("\n")) + for _, raw := range messages { + if bytes.Equal(raw, []byte("")) { + continue + } + message = &rfc5424.Message{} + err = message.UnmarshalBinary(raw) + Expect(err).ToNot(HaveOccurred()) + drain.messages = append(drain.messages, message) + drain.headers = append(drain.headers, r.Header) + } + w.WriteHeader(status) + }) + server := httptest.NewTLSServer(handler) + drain.Server = server + return drain +} diff --git a/src/pkg/egress/syslog/https_test.go b/src/pkg/egress/syslog/https_test.go index 77e9ef0c4..ebf999067 100644 --- a/src/pkg/egress/syslog/https_test.go +++ b/src/pkg/egress/syslog/https_test.go @@ -78,7 +78,6 @@ var _ = Describe("HTTPWriter", func() { &metricsHelpers.SpyMetric{}, c, ) - env := buildLogEnvelope("APP", "1", "just a test", loggregator_v2.Log_OUT) Expect(writer.Write(env)).To(HaveOccurred()) }) @@ -106,60 +105,6 @@ var _ = Describe("HTTPWriter", func() { Expect(err.Error()).ToNot(ContainSubstring("password")) }) - It("writes syslog formatted messages to http drain", func() { - drain := newMockOKDrain() - - b := buildURLBinding( - drain.URL, - "test-app-id", - "test-hostname", - ) - - writer := syslog.NewHTTPSWriter( - b, - netConf, - skipSSLTLSConfig, - &metricsHelpers.SpyMetric{}, - c, - ) - - env1 := buildLogEnvelope("APP", "1", "just a test", loggregator_v2.Log_OUT) - Expect(writer.Write(env1)).To(Succeed()) - env2 := buildLogEnvelope("CELL", "5", "log from cell", loggregator_v2.Log_ERR) - Expect(writer.Write(env2)).To(Succeed()) - env3 := buildLogEnvelope("CELL", "", "log from cell", loggregator_v2.Log_ERR) - Expect(writer.Write(env3)).To(Succeed()) - - Expect(drain.messages).To(HaveLen(3)) - expected := &rfc5424.Message{ - AppName: "test-app-id", - Hostname: "test-hostname", - Priority: rfc5424.Priority(14), - ProcessID: "[APP/1]", - Message: []byte("just a test\n"), - } - Expect(drain.messages[0].AppName).To(Equal(expected.AppName)) - Expect(drain.messages[0].Hostname).To(Equal(expected.Hostname)) - Expect(drain.messages[0].Priority).To(BeEquivalentTo(expected.Priority)) - Expect(drain.messages[0].ProcessID).To(Equal(expected.ProcessID)) - Expect(drain.messages[0].Message).To(Equal(expected.Message)) - - expected = &rfc5424.Message{ - AppName: "test-app-id", - Hostname: "test-hostname", - Priority: rfc5424.Priority(11), - ProcessID: "[CELL/5]", - Message: []byte("log from cell\n"), - } - Expect(drain.messages[1].AppName).To(Equal(expected.AppName)) - Expect(drain.messages[1].Hostname).To(Equal(expected.Hostname)) - Expect(drain.messages[1].Priority).To(BeEquivalentTo(expected.Priority)) - Expect(drain.messages[1].ProcessID).To(Equal(expected.ProcessID)) - Expect(drain.messages[1].Message).To(Equal(expected.Message)) - - Expect(drain.messages[2].ProcessID).To(Equal("[CELL]")) - }) - It("sets Content-Type to text/plain", func() { drain := newMockOKDrain() @@ -342,6 +287,7 @@ func newMockDrain(status int) *SpyDrain { Expect(err).ToNot(HaveOccurred()) defer r.Body.Close() + println(body) err = message.UnmarshalBinary(body) Expect(err).ToNot(HaveOccurred()) diff --git a/src/pkg/egress/syslog/writer_factory.go b/src/pkg/egress/syslog/writer_factory.go index 84b45d24c..8ec8249ab 100644 --- a/src/pkg/egress/syslog/writer_factory.go +++ b/src/pkg/egress/syslog/writer_factory.go @@ -106,6 +106,14 @@ func (f WriterFactory) NewWriter(ub *URLBinding) (egress.WriteCloser, error) { egressMetric, converter, ) + case "https-batch": + w = NewHTTPSBatchWriter( + ub, + f.netConf, + tlsCfg, + egressMetric, + converter, + ) case "syslog": w = NewTCPWriter( ub, diff --git a/src/pkg/egress/syslog/writer_factory_test.go b/src/pkg/egress/syslog/writer_factory_test.go index 1fbbe35e0..5707bbf56 100644 --- a/src/pkg/egress/syslog/writer_factory_test.go +++ b/src/pkg/egress/syslog/writer_factory_test.go @@ -23,7 +23,7 @@ var _ = Describe("EgressFactory", func() { }) Context("when the url begins with https", func() { - It("returns an https writer", func() { + It("returns an single https writer", func() { url, err := url.Parse("https://syslog.example.com") Expect(err).ToNot(HaveOccurred()) urlBinding := &syslog.URLBinding{ @@ -41,6 +41,25 @@ var _ = Describe("EgressFactory", func() { }) }) + Context("when the url begins with https and enables batching", func() { + It("returns an single https writer", func() { + url, err := url.Parse("https-batch://syslog.example.com") + Expect(err).ToNot(HaveOccurred()) + urlBinding := &syslog.URLBinding{ + URL: url, + } + + writer, err := f.NewWriter(urlBinding) + Expect(err).ToNot(HaveOccurred()) + + retryWriter, ok := writer.(*syslog.RetryWriter) + Expect(ok).To(BeTrue()) + + _, ok = retryWriter.Writer.(*syslog.HTTPSBatchWriter) + Expect(ok).To(BeTrue()) + }) + }) + Context("when the url begins with syslog://", func() { It("returns a tcp writer", func() { url, err := url.Parse("syslog://syslog.example.com")