-
Notifications
You must be signed in to change notification settings - Fork 2
/
leader.go
126 lines (102 loc) · 3.31 KB
/
leader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
package cache_torrent
import (
"github.com/danalex97/nfsTorrent/torrent"
"github.com/danalex97/nfsTorrent/log"
)
// A Leader is a privileaged node. It was appointed as a leader in an election
// and acts as a cache at the margin of the autonomous system.
// A Leader can:
// - download from anybody
// - upload to anybody(see race condition)
// A Leader can not:
// - upload to different AS via an indirect connection
//
// The Leader forwards the 'have' message towards the Follower. When a Follower
// asks the Leader for a piece, the Leader with either have the piece and
// respond, or, in case it does not have it, it will try acquire the piece as
// fast as possbile. To to that, the Leader gives priority to pieces requested
// by Followers. This is achieved by using a new Picker.
type Leader struct {
*Peer
followerFwd map[string][]*Forwarder
peerFwd map[string][]*Forwarder
}
func NewLeader(p *Peer) *Leader {
return &Leader{
Peer : p,
followerFwd : make(map[string][]*Forwarder),
peerFwd : make(map[string][]*Forwarder),
}
}
func (l *Leader) Run() {
log.LogLeader(log.Leader{
Id : l.Id,
})
l.Peer.Run(l.outgoingConnection)
l.Picker = NewPicker(l.Storage)
}
func (l *Leader) Recv(m interface {}) {
switch msg := m.(type) {
case LeaderStart:
follower := msg.Id
peer := msg.Dest
// Make bidirectional connection to follower
if _, ok := l.Connectors[follower]; !ok {
l.outgoingConnection(follower)
}
if _, ok := l.Connectors[peer]; !ok {
// If there is no connection with the Peer, we make a download
// only connection. That is, we do no handshake and send a message
// to the peer.
// We add the upload capability even though it will not be used.
// This is necessary in case we will contacted for a bidirectional
// leader-leader connection.
// log.Println(l.Id, "<-", peer)
torrent.
NewConnector(l.Id, peer, l.Components).
WithUpload(NewUpload).
WithDownload(NewDownload).
Register(l.Peer.Peer)
}
// Once the connections are made, we only need to register the forwarder
l.registerForwarder(follower, peer)
}
// Send the messages to corresponding forwarders
l.forward(m)
l.Peer.RunRecv(l.GetId(m), m, l.incomingConnection)
}
func (l *Leader) forward(m interface {}) {
id := l.GetId(m)
forward := func(mp map[string][]*Forwarder) {
if _, ok := mp[id]; ok {
for _, fwd := range mp[id] {
fwd.Recv(m)
}
}
}
forward(l.followerFwd)
forward(l.peerFwd)
}
func (l *Leader) registerForwarder(follower, peer string) {
fwd := NewForwarder(l, follower, peer)
if _, ok := l.followerFwd[follower]; !ok {
l.followerFwd[follower] = []*Forwarder{}
}
l.followerFwd[follower] = append(l.followerFwd[follower], fwd)
if _, ok := l.peerFwd[peer]; !ok {
l.peerFwd[peer] = []*Forwarder{}
}
l.peerFwd[peer] = append(l.peerFwd[peer], fwd)
}
func (l *Leader) outgoingConnection(id string) {
// A leader has the initial protocol capabilities, that is
// it's able to upload to anybody for outgoing connections.
torrent.
NewConnector(l.Id, id, l.Components).
WithUpload(NewUpload).
WithDownload(NewDownload).
Register(l.Peer.Peer)
}
func (l *Leader) incomingConnection(id string) {
l.outgoingConnection(id)
}