-
Notifications
You must be signed in to change notification settings - Fork 14
/
gateway.clj
315 lines (273 loc) · 13.9 KB
/
gateway.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
(ns discord.gateway
"This implements the Discord Gateway protocol"
(:require [clojure.core.async :refer [>! <! go go-loop] :as async]
[clojure.data.json :as json]
[clojure.set :refer [map-invert]]
[gniazdo.core :as ws]
[taoensso.timbre :as timbre]
[discord.http :as http]
[discord.permissions :as perm]
[discord.types :refer [Authenticated] :as types]
[discord.config :as config]))
;;; Representing a message from the API
(defrecord Message [content attachments embeds sent-time channel author user-mentions role-mentions
pinned? everyone-mentioned? id])
(defn- string->long [s]
(try
(Long/parseLong s)
(catch NumberFormatException nfe s)))
(defn build-message
"Builds a Message record based on the incoming Message from the Discord Gateway. The Gateway
record that received the message is passed as the second argument to this function."
[message-map gateway]
(let [user-wrap (fn [user-map] {:user user-map})
author (http/build-user (user-wrap (get-in message-map [:d :author])))
channel (http/get-channel gateway (get-in message-map [:d :channel_id]))
users (map (comp http/build-user user-wrap) (get-in message-map [:d :mentions]))
roles (map (comp http/build-user user-wrap) (get-in message-map [:d :role_mentions]))]
(map->Message
{:author author
:user-mentions users
:role-mentions roles
:channel channel
:everyone-mentioned? (get-in message-map [:d :mention_everyone])
:content (get-in message-map [:d :content])
:embeds (get-in message-map [:d :embeds])
:attachments (get-in message-map [:d :attachments])
:pinned? (get-in message-map [:d :pinned])
:id (string->long (get-in message-map [:d :id]))})))
;;; Implementing Discord Gateway behaviour
(defprotocol Gateway
(send-message [this message]))
(defrecord DiscordGateway [url shards websocket auth seq-num session-id heartbeat-interval
stop-heartbeat-channel]
java.io.Closeable
(close [this]
(when (:websocket this)
(ws/close @(:websocket this)))
(when (:stop-heartbeat-channel this)
(async/close! (:stop-heartbeat-channel this))))
Authenticated
(token [this]
(types/token (:auth this)))
(token-type [this]
(types/token-type (:auth this)))
Gateway
(send-message [this message]
(ws/send-msg @(:websocket this) (json/write-str message))))
(defn build-gateway [gateway-response]
(let [gateway-map (into {} gateway-response)
url (format "%s?v=%s&encoding=%s" (:url gateway-map) types/api-version "json")]
(map->DiscordGateway (assoc gateway-map :url url))))
;;; The different kinds of messages that we can receive from Discord
(defonce message-name->code
{:dispatch 0
:heartbeat 1
:identify 2
:presence 3
:voice-state 4
:voice-ping 5
:resume 6
:reconnect 7
:request-members 8
:invalidate-session 9
:hello 10
:heartbeat-ack 11
:guild-sync 12})
(defonce message-code->name
(map-invert message-name->code))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;;; Handling server EVENTS
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(defmulti handle-gateway-control-event
"Handling server control events. Control events are control messages sent by the Gateway to a
connected client to inform the client about disconnects, reconnects, rate limits, etc."
(fn [discord-event gateway receive-chan]
(message-code->name (:op discord-event))))
(defmethod handle-gateway-control-event :hello
[discord-event gateway receive-chan]
;;; Handle the initial "HELLO" message, which sets the heartbeat-interval
(let [new-heartbeat (get-in discord-event [:d :heartbeat_interval])
heartbeat-atom (:heartbeat-interval gateway)]
(timbre/infof "Setting heartbeat interval to %d milliseconds" new-heartbeat)
(reset! heartbeat-atom new-heartbeat)))
;;; Since there is nothing to do regarding a heartback ACK message, we'll just ignore it.
(defmethod handle-gateway-control-event :heartbeat-ack [& _])
(defmethod handle-gateway-control-event :default
[discord-event gateway receive-chan]
(timbre/infof "Event of Type: %s" (message-code->name (:op discord-event))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;;; Handles non-control-flow messages from the Discord gateway. Any messages dealing with metadata
;;; surrounding our gateway connection is handled above by handle-gateway-control-event
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(defmulti handle-gateway-message
"Handling messages sent by the Discord API gateway
These are messages that are received from the gateway and include text messages sent by
users/clients as well as various control messages sent by the Discord gateway."
(fn [discord-message gateway receive-chan]
(keyword (:t discord-message))))
;;; These are messages that are currently not explicitly handled by the framework, but that we don't
;;; want to explicitly handle.
(defmethod handle-gateway-message :PRESENCE_UPDATE [& _])
(defmethod handle-gateway-message :GUILD_CREATE [& _])
(defmethod handle-gateway-message :CHANNEL_CREATE [& _])
(defmethod handle-gateway-message :TYPING_START [& _])
(defmethod handle-gateway-message :MESSAGE_DELETE [& _])
(defmethod handle-gateway-message :MESSAGE_UPDATE [& _])
(defmethod handle-gateway-message :READY
[discord-message gateway receive-chan]
(let [session-id (get-in discord-message [:d :session-id])
session-id-atom (:session-id gateway)]
(reset! session-id-atom session-id)))
(defmethod handle-gateway-message :HELLO
[discord-message gateway receive-chan]
(timbre/info "RECEIVED HELLO MESSAGE"))
;;; If it's a user message, put it on the receive channel for parsing by the client
(defmethod handle-gateway-message :MESSAGE_CREATE
[discord-message gateway receive-chan]
(let [message (build-message discord-message gateway)]
(go (>! receive-chan message))))
(defmethod handle-gateway-message :GUILD_ROLE_UPDATE
[discord-message gateway receive-chan]
(let [guild-id (get-in discord-message [:d :guild_id])
role-name (get-in discord-message [:d :role :name])]
(timbre/infof "Role \"%s\" updated, clearing guild role cache for guild %s." role-name guild-id)
(perm/clear-role-cache! guild-id)))
(defmethod handle-gateway-message :GUILD_ROLE_CREATE
[discord-message gateway receive-chan]
(let [guild-id (get-in discord-message [:d :guild_id])
role-name (get-in discord-message [:d :role :name])]
(timbre/infof "Role \"%s\" created, clearing guild role cache for guild %s." role-name guild-id)
(perm/clear-role-cache! guild-id)))
(defmethod handle-gateway-message nil
[discord-message gateway receive-chan]
;; A message type of "nil" the message is an event that is handle differently
(handle-gateway-control-event discord-message gateway receive-chan))
(defmethod handle-gateway-message :default
[discord-message gateway receive-chan]
(timbre/infof "Unknown message of type %s received: %s" (keyword (:t discord-message)) discord-message))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;;; Helper functions to send some common control messages, such as heartbeats or identification
;;; messages, to the Discord gateway.
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(defn format-gateway-message
"Builds the correct map structure with the correct op-codes. If the op-code supplied is not found,
an (ex-info) Exception will be raised"
[op data]
(if-let [op-code (message-name->code op)]
{:op op-code :d data}
(throw (ex-info "Unknown op-code" {:op-code op}))))
(defn send-identify
"Sends an identification message to the supplied Gateway. This tells the Discord gateway
information about ourselves."
[gateway]
(let [identify (format-gateway-message
:identify
{:token (types/token gateway)
:properties {"$os" "linux"
"$browser" "discord.clj"
"$device" "discord.clj"
"$referrer" ""
"$referring_domain" ""}
:compress false
:large_threshold 250
:shard [0 (:shards gateway)]})]
(send-message gateway identify)))
(defn send-heartbeat [gateway seq-num]
(let [heartbeat (format-gateway-message :heartbeat @seq-num)]
(send-message gateway heartbeat)))
(defn send-resume [gateway]
(let [session-id (:session-id gateway)
seq-num @(:seq-num gateway)]
(send-message
gateway
(format-gateway-message
:resume
{:token (types/token gateway)
:session_id session-id
:seq seq-num}))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;;; Establishing a connection to the Discord gateway and begin reading messages from it.
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(defn- handle-message
"Parses a message coming from the server.
1: Parses the message body
2: Attempts to update the seq-num atom with the highest available message.
3: Passes the message received onto the handle-gateway-message handler. That handler will either
directly handle the message, pass it into to a more appropriate handler, or publish it to the
receive-channel attached to the Gateway."
[raw-message gateway receive-channel]
(let [message (json/read-str raw-message :key-fn keyword)
next-sequence-number (:s message)
seq-num (:seq-num gateway)]
;; Update the sequence number (if present)
(if next-sequence-number
(swap! seq-num max next-sequence-number))
;; Pass the message on to the handler
(handle-gateway-message message gateway receive-channel)))
(declare reconnect-gateway)
(defn- create-websocket
"Creates websocket and connects to the Discord gateway."
[gateway]
(let [receive-channel (:receive-channel gateway)
gateway-url (:url gateway)]
(ws/connect
gateway-url
:on-receive (fn [message]
(handle-message message gateway receive-channel))
:on-connect (fn [message] (timbre/info "Connected to Discord Gateway"))
:on-error (fn [message] (timbre/errorf "Error: %s" message))
:on-close (fn [status reason]
;; The codes above 1001 denote erroreous closure states
;; https://developer.mozilla.org/en-US/docs/Web/API/CloseEvent
(if (> 1001 status)
(do
(timbre/warnf "Socket closed for unexpected reason (%d): %s" status reason)
(timbre/warnf "Attempting to reconnect to websocket...")
(reconnect-gateway gateway))
(timbre/infof "Closing Gateway websocket, not reconnecting (%d)." status))))))
;;; There are a few elements of state that a Discord gateway connection needs to track, such as
;;; its sequence number, its heartbeat interval, the websocket connection, and its I/O channels.
(defn connect-to-gateway
"Attempts to connect to the discord Gateway using some supplied authentication source
Arguments:
auth : Authenticated -- An implementation of the Authenticated protcol to authenticate with the
Discord APIs.
receive-channel : Channel -- An asynchronous channel (core.async) that messages from the server
will be pushed onto."
[auth receive-channel]
(let [socket (atom nil)
seq-num (atom 0)
heartbeat-interval (atom 1000)
stop-heartbeat-channel (async/chan)
session-id (atom nil)
gateway (build-gateway (http/get-bot-gateway auth))
gateway (assoc gateway
:auth auth
:session-id session-id
:seq-num seq-num
:heartbeat-interval heartbeat-interval
:stop-heartbeat-channel stop-heartbeat-channel
:receive-channel receive-channel
:websocket socket)
websocket (create-websocket gateway)]
;; Assign the connected websocket to the Gateway's socket field
(reset! socket websocket)
;; Begin asynchronously sending heartbeat messages to the gateway
(go-loop []
(send-heartbeat gateway seq-num)
(async/alt!
stop-heartbeat-channel (timbre/warn "Websocket closed! Terminating heartbeat channel...")
(async/timeout @heartbeat-interval) (recur)))
;; Return the gateway that we created
gateway))
(defn reconnect-gateway
"In the event that we are disconnected from the Discord gateway, we will attempt to reconnect by
establishing a new websocket connection with the gateway. After this is complete, we will update
our current socket reference and then send a 'resume' message to the Discord gateway that we have
resumed our session."
[gateway]
(let [socket (:websocket gateway)
websocket (create-websocket gateway)]
(reset! socket websocket)
(send-resume gateway)))