-
Notifications
You must be signed in to change notification settings - Fork 41
/
wsHub.go
160 lines (142 loc) · 4.25 KB
/
wsHub.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
package api
import (
"context"
"encoding/json"
"errors"
"fmt"
"strconv"
"strings"
"sync"
"time"
"github.com/TUM-Dev/gocast/dao"
"github.com/TUM-Dev/gocast/tools"
"github.com/TUM-Dev/gocast/tools/realtime"
"github.com/getsentry/sentry-go"
"github.com/gin-gonic/gin"
)
var wsMapLock sync.RWMutex
var sessionsMap = map[uint][]*sessionWrapper{}
const (
TypeServerInfo = "info"
TypeServerWarn = "warn"
TypeServerErr = "error"
)
type sessionWrapper struct {
session *realtime.Context
isAdminOfCourse bool
}
var connHandler = func(context *realtime.Context) {
foundContext, exists := context.Get("TUMLiveContext") // get gin context
if !exists {
sentry.CaptureException(errors.New("context should exist but doesn't"))
return
}
tumLiveContext := foundContext.(tools.TUMLiveContext)
isAdmin := false
if tumLiveContext.User != nil {
isAdmin = tumLiveContext.User.IsAdminOfCourse(*tumLiveContext.Course)
}
sessionData := sessionWrapper{context, isAdmin}
wsMapLock.Lock()
sessionsMap[tumLiveContext.Stream.ID] = append(sessionsMap[tumLiveContext.Stream.ID], &sessionData)
wsMapLock.Unlock()
msg, _ := json.Marshal(gin.H{"viewers": len(sessionsMap[tumLiveContext.Stream.ID])})
err := context.Send(msg)
if err != nil {
logger.Error("can't write initial stats to session", "err", err)
}
}
// sendServerMessageWithBackoff sends a message to the client(if it didn't send a message to this user in the last 10 Minutes and the client is logged in)
//
//lint:ignore U1000 Ignore unused function
func sendServerMessageWithBackoff(session *realtime.Context, userId uint, streamId uint, msg string, t string) {
if userId == 0 {
return
}
cacheKey := fmt.Sprintf("shouldSendServerMsg_%d_%d", userId, streamId)
// if the user has sent a message in the last 10 Minutes, don't send a message
_, shouldSkip := tools.GetCacheItem(cacheKey)
if shouldSkip {
return
}
msgBytes, _ := json.Marshal(gin.H{"server": msg, "type": t})
err := session.Send(msgBytes)
if err != nil {
logger.Error("can't write server message to session", "err", err)
}
// set cache item with ttl, so the user won't get a message for 10 Minutes
tools.SetCacheItem(cacheKey, true, time.Minute*10)
}
// sendServerMessage sends a server message to the client(s)
func sendServerMessage(msg string, t string, sessions ...*realtime.Context) {
msgBytes, _ := json.Marshal(gin.H{"server": msg, "type": t})
for _, session := range sessions {
err := session.Send(msgBytes)
if err != nil {
logger.Error("can't write server message to session", "err", err)
}
}
}
func BroadcastStats(streamsDao dao.StreamsDao) {
for sID, sessions := range sessionsMap {
if len(sessions) == 0 {
continue
}
stream, err := streamsDao.GetStreamByID(context.Background(), fmt.Sprintf("%d", sID))
if err != nil || stream.Recording {
continue
}
msg, _ := json.Marshal(gin.H{"viewers": len(sessions)})
broadcastStream(sID, msg)
}
}
func cleanupSessions() {
for id, sessions := range sessionsMap {
roomName := strings.Replace(ChatRoomName, ":streamID", strconv.Itoa(int(id)), -1)
var newSessions []*sessionWrapper
for i, session := range sessions {
if RealtimeInstance.IsSubscribed(roomName, session.session.Client.Id) {
newSessions = append(newSessions, sessions[i])
}
}
wsMapLock.Lock()
sessionsMap[id] = newSessions
wsMapLock.Unlock()
}
}
func broadcastStream(streamID uint, msg []byte) {
sessions, f := sessionsMap[streamID]
if !f {
return
}
wsMapLock.Lock()
sessions = removeClosed(sessions)
wsMapLock.Unlock()
for _, wrapper := range sessions {
_ = wrapper.session.Send(msg) // ignore "session closed" error, nothing we can do about it at this point
}
}
func broadcastStreamToAdmins(streamID uint, msg []byte) {
sessions, f := sessionsMap[streamID]
if !f {
return
}
wsMapLock.Lock()
sessions = removeClosed(sessions)
wsMapLock.Unlock()
for _, wrapper := range sessions {
if wrapper.isAdminOfCourse {
_ = wrapper.session.Send(msg)
}
}
}
// removeClosed removes session where IsClosed() is true.
func removeClosed(sessions []*sessionWrapper) []*sessionWrapper {
var newSessions []*sessionWrapper
for _, wrapper := range sessions {
if RealtimeInstance.IsConnected(wrapper.session.Client.Id) {
newSessions = append(newSessions, wrapper)
}
}
return newSessions
}