Skip to content

Commit

Permalink
fix(plc4go): ensure options are passed downstream
Browse files Browse the repository at this point in the history
  • Loading branch information
sruehl committed Jun 22, 2023
1 parent c970c27 commit 840ca2a
Show file tree
Hide file tree
Showing 24 changed files with 626 additions and 375 deletions.
2 changes: 2 additions & 0 deletions plc4go/internal/ads/Connection.go
Expand Up @@ -58,6 +58,7 @@ type Connection struct {

passLogToModel bool
log zerolog.Logger
_options []options.WithOption // Used to pass them downstream
}

func NewConnection(messageCodec spi.MessageCodec, configuration model.Configuration, connectionOptions map[string][]string, _options ...options.WithOption) (*Connection, error) {
Expand All @@ -74,6 +75,7 @@ func NewConnection(messageCodec spi.MessageCodec, configuration model.Configurat
subscriptions: map[uint32]apiModel.PlcSubscriptionHandle{},
passLogToModel: passLoggerToModel,
log: customLogger,
_options: _options,
}
if traceEnabledOption, ok := connectionOptions["traceEnabled"]; ok {
if len(traceEnabledOption) == 1 {
Expand Down
21 changes: 16 additions & 5 deletions plc4go/internal/ads/Driver.go
Expand Up @@ -38,13 +38,15 @@ import (
type Driver struct {
_default.DefaultDriver

log zerolog.Logger
log zerolog.Logger
_options []options.WithOption // Used to pass them downstream
}

func NewDriver(_options ...options.WithOption) plc4go.PlcDriver {
customLogger := options.ExtractCustomLoggerOrDefaultToGlobal(_options...)
driver := &Driver{
log: customLogger,
log: customLogger,
_options: _options,
}
driver.DefaultDriver = _default.NewDefaultDriver(driver, "ads", "Beckhoff TwinCat ADS", "tcp", NewTagHandler())
return driver
Expand All @@ -63,7 +65,11 @@ func (m *Driver) GetConnectionWithContext(ctx context.Context, transportUrl url.
// Provide a default-port to the transport, which is used, if the user doesn't provide on in the connection string.
driverOptions["defaultTcpPort"] = []string{strconv.Itoa(int(adsModel.AdsConstants_ADSTCPDEFAULTPORT))}
// Have the transport create a new transport-instance.
transportInstance, err := transport.CreateTransportInstance(transportUrl, driverOptions, options.WithCustomLogger(m.log))
transportInstance, err := transport.CreateTransportInstance(
transportUrl,
driverOptions,
append(m._options, options.WithCustomLogger(m.log))...,
)
if err != nil {
m.log.Error().Stringer("transportUrl", &transportUrl).Msgf("We couldn't create a transport instance for port %#v", driverOptions["defaultTcpPort"])
ch := make(chan plc4go.PlcConnectionConnectResult, 1)
Expand All @@ -72,7 +78,10 @@ func (m *Driver) GetConnectionWithContext(ctx context.Context, transportUrl url.
}

// Create a new codec for taking care of encoding/decoding of messages
codec := NewMessageCodec(transportInstance, options.WithCustomLogger(m.log))
codec := NewMessageCodec(
transportInstance,
append(m._options, options.WithCustomLogger(m.log))...,
)
m.log.Debug().Msgf("working with codec %#v", codec)

configuration, err := model.ParseFromOptions(m.log, driverOptions)
Expand All @@ -99,5 +108,7 @@ func (m *Driver) SupportsDiscovery() bool {
}

func (m *Driver) DiscoverWithContext(ctx context.Context, callback func(event apiModel.PlcDiscoveryItem), discoveryOptions ...options.WithDiscoveryOption) error {
return NewDiscoverer(options.WithCustomLogger(m.log)).Discover(ctx, callback, discoveryOptions...)
return NewDiscoverer(
append(m._options, options.WithCustomLogger(m.log))...,
).Discover(ctx, callback, discoveryOptions...)
}
11 changes: 8 additions & 3 deletions plc4go/internal/ads/Subscriber.go
Expand Up @@ -150,14 +150,19 @@ func (m *Connection) subscribe(ctx context.Context, subscriptionRequest apiModel
)
}
// Create a new subscription handle.
subscriptionHandle := dirverModel.NewAdsSubscriptionHandle(m, tagName, directTag, options.WithCustomLogger(m.log))
subscriptionHandle := dirverModel.NewAdsSubscriptionHandle(
m,
tagName,
directTag,
append(m._options, options.WithCustomLogger(m.log))...,
)
responseChan <- spiModel.NewDefaultPlcSubscriptionRequestResult(
subscriptionRequest,
spiModel.NewDefaultPlcSubscriptionResponse(
subscriptionRequest,
map[string]apiModel.PlcResponseCode{tagName: apiModel.PlcResponseCode_OK},
map[string]apiModel.PlcSubscriptionHandle{tagName: subscriptionHandle},
options.WithCustomLogger(m.log),
append(m._options, options.WithCustomLogger(m.log))...,
),
nil,
)
Expand Down Expand Up @@ -210,7 +215,7 @@ func (m *Connection) processSubscriptionResponses(_ context.Context, subscriptio
subscriptionRequest,
responseCodes,
subscriptionHandles,
options.WithCustomLogger(m.log),
append(m._options, options.WithCustomLogger(m.log))...,
),
err,
)
Expand Down
6 changes: 4 additions & 2 deletions plc4go/internal/ads/model/AdsSubscriptionHandle.go
Expand Up @@ -36,7 +36,8 @@ type AdsSubscriptionHandle struct {
directTag DirectPlcTag
consumers []apiModel.PlcSubscriptionEventConsumer

log zerolog.Logger
log zerolog.Logger
_options []options.WithOption // Used to pass them downstream
}

func NewAdsSubscriptionHandle(subscriber spi.PlcSubscriber, tagName string, directTag DirectPlcTag, _options ...options.WithOption) *AdsSubscriptionHandle {
Expand All @@ -47,6 +48,7 @@ func NewAdsSubscriptionHandle(subscriber spi.PlcSubscriber, tagName string, dire
directTag: directTag,
consumers: []apiModel.PlcSubscriptionEventConsumer{},
log: customLogger,
_options: _options,
}
}

Expand All @@ -70,7 +72,7 @@ func (t *AdsSubscriptionHandle) PublishPlcValue(value apiValues.PlcValue) {
map[string]time.Duration{t.tagName: time.Second},
map[string]apiModel.PlcResponseCode{t.tagName: apiModel.PlcResponseCode_OK},
map[string]apiValues.PlcValue{t.tagName: value},
options.WithCustomLogger(t.log),
append(t._options, options.WithCustomLogger(t.log))...,
)
for _, consumer := range t.consumers {
consumer(&event)
Expand Down
23 changes: 20 additions & 3 deletions plc4go/internal/bacnetip/Connection.go
Expand Up @@ -50,7 +50,8 @@ type Connection struct {
connectionId string
tracer tracer.Tracer

log zerolog.Logger
log zerolog.Logger
_options []options.WithOption // Used to pass them downstream
}

func NewConnection(messageCodec spi.MessageCodec, tagHandler spi.PlcTagHandler, tm transactions.RequestTransactionManager, connectionOptions map[string][]string, _options ...options.WithOption) *Connection {
Expand All @@ -60,6 +61,7 @@ func NewConnection(messageCodec spi.MessageCodec, tagHandler spi.PlcTagHandler,
messageCodec: messageCodec,
tm: tm,
log: customLogger,
_options: _options,
}
if traceEnabledOption, ok := connectionOptions["traceEnabled"]; ok {
if len(traceEnabledOption) == 1 {
Expand Down Expand Up @@ -134,11 +136,26 @@ func (c *Connection) GetMessageCodec() spi.MessageCodec {
}

func (c *Connection) ReadRequestBuilder() apiModel.PlcReadRequestBuilder {
return spiModel.NewDefaultPlcReadRequestBuilder(c.GetPlcTagHandler(), NewReader(&c.invokeIdGenerator, c.messageCodec, c.tm, options.WithCustomLogger(c.log)))
return spiModel.NewDefaultPlcReadRequestBuilder(
c.GetPlcTagHandler(),
NewReader(
&c.invokeIdGenerator,
c.messageCodec,
c.tm,
append(c._options, options.WithCustomLogger(c.log))...,
),
)
}

func (c *Connection) SubscriptionRequestBuilder() apiModel.PlcSubscriptionRequestBuilder {
return spiModel.NewDefaultPlcSubscriptionRequestBuilder(c.GetPlcTagHandler(), c.GetPlcValueHandler(), NewSubscriber(c, options.WithCustomLogger(c.log)))
return spiModel.NewDefaultPlcSubscriptionRequestBuilder(
c.GetPlcTagHandler(),
c.GetPlcValueHandler(),
NewSubscriber(
c,
append(c._options, options.WithCustomLogger(c.log))...,
),
)
}

func (c *Connection) addSubscriber(subscriber *Subscriber) {
Expand Down
8 changes: 5 additions & 3 deletions plc4go/internal/bacnetip/Subscriber.go
Expand Up @@ -33,7 +33,8 @@ type Subscriber struct {
connection *Connection
consumers map[*spiModel.DefaultPlcConsumerRegistration]apiModel.PlcSubscriptionEventConsumer

log zerolog.Logger
log zerolog.Logger
_options []options.WithOption // Used to pass them downstream
}

func NewSubscriber(connection *Connection, _options ...options.WithOption) *Subscriber {
Expand All @@ -42,7 +43,8 @@ func NewSubscriber(connection *Connection, _options ...options.WithOption) *Subs
connection: connection,
consumers: make(map[*spiModel.DefaultPlcConsumerRegistration]apiModel.PlcSubscriptionEventConsumer),

log: logger,
log: logger,
_options: _options,
}
}

Expand Down Expand Up @@ -72,7 +74,7 @@ func (m *Subscriber) Subscribe(ctx context.Context, subscriptionRequest apiModel
subscriptionRequest,
responseCodes,
subscriptionValues,
options.WithCustomLogger(m.log),
append(m._options, options.WithCustomLogger(m.log))...,
),
nil,
)
Expand Down
29 changes: 24 additions & 5 deletions plc4go/internal/cbus/Connection.go
Expand Up @@ -74,7 +74,8 @@ type Connection struct {
connectionId string
tracer tracer.Tracer

log zerolog.Logger `ignore:"true"`
log zerolog.Logger `ignore:"true"`
_options []options.WithOption `ignore:"true"` // Used to pass them downstream
}

func NewConnection(messageCodec *MessageCodec, configuration Configuration, driverContext DriverContext, tagHandler spi.PlcTagHandler, tm transactions.RequestTransactionManager, connectionOptions map[string][]string, _options ...options.WithOption) *Connection {
Expand All @@ -86,7 +87,8 @@ func NewConnection(messageCodec *MessageCodec, configuration Configuration, driv
driverContext: driverContext,
tm: tm,

log: customLogger,
log: customLogger,
_options: _options,
}
if traceEnabledOption, ok := connectionOptions["traceEnabled"]; ok {
if len(traceEnabledOption) == 1 {
Expand Down Expand Up @@ -175,7 +177,15 @@ func (c *Connection) GetMetadata() apiModel.PlcConnectionMetadata {
}

func (c *Connection) ReadRequestBuilder() apiModel.PlcReadRequestBuilder {
return spiModel.NewDefaultPlcReadRequestBuilder(c.GetPlcTagHandler(), NewReader(&c.alphaGenerator, c.messageCodec, c.tm, options.WithCustomLogger(c.log)))
return spiModel.NewDefaultPlcReadRequestBuilder(
c.GetPlcTagHandler(),
NewReader(
&c.alphaGenerator,
c.messageCodec,
c.tm,
append(c._options, options.WithCustomLogger(c.log))...,
),
)
}

func (c *Connection) WriteRequestBuilder() apiModel.PlcWriteRequestBuilder {
Expand All @@ -186,7 +196,10 @@ func (c *Connection) SubscriptionRequestBuilder() apiModel.PlcSubscriptionReques
return spiModel.NewDefaultPlcSubscriptionRequestBuilder(
c.GetPlcTagHandler(),
c.GetPlcValueHandler(),
NewSubscriber(c.addSubscriber, options.WithCustomLogger(c.log)),
NewSubscriber(
c.addSubscriber,
append(c._options, options.WithCustomLogger(c.log))...,
),
)
}

Expand All @@ -196,7 +209,13 @@ func (c *Connection) UnsubscriptionRequestBuilder() apiModel.PlcUnsubscriptionRe
}

func (c *Connection) BrowseRequestBuilder() apiModel.PlcBrowseRequestBuilder {
return spiModel.NewDefaultPlcBrowseRequestBuilder(c.GetPlcTagHandler(), NewBrowser(c, options.WithCustomLogger(c.log)))
return spiModel.NewDefaultPlcBrowseRequestBuilder(
c.GetPlcTagHandler(),
NewBrowser(
c,
append(c._options, options.WithCustomLogger(c.log))...,
),
)
}

func (c *Connection) addSubscriber(subscriber *Subscriber) {
Expand Down
11 changes: 8 additions & 3 deletions plc4go/internal/cbus/Discoverer.go
Expand Up @@ -47,7 +47,8 @@ type Discoverer struct {
deviceScanningWorkItemId atomic.Int32
deviceScanningQueue pool.Executor

log zerolog.Logger
log zerolog.Logger
_options []options.WithOption // Used to pass them downstream
}

func NewDiscoverer(_options ...options.WithOption) *Discoverer {
Expand All @@ -57,7 +58,8 @@ func NewDiscoverer(_options ...options.WithOption) *Discoverer {
transportInstanceCreationQueue: pool.NewFixedSizeExecutor(50, 100, _options...),
deviceScanningQueue: pool.NewFixedSizeExecutor(50, 100, _options...),

log: customLogger,
log: customLogger,
_options: _options,
}
}

Expand Down Expand Up @@ -222,7 +224,10 @@ func (d *Discoverer) createDeviceScanDispatcher(tcpTransportInstance *tcp.Transp
transportInstanceLogger := d.log.With().Stringer("transportInstance", tcpTransportInstance).Logger()
transportInstanceLogger.Debug().Msgf("Scanning %v", tcpTransportInstance)
// Create a codec for sending and receiving messages.
codec := NewMessageCodec(tcpTransportInstance, options.WithCustomLogger(d.log))
codec := NewMessageCodec(
tcpTransportInstance,
append(d._options, options.WithCustomLogger(d.log))...,
)
// Explicitly start the worker
if err := codec.ConnectWithContext(context.TODO()); err != nil {
transportInstanceLogger.Debug().Err(err).Msg("Error connecting")
Expand Down
28 changes: 22 additions & 6 deletions plc4go/internal/cbus/Driver.go
Expand Up @@ -41,7 +41,8 @@ type Driver struct {
awaitSetupComplete bool
awaitDisconnectComplete bool

log zerolog.Logger
log zerolog.Logger
_options []options.WithOption // Used to pass them downstream
}

func NewDriver(_options ...options.WithOption) plc4go.PlcDriver {
Expand All @@ -51,6 +52,7 @@ func NewDriver(_options ...options.WithOption) plc4go.PlcDriver {
awaitSetupComplete: true,
awaitDisconnectComplete: true,
log: customLogger,
_options: _options,
}
driver.DefaultDriver = _default.NewDefaultDriver(driver, "c-bus", "Clipsal Bus", "tcp", NewTagHandler())
return driver
Expand All @@ -67,7 +69,11 @@ func (m *Driver) GetConnectionWithContext(ctx context.Context, transportUrl url.
// Provide a default-port to the transport, which is used, if the user doesn't provide on in the connection string.
driverOptions["defaultTcpPort"] = []string{strconv.FormatUint(uint64(readWriteModel.CBusConstants_CBUSTCPDEFAULTPORT), 10)}
// Have the transport create a new transport-instance.
transportInstance, err := transport.CreateTransportInstance(transportUrl, driverOptions, options.WithCustomLogger(m.log))
transportInstance, err := transport.CreateTransportInstance(
transportUrl,
driverOptions,
append(m._options, options.WithCustomLogger(m.log))...,
)
if err != nil {
m.log.Error().Err(err).Stringer("transportUrl", &transportUrl).Msgf("We couldn't create a transport instance for port %#v", driverOptions["defaultTcpPort"])
return m.reportError(errors.Wrapf(err, "couldn't initialize transport configuration for given transport url %s", transportUrl.String()))
Expand All @@ -78,16 +84,24 @@ func (m *Driver) GetConnectionWithContext(ctx context.Context, transportUrl url.
m.log.Error().Err(err).Msgf("Invalid options")
return m.reportError(errors.Wrap(err, "Invalid options"))
}
// TODO: we might need to remember the original options
codec := NewMessageCodec(transportInstance, options.WithCustomLogger(m.log))
codec := NewMessageCodec(
transportInstance,
append(m._options, options.WithCustomLogger(m.log))...,
)
m.log.Debug().Msgf("working with codec:\n%s", codec)

driverContext := NewDriverContext(configuration)
driverContext.awaitSetupComplete = m.awaitSetupComplete
driverContext.awaitDisconnectComplete = m.awaitDisconnectComplete

// Create the new connection
connection := NewConnection(codec, configuration, driverContext, m.GetPlcTagHandler(), m.tm, driverOptions, options.WithCustomLogger(m.log))
connection := NewConnection(
codec, configuration,
driverContext,
m.GetPlcTagHandler(),
m.tm, driverOptions,
append(m._options, options.WithCustomLogger(m.log))...,
)
m.log.Debug().Msg("created connection, connecting now")
return connection.ConnectWithContext(ctx)
}
Expand All @@ -111,7 +125,9 @@ func (m *Driver) SupportsDiscovery() bool {
}

func (m *Driver) DiscoverWithContext(ctx context.Context, callback func(event apiModel.PlcDiscoveryItem), discoveryOptions ...options.WithDiscoveryOption) error {
return NewDiscoverer(options.WithCustomLogger(m.log)).Discover(ctx, callback, discoveryOptions...)
return NewDiscoverer(
append(m._options, options.WithCustomLogger(m.log))...,
).Discover(ctx, callback, discoveryOptions...)
}

func (m *Driver) Close() error {
Expand Down
8 changes: 5 additions & 3 deletions plc4go/internal/cbus/Subscriber.go
Expand Up @@ -44,7 +44,8 @@ type Subscriber struct {

consumersMutex sync.RWMutex

log zerolog.Logger `ignore:"true"`
log zerolog.Logger `ignore:"true"`
_options []options.WithOption `ignore:"true"` // Used to pass them downstream
}

func NewSubscriber(addSubscriber func(subscriber *Subscriber), _options ...options.WithOption) *Subscriber {
Expand All @@ -53,7 +54,8 @@ func NewSubscriber(addSubscriber func(subscriber *Subscriber), _options ...optio
addSubscriber: addSubscriber,
consumers: make(map[*spiModel.DefaultPlcConsumerRegistration]apiModel.PlcSubscriptionEventConsumer),

log: customLogger,
log: customLogger,
_options: _options,
}
}

Expand Down Expand Up @@ -95,7 +97,7 @@ func (s *Subscriber) Subscribe(_ context.Context, subscriptionRequest apiModel.P
subscriptionRequest,
responseCodes,
subscriptionValues,
options.WithCustomLogger(s.log),
append(s._options, options.WithCustomLogger(s.log))...,
),
nil,
)
Expand Down

0 comments on commit 840ca2a

Please sign in to comment.