-
Notifications
You must be signed in to change notification settings - Fork 0
/
pubsub.go
136 lines (109 loc) · 3.34 KB
/
pubsub.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
//Package pubsub implements a simple Publish Subscribe Pattern in pure golang
package pubsub
import (
"context"
"sync"
)
// Message is the format in which data is being passed from one channel to another
type Message struct {
ChannelName string
Value interface{}
}
//Subscription will be returned after on calling NewSubscription.
type Subscription struct {
wg *sync.WaitGroup
pubsub *PubSub
cancel context.CancelFunc
context context.Context
pipeChannel chan Message
subscribedChannels []string
}
//PubSub is type which encapsulates the data related to the publish-subscriber pattern implementation.
//It is to be instantiated only using New() function
type PubSub struct {
context context.Context //Application Context
// Map of channelName to the list of subscriptions which have subscribed to this channel
subscriptions map[string][]*Subscription
}
//The function to instantiate PubSub
func New(ctx context.Context) *PubSub {
return &PubSub{
context: ctx,
subscriptions: make(map[string][]*Subscription),
}
}
//Publish data to the specified channels
func (pubsub *PubSub) Publish(channelNames []string, value interface{}) {
for _, channelName := range channelNames {
subscriptions := pubsub.subscriptions[channelName]
message := Message{
ChannelName: channelName,
Value: value,
}
for _, subscription := range subscriptions {
subscription.wg.Add(1)
go func(s *Subscription) {
defer s.wg.Done()
select {
case s.pipeChannel <- message:
return
case <-pubsub.context.Done():
return
case <-s.context.Done():
return
}
}(subscription)
}
}
}
//Create a NewSubscription
func (pubsub *PubSub) NewSubscription(channelNames []string) *Subscription {
pipeChannel := make(chan Message)
ctx, cancel := context.WithCancel(context.Background())
sub := Subscription{
pipeChannel: pipeChannel,
cancel: cancel,
context: ctx,
wg: new(sync.WaitGroup),
subscribedChannels: make([]string, 0),
pubsub: pubsub,
}
for _, channelName := range channelNames {
sub.subscribedChannels = append(sub.subscribedChannels, channelName)
pubsub.subscriptions[channelName] = append(pubsub.subscriptions[channelName], &sub)
}
return &sub
}
//The channel returned will recieve data from the published data to the subscribed channels
func (subscription *Subscription) Channel() <-chan Message {
return subscription.pipeChannel
}
//Helper function to remove element at an index
func remove(s []*Subscription, i int) []*Subscription {
s[len(s)-1], s[i] = s[i], s[len(s)-1]
return s[:len(s)-1]
}
func indexOf(s *Subscription, subscriptions []*Subscription) (index int) {
index = -1
for i, sub := range subscriptions {
if sub == s {
index = i
}
}
return
}
//Unsubscribe to the subscribed channels
func (subscription *Subscription) UnSubscribe() {
pubsub := subscription.pubsub
subscription.cancel()
subscription.wg.Wait()
for _, channelName := range subscription.subscribedChannels {
idx := indexOf(subscription, pubsub.subscriptions[channelName])
if idx != -1 {
pubsub.subscriptions[channelName] = remove(pubsub.subscriptions[channelName], idx)
}
}
}
func (subscription *Subscription) GetSubscribedChannels() []string {
return subscription.subscribedChannels
}