-
Notifications
You must be signed in to change notification settings - Fork 0
/
sender.go
106 lines (91 loc) · 2.44 KB
/
sender.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
// Copyright (c) 2023 Janik Liebrecht
// Use of this source code is governed by the MIT License that can be found in the LICENSE file.
package artemis
import (
"bytes"
"encoding/gob"
"encoding/json"
"fmt"
"github.com/go-stomp/stomp/v3"
)
type encoding int
const (
EncodingGob encoding = iota
EncodingJson
)
// A Sender sends messages to the artemis broker.
type Sender struct {
// Addr is the address of the broker.
Addr string
// Dest is the default destination.
Dest string
// PubSub configures the type of destination.
// 'true' for the publish-subscribe pattern (topics),
// 'false' for the producer-consumer pattern (queues).
PubSub bool
// Enc specifies the encoding.
// The default encoding is gob.
Enc encoding
}
// SendTo sends messages to a specified destination.
func (s *Sender) SendTo(destination string, messages ...any) error {
conn, err := stomp.Dial("tcp", s.Addr)
if err != nil {
return fmt.Errorf("could not connect to broker %s: %v", s.Addr, err)
}
defer conn.Disconnect()
destType := s.destType()
for _, msg := range messages {
m, err := encode(msg, s.Enc)
if err != nil {
return fmt.Errorf("failed to encode message: %v: %v", msg, err)
}
err = conn.Send(destination, "text/plain", m,
stomp.SendOpt.Header("destination-type", destType))
if err != nil {
return fmt.Errorf("could not send to destination %s: %v", destination, err)
}
}
return nil
}
// Send sends messages to the default destination.
func (s *Sender) Send(messages ...any) error {
if s.Dest == "" {
return fmt.Errorf("no default destination specified")
}
return s.SendTo(s.Dest, messages...)
}
func (s *Sender) destType() string {
if s.PubSub {
return "MULTICAST"
} else {
return "ANYCAST"
}
}
func encode(message any, enc encoding) ([]byte, error) {
switch enc {
case EncodingGob:
return encodeGob(message)
case EncodingJson:
return encodeJson(message)
default:
return nil, fmt.Errorf("unknown encoding: %v", enc)
}
}
func encodeGob(message any) ([]byte, error) {
gob.Register(message)
buff := bytes.Buffer{}
enc := gob.NewEncoder(&buff)
err := enc.Encode(&message) // Pass pointer to interface so Encode sees a value of interface type.
if err != nil {
return nil, fmt.Errorf("could not encode as gob: %v", err)
}
return buff.Bytes(), nil
}
func encodeJson(message any) ([]byte, error) {
b, err := json.Marshal(message)
if err != nil {
return nil, fmt.Errorf("could not marshal as json: %v", err)
}
return b, nil
}