/
messenger.go
89 lines (74 loc) · 2.32 KB
/
messenger.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
package common
import (
"fmt"
"os"
"github.com/ElrondNetwork/arwen-wasm-vm/ipc/marshaling"
logger "github.com/ElrondNetwork/elrond-go-logger"
)
var log = logger.GetOrCreate("arwen/baseMessenger")
// Messenger intermediates communication (message exchange) via pipes
type Messenger struct {
Name string
Nonce uint32
receiver *Receiver
sender *Sender
}
// NewMessengerPipes creates a new messenger from pipes
func NewMessengerPipes(name string, reader *os.File, writer *os.File, marshalizer marshaling.Marshalizer) *Messenger {
return &Messenger{
Name: name,
receiver: NewReceiver(reader, marshalizer),
sender: NewSender(writer, marshalizer),
}
}
// NewMessenger creates a new messenger
func NewMessenger(name string, receiver *Receiver, sender *Sender) *Messenger {
return &Messenger{
Name: name,
receiver: receiver,
sender: sender,
}
}
// Send sends a message over the pipe
func (messenger *Messenger) Send(message MessageHandler) error {
messenger.Nonce++
message.SetNonce(messenger.Nonce)
length, err := messenger.sender.Send(message)
log.Trace(fmt.Sprintf("[%s][#%d]: SENT message", messenger.Name, message.GetNonce()), "size", length, "msg", message.DebugString())
return err
}
// Receive receives a message, reads it from the pipe
func (messenger *Messenger) Receive(timeout int) (MessageHandler, error) {
log.Trace(fmt.Sprintf("[%s]: Receive message...", messenger.Name))
message, length, err := messenger.receiver.Receive(timeout)
if err != nil {
return nil, err
}
log.Trace(fmt.Sprintf("[%s][#%d]: RECEIVED message", messenger.Name, message.GetNonce()), "size", length, "msg", message.DebugString())
messageNonce := message.GetNonce()
if messageNonce != messenger.Nonce+1 {
return nil, ErrInvalidMessageNonce
}
messenger.Nonce = messageNonce
return message, nil
}
// Reset resets the messenger
func (messenger *Messenger) Reset() {
messenger.ResetDialogue()
}
// ResetDialogue resets the dialogue nonce
func (messenger *Messenger) ResetDialogue() {
messenger.Nonce = 0
}
// Shutdown closes the pipes
func (messenger *Messenger) Shutdown() {
log.Debug("Messenger.Shutdown()")
err := messenger.receiver.Shutdown()
if err != nil {
log.Error("Cannot close receiver", "err", err)
}
err = messenger.sender.Shutdown()
if err != nil {
log.Error("Cannot close sender", "err", err)
}
}