/
listeners.go
170 lines (150 loc) · 5.03 KB
/
listeners.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
package websocket
import (
"context"
"sync"
"time"
"emperror.dev/errors"
"github.com/goccy/go-json"
"github.com/blademindeu/wings/events"
"github.com/blademindeu/wings/system"
"github.com/blademindeu/wings/server"
)
// RegisterListenerEvents will setup the server event listeners and expiration
// timers. This is only triggered on first authentication with the websocket,
// reconnections will not call it.
//
// This needs to be called once the socket is properly authenticated otherwise
// you'll get a flood of error spam in the output that doesn't make sense because
// Docker events being output to the socket will fail when it hasn't been
// properly initialized yet.
//
// @see https://github.com/blademindeu/panel/issues/3295
func (h *Handler) registerListenerEvents(ctx context.Context) {
h.Logger().Debug("registering event listeners for connection")
go func() {
if err := h.listenForServerEvents(ctx); err != nil {
h.Logger().Warn("error while processing server event; closing websocket connection")
if err := h.Connection.Close(); err != nil {
h.Logger().WithField("error", errors.WithStack(err)).Error("error closing websocket connection")
}
}
}()
go h.listenForExpiration(ctx)
}
// ListenForExpiration checks the time to expiration on the JWT every 30 seconds
// until the token has expired. If we are within 3 minutes of the token expiring,
// send a notice over the socket that it is expiring soon. If it has expired,
// send that notice as well.
func (h *Handler) listenForExpiration(ctx context.Context) {
// Make a ticker and completion channel that is used to continuously poll the
// JWT stored in the session to send events to the socket when it is expiring.
ticker := time.NewTicker(time.Second * 30)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
jwt := h.GetJwt()
if jwt != nil {
if jwt.ExpirationTime.Unix()-time.Now().Unix() <= 0 {
_ = h.SendJson(Message{Event: TokenExpiredEvent})
} else if jwt.ExpirationTime.Unix()-time.Now().Unix() <= 60 {
_ = h.SendJson(Message{Event: TokenExpiringEvent})
}
}
}
}
}
var e = []string{
server.StatsEvent,
server.StatusEvent,
server.ConsoleOutputEvent,
server.InstallOutputEvent,
server.InstallStartedEvent,
server.InstallCompletedEvent,
server.DaemonMessageEvent,
server.BackupCompletedEvent,
server.BackupRestoreCompletedEvent,
server.TransferLogsEvent,
server.TransferStatusEvent,
}
// ListenForServerEvents will listen for different events happening on a server
// and send them along to the connected websocket client. This function will
// block until the context provided to it is canceled.
func (h *Handler) listenForServerEvents(ctx context.Context) error {
var o sync.Once
var err error
ctx, cancel := context.WithCancel(ctx)
defer cancel()
eventChan := make(chan []byte)
logOutput := make(chan []byte, 8)
installOutput := make(chan []byte, 4)
h.server.Events().On(eventChan) // TODO: make a sinky
h.server.Sink(system.LogSink).On(logOutput)
h.server.Sink(system.InstallSink).On(installOutput)
onError := func(evt string, err2 error) {
h.Logger().WithField("event", evt).WithField("error", err2).Error("failed to send event over server websocket")
// Avoid race conditions by only setting the error once and then canceling
// the context. This way if additional processing errors come through due
// to a massive flood of things you still only report and stop at the first.
o.Do(func() {
err = err2
})
cancel()
}
for {
select {
case <-ctx.Done():
break
case b := <-logOutput:
sendErr := h.SendJson(Message{Event: server.ConsoleOutputEvent, Args: []string{string(b)}})
if sendErr == nil {
continue
}
onError(server.ConsoleOutputEvent, sendErr)
case b := <-installOutput:
sendErr := h.SendJson(Message{Event: server.InstallOutputEvent, Args: []string{string(b)}})
if sendErr == nil {
continue
}
onError(server.InstallOutputEvent, sendErr)
case b := <-eventChan:
var e events.Event
if err := events.DecodeTo(b, &e); err != nil {
continue
}
var sendErr error
message := Message{Event: e.Topic}
if str, ok := e.Data.(string); ok {
message.Args = []string{str}
} else if b, ok := e.Data.([]byte); ok {
message.Args = []string{string(b)}
} else {
b, sendErr = json.Marshal(e.Data)
if sendErr == nil {
message.Args = []string{string(b)}
}
}
if sendErr == nil {
sendErr = h.SendJson(message)
if sendErr == nil {
continue
}
}
onError(message.Event, sendErr)
}
break
}
// These functions will automatically close the channel if it hasn't been already.
h.server.Events().Off(eventChan)
h.server.Sink(system.LogSink).Off(logOutput)
h.server.Sink(system.InstallSink).Off(installOutput)
// If the internal context is stopped it is either because the parent context
// got canceled or because we ran into an error. If the "err" variable is nil
// we can assume the parent was canceled and need not perform any actions.
if err != nil {
return errors.WithStack(err)
}
return nil
}