-
Notifications
You must be signed in to change notification settings - Fork 8
/
nats_stream.go
53 lines (46 loc) · 1.39 KB
/
nats_stream.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
package nats
import (
"context"
"github.com/aperturerobotics/bifrost/link"
"github.com/aperturerobotics/bifrost/peer"
"github.com/aperturerobotics/bifrost/pubsub"
stream_netconn "github.com/aperturerobotics/bifrost/stream/netconn"
"github.com/sirupsen/logrus"
)
// streamHandler is a remote floodsub peer with a stream.
type streamHandler struct {
tpl pubsub.PeerLinkTuple
m *Nats
initiator bool
le *logrus.Entry
peerID peer.ID
mstrm link.MountedStream
strmType NatsConnType
ctx context.Context
ctxCancel context.CancelFunc
}
// executeSession executes the stream session.
func (s *streamHandler) executeSession() {
ctx := s.ctx
defer s.ctxCancel()
defer s.mstrm.GetStream().Close()
// c contains the nats client
le := s.le.
WithField("initiator", s.initiator).
WithField("nats-stream-type", s.strmType.String())
le.Debug("executing session")
nconn := stream_netconn.NewNetConn(s.mstrm)
peerInfo := s.mstrm.GetPeerID()
switch s.strmType {
case NatsConnType_NatsConnType_CLIENT:
_ = s.m.natsServer.HandleClientConnection(nconn, peerInfo.Pretty())
case NatsConnType_NatsConnType_ROUTER:
s.m.natsServer.HandleRouterConnection(nconn, peerInfo.Pretty())
default:
// err := errors.Errorf("unknown nats conn type: %v", s.strmType.String())
le.Warn("rejecting session with unknown nats conn type")
return
}
<-ctx.Done()
le.Debug("session closed")
}