Skip to content

Commit

Permalink
fix(plc4go/opcua): fixed reading
Browse files Browse the repository at this point in the history
  • Loading branch information
sruehl committed Aug 7, 2023
1 parent 3e808f0 commit a197af1
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 17 deletions.
5 changes: 4 additions & 1 deletion plc4go/internal/opcua/EncryptionHandler.go
Expand Up @@ -135,6 +135,7 @@ func (h *EncryptionHandler) decodeMessage(ctx context.Context, pdu readWriteMode
case readWriteModel.OpcuaMessageResponseExactly:
message = pduMessage.GetMessage()
default:
h.log.Trace().Type("pdu", pdu).Msg("unhandled type")
return pdu, nil
}
encryptedLength := int(pdu.GetLengthInBytes(ctx))
Expand Down Expand Up @@ -168,8 +169,10 @@ func (h *EncryptionHandler) decodeMessage(ctx context.Context, pdu readWriteMode

readBuffer := utils.NewReadBufferByteBased(buf.GetBytes(), utils.WithByteOrderForReadBufferByteBased(binary.LittleEndian))
return readWriteModel.OpcuaAPUParseWithBuffer(ctx, readBuffer, true)
default:
h.log.Trace().Msg("unmapped security policy")
return pdu, nil
}
return pdu, nil
}

