Skip to content

Commit

Permalink
Refactor approach to use different protocol instead of parameter.
Browse files Browse the repository at this point in the history
This is a new approach to switch between http and http batching.
It only is different in this regard from the previous attempts,
and only contains refactorings besides this change.
  • Loading branch information
nicklas-dohrn committed May 6, 2024
1 parent f206257 commit 35104c5
Show file tree
Hide file tree
Showing 5 changed files with 136 additions and 113 deletions.
79 changes: 7 additions & 72 deletions src/pkg/egress/syslog/https.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package syslog

import (
"bytes"
"crypto/tls"
"errors"
"fmt"
Expand All @@ -15,8 +14,6 @@ import (
"github.com/valyala/fasthttp"
)

const BATCHSIZE = 256 * 1024

type HTTPSWriter struct {
hostname string
appID string
Expand All @@ -26,15 +23,6 @@ type HTTPSWriter struct {
syslogConverter *Converter
}

type BatchHTTPSWriter struct {
HTTPSWriter
msgBatch bytes.Buffer
batchSize int
sendInterval time.Duration
sendTimer TriggerTimer
egrMsgCount float64
}

func NewHTTPSWriter(
binding *URLBinding,
netConf NetworkTimeoutConfig,
Expand All @@ -44,66 +32,13 @@ func NewHTTPSWriter(
) egress.WriteCloser {

client := httpClient(netConf, tlsConf)
if binding.URL.Query().Get("batching") == "true" {
return &BatchHTTPSWriter{
HTTPSWriter: HTTPSWriter{
url: binding.URL,
appID: binding.AppID,
hostname: binding.Hostname,
client: client,
egressMetric: egressMetric,
syslogConverter: c,
},
batchSize: BATCHSIZE,
sendInterval: time.Second,
egrMsgCount: 0,
}
} else {
return &HTTPSWriter{
url: binding.URL,
appID: binding.AppID,
hostname: binding.Hostname,
client: client,
egressMetric: egressMetric,
syslogConverter: c,
}
}
}

func (w *BatchHTTPSWriter) sendMsgBatch() error {
currentEgrCount := w.egrMsgCount
currentMsg := w.msgBatch.Bytes()

w.egrMsgCount = 0
w.msgBatch.Reset()

return w.sendHttpRequest(currentMsg, currentEgrCount)
}

// Modified Write function
func (w *BatchHTTPSWriter) Write(env *loggregator_v2.Envelope) error {
msgs, err := w.syslogConverter.ToRFC5424(env, w.hostname)
if err != nil {
return err
}

for _, msg := range msgs {
w.msgBatch.Write(msg)
w.egrMsgCount += 1
w.startAndTriggerSend()
}
return nil
}

// TODO: Error back propagation. Errors are not looked at further down the call chain
func (w *BatchHTTPSWriter) startAndTriggerSend() {
if !w.sendTimer.Running() {
w.sendTimer.Start(w.sendInterval, func() {
w.sendMsgBatch()
})
}
if w.msgBatch.Len() >= w.batchSize {
w.sendTimer.Trigger()
return &HTTPSWriter{
url: binding.URL,
appID: binding.AppID,
hostname: binding.Hostname,
client: client,
egressMetric: egressMetric,
syslogConverter: c,
}
}

Expand Down
119 changes: 119 additions & 0 deletions src/pkg/egress/syslog/https_batch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package syslog

import (
"bytes"
"crypto/tls"
"time"

"code.cloudfoundry.org/go-loggregator/v9/rpc/loggregator_v2"
metrics "code.cloudfoundry.org/go-metric-registry"
"code.cloudfoundry.org/loggregator-agent-release/src/pkg/egress"
)

const BATCHSIZE = 256 * 1024

type HTTPSBatchWriter struct {
HTTPSWriter
msgBatch bytes.Buffer
batchSize int
sendInterval time.Duration
sendTimer TriggerTimer
egrMsgCount float64
}

func NewHTTPSBatchWriter(
binding *URLBinding,
netConf NetworkTimeoutConfig,
tlsConf *tls.Config,
egressMetric metrics.Counter,
c *Converter,
) egress.WriteCloser {
client := httpClient(netConf, tlsConf)
binding.URL.Scheme = "https" // reset the scheme for usage to a valid http scheme
return &HTTPSBatchWriter{
HTTPSWriter: HTTPSWriter{
url: binding.URL,
appID: binding.AppID,
hostname: binding.Hostname,
client: client,
egressMetric: egressMetric,
syslogConverter: c,
},
batchSize: BATCHSIZE,
sendInterval: time.Second,
egrMsgCount: 0,
}
}

func (w *HTTPSBatchWriter) sendMsgBatch() error {
currentEgrCount := w.egrMsgCount
currentMsg := w.msgBatch.Bytes()

w.egrMsgCount = 0
w.msgBatch.Reset()

return w.sendHttpRequest(currentMsg, currentEgrCount)
}

// Modified Write function
func (w *HTTPSBatchWriter) Write(env *loggregator_v2.Envelope) error {
msgs, err := w.syslogConverter.ToRFC5424(env, w.hostname)
if err != nil {
return err
}

for _, msg := range msgs {
w.msgBatch.Write(msg)
w.egrMsgCount += 1
w.startAndTriggerSend()
}
return nil
}

// TODO: Error back propagation. Errors are not looked at further down the call chain
func (w *HTTPSBatchWriter) startAndTriggerSend() {
if !w.sendTimer.Running() {
w.sendTimer.Start(w.sendInterval, func() {
w.sendMsgBatch()
})
}
if w.msgBatch.Len() >= w.batchSize {
w.sendTimer.Trigger()
}
}

type TriggerTimer struct {
trigger chan int
running bool
}

type Timer interface {
Start(d time.Duration, f func())
}

func NewTriggerTimer() Timer {
return &TriggerTimer{
running: false,
}
}

func (t *TriggerTimer) Start(d time.Duration, f func()) {
t.running = true
for {
timer := time.NewTimer(d)
select {
case <-timer.C:
case <-t.trigger:
f()
t.running = false
}
}
}

func (t *TriggerTimer) Trigger() {
t.trigger <- 1
}

func (t *TriggerTimer) Running() bool {
return t.running
}
39 changes: 0 additions & 39 deletions src/pkg/egress/syslog/triggerTimer.go

This file was deleted.

8 changes: 8 additions & 0 deletions src/pkg/egress/syslog/writer_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,14 @@ func (f WriterFactory) NewWriter(ub *URLBinding) (egress.WriteCloser, error) {
egressMetric,
converter,
)
case "https-batch":
w = NewHTTPSBatchWriter(
ub,
f.netConf,
tlsCfg,
egressMetric,
converter,
)
case "syslog":
w = NewTCPWriter(
ub,
Expand Down
4 changes: 2 additions & 2 deletions src/pkg/egress/syslog/writer_factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ var _ = Describe("EgressFactory", func() {

Context("when the url begins with https and enables batching", func() {
It("returns an single https writer", func() {
url, err := url.Parse("https://syslog.example.com?batching=true")
url, err := url.Parse("https-batch://syslog.example.com")
Expect(err).ToNot(HaveOccurred())
urlBinding := &syslog.URLBinding{
URL: url,
Expand All @@ -55,7 +55,7 @@ var _ = Describe("EgressFactory", func() {
retryWriter, ok := writer.(*syslog.RetryWriter)
Expect(ok).To(BeTrue())

_, ok = retryWriter.Writer.(*syslog.BatchHTTPSWriter)
_, ok = retryWriter.Writer.(*syslog.HTTPSBatchWriter)
Expect(ok).To(BeTrue())
})
})
Expand Down

0 comments on commit 35104c5

Please sign in to comment.