-
-
Notifications
You must be signed in to change notification settings - Fork 577
/
grpc.go
79 lines (67 loc) · 2.07 KB
/
grpc.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
package unigrpc
import (
"time"
"github.com/centrifugal/centrifugo/v5/internal/unigrpc/unistream"
"github.com/centrifugal/centrifuge"
"google.golang.org/grpc"
)
// RegisterService ...
func RegisterService(server *grpc.Server, service unistream.CentrifugoUniStreamServer) error {
unistream.RegisterCentrifugoUniStreamServer(server, service)
return nil
}
// Service can work with client GRPC connections.
type Service struct {
unistream.UnimplementedCentrifugoUniStreamServer
config Config
node *centrifuge.Node
}
// NewService creates new Service.
func NewService(n *centrifuge.Node, c Config) *Service {
return &Service{
config: c,
node: n,
}
}
// Consume is a unidirectional server->client stream with real-time data.
func (s *Service) Consume(req *unistream.ConnectRequest, stream unistream.CentrifugoUniStream_ConsumeServer) error {
streamDataCh := make(chan rawFrame)
transport := newGRPCTransport(stream, streamDataCh)
connectRequest := centrifuge.ConnectRequest{
Token: req.Token,
Data: req.Data,
Name: req.Name,
Version: req.Version,
}
if req.Subs != nil {
subs := make(map[string]centrifuge.SubscribeRequest, len(req.Subs))
for k, v := range req.Subs {
subs[k] = centrifuge.SubscribeRequest{
Recover: v.Recover,
Offset: v.Offset,
Epoch: v.Epoch,
}
}
connectRequest.Subs = subs
}
c, closeFn, err := centrifuge.NewClient(stream.Context(), s.node, transport)
if err != nil {
return err
}
defer func() { _ = closeFn() }()
if s.node.LogEnabled(centrifuge.LogLevelDebug) {
s.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelDebug, "client connection established", map[string]any{"transport": transport.Name(), "client": c.ID()}))
defer func(started time.Time) {
s.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelDebug, "client connection completed", map[string]any{"duration": time.Since(started), "transport": transport.Name(), "client": c.ID()}))
}(time.Now())
}
c.Connect(connectRequest)
for {
select {
case <-transport.closeCh:
return nil
case <-stream.Context().Done():
return nil
}
}
}