func (h *EncryptionHandler) decryptBlock(buf utils.WriteBufferByteBased, data []byte) error {
Expand Down
7 changes: 4 additions & 3 deletions plc4go/internal/opcua/Reader.go
Expand Up @@ -126,15 +126,16 @@ func (m *Reader) readSync(ctx context.Context, readRequest apiModel.PlcReadReque
result <- spiModel.NewDefaultPlcReadRequestResult(readRequest, nil, errors.Wrapf(err, "Unable to read the reply"))
return
}
if _readResponse, ok := reply.(readWriteModel.ReadResponseExactly); ok {
extensionObjectDefinition := reply.GetBody()
if _readResponse, ok := extensionObjectDefinition.(readWriteModel.ReadResponseExactly); ok {
result <- spiModel.NewDefaultPlcReadRequestResult(readRequest, spiModel.NewDefaultPlcReadResponse(readResponse(m.log, readRequest, readRequest.GetTagNames(), _readResponse.GetResults())), nil)
return
} else {
if serviceFault, ok := reply.(readWriteModel.ServiceFaultExactly); ok {
if serviceFault, ok := extensionObjectDefinition.(readWriteModel.ServiceFaultExactly); ok {
header := serviceFault.GetResponseHeader()
m.log.Error().Stringer("header", header).Msg("Read request ended up with ServiceFault")
} else {
m.log.Error().Stringer("reply", reply).Msg("Remote party returned an error")
m.log.Error().Stringer("extensionObjectDefinition", extensionObjectDefinition).Msg("Remote party returned an error")
}

responseCodes := map[string]apiModel.PlcResponseCode{}
Expand Down
32 changes: 19 additions & 13 deletions plc4go/internal/opcua/SecureChannel.go
Expand Up @@ -228,6 +228,7 @@ func (s *SecureChannel) submit(ctx context.Context, codec *MessageCodec, errorDi
opcuaAPU = decodedOpcuaAPU.(readWriteModel.OpcuaAPUExactly)
}
messagePDU := opcuaAPU.GetMessage()
s.log.Trace().Stringer("messagePDU", messagePDU).Msg("looking at messagePDU")
opcuaResponse, ok := messagePDU.(readWriteModel.OpcuaMessageResponseExactly)
if !ok {
s.log.Debug().Type("type", message).Msg("Not relevant")
Expand All @@ -252,12 +253,15 @@ func (s *SecureChannel) submit(ctx context.Context, codec *MessageCodec, errorDi
opcuaAPU := message.(readWriteModel.OpcuaAPU)
opcuaAPU, _ = s.encryptionHandler.decodeMessage(ctx, opcuaAPU)
messagePDU := opcuaAPU.GetMessage()
s.log.Trace().Stringer("messagePDU", messagePDU).Msg("looking at messagePDU")
opcuaResponse := messagePDU.(readWriteModel.OpcuaMessageResponse)
if opcuaResponse.GetChunk() == (FINAL_CHUNK) {
s.tokenId.Store(opcuaResponse.GetSecureTokenId())
s.channelId.Store(opcuaResponse.GetSecureChannelId())

consumer(messageBuffer)
} else {
s.log.Warn().Str("chunk", opcuaResponse.GetChunk()).Msg("Message discarded")
}
return nil
},
Expand Down Expand Up @@ -554,13 +558,14 @@ func (s *SecureChannel) onConnectCreateSessionRequest(ctx context.Context, conne
}

consumer := func(opcuaResponse []byte) {
message, err := readWriteModel.ExtensionObjectParseWithBuffer(ctx, utils.NewReadBufferByteBased(opcuaResponse, utils.WithByteOrderForReadBufferByteBased(binary.LittleEndian)), false)
extensionObject, err := readWriteModel.ExtensionObjectParseWithBuffer(ctx, utils.NewReadBufferByteBased(opcuaResponse, utils.WithByteOrderForReadBufferByteBased(binary.LittleEndian)), false)
if err != nil {
s.log.Error().Err(err).Msg("error parsing")
connection.fireConnectionError(err, ch)
return
}
if fault, ok := message.GetBody().(readWriteModel.ServiceFaultExactly); ok {
s.log.Trace().Stringer("extensionObject", extensionObject).Msg("looking at message")
if fault, ok := extensionObject.GetBody().(readWriteModel.ServiceFaultExactly); ok {
statusCode := fault.GetResponseHeader().(readWriteModel.ResponseHeader).GetServiceResult().GetStatusCode()
statusCodeByValue, _ := readWriteModel.OpcuaStatusCodeByValue(statusCode)
s.log.Error().
Expand All @@ -572,16 +577,11 @@ func (s *SecureChannel) onConnectCreateSessionRequest(ctx context.Context, conne
}
s.log.Debug().Msg("Got Create Session Response Connection Response")

extensionObject, err := readWriteModel.ExtensionObjectParseWithBuffer(ctx, utils.NewReadBufferByteBased(opcuaResponse, utils.WithByteOrderForReadBufferByteBased(binary.LittleEndian)), false)
if err != nil {
s.log.Error().Err(err).Msg("error parsing")
return
}
unknownExtensionObject := extensionObject.GetBody()
if responseMessage, ok := unknownExtensionObject.(readWriteModel.CreateSessionResponseExactly); ok {
s.authenticationToken = responseMessage.GetAuthenticationToken().GetNodeId()

go s.onConnectActivateSessionRequest(ctx, connection, ch, responseMessage, message.GetBody().(readWriteModel.CreateSessionResponse))
go s.onConnectActivateSessionRequest(ctx, connection, ch, responseMessage, responseMessage)
} else {
serviceFault := unknownExtensionObject.(readWriteModel.ServiceFault)
header := serviceFault.GetResponseHeader().(readWriteModel.ResponseHeader)
Expand All @@ -607,6 +607,7 @@ func (s *SecureChannel) onConnectActivateSessionRequest(ctx context.Context, con
connection.fireConnectionError(err, ch)
return
}
s.log.Debug().Interface("senderCertificate", certificate).Msg("working with senderCertificate")
s.encryptionHandler.setServerCertificate(certificate)
s.senderNonce = sessionResponse.GetServerNonce().GetStringValue()
endpoints := make([]string, 3)
Expand All @@ -625,8 +626,10 @@ func (s *SecureChannel) onConnectActivateSessionRequest(ctx context.Context, con
connection.fireConnectionError(err, ch)
return
}
println("wadafack\n" + s.tokenType.String() + "\n" + s.policyId.String())

userIdentityToken := s.getIdentityToken(s.tokenType, s.policyId.GetStringValue())
println("ananas\n" + userIdentityToken.String())

requestHandle := s.getRequestHandle()

Expand All @@ -649,7 +652,8 @@ func (s *SecureChannel) onConnectActivateSessionRequest(ctx context.Context, con
0,
nil,
userIdentityToken,
clientSignature)
clientSignature,
)

identifier, err := strconv.ParseUint(activateSessionRequest.GetIdentifier(), 10, 16)
if err != nil {
Expand Down Expand Up @@ -684,6 +688,7 @@ func (s *SecureChannel) onConnectActivateSessionRequest(ctx context.Context, con
s.log.Error().Err(err).Msg("error parsing")
return
}
s.log.Trace().Stringer("message", message).Msg("looking at message")
if fault, ok := message.GetBody().(readWriteModel.ServiceFaultExactly); ok {
statusCode := fault.GetResponseHeader().(readWriteModel.ResponseHeader).GetServiceResult().GetStatusCode()
statusCodeByValue, _ := readWriteModel.OpcuaStatusCodeByValue(statusCode)
Expand Down Expand Up @@ -780,6 +785,7 @@ func (s *SecureChannel) onDisconnect(ctx context.Context, connection *Connection
s.log.Error().Err(err).Msg("error parsing")
return
}
s.log.Trace().Stringer("message", message).Msg("looking at message")
if fault, ok := message.GetBody().(readWriteModel.ServiceFaultExactly); ok {
statusCode := fault.GetResponseHeader().(readWriteModel.ResponseHeader).GetServiceResult().GetStatusCode()
statusCodeByValue, _ := readWriteModel.OpcuaStatusCodeByValue(statusCode)
Expand Down Expand Up @@ -1566,9 +1572,9 @@ func (s *SecureChannel) hasIdentity(policies []readWriteModel.UserTokenPolicy) {

// getIdentityToken creates an IdentityToken to authenticate with a server.
// - @param tokenType the token type
// - @param securityPolicy the security policy
// - @param policyId the policy id
// - @return returns an ExtensionObject with an IdentityToken.
func (s *SecureChannel) getIdentityToken(tokenType readWriteModel.UserTokenType, value string) readWriteModel.ExtensionObject {
func (s *SecureChannel) getIdentityToken(tokenType readWriteModel.UserTokenType, policyId string) readWriteModel.ExtensionObject {
switch tokenType {
case readWriteModel.UserTokenType_userTokenTypeAnonymous:
//If we aren't using authentication tell the server we would like to log in anonymously
Expand All @@ -1584,7 +1590,7 @@ func (s *SecureChannel) getIdentityToken(tokenType readWriteModel.UserTokenType,
return readWriteModel.NewExtensionObject(
extExpandedNodeId,
readWriteModel.NewExtensionObjectEncodingMask(false, false, true),
readWriteModel.NewUserIdentityToken(readWriteModel.NewPascalString(s.securityPolicy), anonymousIdentityToken),
readWriteModel.NewUserIdentityToken(readWriteModel.NewPascalString(policyId), anonymousIdentityToken),
false,
)
case readWriteModel.UserTokenType_userTokenTypeUserName:
Expand Down Expand Up @@ -1620,7 +1626,7 @@ func (s *SecureChannel) getIdentityToken(tokenType readWriteModel.UserTokenType,
return readWriteModel.NewExtensionObject(
extExpandedNodeId,
readWriteModel.NewExtensionObjectEncodingMask(false, false, true),
readWriteModel.NewUserIdentityToken(readWriteModel.NewPascalString(s.securityPolicy), userNameIdentityToken),
readWriteModel.NewUserIdentityToken(readWriteModel.NewPascalString(policyId), userNameIdentityToken),
false,
)
}
Expand Down

0 comments on commit a197af1

Please sign in to comment.