/
sessionmanager.go
140 lines (119 loc) · 3.98 KB
/
sessionmanager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
package sessionmanager
import (
"context"
"sync"
"time"
cid "github.com/ipfs/go-cid"
delay "github.com/ipfs/go-ipfs-delay"
notifications "github.com/SJTU-OpenNetwork/go-bitswap/notifications"
bssession "github.com/SJTU-OpenNetwork/go-bitswap/session"
exchange "github.com/ipfs/go-ipfs-exchange-interface"
peer "github.com/libp2p/go-libp2p-core/peer"
)
// Session is a session that is managed by the session manager
type Session interface {
exchange.Fetcher
ReceiveFrom(peer.ID, []cid.Cid)
IsWanted(cid.Cid) bool
}
type sesTrk struct {
session Session
pm bssession.PeerManager
srs bssession.RequestSplitter
}
// SessionFactory generates a new session for the SessionManager to track.
type SessionFactory func(ctx context.Context, id uint64, pm bssession.PeerManager, srs bssession.RequestSplitter, notif notifications.PubSub, provSearchDelay time.Duration, rebroadcastDelay delay.D) Session
// RequestSplitterFactory generates a new request splitter for a session.
type RequestSplitterFactory func(ctx context.Context) bssession.RequestSplitter
// PeerManagerFactory generates a new peer manager for a session.
type PeerManagerFactory func(ctx context.Context, id uint64) bssession.PeerManager
// SessionManager is responsible for creating, managing, and dispatching to
// sessions.
type SessionManager struct {
ctx context.Context
sessionFactory SessionFactory
peerManagerFactory PeerManagerFactory
requestSplitterFactory RequestSplitterFactory
notif notifications.PubSub
// Sessions
sessLk sync.RWMutex
sessions []sesTrk
// Session Index
sessIDLk sync.Mutex
sessID uint64
}
// New creates a new SessionManager.
func New(ctx context.Context, sessionFactory SessionFactory, peerManagerFactory PeerManagerFactory,
requestSplitterFactory RequestSplitterFactory, notif notifications.PubSub) *SessionManager {
return &SessionManager{
ctx: ctx,
sessionFactory: sessionFactory,
peerManagerFactory: peerManagerFactory,
requestSplitterFactory: requestSplitterFactory,
notif: notif,
}
}
// NewSession initializes a session with the given context, and adds to the
// session manager.
func (sm *SessionManager) NewSession(ctx context.Context,
provSearchDelay time.Duration,
rebroadcastDelay delay.D) exchange.Fetcher {
id := sm.GetNextSessionID()
sessionctx, cancel := context.WithCancel(ctx)
pm := sm.peerManagerFactory(sessionctx, id)
srs := sm.requestSplitterFactory(sessionctx)
session := sm.sessionFactory(sessionctx, id, pm, srs, sm.notif, provSearchDelay, rebroadcastDelay)
tracked := sesTrk{session, pm, srs}
sm.sessLk.Lock()
sm.sessions = append(sm.sessions, tracked)
sm.sessLk.Unlock()
go func() {
defer cancel()
select {
case <-sm.ctx.Done():
sm.removeSession(tracked)
case <-ctx.Done():
sm.removeSession(tracked)
}
}()
return session
}
func (sm *SessionManager) removeSession(session sesTrk) {
sm.sessLk.Lock()
defer sm.sessLk.Unlock()
for i := 0; i < len(sm.sessions); i++ {
if sm.sessions[i] == session {
sm.sessions[i] = sm.sessions[len(sm.sessions)-1]
sm.sessions[len(sm.sessions)-1] = sesTrk{} // free memory.
sm.sessions = sm.sessions[:len(sm.sessions)-1]
return
}
}
}
// GetNextSessionID returns the next sequentional identifier for a session.
func (sm *SessionManager) GetNextSessionID() uint64 {
sm.sessIDLk.Lock()
defer sm.sessIDLk.Unlock()
sm.sessID++
return sm.sessID
}
// ReceiveFrom receives block CIDs from a peer and dispatches to sessions.
func (sm *SessionManager) ReceiveFrom(from peer.ID, ks []cid.Cid) {
sm.sessLk.RLock()
defer sm.sessLk.RUnlock()
for _, s := range sm.sessions {
s.session.ReceiveFrom(from, ks)
}
}
// IsWanted indicates whether any of the sessions are waiting to receive
// the block with the given CID.
func (sm *SessionManager) IsWanted(cid cid.Cid) bool {
sm.sessLk.RLock()
defer sm.sessLk.RUnlock()
for _, s := range sm.sessions {
if s.session.IsWanted(cid) {
return true
}
}
return false
}