Skip to content

Commit

Permalink
feat(plc4go/spi): use more local loggers
Browse files Browse the repository at this point in the history
  • Loading branch information
sruehl committed May 31, 2023
1 parent a4f5155 commit 7c14c99
Show file tree
Hide file tree
Showing 20 changed files with 251 additions and 630 deletions.
2 changes: 1 addition & 1 deletion plc4go/internal/cbus/Browser.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func NewBrowser(connection plc4go.PlcConnection, _options ...options.WithOption)

log: options.ExtractCustomLogger(_options...),
}
browser.DefaultBrowser = _default.NewDefaultBrowser(browser)
browser.DefaultBrowser = _default.NewDefaultBrowser(browser, _options...)
return &browser
}

Expand Down
5 changes: 3 additions & 2 deletions plc4go/internal/knxnetip/Browser.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"context"
"encoding/hex"
"fmt"
"github.com/apache/plc4x/plc4go/spi/options"
"strconv"
"strings"
"time"
Expand All @@ -46,13 +47,13 @@ type Browser struct {
sequenceCounter uint8
}

func NewBrowser(connection *Connection, messageCodec spi.MessageCodec) *Browser {
func NewBrowser(connection *Connection, messageCodec spi.MessageCodec, _options ...options.WithOption) *Browser {
browser := Browser{
connection: connection,
messageCodec: messageCodec,
sequenceCounter: 0,
}
browser.DefaultBrowser = _default.NewDefaultBrowser(browser)
browser.DefaultBrowser = _default.NewDefaultBrowser(browser, _options...)
return &browser
}

Expand Down
4 changes: 3 additions & 1 deletion plc4go/internal/knxnetip/Connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"context"
"encoding/hex"
"fmt"
"github.com/apache/plc4x/plc4go/spi/options"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -165,7 +166,7 @@ type InternalResult struct {
err error
}

func NewConnection(transportInstance transports.TransportInstance, options map[string][]string, tagHandler spi.PlcTagHandler) *Connection {
func NewConnection(transportInstance transports.TransportInstance, options map[string][]string, tagHandler spi.PlcTagHandler, _options ...options.WithOption) *Connection {
connection := &Connection{
options: options,
tagHandler: tagHandler,
Expand All @@ -175,6 +176,7 @@ func NewConnection(transportInstance transports.TransportInstance, options map[s
spiModel.NewDefaultPlcWriteRequest,
spiModel.NewDefaultPlcReadResponse,
spiModel.NewDefaultPlcWriteResponse,
_options...,
),
subscribers: []*Subscriber{},
valueCache: map[uint16][]byte{},
Expand Down
4 changes: 3 additions & 1 deletion plc4go/internal/modbus/Connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package modbus
import (
"context"
"fmt"
"github.com/apache/plc4x/plc4go/spi/options"
"time"

"github.com/apache/plc4x/plc4go/pkg/api"
Expand All @@ -47,7 +48,7 @@ type Connection struct {
tracer *spi.Tracer
}

func NewConnection(unitIdentifier uint8, messageCodec spi.MessageCodec, options map[string][]string, tagHandler spi.PlcTagHandler) *Connection {
func NewConnection(unitIdentifier uint8, messageCodec spi.MessageCodec, options map[string][]string, tagHandler spi.PlcTagHandler, _options ...options.WithOption) *Connection {
connection := &Connection{
unitIdentifier: unitIdentifier,
messageCodec: messageCodec,
Expand All @@ -57,6 +58,7 @@ func NewConnection(unitIdentifier uint8, messageCodec spi.MessageCodec, options
spiModel.NewDefaultPlcWriteRequest,
spiModel.NewDefaultPlcReadResponse,
spiModel.NewDefaultPlcWriteResponse,
_options...,
),
}
if traceEnabledOption, ok := options["traceEnabled"]; ok {
Expand Down
14 changes: 9 additions & 5 deletions plc4go/spi/default/DefaultBrowser.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ package _default

import (
"context"
"github.com/rs/zerolog/log"

apiModel "github.com/apache/plc4x/plc4go/pkg/api/model"
"github.com/apache/plc4x/plc4go/spi"
spiModel "github.com/apache/plc4x/plc4go/spi/model"
"github.com/apache/plc4x/plc4go/spi/options"
"github.com/rs/zerolog"
)

// DefaultBrowserRequirements adds required methods to Browser that are needed when using DefaultBrowser
Expand All @@ -37,9 +37,11 @@ type DefaultBrowser interface {
spi.PlcBrowser
}

func NewDefaultBrowser(defaultBrowserRequirements DefaultBrowserRequirements) DefaultBrowser {
func NewDefaultBrowser(defaultBrowserRequirements DefaultBrowserRequirements, _options ...options.WithOption) DefaultBrowser {
return &defaultBrowser{
defaultBrowserRequirements,
DefaultBrowserRequirements: defaultBrowserRequirements,

log: options.ExtractCustomLogger(_options...),
}
}

Expand All @@ -51,6 +53,8 @@ func NewDefaultBrowser(defaultBrowserRequirements DefaultBrowserRequirements) De

type defaultBrowser struct {
DefaultBrowserRequirements

log zerolog.Logger
}

//
Expand All @@ -70,7 +74,7 @@ func (m *defaultBrowser) BrowseWithInterceptor(ctx context.Context, browseReques
go func() {
defer func() {
if err := recover(); err != nil {
log.Error().Interface("err", err).Msg("caught panic")
m.log.Error().Interface("err", err).Msg("caught panic")
}
}()
responseCodes := map[string]apiModel.PlcResponseCode{}
Expand Down
51 changes: 30 additions & 21 deletions plc4go/spi/interceptors/SingleItemRequestInterceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ import (
apiModel "github.com/apache/plc4x/plc4go/pkg/api/model"
"github.com/apache/plc4x/plc4go/pkg/api/values"
"github.com/apache/plc4x/plc4go/spi"
"github.com/apache/plc4x/plc4go/spi/options"
"github.com/apache/plc4x/plc4go/spi/utils"
"github.com/rs/zerolog/log"
"github.com/rs/zerolog"
)

type ReaderExposer interface {
Expand Down Expand Up @@ -74,10 +75,18 @@ type SingleItemRequestInterceptor struct {
writeRequestFactory writeRequestFactory
readResponseFactory readResponseFactory
writeResponseFactory writeResponseFactory

log zerolog.Logger
}

func NewSingleItemRequestInterceptor(readRequestFactory readRequestFactory, writeRequestFactory writeRequestFactory, readResponseFactory readResponseFactory, writeResponseFactory writeResponseFactory) SingleItemRequestInterceptor {
return SingleItemRequestInterceptor{readRequestFactory, writeRequestFactory, readResponseFactory, writeResponseFactory}
func NewSingleItemRequestInterceptor(readRequestFactory readRequestFactory, writeRequestFactory writeRequestFactory, readResponseFactory readResponseFactory, writeResponseFactory writeResponseFactory, _options ...options.WithOption) SingleItemRequestInterceptor {
return SingleItemRequestInterceptor{
readRequestFactory: readRequestFactory,
writeRequestFactory: writeRequestFactory,
readResponseFactory: readResponseFactory,
writeResponseFactory: writeResponseFactory,
log: options.ExtractCustomLogger(_options...),
}
}

///////////////////////////////////////
Expand Down Expand Up @@ -134,18 +143,18 @@ func (m SingleItemRequestInterceptor) InterceptReadRequest(ctx context.Context,
}
// If this request just has one tag, go the shortcut
if len(readRequest.GetTagNames()) == 1 {
log.Debug().Msg("We got only one request, no splitting required")
m.log.Debug().Msg("We got only one request, no splitting required")
return []apiModel.PlcReadRequest{readRequest}
}
log.Trace().Msg("Splitting requests")
m.log.Trace().Msg("Splitting requests")
// In all other cases, create a new read request containing only one item
var readRequests []apiModel.PlcReadRequest
for _, tagName := range readRequest.GetTagNames() {
if err := ctx.Err(); err != nil {
log.Warn().Err(err).Msg("aborting early")
m.log.Warn().Err(err).Msg("aborting early")
return nil
}
log.Debug().Str("tagName", tagName).Msg("Splitting into own request")
m.log.Debug().Str("tagName", tagName).Msg("Splitting into own request")
tag := readRequest.GetTag(tagName)
subReadRequest := m.readRequestFactory(
map[string]apiModel.PlcTag{tagName: tag},
Expand All @@ -160,16 +169,16 @@ func (m SingleItemRequestInterceptor) InterceptReadRequest(ctx context.Context,

func (m SingleItemRequestInterceptor) ProcessReadResponses(ctx context.Context, readRequest apiModel.PlcReadRequest, readResults []apiModel.PlcReadRequestResult) apiModel.PlcReadRequestResult {
if len(readResults) == 1 {
log.Debug().Msg("We got only one response, no merging required")
m.log.Debug().Msg("We got only one response, no merging required")
return readResults[0]
}
log.Trace().Msg("Merging requests")
m.log.Trace().Msg("Merging requests")
responseCodes := map[string]apiModel.PlcResponseCode{}
val := map[string]values.PlcValue{}
var err error = nil
for _, readResult := range readResults {
if ctxErr := ctx.Err(); ctxErr != nil {
log.Warn().Err(ctxErr).Msg("aborting early")
m.log.Warn().Err(ctxErr).Msg("aborting early")
if err != nil {
multiError := err.(utils.MultiError)
multiError.Errors = append(multiError.Errors, ctxErr)
Expand All @@ -179,7 +188,7 @@ func (m SingleItemRequestInterceptor) ProcessReadResponses(ctx context.Context,
break
}
if readResult.GetErr() != nil {
log.Debug().Err(readResult.GetErr()).Msgf("Error during read")
m.log.Debug().Err(readResult.GetErr()).Msgf("Error during read")
if err == nil {
// Lazy initialization of multi error
err = utils.MultiError{MainError: errors.New("while aggregating results"), Errors: []error{readResult.GetErr()}}
Expand All @@ -190,7 +199,7 @@ func (m SingleItemRequestInterceptor) ProcessReadResponses(ctx context.Context,
} else if response := readResult.GetResponse(); response != nil {
request := response.GetRequest()
if len(request.GetTagNames()) > 1 {
log.Error().Int("numberOfTags", len(request.GetTagNames())).Msg("We should only get 1")
m.log.Error().Int("numberOfTags", len(request.GetTagNames())).Msg("We should only get 1")
}
for _, tagName := range request.GetTagNames() {
responseCodes[tagName] = response.GetResponseCode(tagName)
Expand All @@ -211,18 +220,18 @@ func (m SingleItemRequestInterceptor) InterceptWriteRequest(ctx context.Context,
}
// If this request just has one tag, go the shortcut
if len(writeRequest.GetTagNames()) == 1 {
log.Debug().Msg("We got only one request, no splitting required")
m.log.Debug().Msg("We got only one request, no splitting required")
return []apiModel.PlcWriteRequest{writeRequest}
}
log.Trace().Msg("Splitting requests")
m.log.Trace().Msg("Splitting requests")
// In all other cases, create a new write request containing only one item
var writeRequests []apiModel.PlcWriteRequest
for _, tagName := range writeRequest.GetTagNames() {
if err := ctx.Err(); err != nil {
log.Warn().Err(err).Msg("aborting early")
m.log.Warn().Err(err).Msg("aborting early")
return nil
}
log.Debug().Str("tagName", tagName).Msg("Splitting into own request")
m.log.Debug().Str("tagName", tagName).Msg("Splitting into own request")
tag := writeRequest.GetTag(tagName)
subWriteRequest := m.writeRequestFactory(
map[string]apiModel.PlcTag{tagName: tag},
Expand All @@ -238,15 +247,15 @@ func (m SingleItemRequestInterceptor) InterceptWriteRequest(ctx context.Context,

func (m SingleItemRequestInterceptor) ProcessWriteResponses(ctx context.Context, writeRequest apiModel.PlcWriteRequest, writeResults []apiModel.PlcWriteRequestResult) apiModel.PlcWriteRequestResult {
if len(writeResults) == 1 {
log.Debug().Msg("We got only one response, no merging required")
m.log.Debug().Msg("We got only one response, no merging required")
return writeResults[0]
}
log.Trace().Msg("Merging requests")
m.log.Trace().Msg("Merging requests")
responseCodes := map[string]apiModel.PlcResponseCode{}
var err error = nil
for _, writeResult := range writeResults {
if ctxErr := ctx.Err(); ctxErr != nil {
log.Warn().Err(ctxErr).Msg("aborting early")
m.log.Warn().Err(ctxErr).Msg("aborting early")
if err != nil {
multiError := err.(utils.MultiError)
multiError.Errors = append(multiError.Errors, ctxErr)
Expand All @@ -256,7 +265,7 @@ func (m SingleItemRequestInterceptor) ProcessWriteResponses(ctx context.Context,
break
}
if writeResult.GetErr() != nil {
log.Debug().Err(writeResult.GetErr()).Msgf("Error during write")
m.log.Debug().Err(writeResult.GetErr()).Msgf("Error during write")
if err == nil {
// Lazy initialization of multi error
err = utils.MultiError{MainError: errors.New("while aggregating results"), Errors: []error{writeResult.GetErr()}}
Expand All @@ -266,7 +275,7 @@ func (m SingleItemRequestInterceptor) ProcessWriteResponses(ctx context.Context,
}
} else if writeResult.GetResponse() != nil {
if len(writeResult.GetResponse().GetRequest().GetTagNames()) > 1 {
log.Error().Int("numberOfTags", len(writeResult.GetResponse().GetRequest().GetTagNames())).Msg("We should only get 1")
m.log.Error().Int("numberOfTags", len(writeResult.GetResponse().GetRequest().GetTagNames())).Msg("We should only get 1")
}
for _, tagName := range writeResult.GetResponse().GetRequest().GetTagNames() {
responseCodes[tagName] = writeResult.GetResponse().GetResponseCode(tagName)
Expand Down
21 changes: 12 additions & 9 deletions plc4go/spi/testutils/TestUtils.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,23 @@
package testutils

import (
"os"
"runtime/debug"
"strings"
"testing"

"github.com/apache/plc4x/plc4go/spi/utils"

"github.com/ajankovic/xdiff"
"github.com/ajankovic/xdiff/parser"
"github.com/apache/plc4x/plc4go/spi/utils"
"github.com/pkg/errors"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"github.com/stretchr/testify/assert"
"os"
"runtime/debug"
"strings"
"testing"
)

func CompareResults(t *testing.T, actualString []byte, referenceString []byte) error {
localLog := ProduceTestingLogger(t)
// Now parse the xml strings of the actual and the reference in xdiff's dom
p := parser.New()
actual, err := p.ParseBytes(actualString)
Expand All @@ -56,7 +59,7 @@ func CompareResults(t *testing.T, actualString []byte, referenceString []byte) e
cleanDiff := make([]xdiff.Delta, 0)
for _, delta := range diff {
if delta.Operation == xdiff.Delete && delta.Subject.Value == nil || delta.Operation == xdiff.Insert && delta.Subject.Value == nil {
log.Info().Msgf("We ignore empty elements which should be deleted %v", delta)
localLog.Info().Msgf("We ignore empty elements which should be deleted %v", delta)
continue
}
// Workaround for different precisions on float
Expand All @@ -66,7 +69,7 @@ func CompareResults(t *testing.T, actualString []byte, referenceString []byte) e
string(delta.Object.Parent.FirstChild.Name) == "dataType" &&
string(delta.Object.Parent.FirstChild.Value) == "float" {
if strings.Contains(string(delta.Subject.Value), string(delta.Object.Value)) || strings.Contains(string(delta.Object.Value), string(delta.Subject.Value)) {
log.Info().Msgf("We ignore precision diffs %v", delta)
localLog.Info().Msgf("We ignore precision diffs %v", delta)
continue
}
}
Expand All @@ -76,7 +79,7 @@ func CompareResults(t *testing.T, actualString []byte, referenceString []byte) e
string(delta.Object.Parent.FirstChild.Name) == "dataType" &&
string(delta.Object.Parent.FirstChild.Value) == "string" {
if diff, err := xdiff.Compare(delta.Subject, delta.Object); diff == nil && err == nil {
log.Info().Msgf("We ignore newline diffs %v", delta)
localLog.Info().Msgf("We ignore newline diffs %v", delta)
continue
}
}
Expand All @@ -88,7 +91,7 @@ func CompareResults(t *testing.T, actualString []byte, referenceString []byte) e
return errors.Wrap(err, "Error outputting results")
}
if len(cleanDiff) <= 0 {
log.Warn().Msg("We only found non relevant changes")
localLog.Warn().Msg("We only found non relevant changes")
return nil
}

Expand Down
12 changes: 6 additions & 6 deletions plc4go/spi/transactions/RequestTransactionManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,11 +141,11 @@ type requestTransactionManager struct {
///////////////////////////////////////

func (r *requestTransactionManager) SetNumberOfConcurrentRequests(numberOfConcurrentRequests int) {
log.Info().Msgf("Setting new number of concurrent requests %d", numberOfConcurrentRequests)
r.log.Info().Msgf("Setting new number of concurrent requests %d", numberOfConcurrentRequests)
// If we reduced the number of concurrent requests and more requests are in-flight
// than should be, at least log a warning.
if numberOfConcurrentRequests < len(r.runningRequests) {
log.Warn().Msg("The number of concurrent requests was reduced and currently more requests are in flight.")
r.log.Warn().Msg("The number of concurrent requests was reduced and currently more requests are in flight.")
}

r.numberOfConcurrentRequests = numberOfConcurrentRequests
Expand All @@ -167,11 +167,11 @@ func (r *requestTransactionManager) submitTransaction(transaction *requestTransa
func (r *requestTransactionManager) processWorklog() {
r.workLogMutex.RLock()
defer r.workLogMutex.RUnlock()
log.Debug().Msgf("Processing work log with size of %d (%d concurrent requests allowed)", r.workLog.Len(), r.numberOfConcurrentRequests)
r.log.Debug().Msgf("Processing work log with size of %d (%d concurrent requests allowed)", r.workLog.Len(), r.numberOfConcurrentRequests)
for len(r.runningRequests) < r.numberOfConcurrentRequests && r.workLog.Len() > 0 {
front := r.workLog.Front()
next := front.Value.(*requestTransaction)
log.Debug().Msgf("Handling next %v. (Adding to running requests (length: %d))", next, len(r.runningRequests))
r.log.Debug().Msgf("Handling next %v. (Adding to running requests (length: %d))", next, len(r.runningRequests))
r.runningRequests = append(r.runningRequests, next)
completionFuture := r.executor.Submit(context.Background(), next.transactionId, next.operation)
next.completionFuture = completionFuture
Expand Down Expand Up @@ -258,7 +258,7 @@ func (r *requestTransactionManager) CloseGraceful(timeout time.Duration) error {
}()
select {
case <-timer.C:
log.Warn().Msgf("timout after %d", timeout)
r.log.Warn().Msgf("timout after %d", timeout)
case <-signal:
}
}
Expand All @@ -283,7 +283,7 @@ func (t *requestTransaction) EndRequest() error {

func (t *requestTransaction) Submit(operation RequestTransactionRunnable) {
if t.operation != nil {
log.Warn().Msg("Operation already set")
t.transactionLog.Warn().Msg("Operation already set")
}
t.transactionLog.Trace().Msgf("Submission of transaction %d", t.transactionId)
t.operation = func() {
Expand Down
Loading

0 comments on commit 7c14c99

Please sign in to comment.