-
Notifications
You must be signed in to change notification settings - Fork 0
/
message.go
195 lines (164 loc) · 4.66 KB
/
message.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
package twipi
import (
"net"
"net/http"
"time"
"github.com/diamondburned/twikit/utils/pubsub"
"github.com/diamondburned/twikit/utils/srvutil"
"github.com/diamondburned/twikit/logger"
"github.com/go-chi/chi/v5"
"github.com/pkg/errors"
"github.com/twilio/twilio-go/twiml"
)
// PhoneNumber is a phone number.
type PhoneNumber string
// Message describes an SMS message.
type Message struct {
From PhoneNumber
To PhoneNumber
Body string
replier chan Message
}
func (m *Message) tryReply(msg Message) bool {
if m.replier == nil {
return false
}
select {
case m.replier <- msg:
return true
default:
return false
}
}
// MessageHandler is a handler for incoming messages.
type MessageHandler struct {
subs pubsub.Subscriber[Message]
msgs chan Message
stop chan struct{}
incomingPath string
deliveryPath string
}
var _ WebhookRegisterer = (*MessageHandler)(nil)
// NewMessageHandler creates a new MessageHandler. The user should give the
// constructor the paths for the incoming webhook and the delivery webhook. If
// any of the paths are empty, the corresponding webhook will be disabled.
func NewMessageHandler(incomingPath, deliveryPath string) *MessageHandler {
h := &MessageHandler{
msgs: make(chan Message),
stop: make(chan struct{}),
incomingPath: incomingPath,
deliveryPath: deliveryPath,
}
h.subs.Listen(h.msgs)
return h
}
// SubscribeMessages subscribes the given channel to incoming messages. If
// recipient is not empty, only messages sent to that recipient will be
// published to the channel.
func (l *MessageHandler) SubscribeMessages(recipient PhoneNumber, ch chan<- Message) {
filter := func(msg Message) bool { return true }
if recipient != "" {
filter = func(msg Message) bool { return msg.To == recipient }
}
l.subs.Subscribe(ch, filter)
}
// UnsubscribeMessages unsubscribes the given channel from incoming messages.
func (l *MessageHandler) UnsubscribeMessages(ch chan<- Message) {
l.subs.Unsubscribe(ch)
}
// Mount implements WebhookRegisterer.
func (l *MessageHandler) Mount(r chi.Router) {
r.Use(srvutil.ParseForm)
if l.incomingPath != "" {
r.Get(l.incomingPath, l.handleIncoming)
r.Post(l.incomingPath, l.handleIncoming)
}
if l.deliveryPath != "" {
r.Get(l.deliveryPath, l.postDelivery)
r.Post(l.deliveryPath, l.postDelivery)
}
}
// Close implements WebhookRegisterer.
func (l *MessageHandler) Close() error {
if l.msgs == nil {
return net.ErrClosed
}
close(l.stop)
l.msgs = nil
return nil
}
type incomingMessage struct {
MessageSID string `schema:"MessageSid"`
SMSSID string `schema:"SmsSid"`
AccountSID string `schema:"AccountSid"`
MessagingServiceSID string `schema:"MessagingServiceSid"`
From string `schema:"From"`
To string `schema:"To"`
Body string `schema:"Body"`
}
func (l *MessageHandler) handleIncoming(w http.ResponseWriter, r *http.Request) {
incoming := incomingMessage{
MessageSID: r.FormValue("MessageSid"),
SMSSID: r.FormValue("SmsSid"),
AccountSID: r.FormValue("AccountSid"),
MessagingServiceSID: r.FormValue("MessagingServiceSid"),
From: r.FormValue("From"),
To: r.FormValue("To"),
Body: r.FormValue("Body"),
}
replyCh := make(chan Message, 1)
msg := Message{
From: PhoneNumber(incoming.From),
To: PhoneNumber(incoming.To),
Body: incoming.Body,
replier: replyCh,
}
go func() {
select {
case l.msgs <- msg:
case <-l.stop:
}
}()
timeout := time.NewTimer(3 * time.Second)
defer timeout.Stop()
var reply Message
select {
case reply = <-replyCh:
// ok, do the reply
case <-timeout.C:
// Timeout. Block the channel by trying to fill it with an empty
// message. If the channel is already full, then the message will be
// taken.
select {
case replyCh <- Message{}:
// We blocked the channel. Do nothing.
case reply = <-replyCh:
// The channel was already blocked. Do the reply.
}
case <-r.Context().Done():
return
}
xml, err := messageToTwiML(reply)
if err != nil {
log := logger.FromContext(r.Context())
log.Println("twipi: failed to encode TwiML:", err)
return
}
w.Header().Set("Content-Type", "application/xml")
w.Write(xml)
}
func messageToTwiML(msg Message) ([]byte, error) {
xmlDoc, rootElem := twiml.CreateDocument()
if msg.Body != "" {
token := xmlDoc.CreateElement("Message")
token.SetText(msg.Body)
rootElem.AddChild(token)
}
xml, err := twiml.ToXML(xmlDoc)
if err != nil {
return nil, errors.Wrap(err, "failed to marshal TwiML")
}
return []byte(xml), nil
}
func (l *MessageHandler) postDelivery(w http.ResponseWriter, r *http.Request) {
}