Skip to content

Commit

Permalink
fix(plc4go): fixed some quality issues
Browse files Browse the repository at this point in the history
  • Loading branch information
sruehl committed Jun 3, 2023
1 parent 7d745da commit dd568f9
Show file tree
Hide file tree
Showing 18 changed files with 81 additions and 55 deletions.
2 changes: 1 addition & 1 deletion plc4go/examples/ads/discovery/Discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (

func main() {
discoverer := ads.NewDiscoverer()
discoverer.Discover(context.Background(), func(event apiModel.PlcDiscoveryItem) {
_ = discoverer.Discover(context.Background(), func(event apiModel.PlcDiscoveryItem) {
print(event)
})
time.Sleep(time.Second * 5)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,6 @@ func main() {
}
log.Info().Str("connection string", connStr).Msg("Connected")
connection := connectionResult.GetConnection()
defer connection.BlockingClose()
connection.BlockingClose()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func main() {
}
log.Info().Str("connection string", connStr).Msg("Connected")
connection := connectionResult.GetConnection()
defer connection.BlockingClose()
connection.BlockingClose()

// Try to find all KNX devices on the current network
browseRequest, err := connection.BrowseRequestBuilder().
Expand Down
4 changes: 3 additions & 1 deletion plc4go/internal/ads/Discoverer.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,9 @@ func (d *Discoverer) Discover(ctx context.Context, callback func(event apiModel.
defer func() {
for _, discoveryItem := range discoveryItems {
if discoveryItem.socket != nil {
discoveryItem.socket.Close()
if err := discoveryItem.socket.Close(); err != nil {
d.log.Debug().Err(err).Msg("errored")
}
}
}
}()
Expand Down
8 changes: 6 additions & 2 deletions plc4go/internal/bacnetip/BACnetVirtualLinkLayerService.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,9 +165,13 @@ func (m *UDPMultiplexer) Close() error {
log.Debug().Msg("Close")

// pass along the close to the director(s)
m.directPort.Close()
if err := m.directPort.Close(); err != nil {
log.Debug().Err(err).Msg("errored")
}
if m.broadcastPort != nil {
m.broadcastPort.Close()
if err := m.broadcastPort.Close(); err != nil {
log.Debug().Err(err).Msg("errored")
}
}
return nil
}
Expand Down
23 changes: 14 additions & 9 deletions plc4go/internal/bacnetip/Connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,15 +101,7 @@ func (c *Connection) ConnectWithContext(ctx context.Context) <-chan plc4go.PlcCo
}()
for c.IsConnected() {
c.log.Trace().Msg("Polling data")
incomingMessageChannel := c.messageCodec.GetDefaultIncomingMessageChannel()
timeout := time.NewTimer(20 * time.Millisecond)
defer utils.CleanupTimer(timeout)
select {
case message := <-incomingMessageChannel:
// TODO: implement mapping to subscribers
log.Info().Msgf("Received \n%v", message)
case <-timeout.C:
}
c.passToDefaultIncomingMessageChannel()
}
c.log.Info().Msg("Ending incoming message transfer")
}()
Expand All @@ -118,6 +110,19 @@ func (c *Connection) ConnectWithContext(ctx context.Context) <-chan plc4go.PlcCo
return ch
}

func (c *Connection) passToDefaultIncomingMessageChannel() {
incomingMessageChannel := c.messageCodec.GetDefaultIncomingMessageChannel()
timeout := time.NewTimer(20 * time.Millisecond)
defer utils.CleanupTimer(timeout)
select {
case message := <-incomingMessageChannel:
// TODO: implement mapping to subscribers
log.Info().Msgf("Received \n%v", message)
case <-timeout.C:
log.Info().Msg("Message was not handled")
}
}

func (c *Connection) GetConnection() plc4go.PlcConnection {
return c
}
Expand Down
14 changes: 7 additions & 7 deletions plc4go/internal/bacnetip/IOCBModule.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,17 +299,17 @@ type PriorityItem struct {
// A PriorityQueue implements heap.Interface and holds Items.
type PriorityQueue []*PriorityItem

func (pq PriorityQueue) Len() int { return len(pq) }
func (pq *PriorityQueue) Len() int { return len(*pq) }

func (pq PriorityQueue) Less(i, j int) bool {
func (pq *PriorityQueue) Less(i, j int) bool {
// We want Pop to give us the highest, not lowest, priority so we use greater than here.
return pq[i].priority > pq[j].priority
return (*pq)[i].priority > (*pq)[j].priority
}

func (pq PriorityQueue) Swap(i, j int) {
pq[i], pq[j] = pq[j], pq[i]
pq[i].index = i
pq[j].index = j
func (pq *PriorityQueue) Swap(i, j int) {
(*pq)[i], (*pq)[j] = (*pq)[j], (*pq)[i]
(*pq)[i].index = i
(*pq)[j].index = j
}

func (pq *PriorityQueue) Push(x any) {
Expand Down
12 changes: 10 additions & 2 deletions plc4go/internal/bacnetip/MessageCodec.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,11 @@ func (m *ApplicationLayerMessageCodec) Send(message spi.Message) error {
return errors.Wrap(err, "error creating IOCB")
}
go func() {
go m.bipSimpleApplication.RequestIO(iocb)
go func() {
if err := m.bipSimpleApplication.RequestIO(iocb); err != nil {
log.Debug().Err(err).Msg("errored")
}
}()
iocb.Wait()
if iocb.ioError != nil {
// TODO: handle error
Expand Down Expand Up @@ -142,7 +146,11 @@ func (m *ApplicationLayerMessageCodec) SendRequest(ctx context.Context, message
return errors.Wrap(err, "error creating IOCB")
}
go func() {
go m.bipSimpleApplication.RequestIO(iocb)
go func() {
if err := m.bipSimpleApplication.RequestIO(iocb); err != nil {

}
}()
iocb.Wait()
if err := iocb.ioError; err != nil {
if err := handleError(err); err != nil {
Expand Down
4 changes: 2 additions & 2 deletions plc4go/internal/bacnetip/PDU.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ func (a *Address) decodeAddress(addr any) error {
if m {
log.Debug().Msg("combined pattern")
groups := combined_pattern.FindStringSubmatch(addr)
net := groups[0]
_net := groups[0]
global_broadcast := groups[1]
local_broadcast := groups[2]
local_addr := groups[3]
Expand All @@ -223,7 +223,7 @@ func (a *Address) decodeAddress(addr any) error {
a := func(...any) {

}
a(net, global_broadcast, local_broadcast, local_addr, local_ip_addr, local_ip_net, local_ip_port, route_addr, route_ip_addr, route_ip_port)
a(_net, global_broadcast, local_broadcast, local_addr, local_ip_addr, local_ip_net, local_ip_port, route_addr, route_ip_addr, route_ip_port)
}
panic("parsing not yet ported")
case AddressTuple[string, uint16]:
Expand Down
6 changes: 5 additions & 1 deletion plc4go/internal/bacnetip/UDPCommunicationsModule.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,11 @@ func (d *UDPDirector) handleRead() {
}
pdu := NewPDU(bvlc, WithPDUSource(saddr), WithPDUDestination(daddr))
// send the PDU up to the client
go d._response(pdu)
go func() {
if err := d._response(pdu); err != nil {
log.Debug().Err(err).Msg("errored")
}
}()
}

func (d *UDPDirector) handleError(err error) {
Expand Down
12 changes: 6 additions & 6 deletions plc4go/internal/knxnetip/Browser.go
Original file line number Diff line number Diff line change
Expand Up @@ -554,10 +554,10 @@ func (m Browser) calculateAddresses(query DeviceQuery) ([]driverModel.KnxAddress
}

func (m Browser) explodeSegment(segment string, min uint8, max uint8) ([]uint8, error) {
var options []uint8
var segmentOptions []uint8
if strings.Contains(segment, "*") {
for i := min; i <= max; i++ {
options = append(options, i)
segmentOptions = append(segmentOptions, i)
}
} else if strings.HasPrefix(segment, "[") && strings.HasSuffix(segment, "]") {
segment = strings.TrimPrefix(segment, "[")
Expand All @@ -574,14 +574,14 @@ func (m Browser) explodeSegment(segment string, min uint8, max uint8) ([]uint8,
return nil, err
}
for i := localMin; i <= localMax; i++ {
options = append(options, uint8(i))
segmentOptions = append(segmentOptions, uint8(i))
}
} else {
option, err := strconv.ParseUint(segment, 10, 8)
if err != nil {
return nil, err
}
options = append(options, uint8(option))
segmentOptions = append(segmentOptions, uint8(option))
}
}
} else {
Expand All @@ -590,10 +590,10 @@ func (m Browser) explodeSegment(segment string, min uint8, max uint8) ([]uint8,
return nil, err
}
if uint8(value) >= min && uint8(value) <= max {
options = append(options, uint8(value))
segmentOptions = append(segmentOptions, uint8(value))
}
}
return options, nil
return segmentOptions, nil
}

func (m Browser) parseAssociationTable(deviceDescriptor uint16, knxGroupAddresses []driverModel.KnxGroupAddress, value values.PlcValue) (driverModel.KnxGroupAddress, uint16) {
Expand Down
8 changes: 4 additions & 4 deletions plc4go/pkg/api/cache/plcConnectionLease.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,13 +121,13 @@ func (t *plcConnectionLease) Close() <-chan plc4go.PlcConnectionCloseResult {
// Extract the trace entries from the connection.
var traces []tracer.TraceEntry
if t.IsTraceEnabled() {
tracer := t.GetTracer()
_tracer := t.GetTracer()
// Save all traces.
traces = tracer.GetTraces()
traces = _tracer.GetTraces()
// Clear the log.
tracer.ResetTraces()
_tracer.ResetTraces()
// Reset the connection id back to the one without the lease-id.
tracer.SetConnectionId(t.connection.GetConnectionId())
_tracer.SetConnectionId(t.connection.GetConnectionId())
}

// Return the connection to the connection container and don't actually close it.
Expand Down
23 changes: 13 additions & 10 deletions plc4go/spi/default/DefaultCodec.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,9 +328,8 @@ mainLoop:
if m.customMessageHandling(codec, message) {
workerLog.Trace().Msg("Custom handling handled the message")
continue mainLoop
} else {
workerLog.Trace().Msg("Custom handling didn't handle the message")
}
workerLog.Trace().Msg("Custom handling didn't handle the message")
}

workerLog.Trace().Msg("Handle message")
Expand All @@ -340,14 +339,18 @@ mainLoop:
// If the message has not been handled and a default handler is provided, call this ...
if !messageHandled {
workerLog.Trace().Msg("Message was not handled")
timeout := time.NewTimer(time.Millisecond * 40)
defer utils.CleanupTimer(timeout)
select {
case m.defaultIncomingMessageChannel <- message:
case <-timeout.C:
timeout.Stop()
workerLog.Warn().Msgf("Message discarded\n%s", message)
}
m.passToDefaultIncomingMessageChannel(workerLog, message)
}
}
}

func (m *defaultCodec) passToDefaultIncomingMessageChannel(workerLog zerolog.Logger, message spi.Message) {
timeout := time.NewTimer(time.Millisecond * 40)
defer utils.CleanupTimer(timeout)
select {
case m.defaultIncomingMessageChannel <- message:
case <-timeout.C:
timeout.Stop()
workerLog.Warn().Msgf("Message discarded\n%s", message)
}
}
4 changes: 2 additions & 2 deletions plc4go/spi/model/DefaultPlcBrowseRequest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ func TestDefaultPlcBrowseRequest_ExecuteWithInterceptor(t *testing.T) {
queryNames: tt.fields.queryNames,
queries: tt.fields.queries,
}
assert.Equalf(t, tt.want, d.ExecuteWithInterceptor(tt.args.interceptor), "ExecuteWithInterceptor(%v)", tt.args.interceptor)
assert.Equalf(t, tt.want, d.ExecuteWithInterceptor(tt.args.interceptor), "ExecuteWithInterceptor(func(%t))", tt.args.interceptor != nil)
})
}
}
Expand Down Expand Up @@ -306,7 +306,7 @@ func TestDefaultPlcBrowseRequest_ExecuteWithInterceptorWithContext(t *testing.T)
queryNames: tt.fields.queryNames,
queries: tt.fields.queries,
}
assert.Equalf(t, tt.want, d.ExecuteWithInterceptorWithContext(tt.args.ctx, tt.args.interceptor), "ExecuteWithInterceptorWithContext(%v, %v)", tt.args.ctx, tt.args.interceptor)
assert.Equalf(t, tt.want, d.ExecuteWithInterceptorWithContext(tt.args.ctx, tt.args.interceptor), "ExecuteWithInterceptorWithContext(%v, func(%t))", tt.args.ctx, tt.args.interceptor != nil)
})
}
}
Expand Down
2 changes: 1 addition & 1 deletion plc4go/spi/model/DefaultPlcSubscriptionRequest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ func TestDefaultPlcSubscriptionRequestBuilder_AddPreRegisteredConsumer(t *testin
intervals: tt.fields.intervals,
preRegisteredConsumers: tt.fields.preRegisteredConsumers,
}
assert.Equalf(t, tt.want, d.AddPreRegisteredConsumer(tt.args.name, tt.args.consumer), "AddPreRegisteredConsumer(%v, %v)", tt.args.name, tt.args.consumer)
assert.Equalf(t, tt.want, d.AddPreRegisteredConsumer(tt.args.name, tt.args.consumer), "AddPreRegisteredConsumer(%v, func(%t))", tt.args.name, tt.args.consumer != nil)
})
}
}
Expand Down
6 changes: 3 additions & 3 deletions plc4go/spi/model/DefaultPlcUnsubscriptionRequest.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,16 @@ func NewDefaultPlcUnsubscriptionRequest() *DefaultPlcUnsubscriptionRequest {
return &DefaultPlcUnsubscriptionRequest{}
}

