forked from zishang520/socket.io
/
session-aware-adapter.go
173 lines (144 loc) Β· 4.47 KB
/
session-aware-adapter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
package socket
import (
"sync"
"time"
"github.com/Presslogic-Media/engine.io/v2/types"
"github.com/Presslogic-Media/engine.io/v2/utils"
"github.com/zishang520/socket.io-go-parser/v2/parser"
)
type (
SessionAwareAdapterBuilder struct {
}
sessionAwareAdapter struct {
Adapter
maxDisconnectionDuration int64
sessions *types.Map[PrivateSessionId, *SessionWithTimestamp]
packets []*PersistedPacket
mu_packets sync.RWMutex
}
)
func (*SessionAwareAdapterBuilder) New(nsp NamespaceInterface) Adapter {
return NewSessionAwareAdapter(nsp)
}
func MakeSessionAwareAdapter() Adapter {
s := &sessionAwareAdapter{
Adapter: MakeAdapter(),
sessions: &types.Map[PrivateSessionId, *SessionWithTimestamp]{},
packets: []*PersistedPacket{},
}
s.Prototype(s)
return s
}
func NewSessionAwareAdapter(nsp NamespaceInterface) Adapter {
s := MakeSessionAwareAdapter()
s.Construct(nsp)
return s
}
func (s *sessionAwareAdapter) Construct(nsp NamespaceInterface) {
s.Adapter.Construct(nsp)
s.maxDisconnectionDuration = nsp.Server().Opts().ConnectionStateRecovery().MaxDisconnectionDuration()
timer := utils.SetInterval(func() {
threshold := time.Now().UnixMilli() - s.maxDisconnectionDuration
s.sessions.Range(func(sessionId PrivateSessionId, session *SessionWithTimestamp) bool {
if session.DisconnectedAt < threshold {
s.sessions.Delete(sessionId)
}
return true
})
s.mu_packets.Lock()
defer s.mu_packets.Unlock()
for i, packet := range s.packets {
if packet.EmittedAt < threshold {
copy(s.packets, s.packets[i+1:])
s.packets = s.packets[:len(s.packets)-i-1]
break
}
}
}, 60*1000*time.Millisecond)
// prevents the timer from keeping the process alive
timer.Unref()
}
func (s *sessionAwareAdapter) PersistSession(session *SessionToPersist) {
_session := &SessionWithTimestamp{SessionToPersist: session, DisconnectedAt: time.Now().UnixMilli()}
s.sessions.Store(_session.Pid, _session)
}
func (s *sessionAwareAdapter) RestoreSession(pid PrivateSessionId, offset string) (*Session, error) {
session, ok := s.sessions.Load(pid)
if !ok {
// the session may have expired
return nil, nil
}
hasExpired := session.DisconnectedAt+s.maxDisconnectionDuration < time.Now().UnixMilli()
if hasExpired {
// the session has expired
s.sessions.Delete(pid)
return nil, nil
}
s.mu_packets.RLock()
defer s.mu_packets.RUnlock()
// Find the index of the packet with the given offset
index := -1
for i, packet := range s.packets {
if packet.Id == offset {
index = i
break
}
}
if index == -1 {
return nil, nil
}
// Use a pre-allocated slice to avoid memory allocation in the loop
missedPackets := make([]any, 0, len(s.packets)-index-1)
missedNum := 0
// Iterate over the packets and append the data of those that should be included
for i := index + 1; i < len(s.packets); i++ {
packet := s.packets[i]
if shouldIncludePacket(session.Rooms, packet.Opts) {
missedPackets = append(missedPackets, packet.Data)
missedNum++
}
}
// Create a new Session object and return it
return &Session{
SessionToPersist: session.SessionToPersist,
MissedPackets: missedPackets[:missedNum],
}, nil
}
func (s *sessionAwareAdapter) Broadcast(packet *parser.Packet, opts *BroadcastOptions) {
isEventPacket := packet.Type == parser.EVENT
// packets with acknowledgement are not stored because the acknowledgement function cannot be serialized and
// restored on another server upon reconnection
withoutAcknowledgement := packet.Id == nil
notVolatile := opts == nil || opts.Flags == nil || opts.Flags.Volatile == false
if isEventPacket && withoutAcknowledgement && notVolatile {
id := utils.YeastDate()
// the offset is stored at the end of the data array, so the client knows the ID of the last packet it has
// processed (and the format is backward-compatible)
packet.Data = append(packet.Data.([]any), id)
s.mu_packets.Lock()
defer s.mu_packets.Unlock()
s.packets = append(s.packets, &PersistedPacket{
Id: id,
EmittedAt: time.Now().UnixMilli(),
Data: packet.Data,
Opts: opts,
})
}
s.Adapter.Broadcast(packet, opts)
}
func shouldIncludePacket(sessionRooms *types.Set[Room], opts *BroadcastOptions) bool {
included := opts.Rooms.Len() == 0
notExcluded := true
for _, room := range sessionRooms.Keys() {
if included && !notExcluded {
break
}
if !included && opts.Rooms.Has(room) {
included = true
}
if notExcluded && opts.Except.Has(room) {
notExcluded = false
}
}
return included && notExcluded
}