/
memory.go
125 lines (105 loc) · 3.65 KB
/
memory.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
package memory
import (
"fmt"
"sync"
"github.com/Bendomey/nucleo-go"
"github.com/Bendomey/nucleo-go/serializer"
"github.com/Bendomey/nucleo-go/transit"
"github.com/Bendomey/nucleo-go/utils"
log "github.com/sirupsen/logrus"
)
type Subscription struct {
id string
transporterId string
handler transit.TransportHandler
active bool
}
type SharedMemory struct {
handlers map[string][]Subscription
mutex *sync.Mutex
}
type MemoryTransporter struct {
prefix string
instanceID string
logger *log.Entry
memory *SharedMemory
}
func Create(logger *log.Entry, memory *SharedMemory) MemoryTransporter {
instanceID := utils.RandomString(5)
if memory.handlers == nil {
memory.handlers = make(map[string][]Subscription)
}
if memory.mutex == nil {
memory.mutex = &sync.Mutex{}
}
return MemoryTransporter{memory: memory, logger: logger, instanceID: instanceID}
}
func (transporter *MemoryTransporter) SetPrefix(prefix string) {
transporter.prefix = prefix
}
func (transporter *MemoryTransporter) SetNodeID(nodeID string) {
}
func (transporter *MemoryTransporter) SetSerializer(serializer serializer.Serializer) {
}
func (transporter *MemoryTransporter) Connect() chan error {
transporter.logger.Debugln("[Mem-Trans-", transporter.instanceID, "] -> Connecting() ...")
endChan := make(chan error)
go func() {
endChan <- nil
}()
transporter.logger.Infoln("[Mem-Trans-", transporter.instanceID, "] -> Connected() !")
return endChan
}
func (transporter *MemoryTransporter) Disconnect() chan error {
endChan := make(chan error)
transporter.logger.Debugln("[Mem-Trans-", transporter.instanceID, "] -> Disconnecting() ...")
newHandlers := map[string][]Subscription{}
for key, subscriptions := range transporter.memory.handlers {
keep := []Subscription{}
for _, subscription := range subscriptions {
if subscription.transporterId != transporter.instanceID {
keep = append(keep, subscription)
}
}
newHandlers[key] = keep
}
transporter.memory.handlers = newHandlers
go func() {
endChan <- nil
}()
transporter.logger.Infoln("[Mem-Trans-", transporter.instanceID, "] -> Disconnected() !")
return endChan
}
func topicName(transporter *MemoryTransporter, command string, nodeID string) string {
if nodeID != "" {
return fmt.Sprint(transporter.prefix, ".", command, ".", nodeID)
}
return fmt.Sprint(transporter.prefix, ".", command)
}
func (transporter *MemoryTransporter) Subscribe(command string, nodeID string, handler transit.TransportHandler) {
topic := topicName(transporter, command, nodeID)
transporter.logger.Traceln("[Mem-Trans-", transporter.instanceID, "] Subscribe() listen for command: ", command, " nodeID: ", nodeID, " topic: ", topic)
subscription := Subscription{utils.RandomString(5) + "_" + command, transporter.instanceID, handler, true}
transporter.memory.mutex.Lock()
_, exists := transporter.memory.handlers[topic]
if exists {
transporter.memory.handlers[topic] = append(transporter.memory.handlers[topic], subscription)
} else {
transporter.memory.handlers[topic] = []Subscription{subscription}
}
transporter.memory.mutex.Unlock()
}
func (transporter *MemoryTransporter) Publish(command, nodeID string, message nucleo.Payload) {
topic := topicName(transporter, command, nodeID)
transporter.logger.Traceln("[Mem-Trans-", transporter.instanceID, "] Publish() command: ", command, " nodeID: ", nodeID, " message: \n", message, "\n - end")
transporter.memory.mutex.Lock()
subscriptions, exists := transporter.memory.handlers[topic]
transporter.memory.mutex.Unlock()
if exists {
for _, subscription := range subscriptions {
if subscription.active {
go subscription.handler(message)
}
}
}
}