Skip to content

Commit

Permalink
feat: Improved log output for transports to display more meaningful c…
Browse files Browse the repository at this point in the history
…onnection details in some instances
  • Loading branch information
driskell committed Feb 11, 2023
1 parent 05fcd48 commit 1a14888
Show file tree
Hide file tree
Showing 11 changed files with 34 additions and 31 deletions.
4 changes: 2 additions & 2 deletions lc-lib/core/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (e *ExpBackoff) Trigger() time.Duration {

// Did we recover for long enough? Reset delay
if e.expCount != 0 && time.Since(e.lastTrigger) > nextDelay {
log.Debug("[%s] Backoff had recovered, resetting failured count", e.name)
log.Debug("%s - Backoff had recovered, resetting failured count", e.name)
nextDelay = e.calculateDelay(0)
e.expCount = 0
}
Expand All @@ -79,7 +79,7 @@ func (e *ExpBackoff) Trigger() time.Duration {
nextDelay = e.maxDelay
}

log.Debug("[%s] Backoff (%d failures): %v", e.name, e.expCount, nextDelay)
log.Debug("%s - Backoff (%d failures): %v", e.name, e.expCount, nextDelay)
return nextDelay
}

Expand Down
3 changes: 2 additions & 1 deletion lc-lib/publisher/endpoint/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ func (e *Endpoint) Init() {
e.ctx = context.WithValue(context.Background(), ContextSelf, e)

e.warming = true
e.backoff = core.NewExpBackoff(e.server+" Failure", e.sink.config.Backoff, e.sink.config.BackoffMax)
backoffName := fmt.Sprintf("[E %s] Failure", e.server)
e.backoff = core.NewExpBackoff(backoffName, e.sink.config.Backoff, e.sink.config.BackoffMax)

e.readyElement.Value = e
e.failedElement.Value = e
Expand Down
8 changes: 4 additions & 4 deletions lc-lib/publisher/method_failover.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,14 @@ func (m *methodFailover) onFail(endpoint *endpoint.Endpoint) {

// Current endpoint failed, are all failed? We'd have to ignore
if m.sink.Count() == len(m.netConfig.Servers) {
log.Warning("[Failover] All endpoints have failed, awaiting recovery")
log.Warning("[P Failover] All endpoints have failed, awaiting recovery")
return
}

// Add on extra endpoints
m.failoverPosition++
newServer := m.netConfig.Servers[m.failoverPosition]
log.Warning("[Failover] Initiating failover to: %s", newServer)
log.Warning("[P Failover] Initiating failover to: %s", newServer)

// Check it's not already there (it may be still shutting down from a previous
// recovery)
Expand Down Expand Up @@ -102,7 +102,7 @@ func (m *methodFailover) onStarted(endpoint *endpoint.Endpoint) {

// This is the best endpoint, use it, close all later endpoints
m.currentEndpoint = endpoint
log.Info("[Failover] A higher priority endpoint has recovered: %s", endpoint.Server())
log.Info("[P Failover] A higher priority endpoint has recovered: %s", endpoint.Server())

for next := endpoint.Next(); next != nil; next = next.Next() {
m.sink.ShutdownEndpoint(next.Server())
Expand Down Expand Up @@ -134,7 +134,7 @@ func (m *methodFailover) reloadConfig(netConfig *transports.Config) {

// If there was no current, we're initialising, use this one
if m.currentEndpoint == nil {
log.Info("[Failover] Initialised priority endpoint: %s", last.Server())
log.Info("[P Failover] Initialised priority endpoint: %s", last.Server())
m.currentEndpoint = last
foundCurrent = true
}
Expand Down
2 changes: 1 addition & 1 deletion lc-lib/publisher/method_loadbalance.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (m *methodLoadbalance) reloadConfig(netConfig *transports.Config) {
m.netConfig.AddressPools[n],
last,
)
log.Info("[Loadbalance] Initialised new endpoint: %s", last.Server())
log.Info("[P Loadbalance] Initialised new endpoint: %s", last.Server())
continue
}

Expand Down
10 changes: 5 additions & 5 deletions lc-lib/publisher/method_random.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func newMethodRandom(sink *endpoint.Sink, netConfig *transports.Config) *methodR
generator: rand.New(rand.NewSource(int64(time.Now().Nanosecond()))),
}

ret.backoff = core.NewExpBackoff("Random", ret.netConfig.Backoff, ret.netConfig.BackoffMax)
ret.backoff = core.NewExpBackoff("[P Random]", ret.netConfig.Backoff, ret.netConfig.BackoffMax)

if sink.Count() == 0 {
// Empty sink, connect to a random endpoint
Expand Down Expand Up @@ -70,7 +70,7 @@ func newMethodRandom(sink *endpoint.Sink, netConfig *transports.Config) *methodR
ret.activeServer = k
foundAcceptable = true

log.Debug("[Random] Utilising existing endpoint connection: %s", server)
log.Debug("[P Random] Utilising existing endpoint connection: %s", server)

// Reload it
endpoint.ReloadConfig(netConfig)
Expand All @@ -82,7 +82,7 @@ func newMethodRandom(sink *endpoint.Sink, netConfig *transports.Config) *methodR
// should have removed endpoints that don't exist in the configuration, or
// at the very least placed them into a closing status
if !foundAcceptable {
log.Warning("[Random] Method reload discovered inconsistent Endpoint status: %s", endpoint.Server())
log.Warning("[P Random] Method reload discovered inconsistent Endpoint status: %s", endpoint.Server())
sink.ShutdownEndpoint(endpoint.Server())
}
}
Expand Down Expand Up @@ -119,7 +119,7 @@ func (m *methodRandom) connectRandom() {
}
}

log.Info("[Random] Randomly selected new endpoint: %s", server)
log.Info("[P Random] Randomly selected new endpoint: %s", server)

m.sink.AddEndpoint(server, addressPool)
}
Expand All @@ -128,7 +128,7 @@ func (m *methodRandom) onFail(endpoint *endpoint.Endpoint) {
// Failed endpoint - keep it alive until backoff triggers new connection
// This way we still have an endpoint with a last error in monitor
m.sink.Scheduler.SetCallback(m, m.backoff.Trigger(), func() {
log.Warning("[Random] Giving up on failed endpoint: %s", endpoint.Server())
log.Warning("[P Random] Giving up on failed endpoint: %s", endpoint.Server())
m.sink.ShutdownEndpoint(endpoint.Server())
})
}
Expand Down
16 changes: 8 additions & 8 deletions lc-lib/transports/es/transportes.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func (t *transportES) controllerRoutine() {

// setupAssociation gathers cluster information and installs templates
func (t *transportES) setupAssociation() bool {
backoffName := fmt.Sprintf("%s Setup Retry", t.pool.Server())
backoffName := fmt.Sprintf("[T %s] Setup Retry", t.pool.Server())
backoff := core.NewExpBackoff(backoffName, t.config.Retry, t.config.RetryMax)

for {
Expand Down Expand Up @@ -170,7 +170,7 @@ func (t *transportES) populateNodeInfo(addr *addresspool.Address) error {
return fmt.Errorf("failed to calculate maximum version number for cluster: %s", err)
}

log.Infof("[T %s] Successfully retrieved Elasticsearch node information (major version: %d)", t.pool.Server(), t.maxMajorVersion)
log.Infof("[T %s] Successfully retrieved Elasticsearch node information (major version: %d)", addr.Desc(), t.maxMajorVersion)

return nil
}
Expand Down Expand Up @@ -236,7 +236,7 @@ func (t *transportES) installTemplate(addr *addresspool.Address) error {
return fmt.Errorf("unexpected status: %s [Body: %s]", httpResponse.Status, body)
}

log.Infof("[T %s] Successfully installed Elasticsearch index template: %s", t.pool.Server(), name)
log.Infof("[T %s] Successfully installed Elasticsearch index template: %s", addr.Desc(), name)

return nil
}
Expand Down Expand Up @@ -284,7 +284,7 @@ func (t *transportES) httpRoutine(id int) {
case payload := <-t.payloadChan:
if payload == nil {
// Graceful shutdown
log.Infof("[T %s - %d] Elasticsearch routine stopped gracefully", t.pool.Server(), id)
log.Infof("[T %s]{%d} Elasticsearch routine stopped gracefully", t.pool.Server(), id)
return
}

Expand Down Expand Up @@ -343,7 +343,7 @@ func (t *transportES) performBulkRequest(addr *addresspool.Address, id int, requ
} else {
url = fmt.Sprintf("/%s/_doc/_bulk", defaultIndex)
}
log.Debugf("[T %s - %d] Performing Elasticsearch bulk request of %d events via %s", t.pool.Server(), id, request.Remaining(), url)
log.Debugf("[T %s]{%d} Performing Elasticsearch bulk request of %d events to %s", addr.Desc(), id, request.Remaining(), url)

request.Reset()
bodyBuffer := new(bytes.Buffer)
Expand Down Expand Up @@ -381,14 +381,14 @@ func (t *transportES) performBulkRequest(addr *addresspool.Address, id int, requ

if len(response.Errors) != 0 {
for _, errorValue := range response.Errors {
log.Warningf("[T %s - %d] Failed to index event: %s", t.pool.Server(), id, errorValue.Error())
log.Warningf("[T %s]{%d} Failed to index event: %s", addr.Desc(), id, errorValue.Error())
}
}

if request.Remaining() == 0 {
log.Debugf("[T %s - %d] Elasticsearch request complete (took %dms; created %d; errors %d)", t.pool.Server(), id, response.Took, request.Created()-created, len(response.Errors))
log.Debugf("[T %s]{%d} Elasticsearch request complete (took %dms; created %d; errors %d)", addr.Desc(), id, response.Took, request.Created()-created, len(response.Errors))
} else {
log.Warningf("[T %s - %d] Elasticsearch request partially complete (took %dms; created %d; errors %d; retrying %d)", t.pool.Server(), id, response.Took, request.Created()-created, len(response.Errors), request.Remaining())
log.Warningf("[T %s]{%d} Elasticsearch request partially complete (took %dms; created %d; errors %d; retrying %d)", addr.Desc(), id, response.Took, request.Created()-created, len(response.Errors), request.Remaining())
}
return nil
}
Expand Down
10 changes: 4 additions & 6 deletions lc-lib/transports/tcp/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ type connection struct {
shutdownFunc context.CancelFunc
socket connectionSocket
protocol Protocol
poolServer string
isClient bool
eventChan chan<- transports.Event
sendChan chan ProtocolMessage
Expand All @@ -63,12 +62,11 @@ type connection struct {
sendShutdownLock sync.RWMutex
}

func newConnection(ctx context.Context, socket connectionSocket, protocolFactory ProtocolFactory, poolServer string, isClient bool, eventChan chan<- transports.Event) *connection {
func newConnection(ctx context.Context, socket connectionSocket, protocolFactory ProtocolFactory, isClient bool, eventChan chan<- transports.Event) *connection {
ret := &connection{
socket: socket,
poolServer: poolServer,
isClient: isClient,
eventChan: eventChan,
socket: socket,
isClient: isClient,
eventChan: eventChan,
// TODO: Make configurable. Allow up to 100 pending messages.
// This will cope with a max pending payload size of 100 for each connection by allowing 100 acks to be queued
sendChan: make(chan ProtocolMessage, 100),
Expand Down
4 changes: 3 additions & 1 deletion lc-lib/transports/tcp/receiverfactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package tcp

import (
"context"
"fmt"
"time"

"github.com/driskell/log-courier/lc-lib/addresspool"
Expand Down Expand Up @@ -55,14 +56,15 @@ func (f *ReceiverFactory) NewReceiver(context.Context, *addresspool.Pool, chan<-

// NewReceiverWithProtocol creates a new receiver with the given protocol
func (f *ReceiverFactory) NewReceiverWithProtocol(ctx context.Context, pool *addresspool.Pool, eventChan chan<- transports.Event, protocolFactory ProtocolFactory) transports.Receiver {
backoffName := fmt.Sprintf("[R %s] Receiver Reset", pool.Server())
ret := &receiverTCP{
config: f,
pool: pool,
eventChan: eventChan,
connections: make(map[*connection]*connection),
shutdownChan: make(chan struct{}),
// TODO: Own values
backoff: core.NewExpBackoff(pool.Server()+" Receiver Reset", 0, 300*time.Second),
backoff: core.NewExpBackoff(backoffName, 0, 300*time.Second),
protocolFactory: protocolFactory,
}

Expand Down
2 changes: 1 addition & 1 deletion lc-lib/transports/tcp/receivertcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ func (t *receiverTCP) startConnection(socket *net.TCPConn) {
connectionSocket = newConnectionSocketTCP(socket)
}

conn := newConnection(t.ctx, connectionSocket, t.protocolFactory, t.pool.Server(), false, t.eventChan)
conn := newConnection(t.ctx, connectionSocket, t.protocolFactory, false, t.eventChan)

t.connMutex.Lock()
t.connections[conn] = conn
Expand Down
4 changes: 3 additions & 1 deletion lc-lib/transports/tcp/transportfactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package tcp

import (
"context"
"fmt"
"time"

"github.com/driskell/log-courier/lc-lib/addresspool"
Expand Down Expand Up @@ -65,14 +66,15 @@ func (f *TransportFactory) NewTransport(context.Context, *addresspool.Pool, chan
func (f *TransportFactory) NewTransportWithProtocol(ctx context.Context, pool *addresspool.Pool, eventChan chan<- transports.Event, protocolFactory ProtocolFactory) transports.Transport {
cancelCtx, shutdownFunc := context.WithCancel(ctx)

backoffName := fmt.Sprintf("[T %s] Reconnect", pool.Server())
ret := &transportTCP{
ctx: cancelCtx,
shutdownFunc: shutdownFunc,
config: f,
netConfig: transports.FetchConfig(f.config),
pool: pool,
eventChan: eventChan,
backoff: core.NewExpBackoff(pool.Server()+" Reconnect", f.Reconnect, f.ReconnectMax),
backoff: core.NewExpBackoff(backoffName, f.Reconnect, f.ReconnectMax),
protocolFactory: protocolFactory,
}

Expand Down
2 changes: 1 addition & 1 deletion lc-lib/transports/tcp/transporttcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func (t *transportTCP) connect() (*connection, error) {
connectionSocket = newConnectionSocketTCP(socket.(*net.TCPConn))
}

conn := newConnection(t.ctx, connectionSocket, t.protocolFactory, t.pool.Server(), true, t.eventChan)
conn := newConnection(t.ctx, connectionSocket, t.protocolFactory, true, t.eventChan)

log.Noticef("[T %s - %s] Connected", socket.LocalAddr().String(), socket.RemoteAddr().String())
return conn, nil
Expand Down

0 comments on commit 1a14888

Please sign in to comment.