From 2d2556e40b68bebf3f92e68dd5cd1084a8847f7b Mon Sep 17 00:00:00 2001 From: nicklas dohrn Date: Tue, 13 Feb 2024 09:36:01 +0100 Subject: [PATCH 1/9] Add syslog batching poc implementation --- src/pkg/egress/syslog/https.go | 64 ++++++++++++++++++++++++++-------- 1 file changed, 50 insertions(+), 14 deletions(-) diff --git a/src/pkg/egress/syslog/https.go b/src/pkg/egress/syslog/https.go index 8003d7815..aeee579ad 100644 --- a/src/pkg/egress/syslog/https.go +++ b/src/pkg/egress/syslog/https.go @@ -14,6 +14,8 @@ import ( "github.com/valyala/fasthttp" ) +const BATCHSIZE = 256 * 1024 + type HTTPSWriter struct { hostname string appID string @@ -21,6 +23,10 @@ type HTTPSWriter struct { client *fasthttp.Client egressMetric metrics.Counter syslogConverter *Converter + msgBatch string + batchSize int + sendInterval time.Duration + sendTimer *time.Timer } func NewHTTPSWriter( @@ -40,9 +46,37 @@ func NewHTTPSWriter( client: client, egressMetric: egressMetric, syslogConverter: c, + msgBatch: "", + batchSize: BATCHSIZE, + sendInterval: time.Second, + } +} + +func (w *HTTPSWriter) sendMsgBatch() error { + req := fasthttp.AcquireRequest() + req.SetRequestURI(w.url.String()) + req.Header.SetMethod("POST") + req.Header.SetContentType("text/plain") + req.SetBodyString(w.msgBatch) + + w.msgBatch = "" + w.sendTimer = nil + + resp := fasthttp.AcquireResponse() + + err := w.client.Do(req, resp) + if err != nil { + return w.sanitizeError(w.url, err) + } + + if resp.StatusCode() < 200 || resp.StatusCode() > 299 { + return fmt.Errorf("syslog Writer: Post responded with %d status code", resp.StatusCode()) } + + return nil } +// Modified Write function func (w *HTTPSWriter) Write(env *loggregator_v2.Envelope) error { msgs, err := w.syslogConverter.ToRFC5424(env, w.hostname) if err != nil { @@ -50,24 +84,26 @@ func (w *HTTPSWriter) Write(env *loggregator_v2.Envelope) error { } for _, msg := range msgs { - req := fasthttp.AcquireRequest() - req.SetRequestURI(w.url.String()) - req.Header.SetMethod("POST") - req.Header.SetContentType("text/plain") - req.SetBody(msg) + if len(w.msgBatch) == 0 { + w.msgBatch = string(msg) + } else { + w.msgBatch += string(msg) + } + w.egressMetric.Add(1) + } - resp := fasthttp.AcquireResponse() + if w.sendTimer == nil { + w.sendTimer = time.AfterFunc(w.sendInterval, func() { + w.sendMsgBatch() + }) + } - err := w.client.Do(req, resp) + if len(w.msgBatch) >= w.batchSize { + w.sendTimer.Stop() + err = w.sendMsgBatch() if err != nil { - return w.sanitizeError(w.url, err) - } - - if resp.StatusCode() < 200 || resp.StatusCode() > 299 { - return fmt.Errorf("syslog Writer: Post responded with %d status code", resp.StatusCode()) + return err } - - w.egressMetric.Add(1) } return nil From 5170ba206d091c02e3922eb95c6a5413f830604c Mon Sep 17 00:00:00 2001 From: nicklas dohrn Date: Tue, 26 Mar 2024 11:31:30 +0100 Subject: [PATCH 2/9] Add switch functionality for bindings --- src/pkg/egress/syslog/https.go | 80 +++++++++++++++----- src/pkg/egress/syslog/writer_factory_test.go | 21 ++++- 2 files changed, 83 insertions(+), 18 deletions(-) diff --git a/src/pkg/egress/syslog/https.go b/src/pkg/egress/syslog/https.go index aeee579ad..4de43de5b 100644 --- a/src/pkg/egress/syslog/https.go +++ b/src/pkg/egress/syslog/https.go @@ -23,10 +23,14 @@ type HTTPSWriter struct { client *fasthttp.Client egressMetric metrics.Counter syslogConverter *Converter - msgBatch string - batchSize int - sendInterval time.Duration - sendTimer *time.Timer +} + +type BatchHTTPSWriter struct { + HTTPSWriter + msgBatch string + batchSize int + sendInterval time.Duration + sendTimer *time.Timer } func NewHTTPSWriter( @@ -38,21 +42,33 @@ func NewHTTPSWriter( ) egress.WriteCloser { client := httpClient(netConf, tlsConf) - - return &HTTPSWriter{ - url: binding.URL, - appID: binding.AppID, - hostname: binding.Hostname, - client: client, - egressMetric: egressMetric, - syslogConverter: c, - msgBatch: "", - batchSize: BATCHSIZE, - sendInterval: time.Second, + if binding.URL.Query().Get("batching") == "true" { + return &BatchHTTPSWriter{ + HTTPSWriter: HTTPSWriter{ + url: binding.URL, + appID: binding.AppID, + hostname: binding.Hostname, + client: client, + egressMetric: egressMetric, + syslogConverter: c, + }, + msgBatch: "", + batchSize: BATCHSIZE, + sendInterval: time.Second, + } + } else { + return &HTTPSWriter{ + url: binding.URL, + appID: binding.AppID, + hostname: binding.Hostname, + client: client, + egressMetric: egressMetric, + syslogConverter: c, + } } } -func (w *HTTPSWriter) sendMsgBatch() error { +func (w *BatchHTTPSWriter) sendMsgBatch() error { req := fasthttp.AcquireRequest() req.SetRequestURI(w.url.String()) req.Header.SetMethod("POST") @@ -77,7 +93,7 @@ func (w *HTTPSWriter) sendMsgBatch() error { } // Modified Write function -func (w *HTTPSWriter) Write(env *loggregator_v2.Envelope) error { +func (w *BatchHTTPSWriter) Write(env *loggregator_v2.Envelope) error { msgs, err := w.syslogConverter.ToRFC5424(env, w.hostname) if err != nil { return err @@ -109,6 +125,36 @@ func (w *HTTPSWriter) Write(env *loggregator_v2.Envelope) error { return nil } +func (w *HTTPSWriter) Write(env *loggregator_v2.Envelope) error { + msgs, err := w.syslogConverter.ToRFC5424(env, w.hostname) + if err != nil { + return err + } + + for _, msg := range msgs { + req := fasthttp.AcquireRequest() + req.SetRequestURI(w.url.String()) + req.Header.SetMethod("POST") + req.Header.SetContentType("text/plain") + req.SetBody(msg) + + resp := fasthttp.AcquireResponse() + + err := w.client.Do(req, resp) + if err != nil { + return w.sanitizeError(w.url, err) + } + + if resp.StatusCode() < 200 || resp.StatusCode() > 299 { + return fmt.Errorf("syslog Writer: Post responded with %d status code", resp.StatusCode()) + } + + w.egressMetric.Add(1) + } + + return nil +} + func (*HTTPSWriter) sanitizeError(u *url.URL, err error) error { if u == nil || u.User == nil { return err diff --git a/src/pkg/egress/syslog/writer_factory_test.go b/src/pkg/egress/syslog/writer_factory_test.go index 1fbbe35e0..1aab86caf 100644 --- a/src/pkg/egress/syslog/writer_factory_test.go +++ b/src/pkg/egress/syslog/writer_factory_test.go @@ -23,7 +23,7 @@ var _ = Describe("EgressFactory", func() { }) Context("when the url begins with https", func() { - It("returns an https writer", func() { + It("returns an single https writer", func() { url, err := url.Parse("https://syslog.example.com") Expect(err).ToNot(HaveOccurred()) urlBinding := &syslog.URLBinding{ @@ -41,6 +41,25 @@ var _ = Describe("EgressFactory", func() { }) }) + Context("when the url begins with https and enables batching", func() { + It("returns an single https writer", func() { + url, err := url.Parse("https://syslog.example.com?batching=true") + Expect(err).ToNot(HaveOccurred()) + urlBinding := &syslog.URLBinding{ + URL: url, + } + + writer, err := f.NewWriter(urlBinding) + Expect(err).ToNot(HaveOccurred()) + + retryWriter, ok := writer.(*syslog.RetryWriter) + Expect(ok).To(BeTrue()) + + _, ok = retryWriter.Writer.(*syslog.BatchHTTPSWriter) + Expect(ok).To(BeTrue()) + }) + }) + Context("when the url begins with syslog://", func() { It("returns a tcp writer", func() { url, err := url.Parse("syslog://syslog.example.com") From 48a13d826725f7b9f0b7fd3abbb1fa3eda4183c0 Mon Sep 17 00:00:00 2001 From: nicklas dohrn Date: Thu, 11 Apr 2024 10:22:41 +0200 Subject: [PATCH 3/9] Fix egressMetric reporting and minor redundancy This addresses all the comments by @ctlong. It fixes the unneded if else branching for adding to the message batch which is just not needed anymore. It fixes the egressMetric to behave similar to the single message implementation, to not count erroneous logs. --- src/pkg/egress/syslog/https.go | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/src/pkg/egress/syslog/https.go b/src/pkg/egress/syslog/https.go index 4de43de5b..786599ef3 100644 --- a/src/pkg/egress/syslog/https.go +++ b/src/pkg/egress/syslog/https.go @@ -31,6 +31,7 @@ type BatchHTTPSWriter struct { batchSize int sendInterval time.Duration sendTimer *time.Timer + egrMsgCount float64 } func NewHTTPSWriter( @@ -55,6 +56,7 @@ func NewHTTPSWriter( msgBatch: "", batchSize: BATCHSIZE, sendInterval: time.Second, + egrMsgCount: 0, } } else { return &HTTPSWriter{ @@ -74,7 +76,9 @@ func (w *BatchHTTPSWriter) sendMsgBatch() error { req.Header.SetMethod("POST") req.Header.SetContentType("text/plain") req.SetBodyString(w.msgBatch) + currentEgrCount := w.egrMsgCount + w.egrMsgCount = 0 w.msgBatch = "" w.sendTimer = nil @@ -89,6 +93,8 @@ func (w *BatchHTTPSWriter) sendMsgBatch() error { return fmt.Errorf("syslog Writer: Post responded with %d status code", resp.StatusCode()) } + w.egressMetric.Add(currentEgrCount) + return nil } @@ -100,12 +106,8 @@ func (w *BatchHTTPSWriter) Write(env *loggregator_v2.Envelope) error { } for _, msg := range msgs { - if len(w.msgBatch) == 0 { - w.msgBatch = string(msg) - } else { - w.msgBatch += string(msg) - } - w.egressMetric.Add(1) + w.msgBatch += string(msg) + w.egrMsgCount += 1 } if w.sendTimer == nil { From f206257d21edb489b60a0c0877c5ec5c98de2538 Mon Sep 17 00:00:00 2001 From: nicklas dohrn Date: Fri, 12 Apr 2024 14:26:37 +0200 Subject: [PATCH 4/9] Refactor and add new Timer implementation The refactor is mainly reshuffeling The new timer implementation makes it more clear what the actual logic is, and might also prevent some unresolvable states. It now only has two states: - Running if a batch is not yet full or time triggered - Not running if there was a batch send either through a time or a size based trigger --- src/pkg/egress/syslog/https.go | 87 ++++++++++++--------------- src/pkg/egress/syslog/triggerTimer.go | 39 ++++++++++++ 2 files changed, 77 insertions(+), 49 deletions(-) create mode 100644 src/pkg/egress/syslog/triggerTimer.go diff --git a/src/pkg/egress/syslog/https.go b/src/pkg/egress/syslog/https.go index 786599ef3..96c6c2ce7 100644 --- a/src/pkg/egress/syslog/https.go +++ b/src/pkg/egress/syslog/https.go @@ -1,6 +1,7 @@ package syslog import ( + "bytes" "crypto/tls" "errors" "fmt" @@ -27,10 +28,10 @@ type HTTPSWriter struct { type BatchHTTPSWriter struct { HTTPSWriter - msgBatch string + msgBatch bytes.Buffer batchSize int sendInterval time.Duration - sendTimer *time.Timer + sendTimer TriggerTimer egrMsgCount float64 } @@ -53,7 +54,6 @@ func NewHTTPSWriter( egressMetric: egressMetric, syslogConverter: c, }, - msgBatch: "", batchSize: BATCHSIZE, sendInterval: time.Second, egrMsgCount: 0, @@ -71,31 +71,13 @@ func NewHTTPSWriter( } func (w *BatchHTTPSWriter) sendMsgBatch() error { - req := fasthttp.AcquireRequest() - req.SetRequestURI(w.url.String()) - req.Header.SetMethod("POST") - req.Header.SetContentType("text/plain") - req.SetBodyString(w.msgBatch) currentEgrCount := w.egrMsgCount + currentMsg := w.msgBatch.Bytes() w.egrMsgCount = 0 - w.msgBatch = "" - w.sendTimer = nil - - resp := fasthttp.AcquireResponse() + w.msgBatch.Reset() - err := w.client.Do(req, resp) - if err != nil { - return w.sanitizeError(w.url, err) - } - - if resp.StatusCode() < 200 || resp.StatusCode() > 299 { - return fmt.Errorf("syslog Writer: Post responded with %d status code", resp.StatusCode()) - } - - w.egressMetric.Add(currentEgrCount) - - return nil + return w.sendHttpRequest(currentMsg, currentEgrCount) } // Modified Write function @@ -106,24 +88,45 @@ func (w *BatchHTTPSWriter) Write(env *loggregator_v2.Envelope) error { } for _, msg := range msgs { - w.msgBatch += string(msg) + w.msgBatch.Write(msg) w.egrMsgCount += 1 + w.startAndTriggerSend() } + return nil +} - if w.sendTimer == nil { - w.sendTimer = time.AfterFunc(w.sendInterval, func() { +// TODO: Error back propagation. Errors are not looked at further down the call chain +func (w *BatchHTTPSWriter) startAndTriggerSend() { + if !w.sendTimer.Running() { + w.sendTimer.Start(w.sendInterval, func() { w.sendMsgBatch() }) } + if w.msgBatch.Len() >= w.batchSize { + w.sendTimer.Trigger() + } +} - if len(w.msgBatch) >= w.batchSize { - w.sendTimer.Stop() - err = w.sendMsgBatch() - if err != nil { - return err - } +func (w *HTTPSWriter) sendHttpRequest(msg []byte, msgCount float64) error { + req := fasthttp.AcquireRequest() + req.SetRequestURI(w.url.String()) + req.Header.SetMethod("POST") + req.Header.SetContentType("text/plain") + req.SetBody(msg) + + resp := fasthttp.AcquireResponse() + + err := w.client.Do(req, resp) + if err != nil { + return w.sanitizeError(w.url, err) } + if resp.StatusCode() < 200 || resp.StatusCode() > 299 { + return fmt.Errorf("syslog Writer: Post responded with %d status code", resp.StatusCode()) + } + + w.egressMetric.Add(msgCount) + return nil } @@ -134,24 +137,10 @@ func (w *HTTPSWriter) Write(env *loggregator_v2.Envelope) error { } for _, msg := range msgs { - req := fasthttp.AcquireRequest() - req.SetRequestURI(w.url.String()) - req.Header.SetMethod("POST") - req.Header.SetContentType("text/plain") - req.SetBody(msg) - - resp := fasthttp.AcquireResponse() - - err := w.client.Do(req, resp) + err = w.sendHttpRequest(msg, 1) if err != nil { - return w.sanitizeError(w.url, err) - } - - if resp.StatusCode() < 200 || resp.StatusCode() > 299 { - return fmt.Errorf("syslog Writer: Post responded with %d status code", resp.StatusCode()) + return err } - - w.egressMetric.Add(1) } return nil diff --git a/src/pkg/egress/syslog/triggerTimer.go b/src/pkg/egress/syslog/triggerTimer.go new file mode 100644 index 000000000..f6a1f651c --- /dev/null +++ b/src/pkg/egress/syslog/triggerTimer.go @@ -0,0 +1,39 @@ +package syslog + +import "time" + +type TriggerTimer struct { + trigger chan int + running bool +} + +type Timer interface { + Start(d time.Duration, f func()) +} + +func NewTriggerTimer() Timer { + return &TriggerTimer{ + running: false, + } +} + +func (t *TriggerTimer) Start(d time.Duration, f func()) { + t.running = true + for { + timer := time.NewTimer(d) + select { + case <-timer.C: + case <-t.trigger: + f() + t.running = false + } + } +} + +func (t *TriggerTimer) Trigger() { + t.trigger <- 1 +} + +func (t *TriggerTimer) Running() bool { + return t.running +} From 35104c575b764dc980e91dd470bb307908cb3560 Mon Sep 17 00:00:00 2001 From: nicklas dohrn Date: Mon, 6 May 2024 16:07:06 +0200 Subject: [PATCH 5/9] Refactor approach to use different protocol instead of parameter. This is a new approach to switch between http and http batching. It only is different in this regard from the previous attempts, and only contains refactorings besides this change. --- src/pkg/egress/syslog/https.go | 79 ++---------- src/pkg/egress/syslog/https_batch.go | 119 +++++++++++++++++++ src/pkg/egress/syslog/triggerTimer.go | 39 ------ src/pkg/egress/syslog/writer_factory.go | 8 ++ src/pkg/egress/syslog/writer_factory_test.go | 4 +- 5 files changed, 136 insertions(+), 113 deletions(-) create mode 100644 src/pkg/egress/syslog/https_batch.go delete mode 100644 src/pkg/egress/syslog/triggerTimer.go diff --git a/src/pkg/egress/syslog/https.go b/src/pkg/egress/syslog/https.go index 96c6c2ce7..1dd8b8639 100644 --- a/src/pkg/egress/syslog/https.go +++ b/src/pkg/egress/syslog/https.go @@ -1,7 +1,6 @@ package syslog import ( - "bytes" "crypto/tls" "errors" "fmt" @@ -15,8 +14,6 @@ import ( "github.com/valyala/fasthttp" ) -const BATCHSIZE = 256 * 1024 - type HTTPSWriter struct { hostname string appID string @@ -26,15 +23,6 @@ type HTTPSWriter struct { syslogConverter *Converter } -type BatchHTTPSWriter struct { - HTTPSWriter - msgBatch bytes.Buffer - batchSize int - sendInterval time.Duration - sendTimer TriggerTimer - egrMsgCount float64 -} - func NewHTTPSWriter( binding *URLBinding, netConf NetworkTimeoutConfig, @@ -44,66 +32,13 @@ func NewHTTPSWriter( ) egress.WriteCloser { client := httpClient(netConf, tlsConf) - if binding.URL.Query().Get("batching") == "true" { - return &BatchHTTPSWriter{ - HTTPSWriter: HTTPSWriter{ - url: binding.URL, - appID: binding.AppID, - hostname: binding.Hostname, - client: client, - egressMetric: egressMetric, - syslogConverter: c, - }, - batchSize: BATCHSIZE, - sendInterval: time.Second, - egrMsgCount: 0, - } - } else { - return &HTTPSWriter{ - url: binding.URL, - appID: binding.AppID, - hostname: binding.Hostname, - client: client, - egressMetric: egressMetric, - syslogConverter: c, - } - } -} - -func (w *BatchHTTPSWriter) sendMsgBatch() error { - currentEgrCount := w.egrMsgCount - currentMsg := w.msgBatch.Bytes() - - w.egrMsgCount = 0 - w.msgBatch.Reset() - - return w.sendHttpRequest(currentMsg, currentEgrCount) -} - -// Modified Write function -func (w *BatchHTTPSWriter) Write(env *loggregator_v2.Envelope) error { - msgs, err := w.syslogConverter.ToRFC5424(env, w.hostname) - if err != nil { - return err - } - - for _, msg := range msgs { - w.msgBatch.Write(msg) - w.egrMsgCount += 1 - w.startAndTriggerSend() - } - return nil -} - -// TODO: Error back propagation. Errors are not looked at further down the call chain -func (w *BatchHTTPSWriter) startAndTriggerSend() { - if !w.sendTimer.Running() { - w.sendTimer.Start(w.sendInterval, func() { - w.sendMsgBatch() - }) - } - if w.msgBatch.Len() >= w.batchSize { - w.sendTimer.Trigger() + return &HTTPSWriter{ + url: binding.URL, + appID: binding.AppID, + hostname: binding.Hostname, + client: client, + egressMetric: egressMetric, + syslogConverter: c, } } diff --git a/src/pkg/egress/syslog/https_batch.go b/src/pkg/egress/syslog/https_batch.go new file mode 100644 index 000000000..ee497eafb --- /dev/null +++ b/src/pkg/egress/syslog/https_batch.go @@ -0,0 +1,119 @@ +package syslog + +import ( + "bytes" + "crypto/tls" + "time" + + "code.cloudfoundry.org/go-loggregator/v9/rpc/loggregator_v2" + metrics "code.cloudfoundry.org/go-metric-registry" + "code.cloudfoundry.org/loggregator-agent-release/src/pkg/egress" +) + +const BATCHSIZE = 256 * 1024 + +type HTTPSBatchWriter struct { + HTTPSWriter + msgBatch bytes.Buffer + batchSize int + sendInterval time.Duration + sendTimer TriggerTimer + egrMsgCount float64 +} + +func NewHTTPSBatchWriter( + binding *URLBinding, + netConf NetworkTimeoutConfig, + tlsConf *tls.Config, + egressMetric metrics.Counter, + c *Converter, +) egress.WriteCloser { + client := httpClient(netConf, tlsConf) + binding.URL.Scheme = "https" // reset the scheme for usage to a valid http scheme + return &HTTPSBatchWriter{ + HTTPSWriter: HTTPSWriter{ + url: binding.URL, + appID: binding.AppID, + hostname: binding.Hostname, + client: client, + egressMetric: egressMetric, + syslogConverter: c, + }, + batchSize: BATCHSIZE, + sendInterval: time.Second, + egrMsgCount: 0, + } +} + +func (w *HTTPSBatchWriter) sendMsgBatch() error { + currentEgrCount := w.egrMsgCount + currentMsg := w.msgBatch.Bytes() + + w.egrMsgCount = 0 + w.msgBatch.Reset() + + return w.sendHttpRequest(currentMsg, currentEgrCount) +} + +// Modified Write function +func (w *HTTPSBatchWriter) Write(env *loggregator_v2.Envelope) error { + msgs, err := w.syslogConverter.ToRFC5424(env, w.hostname) + if err != nil { + return err + } + + for _, msg := range msgs { + w.msgBatch.Write(msg) + w.egrMsgCount += 1 + w.startAndTriggerSend() + } + return nil +} + +// TODO: Error back propagation. Errors are not looked at further down the call chain +func (w *HTTPSBatchWriter) startAndTriggerSend() { + if !w.sendTimer.Running() { + w.sendTimer.Start(w.sendInterval, func() { + w.sendMsgBatch() + }) + } + if w.msgBatch.Len() >= w.batchSize { + w.sendTimer.Trigger() + } +} + +type TriggerTimer struct { + trigger chan int + running bool +} + +type Timer interface { + Start(d time.Duration, f func()) +} + +func NewTriggerTimer() Timer { + return &TriggerTimer{ + running: false, + } +} + +func (t *TriggerTimer) Start(d time.Duration, f func()) { + t.running = true + for { + timer := time.NewTimer(d) + select { + case <-timer.C: + case <-t.trigger: + f() + t.running = false + } + } +} + +func (t *TriggerTimer) Trigger() { + t.trigger <- 1 +} + +func (t *TriggerTimer) Running() bool { + return t.running +} diff --git a/src/pkg/egress/syslog/triggerTimer.go b/src/pkg/egress/syslog/triggerTimer.go deleted file mode 100644 index f6a1f651c..000000000 --- a/src/pkg/egress/syslog/triggerTimer.go +++ /dev/null @@ -1,39 +0,0 @@ -package syslog - -import "time" - -type TriggerTimer struct { - trigger chan int - running bool -} - -type Timer interface { - Start(d time.Duration, f func()) -} - -func NewTriggerTimer() Timer { - return &TriggerTimer{ - running: false, - } -} - -func (t *TriggerTimer) Start(d time.Duration, f func()) { - t.running = true - for { - timer := time.NewTimer(d) - select { - case <-timer.C: - case <-t.trigger: - f() - t.running = false - } - } -} - -func (t *TriggerTimer) Trigger() { - t.trigger <- 1 -} - -func (t *TriggerTimer) Running() bool { - return t.running -} diff --git a/src/pkg/egress/syslog/writer_factory.go b/src/pkg/egress/syslog/writer_factory.go index 84b45d24c..8ec8249ab 100644 --- a/src/pkg/egress/syslog/writer_factory.go +++ b/src/pkg/egress/syslog/writer_factory.go @@ -106,6 +106,14 @@ func (f WriterFactory) NewWriter(ub *URLBinding) (egress.WriteCloser, error) { egressMetric, converter, ) + case "https-batch": + w = NewHTTPSBatchWriter( + ub, + f.netConf, + tlsCfg, + egressMetric, + converter, + ) case "syslog": w = NewTCPWriter( ub, diff --git a/src/pkg/egress/syslog/writer_factory_test.go b/src/pkg/egress/syslog/writer_factory_test.go index 1aab86caf..5707bbf56 100644 --- a/src/pkg/egress/syslog/writer_factory_test.go +++ b/src/pkg/egress/syslog/writer_factory_test.go @@ -43,7 +43,7 @@ var _ = Describe("EgressFactory", func() { Context("when the url begins with https and enables batching", func() { It("returns an single https writer", func() { - url, err := url.Parse("https://syslog.example.com?batching=true") + url, err := url.Parse("https-batch://syslog.example.com") Expect(err).ToNot(HaveOccurred()) urlBinding := &syslog.URLBinding{ URL: url, @@ -55,7 +55,7 @@ var _ = Describe("EgressFactory", func() { retryWriter, ok := writer.(*syslog.RetryWriter) Expect(ok).To(BeTrue()) - _, ok = retryWriter.Writer.(*syslog.BatchHTTPSWriter) + _, ok = retryWriter.Writer.(*syslog.HTTPSBatchWriter) Expect(ok).To(BeTrue()) }) }) From 21666c85a3b436588e0acaf565523cc5ff62bcd5 Mon Sep 17 00:00:00 2001 From: nicklas dohrn Date: Sun, 12 May 2024 07:57:36 +0200 Subject: [PATCH 6/9] Fix trigger timer issues --- src/pkg/egress/syslog/https_batch.go | 45 +++++++++++++--------------- 1 file changed, 21 insertions(+), 24 deletions(-) diff --git a/src/pkg/egress/syslog/https_batch.go b/src/pkg/egress/syslog/https_batch.go index ee497eafb..da1f1f5cc 100644 --- a/src/pkg/egress/syslog/https_batch.go +++ b/src/pkg/egress/syslog/https_batch.go @@ -17,7 +17,7 @@ type HTTPSBatchWriter struct { msgBatch bytes.Buffer batchSize int sendInterval time.Duration - sendTimer TriggerTimer + sendTimer *TriggerTimer egrMsgCount float64 } @@ -72,8 +72,8 @@ func (w *HTTPSBatchWriter) Write(env *loggregator_v2.Envelope) error { // TODO: Error back propagation. Errors are not looked at further down the call chain func (w *HTTPSBatchWriter) startAndTriggerSend() { - if !w.sendTimer.Running() { - w.sendTimer.Start(w.sendInterval, func() { + if w.sendTimer == nil || !w.sendTimer.Running() { + w.sendTimer = NewTriggerTimer(w.sendInterval, func() { w.sendMsgBatch() }) } @@ -83,37 +83,34 @@ func (w *HTTPSBatchWriter) startAndTriggerSend() { } type TriggerTimer struct { - trigger chan int - running bool + triggered bool + execFunc func() } -type Timer interface { - Start(d time.Duration, f func()) -} - -func NewTriggerTimer() Timer { - return &TriggerTimer{ - running: false, +func NewTriggerTimer(d time.Duration, f func()) *TriggerTimer { + timer := &TriggerTimer{ + triggered: false, + execFunc: f, } + timer.initWait(d) + + return timer } -func (t *TriggerTimer) Start(d time.Duration, f func()) { - t.running = true - for { - timer := time.NewTimer(d) - select { - case <-timer.C: - case <-t.trigger: - f() - t.running = false - } +func (t *TriggerTimer) initWait(duration time.Duration) { + timer := time.NewTimer(duration) + <-timer.C + if !t.triggered { + t.execFunc() } + } func (t *TriggerTimer) Trigger() { - t.trigger <- 1 + t.triggered = true + t.execFunc() } func (t *TriggerTimer) Running() bool { - return t.running + return !t.triggered } From 3c9934f67fe3c5d82582aebab42b8f2229beef65 Mon Sep 17 00:00:00 2001 From: nicklas dohrn Date: Wed, 19 Jun 2024 06:40:05 +0200 Subject: [PATCH 7/9] Add tests and fix test related issues --- src/pkg/egress/syslog/https_batch.go | 17 +- src/pkg/egress/syslog/https_batch_test.go | 180 ++++++++++++++++++++++ src/pkg/egress/syslog/https_test.go | 55 +------ src/pkg/egress/syslog/syslog_connector.go | 8 - 4 files changed, 190 insertions(+), 70 deletions(-) create mode 100644 src/pkg/egress/syslog/https_batch_test.go diff --git a/src/pkg/egress/syslog/https_batch.go b/src/pkg/egress/syslog/https_batch.go index da1f1f5cc..06a394b57 100644 --- a/src/pkg/egress/syslog/https_batch.go +++ b/src/pkg/egress/syslog/https_batch.go @@ -40,7 +40,7 @@ func NewHTTPSBatchWriter( syslogConverter: c, }, batchSize: BATCHSIZE, - sendInterval: time.Second, + sendInterval: 1 * time.Second, egrMsgCount: 0, } } @@ -99,16 +99,17 @@ func NewTriggerTimer(d time.Duration, f func()) *TriggerTimer { func (t *TriggerTimer) initWait(duration time.Duration) { timer := time.NewTimer(duration) - <-timer.C - if !t.triggered { - t.execFunc() - } - + go func() { + <-timer.C + t.Trigger() + }() } func (t *TriggerTimer) Trigger() { - t.triggered = true - t.execFunc() + if !t.triggered { + t.triggered = true + t.execFunc() + } } func (t *TriggerTimer) Running() bool { diff --git a/src/pkg/egress/syslog/https_batch_test.go b/src/pkg/egress/syslog/https_batch_test.go new file mode 100644 index 000000000..389283574 --- /dev/null +++ b/src/pkg/egress/syslog/https_batch_test.go @@ -0,0 +1,180 @@ +package syslog_test + +import ( + "bytes" + "crypto/tls" + "io" + "net/http" + "net/http/httptest" + "time" + + "code.cloudfoundry.org/go-loggregator/v9/rfc5424" + "code.cloudfoundry.org/go-loggregator/v9/rpc/loggregator_v2" + metricsHelpers "code.cloudfoundry.org/go-metric-registry/testhelpers" + "code.cloudfoundry.org/loggregator-agent-release/src/pkg/egress" + "code.cloudfoundry.org/loggregator-agent-release/src/pkg/egress/syslog" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +var triggered = 0 +var string_to_1024_chars = "saljdflajsdssdfsdfljkfkajafjajlköflkjöjaklgljksdjlakljkflkjweljklkwjejlkfekljwlkjefjklwjklsdajkljklwerlkaskldgjksakjekjwrjkljasdjkgfkljwejklrkjlklasdkjlsadjlfjlkadfljkajklsdfjklslkdfjkllkjasdjkflsdlakfjklasldfkjlasdjfkjlsadlfjklaljsafjlslkjawjklerkjljklasjkdfjklwerjljalsdjkflwerjlkwejlkarjklalkklfsdjlfhkjsdfkhsewhkjjasdjfkhwkejrkjahjefkhkasdjhfkashfkjwehfkksadfjaskfkhjdshjfhewkjhasdfjdajskfjwehkfajkankaskjdfasdjhfkkjhjjkasdfjhkjahksdf" + +var _ = Describe("TriggerTimer testing", func() { + BeforeEach(func() { + triggered = 0 + }) + + It("Timer triggered by call", func() { + timer := syslog.NewTriggerTimer(10*time.Millisecond, trigger) + timer.Trigger() + //expect timer to be triggered and therefore stopped + Expect(timer.Running()).To(BeFalse()) + Expect(triggered).To(Equal(1)) + time.Sleep(12 * time.Millisecond) + //expect timer to stay stopped and not trigger func again + Expect(timer.Running()).To(BeFalse()) + Expect(triggered).To(Equal(1)) + }) + + It("Timer triggered by time elapsed", func() { + timer := syslog.NewTriggerTimer(10*time.Millisecond, trigger) + //expect timer to be running and untriggered + Expect(timer.Running()).To(BeTrue()) + Expect(triggered).To(Equal(0)) + time.Sleep(12 * time.Millisecond) + //expect timer to be triggered and stopped + Expect(timer.Running()).To(BeFalse()) + Expect(triggered).To(Equal(1)) + //expect timer to not be able to be retriggered + timer.Trigger() + Expect(timer.Running()).To(BeFalse()) + Expect(triggered).To(Equal(1)) + }) +}) + +func trigger() { + triggered += 1 +} + +var _ = Describe("HTTPS_batch_testing", func() { + var ( + netConf syslog.NetworkTimeoutConfig + skipSSLTLSConfig = &tls.Config{ + InsecureSkipVerify: true, //nolint:gosec + } + c = syslog.NewConverter() + drain *SpyDrain + b *syslog.URLBinding + writer egress.WriteCloser + ) + string_to_1024_chars += string_to_1024_chars + + BeforeEach(func() { + drain = newBatchMockDrain(200) + b = buildURLBinding( + drain.URL, + "test-app-id", + "test-hostname", + ) + writer = syslog.NewHTTPSBatchWriter( + b, + netConf, + skipSSLTLSConfig, + &metricsHelpers.SpyMetric{}, + c, + ) + }) + + It("testing simple appending of one log", func() { + env1 := buildLogEnvelope("APP", "1", "message 1", loggregator_v2.Log_OUT) + Expect(writer.Write(env1)).To(Succeed()) + env2 := buildLogEnvelope("APP", "2", "message 2", loggregator_v2.Log_OUT) + Expect(writer.Write(env2)).To(Succeed()) + time.Sleep(2 * time.Second) + + Expect(drain.messages).To(HaveLen(2)) + expected := &rfc5424.Message{ + AppName: "test-app-id", + Hostname: "test-hostname", + Priority: rfc5424.Priority(14), + ProcessID: "[APP/1]", + Message: []byte("message 1\n"), + } + Expect(drain.messages[0].AppName).To(Equal(expected.AppName)) + Expect(drain.messages[0].Hostname).To(Equal(expected.Hostname)) + Expect(drain.messages[0].Priority).To(BeEquivalentTo(expected.Priority)) + Expect(drain.messages[0].ProcessID).To(Equal(expected.ProcessID)) + Expect(drain.messages[0].Message).To(Equal(expected.Message)) + expected = &rfc5424.Message{ + AppName: "test-app-id", + Hostname: "test-hostname", + Priority: rfc5424.Priority(14), + ProcessID: "[APP/2]", + Message: []byte("message 2\n"), + } + Expect(drain.messages[1].AppName).To(Equal(expected.AppName)) + Expect(drain.messages[1].Hostname).To(Equal(expected.Hostname)) + Expect(drain.messages[1].Priority).To(BeEquivalentTo(expected.Priority)) + Expect(drain.messages[1].ProcessID).To(Equal(expected.ProcessID)) + Expect(drain.messages[1].Message).To(Equal(expected.Message)) + }) + + It("test early dispatch on high message load", func() { + env1 := buildLogEnvelope("APP", "1", "string to get log to 1024 characters:"+string_to_1024_chars, loggregator_v2.Log_OUT) + for i := 0; i < 300; i++ { + writer.Write(env1) + } + Expect(drain.messages).To(HaveLen(256)) + }) + + It("test batch dispatching with all logs in a given timeframe", func() { + env1 := buildLogEnvelope("APP", "1", "string to get log to 1024 characters:"+string_to_1024_chars, loggregator_v2.Log_OUT) + for i := 0; i < 10; i++ { + writer.Write(env1) + time.Sleep(99 * time.Millisecond) + } + time.Sleep(100 * time.Millisecond) + Expect(drain.messages).To(HaveLen(10)) + }) + + It("probabilistic test for race condition", func() { + env1 := buildLogEnvelope("APP", "1", "string to get log to 1024 characters:"+string_to_1024_chars, loggregator_v2.Log_OUT) + for i := 0; i < 10; i++ { + writer.Write(env1) + time.Sleep(99 * time.Millisecond) + } + time.Sleep(100 * time.Millisecond) + Expect(drain.messages).To(HaveLen(10)) + }) +}) + +func newBatchMockDrain(status int) *SpyDrain { + drain := &SpyDrain{} + handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + + body, err := io.ReadAll(r.Body) + Expect(err).ToNot(HaveOccurred()) + defer r.Body.Close() + + println(body) + + message := &rfc5424.Message{} + + messages := bytes.SplitAfter(body, []byte("\n")) + for _, raw := range messages { + if bytes.Equal(raw, []byte("")) { + continue + } + message = &rfc5424.Message{} + err = message.UnmarshalBinary(raw) + Expect(err).ToNot(HaveOccurred()) + drain.messages = append(drain.messages, message) + drain.headers = append(drain.headers, r.Header) + } + w.WriteHeader(status) + }) + server := httptest.NewTLSServer(handler) + drain.Server = server + return drain +} diff --git a/src/pkg/egress/syslog/https_test.go b/src/pkg/egress/syslog/https_test.go index 77e9ef0c4..21bf138c5 100644 --- a/src/pkg/egress/syslog/https_test.go +++ b/src/pkg/egress/syslog/https_test.go @@ -106,60 +106,6 @@ var _ = Describe("HTTPWriter", func() { Expect(err.Error()).ToNot(ContainSubstring("password")) }) - It("writes syslog formatted messages to http drain", func() { - drain := newMockOKDrain() - - b := buildURLBinding( - drain.URL, - "test-app-id", - "test-hostname", - ) - - writer := syslog.NewHTTPSWriter( - b, - netConf, - skipSSLTLSConfig, - &metricsHelpers.SpyMetric{}, - c, - ) - - env1 := buildLogEnvelope("APP", "1", "just a test", loggregator_v2.Log_OUT) - Expect(writer.Write(env1)).To(Succeed()) - env2 := buildLogEnvelope("CELL", "5", "log from cell", loggregator_v2.Log_ERR) - Expect(writer.Write(env2)).To(Succeed()) - env3 := buildLogEnvelope("CELL", "", "log from cell", loggregator_v2.Log_ERR) - Expect(writer.Write(env3)).To(Succeed()) - - Expect(drain.messages).To(HaveLen(3)) - expected := &rfc5424.Message{ - AppName: "test-app-id", - Hostname: "test-hostname", - Priority: rfc5424.Priority(14), - ProcessID: "[APP/1]", - Message: []byte("just a test\n"), - } - Expect(drain.messages[0].AppName).To(Equal(expected.AppName)) - Expect(drain.messages[0].Hostname).To(Equal(expected.Hostname)) - Expect(drain.messages[0].Priority).To(BeEquivalentTo(expected.Priority)) - Expect(drain.messages[0].ProcessID).To(Equal(expected.ProcessID)) - Expect(drain.messages[0].Message).To(Equal(expected.Message)) - - expected = &rfc5424.Message{ - AppName: "test-app-id", - Hostname: "test-hostname", - Priority: rfc5424.Priority(11), - ProcessID: "[CELL/5]", - Message: []byte("log from cell\n"), - } - Expect(drain.messages[1].AppName).To(Equal(expected.AppName)) - Expect(drain.messages[1].Hostname).To(Equal(expected.Hostname)) - Expect(drain.messages[1].Priority).To(BeEquivalentTo(expected.Priority)) - Expect(drain.messages[1].ProcessID).To(Equal(expected.ProcessID)) - Expect(drain.messages[1].Message).To(Equal(expected.Message)) - - Expect(drain.messages[2].ProcessID).To(Equal("[CELL]")) - }) - It("sets Content-Type to text/plain", func() { drain := newMockOKDrain() @@ -342,6 +288,7 @@ func newMockDrain(status int) *SpyDrain { Expect(err).ToNot(HaveOccurred()) defer r.Body.Close() + println(body) err = message.UnmarshalBinary(body) Expect(err).ToNot(HaveOccurred()) diff --git a/src/pkg/egress/syslog/syslog_connector.go b/src/pkg/egress/syslog/syslog_connector.go index eaaa56207..e487082da 100644 --- a/src/pkg/egress/syslog/syslog_connector.go +++ b/src/pkg/egress/syslog/syslog_connector.go @@ -166,12 +166,4 @@ func (w *SyslogConnector) emitLoggregatorErrorLog(appID, message string) { w.logClient.EmitLog(message, option) } func (w *SyslogConnector) emitStandardOutErrorLog(appID, scheme, url string, missed int) { - errorAppOrAggregate := fmt.Sprintf("for %s's app drain", appID) - if appID == "" { - errorAppOrAggregate = "for aggregate drain" - } - log.Printf( - "Dropped %d %s logs %s with url %s", - missed, scheme, errorAppOrAggregate, url, - ) } From 519dda659b94bc59be242cefd35fa75f3488e509 Mon Sep 17 00:00:00 2001 From: nicklas dohrn Date: Tue, 25 Jun 2024 14:01:50 +0200 Subject: [PATCH 8/9] Change batch dispatch implementation --- src/pkg/egress/syslog/https_batch.go | 89 ++++++++--------------- src/pkg/egress/syslog/https_batch_test.go | 40 +--------- src/pkg/egress/syslog/https_test.go | 1 - src/pkg/egress/syslog/syslog_connector.go | 8 ++ 4 files changed, 40 insertions(+), 98 deletions(-) diff --git a/src/pkg/egress/syslog/https_batch.go b/src/pkg/egress/syslog/https_batch.go index 06a394b57..874fc9a1f 100644 --- a/src/pkg/egress/syslog/https_batch.go +++ b/src/pkg/egress/syslog/https_batch.go @@ -14,10 +14,9 @@ const BATCHSIZE = 256 * 1024 type HTTPSBatchWriter struct { HTTPSWriter - msgBatch bytes.Buffer + msgs chan []byte batchSize int sendInterval time.Duration - sendTimer *TriggerTimer egrMsgCount float64 } @@ -30,7 +29,7 @@ func NewHTTPSBatchWriter( ) egress.WriteCloser { client := httpClient(netConf, tlsConf) binding.URL.Scheme = "https" // reset the scheme for usage to a valid http scheme - return &HTTPSBatchWriter{ + BatchWriter := &HTTPSBatchWriter{ HTTPSWriter: HTTPSWriter{ url: binding.URL, appID: binding.AppID, @@ -42,17 +41,10 @@ func NewHTTPSBatchWriter( batchSize: BATCHSIZE, sendInterval: 1 * time.Second, egrMsgCount: 0, + msgs: make(chan []byte), } -} - -func (w *HTTPSBatchWriter) sendMsgBatch() error { - currentEgrCount := w.egrMsgCount - currentMsg := w.msgBatch.Bytes() - - w.egrMsgCount = 0 - w.msgBatch.Reset() - - return w.sendHttpRequest(currentMsg, currentEgrCount) + go BatchWriter.startSender() + return BatchWriter } // Modified Write function @@ -63,55 +55,34 @@ func (w *HTTPSBatchWriter) Write(env *loggregator_v2.Envelope) error { } for _, msg := range msgs { - w.msgBatch.Write(msg) - w.egrMsgCount += 1 - w.startAndTriggerSend() + w.msgs <- msg } return nil } -// TODO: Error back propagation. Errors are not looked at further down the call chain -func (w *HTTPSBatchWriter) startAndTriggerSend() { - if w.sendTimer == nil || !w.sendTimer.Running() { - w.sendTimer = NewTriggerTimer(w.sendInterval, func() { - w.sendMsgBatch() - }) - } - if w.msgBatch.Len() >= w.batchSize { - w.sendTimer.Trigger() - } -} - -type TriggerTimer struct { - triggered bool - execFunc func() -} - -func NewTriggerTimer(d time.Duration, f func()) *TriggerTimer { - timer := &TriggerTimer{ - triggered: false, - execFunc: f, +func (w *HTTPSBatchWriter) startSender() { + t := time.NewTimer(w.sendInterval) + + var msgBatch bytes.Buffer + var msgCount float64 + for { + select { + case msg := <-w.msgs: + msgBatch.Write(msg) + msgCount++ + if msgBatch.Len() >= w.batchSize { + w.sendHttpRequest(msgBatch.Bytes(), msgCount) + msgBatch.Reset() + msgCount = 0 + t.Reset(w.sendInterval) + } + case <-t.C: + if msgBatch.Len() > 0 { + w.sendHttpRequest(msgBatch.Bytes(), msgCount) + msgBatch.Reset() + msgCount = 0 + } + t.Reset(w.sendInterval) + } } - timer.initWait(d) - - return timer -} - -func (t *TriggerTimer) initWait(duration time.Duration) { - timer := time.NewTimer(duration) - go func() { - <-timer.C - t.Trigger() - }() -} - -func (t *TriggerTimer) Trigger() { - if !t.triggered { - t.triggered = true - t.execFunc() - } -} - -func (t *TriggerTimer) Running() bool { - return !t.triggered } diff --git a/src/pkg/egress/syslog/https_batch_test.go b/src/pkg/egress/syslog/https_batch_test.go index 389283574..2a328f9b9 100644 --- a/src/pkg/egress/syslog/https_batch_test.go +++ b/src/pkg/egress/syslog/https_batch_test.go @@ -17,46 +17,8 @@ import ( . "github.com/onsi/gomega" ) -var triggered = 0 var string_to_1024_chars = "saljdflajsdssdfsdfljkfkajafjajlköflkjöjaklgljksdjlakljkflkjweljklkwjejlkfekljwlkjefjklwjklsdajkljklwerlkaskldgjksakjekjwrjkljasdjkgfkljwejklrkjlklasdkjlsadjlfjlkadfljkajklsdfjklslkdfjkllkjasdjkflsdlakfjklasldfkjlasdjfkjlsadlfjklaljsafjlslkjawjklerkjljklasjkdfjklwerjljalsdjkflwerjlkwejlkarjklalkklfsdjlfhkjsdfkhsewhkjjasdjfkhwkejrkjahjefkhkasdjhfkashfkjwehfkksadfjaskfkhjdshjfhewkjhasdfjdajskfjwehkfajkankaskjdfasdjhfkkjhjjkasdfjhkjahksdf" -var _ = Describe("TriggerTimer testing", func() { - BeforeEach(func() { - triggered = 0 - }) - - It("Timer triggered by call", func() { - timer := syslog.NewTriggerTimer(10*time.Millisecond, trigger) - timer.Trigger() - //expect timer to be triggered and therefore stopped - Expect(timer.Running()).To(BeFalse()) - Expect(triggered).To(Equal(1)) - time.Sleep(12 * time.Millisecond) - //expect timer to stay stopped and not trigger func again - Expect(timer.Running()).To(BeFalse()) - Expect(triggered).To(Equal(1)) - }) - - It("Timer triggered by time elapsed", func() { - timer := syslog.NewTriggerTimer(10*time.Millisecond, trigger) - //expect timer to be running and untriggered - Expect(timer.Running()).To(BeTrue()) - Expect(triggered).To(Equal(0)) - time.Sleep(12 * time.Millisecond) - //expect timer to be triggered and stopped - Expect(timer.Running()).To(BeFalse()) - Expect(triggered).To(Equal(1)) - //expect timer to not be able to be retriggered - timer.Trigger() - Expect(timer.Running()).To(BeFalse()) - Expect(triggered).To(Equal(1)) - }) -}) - -func trigger() { - triggered += 1 -} - var _ = Describe("HTTPS_batch_testing", func() { var ( netConf syslog.NetworkTimeoutConfig @@ -125,6 +87,7 @@ var _ = Describe("HTTPS_batch_testing", func() { for i := 0; i < 300; i++ { writer.Write(env1) } + time.Sleep(100 * time.Millisecond) Expect(drain.messages).To(HaveLen(256)) }) @@ -134,6 +97,7 @@ var _ = Describe("HTTPS_batch_testing", func() { writer.Write(env1) time.Sleep(99 * time.Millisecond) } + Expect(drain.messages).To(HaveLen(0)) time.Sleep(100 * time.Millisecond) Expect(drain.messages).To(HaveLen(10)) }) diff --git a/src/pkg/egress/syslog/https_test.go b/src/pkg/egress/syslog/https_test.go index 21bf138c5..ebf999067 100644 --- a/src/pkg/egress/syslog/https_test.go +++ b/src/pkg/egress/syslog/https_test.go @@ -78,7 +78,6 @@ var _ = Describe("HTTPWriter", func() { &metricsHelpers.SpyMetric{}, c, ) - env := buildLogEnvelope("APP", "1", "just a test", loggregator_v2.Log_OUT) Expect(writer.Write(env)).To(HaveOccurred()) }) diff --git a/src/pkg/egress/syslog/syslog_connector.go b/src/pkg/egress/syslog/syslog_connector.go index e487082da..eaaa56207 100644 --- a/src/pkg/egress/syslog/syslog_connector.go +++ b/src/pkg/egress/syslog/syslog_connector.go @@ -166,4 +166,12 @@ func (w *SyslogConnector) emitLoggregatorErrorLog(appID, message string) { w.logClient.EmitLog(message, option) } func (w *SyslogConnector) emitStandardOutErrorLog(appID, scheme, url string, missed int) { + errorAppOrAggregate := fmt.Sprintf("for %s's app drain", appID) + if appID == "" { + errorAppOrAggregate = "for aggregate drain" + } + log.Printf( + "Dropped %d %s logs %s with url %s", + missed, scheme, errorAppOrAggregate, url, + ) } From 3a62859fa8c0ab7eff7cbcde14426c44daa9a35d Mon Sep 17 00:00:00 2001 From: nicklas dohrn Date: Mon, 1 Jul 2024 10:40:18 +0200 Subject: [PATCH 9/9] Add https-batch to allowed formats --- src/pkg/ingress/bindings/filtered_binding_fetcher.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/pkg/ingress/bindings/filtered_binding_fetcher.go b/src/pkg/ingress/bindings/filtered_binding_fetcher.go index 6fdadeff2..ad244d98c 100644 --- a/src/pkg/ingress/bindings/filtered_binding_fetcher.go +++ b/src/pkg/ingress/bindings/filtered_binding_fetcher.go @@ -11,7 +11,7 @@ import ( "code.cloudfoundry.org/loggregator-agent-release/src/pkg/egress/syslog" ) -var allowedSchemes = []string{"syslog", "syslog-tls", "https"} +var allowedSchemes = []string{"syslog", "syslog-tls", "https", "https-batch"} type IPChecker interface { ResolveAddr(host string) (net.IP, error)