Skip to content

Commit

Permalink
feat(plc4go/opcua): add writer
Browse files Browse the repository at this point in the history
  • Loading branch information
sruehl committed Jul 28, 2023
1 parent 7c21ea8 commit b74e9f3
Show file tree
Hide file tree
Showing 3 changed files with 394 additions and 11 deletions.
4 changes: 2 additions & 2 deletions plc4go/internal/opcua/Reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (m *Reader) readSync(ctx context.Context, readRequest apiModel.PlcReadReque
requestHeader := readWriteModel.NewRequestHeader(
m.messageCodec.channel.getAuthenticationToken(),
m.messageCodec.channel.getCurrentDateTime(),
0,
m.messageCodec.channel.getRequestHandle(),
0,
NULL_STRING,
REQUEST_TIMEOUT_LONG,
Expand Down Expand Up @@ -150,7 +150,7 @@ func (m *Reader) readSync(ctx context.Context, readRequest apiModel.PlcReadReque
result <- spiModel.NewDefaultPlcReadRequestResult(readRequest, nil, err)
}

m.messageCodec.channel.submit(ctx, m.messageCodec, errorDispatcher, result, consumer, buffer)
m.messageCodec.channel.submit(ctx, m.messageCodec, errorDispatcher, consumer, buffer)
}

func (m *Reader) readResponse(readRequestIn apiModel.PlcReadRequest, results []readWriteModel.DataValue) (readRequest apiModel.PlcReadRequest, responseCodes map[string]apiModel.PlcResponseCode, values map[string]apiValues.PlcValue) {
Expand Down
12 changes: 4 additions & 8 deletions plc4go/internal/opcua/SecureChannel.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import (
"sync/atomic"
"time"

apiModel "github.com/apache/plc4x/plc4go/pkg/api/model"
readWriteModel "github.com/apache/plc4x/plc4go/protocols/opcua/readwrite/model"
"github.com/apache/plc4x/plc4go/spi"
"github.com/apache/plc4x/plc4go/spi/utils"
Expand Down Expand Up @@ -175,7 +174,7 @@ func NewSecureChannel(log zerolog.Logger, ctx DriverContext, configuration Confi
return s
}

func (s *SecureChannel) submit(ctx context.Context, codec *MessageCodec, errorDispatcher func(err error), result chan apiModel.PlcReadRequestResult, consumer func(opcuaResponse []byte), buffer utils.WriteBufferByteBased) {
func (s *SecureChannel) submit(ctx context.Context, codec *MessageCodec, errorDispatcher func(err error), consumer func(opcuaResponse []byte), buffer utils.WriteBufferByteBased) {
transactionId := s.channelTransactionManager.getTransactionIdentifier()

//TODO: We need to split large messages up into chunks if it is larger than the sendBufferSize
Expand Down Expand Up @@ -562,8 +561,7 @@ func (s *SecureChannel) onConnectCreateSessionRequest(ctx context.Context, codec
s.log.Error().Err(err).Msg("Error while waiting for subscription response")
}

result := make(chan apiModel.PlcReadRequestResult, 1)
s.submit(ctx, codec, errorDispatcher, result, consumer, buffer)
s.submit(ctx, codec, errorDispatcher, consumer, buffer)
}

func (s *SecureChannel) onConnectActivateSessionRequest(ctx context.Context, codec *MessageCodec, opcuaMessageResponse readWriteModel.CreateSessionResponse, sessionResponse readWriteModel.CreateSessionResponse) {
Expand Down Expand Up @@ -683,8 +681,7 @@ func (s *SecureChannel) onConnectActivateSessionRequest(ctx context.Context, cod
s.log.Error().Err(err).Msg("Error while waiting for subscription response")
}

result := make(chan apiModel.PlcReadRequestResult, 1)
s.submit(ctx, codec, errorDispatcher, result, consumer, buffer)
s.submit(ctx, codec, errorDispatcher, consumer, buffer)
}

func (s *SecureChannel) onDisconnect(ctx context.Context, codec *MessageCodec) {
Expand Down Expand Up @@ -760,8 +757,7 @@ func (s *SecureChannel) onDisconnect(ctx context.Context, codec *MessageCodec) {
s.log.Error().Err(err).Msg("Error while waiting for close session response")
}

result := make(chan apiModel.PlcReadRequestResult, 1)
s.submit(ctx, codec, errorDispatcher, result, consumer, buffer)
s.submit(ctx, codec, errorDispatcher, consumer, buffer)
}

func (s *SecureChannel) onDisconnectCloseSecureChannel(ctx context.Context, codec *MessageCodec, message readWriteModel.CloseSessionResponseExactly, response readWriteModel.CloseSessionResponse) {
Expand Down
Loading

0 comments on commit b74e9f3

Please sign in to comment.