Skip to content

Commit

Permalink
fix(plc4go/cbus): display MMI's with a default incoming message handl…
Browse files Browse the repository at this point in the history
…er for now
  • Loading branch information
sruehl committed Aug 2, 2022
1 parent 9b7cee0 commit a5af69e
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 4 deletions.
2 changes: 2 additions & 0 deletions plc4go/internal/bacnetip/Connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
apiModel "github.com/apache/plc4x/plc4go/pkg/api/model"
"github.com/rs/zerolog/log"
"sync"
"time"
)

type Connection struct {
Expand Down Expand Up @@ -84,6 +85,7 @@ func (c *Connection) Connect() <-chan plc4go.PlcConnectionConnectResult {
case message := <-incomingMessageChannel:
// TODO: implement mapping to subscribers
log.Info().Msgf("Received \n%v", message)
case <-time.After(20 * time.Millisecond):
}
}
log.Info().Msg("Ending incoming message transfer")
Expand Down
23 changes: 21 additions & 2 deletions plc4go/internal/cbus/Connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,14 +217,15 @@ func (c *Connection) setupConnection(ch chan plc4go.PlcConnectionConnectResult)
return
}

startTime := time.Now()
select {
case <-receivedResetEchoChan:
log.Debug().Msgf("We received the echo")
case err := <-receivedResetEchoErrorChan:
c.fireConnectionError(errors.Wrap(err, "Error receiving of Reset"), ch)
return
case timeout := <-time.After(time.Second * 2):
c.fireConnectionError(errors.Errorf("Timeout after %v", timeout), ch)
c.fireConnectionError(errors.Errorf("Timeout after %v", timeout.Sub(startTime)), ch)
return
}
log.Debug().Msg("Reset done")
Expand Down Expand Up @@ -279,6 +280,23 @@ func (c *Connection) setupConnection(ch chan plc4go.PlcConnectionConnectResult)
}
}()
log.Debug().Msg("Subscription handler stated")

log.Debug().Msg("Starting default incoming message handler")
go func() {
for c.IsConnected() {
log.Debug().Msg("Polling data")
incomingMessageChannel := c.messageCodec.GetDefaultIncomingMessageChannel()
select {
case message := <-incomingMessageChannel:
// TODO: forward that to the subscriber...
// TODO: implement mapping to subscribers
log.Info().Msgf("Received \n%v", message)
case <-time.After(20 * time.Millisecond):
}
}
log.Info().Msg("Ending default incoming message handler")
}()
log.Debug().Msg("default incoming message handler started")
}

func (c *Connection) sendCalDataWrite(ch chan plc4go.PlcConnectionConnectResult, paramNo readWriteModel.Parameter, parameterValue readWriteModel.ParameterValue, requestContext *readWriteModel.RequestContext, cbusOptions *readWriteModel.CBusOptions) bool {
Expand Down Expand Up @@ -348,14 +366,15 @@ func (c *Connection) sendCalDataWrite(ch chan plc4go.PlcConnectionConnectResult,
return false
}

startTime := time.Now()
select {
case <-directCommandAckChan:
log.Debug().Msgf("We received the ack")
case err := <-directCommandAckErrorChan:
c.fireConnectionError(errors.Wrap(err, "Error receiving of ack"), ch)
return false
case timeout := <-time.After(time.Second * 2):
c.fireConnectionError(errors.Errorf("Timeout after %v", timeout), ch)
c.fireConnectionError(errors.Errorf("Timeout after %v", timeout.Sub(startTime)), ch)
return false
}
return true
Expand Down
3 changes: 2 additions & 1 deletion plc4go/internal/spi/transports/tcp/Transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package tcp

import (
"bufio"
"fmt"
"github.com/apache/plc4x/plc4go/internal/spi/transports"
"github.com/apache/plc4x/plc4go/internal/spi/utils"
"github.com/pkg/errors"
Expand Down Expand Up @@ -84,7 +85,7 @@ func (m Transport) CreateTransportInstance(transportUrl url.URL, options map[str
}

// Potentially resolve the ip address, if a hostname was provided
tcpAddr, err := net.ResolveTCPAddr("tcp", address+":"+strconv.Itoa(port))
tcpAddr, err := net.ResolveTCPAddr("tcp", fmt.Sprintf("%s:%d", address, port))
if err != nil {
return nil, errors.Wrap(err, "error resolving typ address")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
if (!smart && !connect) {
// In this mode every message will be echoed
LOGGER.info("Sending echo");
ctx.writeAndFlush(msg);
ctx.write(msg);
}
try {
writeLock.lock();
Expand Down Expand Up @@ -338,6 +338,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
return;
}
} finally {
ctx.flush();
writeLock.unlock();
}
}
Expand Down

0 comments on commit a5af69e

Please sign in to comment.