-
-
Notifications
You must be signed in to change notification settings - Fork 89
/
main.go
124 lines (103 loc) · 3.18 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
package main
import (
"context"
"log"
"net/http"
"os"
"os/signal"
"syscall"
"time"
_ "net/http/pprof"
"github.com/centrifugal/centrifuge"
)
func handleLog(e centrifuge.LogEntry) {
log.Printf("%s: %v", e.Message, e.Fields)
}
func authMiddleware(h http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
newCtx := centrifuge.SetCredentials(ctx, ¢rifuge.Credentials{
UserID: "42",
})
r = r.WithContext(newCtx)
h.ServeHTTP(w, r)
})
}
func waitExitSignal(n *centrifuge.Node) {
sigCh := make(chan os.Signal, 1)
done := make(chan bool, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-sigCh
_ = n.Shutdown(context.Background())
done <- true
}()
<-done
}
func main() {
node, _ := centrifuge.New(centrifuge.Config{
LogLevel: centrifuge.LogLevelDebug,
LogHandler: handleLog,
})
broker, _ := centrifuge.NewMemoryBroker(node, centrifuge.MemoryBrokerConfig{})
node.SetBroker(broker)
node.OnConnecting(func(ctx context.Context, e centrifuge.ConnectEvent) (centrifuge.ConnectReply, error) {
cred, _ := centrifuge.GetCredentials(ctx)
return centrifuge.ConnectReply{
Data: []byte(`{}`),
// Subscribe to personal several server-side channel.
Subscriptions: map[string]centrifuge.SubscribeOptions{
"#" + cred.UserID: {},
},
}, nil
})
node.OnConnect(func(client *centrifuge.Client) {
// Declare concurrency semaphore for client connection closure.
semaphore := make(chan struct{}, 16)
client.OnAlive(func() {
log.Printf("user %s connection is still active", client.UserID())
})
client.OnSubscribe(func(e centrifuge.SubscribeEvent, cb centrifuge.SubscribeCallback) {
log.Printf("user %s subscribes on %s", client.UserID(), e.Channel)
semaphore <- struct{}{}
go func() {
defer func() { <-semaphore }()
time.Sleep(200 * time.Millisecond)
cb(centrifuge.SubscribeReply{}, nil)
}()
})
client.OnUnsubscribe(func(e centrifuge.UnsubscribeEvent) {
log.Printf("user %s unsubscribed from %s", client.UserID(), e.Channel)
})
client.OnRPC(func(e centrifuge.RPCEvent, cb centrifuge.RPCCallback) {
log.Printf("RPC from user: %s, data: %s, method: %s", client.UserID(), string(e.Data), e.Method)
semaphore <- struct{}{}
go func() {
defer func() { <-semaphore }()
time.Sleep(100 * time.Millisecond)
cb(centrifuge.RPCReply{Data: []byte(`{"year": "2020"}`)}, nil)
}()
})
client.OnDisconnect(func(e centrifuge.DisconnectEvent) {
log.Printf("user %s disconnected, disconnect: %s", client.UserID(), e.Disconnect)
})
transport := client.Transport()
log.Printf("user %s connected via %s with protocol: %s", client.UserID(), transport.Name(), transport.Protocol())
})
if err := node.Run(); err != nil {
log.Fatal(err)
}
websocketHandler := centrifuge.NewWebsocketHandler(node, centrifuge.WebsocketConfig{
ReadBufferSize: 1024,
UseWriteBufferPool: true,
})
http.Handle("/connection/websocket", authMiddleware(websocketHandler))
http.Handle("/", http.FileServer(http.Dir("./")))
go func() {
if err := http.ListenAndServe(":8000", nil); err != nil {
log.Fatal(err)
}
}()
waitExitSignal(node)
log.Println("bye!")
}