func (d DefaultPlcUnsubscriptionRequest) Execute() <-chan apiModel.PlcUnsubscriptionRequestResult {
func (d *DefaultPlcUnsubscriptionRequest) Execute() <-chan apiModel.PlcUnsubscriptionRequestResult {
//TODO implement me
panic("implement me")
}

func (d DefaultPlcUnsubscriptionRequest) ExecuteWithContext(ctx context.Context) <-chan apiModel.PlcUnsubscriptionRequestResult {
func (d *DefaultPlcUnsubscriptionRequest) ExecuteWithContext(ctx context.Context) <-chan apiModel.PlcUnsubscriptionRequestResult {
//TODO implement me
panic("implement me")
}

func (d DefaultPlcUnsubscriptionRequest) IsAPlcMessage() bool {
func (d *DefaultPlcUnsubscriptionRequest) IsAPlcMessage() bool {
return true
}
2 changes: 1 addition & 1 deletion plc4go/spi/transports/utils/TransportLogger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func TestWithLogger(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := WithLogger(tt.args.log); !assert.Equal(t, tt.want, got) {
t.Errorf("WithLogger() = %v, want %v", got, tt.want)
t.Errorf("WithLogger() = func(%t), want (%t)", got != nil, tt.want != nil)
}
})
}
Expand Down
2 changes: 1 addition & 1 deletion plc4go/tools/plc4xgenerator/gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ type Generator struct {
}

func (g *Generator) Printf(format string, args ...any) {
fmt.Fprintf(&g.buf, format, args...)
_, _ = fmt.Fprintf(&g.buf, format, args...)
}

// File holds a single parsed file and associated data.
Expand Down

0 comments on commit dd568f9

Please sign in to comment.