-
Notifications
You must be signed in to change notification settings - Fork 10
/
pubsub.go
222 lines (187 loc) · 5.23 KB
/
pubsub.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
package p2p
import (
"context"
"encoding/json"
"fmt"
"github.com/libp2p/go-libp2p-core/peer"
pubsub "github.com/libp2p/go-libp2p-pubsub"
)
// Represents the default fallback room and user names
// if they aren't provided when the app is started
const defaultclient = "client"
const defaulttopic = "pubsub"
// A structure that represents a PubSub Chat Room
type PubSub struct {
// Represents the P2P Host for the PubSub
Host *P2P
// Represents the channel of incoming messages
Inbound chan chatmessage
// Represents the channel of outgoing messages
Outbound chan string
// Represents the channel of chat log messages
Logs chan chatlog
// Represents the client of the chat room
ClientName string
// Represent the topic of the user in the chat room
TopicName string
// Represents the host ID of the peer
selfid peer.ID
// Represents the chat room lifecycle context
psctx context.Context
// Represents the chat room lifecycle cancellation function
pscancel context.CancelFunc
// Represents the PubSub Topic of the PubSub
pstopic *pubsub.Topic
// Represents the PubSub Subscription for the topic
psub *pubsub.Subscription
}
// A structure that represents a chat message
type chatmessage struct {
Message string `json:"message"`
SenderID string `json:"senderid"`
SenderName string `json:"sendername"`
}
// A structure that represents a chat log
type chatlog struct {
logprefix string
logmsg string
}
func (c chatlog) String() string {
return fmt.Sprintf("%s: %s", c.logprefix, c.logmsg)
}
// A constructor function that generates and returns a new
// PubSub for a given P2PHost, username and roomname
func JoinPubSub(p2phost *P2P, clientName string, topicName string) (*PubSub, error) {
// Create a PubSub topic with the room name
topic, err := p2phost.PubSub.Join(fmt.Sprintf("pub-sub-p2p-%s", topicName))
// Check the error
if err != nil {
return nil, err
}
// Subscribe to the PubSub topic
sub, err := topic.Subscribe()
// Check the error
if err != nil {
return nil, err
}
// Check the provided clientname
if clientName == "" {
// Use the default client name
clientName = defaultclient
}
// Check the provided topicname
if topicName == "" {
// Use the default topic name
topicName = defaulttopic
}
// Create cancellable context
pubsubctx, cancel := context.WithCancel(context.Background())
// Create a PubSub object
PubSub := &PubSub{
Host: p2phost,
Inbound: make(chan chatmessage),
Outbound: make(chan string),
Logs: make(chan chatlog),
psctx: pubsubctx,
pscancel: cancel,
pstopic: topic,
psub: sub,
ClientName: clientName,
TopicName: topicName,
selfid: p2phost.Host.ID(),
}
// Start the subscribe loop
go PubSub.SubLoop()
// Start the publish loop
go PubSub.PubLoop()
// Return the PubSub
return PubSub, nil
}
// A method of PubSub that publishes a chatmessage
// to the PubSub topic until the pubsub context closes
func (cr *PubSub) PubLoop() {
for {
select {
case <-cr.psctx.Done():
return
case message := <-cr.Outbound:
// Create a ChatMessage
m := chatmessage{
Message: message,
SenderID: cr.selfid.Pretty(),
SenderName: cr.ClientName,
}
// Marshal the ChatMessage into a JSON
messagebytes, err := json.Marshal(m)
if err != nil {
cr.Logs <- chatlog{logprefix: "puberr", logmsg: "could not marshal JSON"}
continue
}
// Publish the message to the topic
err = cr.pstopic.Publish(cr.psctx, messagebytes)
if err != nil {
cr.Logs <- chatlog{logprefix: "puberr", logmsg: "could not publish to topic"}
continue
}
}
}
}
// A method of PubSub that continously reads from the subscription
// until either the subscription or pubsub context closes.
// The recieved message is parsed sent into the inbound channel
func (cr *PubSub) SubLoop() {
// Start loop
for {
select {
case <-cr.psctx.Done():
return
default:
// Read a message from the subscription
message, err := cr.psub.Next(cr.psctx)
// Check error
if err != nil {
// Close the messages queue (subscription has closed)
close(cr.Inbound)
cr.Logs <- chatlog{logprefix: "suberr", logmsg: "subscription has closed"}
return
}
// Check if message is from self
if message.ReceivedFrom == cr.selfid {
continue
}
// Declare a ChatMessage
cm := &chatmessage{}
// Unmarshal the message data into a ChatMessage
err = json.Unmarshal(message.Data, cm)
if err != nil {
cr.Logs <- chatlog{logprefix: "suberr", logmsg: "could not unmarshal JSON"}
continue
}
// Send the ChatMessage into the message queue
cr.Inbound <- *cm
}
}
}
// A method of PubSub that returns a list
// of all peer IDs connected to it
func (cr *PubSub) PeerList() []peer.ID {
// Return the slice of peer IDs connected to chat room topic
return cr.pstopic.ListPeers()
}
// A method of PubSub that updates the chat
// room by subscribing to the new topic
func (cr *PubSub) Exit() {
defer cr.pscancel()
// Cancel the existing subscription
cr.psub.Cancel()
// Close the topic handler
cr.pstopic.Close()
}
// A method of PubSub that updates the chat user name
func (cr *PubSub) UpdateUser(username string) {
cr.ClientName = username
}
// Insert Msg to Outbound
func (cr *PubSub) InsertOutbound(str string) {
cr.Outbound <- str
}