-
Notifications
You must be signed in to change notification settings - Fork 3
/
events_gateway.go
167 lines (129 loc) · 4.36 KB
/
events_gateway.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
package internal
import (
"context"
"encoding/json"
"errors"
"fmt"
"strconv"
"time"
"github.com/WelcomerTeam/Discord/discord"
sandwich_structs "github.com/WelcomerTeam/Sandwich-Daemon/structs"
"nhooyr.io/websocket"
)
const MagicDecimalBase = 10
func gatewayOpDispatch(ctx context.Context, sh *Shard, msg discord.GatewayPayload, trace sandwich_structs.SandwichTrace) error {
sh.Sequence.Store(msg.Sequence)
trace["dispatch"] = discord.Int64(time.Now().Unix())
go func(msg discord.GatewayPayload, trace sandwich_structs.SandwichTrace) {
sh.Sandwich.EventsInflight.Inc()
defer sh.Sandwich.EventsInflight.Dec()
err := sh.OnDispatch(ctx, msg, trace)
if err != nil && !errors.Is(err, ErrNoDispatchHandler) {
sh.Logger.Error().Err(err).Msg("State dispatch failed")
}
}(msg, trace)
return nil
}
func gatewayOpHeartbeat(ctx context.Context, sh *Shard, msg discord.GatewayPayload, trace sandwich_structs.SandwichTrace) error {
err := sh.SendEvent(ctx, discord.GatewayOpHeartbeat, sh.Sequence.Load())
if err != nil {
go sh.Sandwich.PublishSimpleWebhook(
"Failed to send heartbeat",
"`"+err.Error()+"`",
fmt.Sprintf(
"Manager: %s ShardGroup: %d ShardID: %d/%d",
sh.Manager.Configuration.Identifier,
sh.ShardGroup.ID,
sh.ShardID,
sh.ShardGroup.ShardCount,
),
EmbedColourDanger,
)
err = sh.Reconnect(websocket.StatusNormalClosure)
if err != nil {
sh.Logger.Error().Err(err).Msg("Failed to reconnect")
return err
}
}
return nil
}
func gatewayOpReconnect(ctx context.Context, sh *Shard, msg discord.GatewayPayload, trace sandwich_structs.SandwichTrace) error {
sh.Logger.Info().Msg("Reconnecting in response to gateway")
err := sh.Reconnect(WebsocketReconnectCloseCode)
if err != nil {
sh.Logger.Error().Err(err).Msg("Failed to reconnect")
return err
}
return nil
}
type invalidSession struct {
Resumable bool `json:"d"`
}
func gatewayOpInvalidSession(ctx context.Context, sh *Shard, msg discord.GatewayPayload, trace sandwich_structs.SandwichTrace) error {
invalidSession := invalidSession{}
err := json.Unmarshal(msg.Data, &invalidSession)
if err != nil {
return err
}
if !invalidSession.Resumable {
sh.SessionID.Store("")
sh.Sequence.Store(0)
}
sh.Logger.Warn().Bool("resumable", invalidSession.Resumable).Msg("Received invalid session")
go sh.Sandwich.PublishSimpleWebhook(
"Received invalid session from gateway",
"",
fmt.Sprintf(
"Manager: %s ShardGroup: %d ShardID: %d/%d",
sh.Manager.Configuration.Identifier,
sh.ShardGroup.ID,
sh.ShardID,
sh.ShardGroup.ShardCount,
),
EmbedColourSandwich,
)
err = sh.Reconnect(WebsocketReconnectCloseCode)
if err != nil {
sh.Logger.Error().Err(err).Msg("Failed to reconnect")
return err
}
return nil
}
func gatewayOpHello(ctx context.Context, sh *Shard, msg discord.GatewayPayload, trace sandwich_structs.SandwichTrace) error {
hello := discord.Hello{}
err := sh.decodeContent(msg, &hello)
if err != nil {
return err
}
now := time.Now().UTC()
sh.LastHeartbeatSent.Store(now)
sh.LastHeartbeatAck.Store(now)
sh.HeartbeatInterval = time.Duration(hello.HeartbeatInterval) * time.Millisecond
sh.HeartbeatFailureInterval = sh.HeartbeatInterval * ShardMaxHeartbeatFailures
sh.Heartbeater = time.NewTicker(sh.HeartbeatInterval)
sh.Logger.Debug().
Dur("interval", sh.HeartbeatInterval).
Msg("Received HELLO event from discord")
return nil
}
func gatewayOpHeartbeatACK(ctx context.Context, sh *Shard, msg discord.GatewayPayload, trace sandwich_structs.SandwichTrace) error {
sh.LastHeartbeatAck.Store(time.Now().UTC())
heartbeatRTT := sh.LastHeartbeatAck.Load().Sub(sh.LastHeartbeatSent.Load()).Milliseconds()
sh.Logger.Debug().
Int64("RTT", heartbeatRTT).
Msg("Received heartbeat ACK")
sandwichGatewayLatency.WithLabelValues(
sh.Manager.Identifier.Load(),
strconv.FormatInt(int64(sh.ShardGroup.ID), MagicDecimalBase),
strconv.Itoa(int(sh.ShardID)),
).Set(float64(heartbeatRTT))
return nil
}
func init() {
registerGatewayEvent(discord.GatewayOpDispatch, gatewayOpDispatch)
registerGatewayEvent(discord.GatewayOpHeartbeat, gatewayOpHeartbeat)
registerGatewayEvent(discord.GatewayOpReconnect, gatewayOpReconnect)
registerGatewayEvent(discord.GatewayOpInvalidSession, gatewayOpInvalidSession)
registerGatewayEvent(discord.GatewayOpHello, gatewayOpHello)
registerGatewayEvent(discord.GatewayOpHeartbeatACK, gatewayOpHeartbeatACK)
}