forked from boramalper/magnetico
/
sink.go
179 lines (149 loc) · 4.39 KB
/
sink.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
package metadata
import (
"math/rand"
"net"
"sync"
"time"
"go.uber.org/zap"
"github.com/fabricio20/magnetico/pkg/persistence"
"github.com/fabricio20/magnetico/pkg/util"
"github.com/fabricio20/magnetico/src/dht"
)
type Metadata struct {
InfoHash []byte
// Name should be thought of "Title" of the torrent. For single-file torrents, it is the name
// of the file, and for multi-file torrents, it is the name of the root directory.
Name string
TotalSize uint64
DiscoveredOn int64
// Files must be populated for both single-file and multi-file torrents!
Files []persistence.File
}
type Sink struct {
PeerID []byte
deadline time.Duration
maxNLeeches int
drain chan Metadata
incomingInfoHashes map[[20]byte][]net.TCPAddr
incomingInfoHashesMx sync.Mutex
terminated bool
termination chan interface{}
deleted int
}
func randomID() []byte {
/* > The peer_id is exactly 20 bytes (characters) long.
* >
* > There are mainly two conventions how to encode client and client version information into the peer_id,
* > Azureus-style and Shadow's-style.
* >
* > Azureus-style uses the following encoding: '-', two characters for client id, four ascii digits for version
* > number, '-', followed by random numbers.
* >
* > For example: '-AZ2060-'...
*
* https://wiki.theory.org/index.php/BitTorrentSpecification
*
* We encode the version number as:
* - First two digits for the major version number
* - Last two digits for the minor version number
* - Patch version number is not encoded.
*/
prefix := []byte("-MC0008-")
var rando []byte
for i := 20 - len(prefix); i >= 0; i-- {
rando = append(rando, randomDigit())
}
return append(prefix, rando...)
}
func randomDigit() byte {
var max, min int
max, min = '9', '0'
return byte(rand.Intn(max-min) + min)
}
func NewSink(deadline time.Duration, maxNLeeches int) *Sink {
ms := new(Sink)
ms.PeerID = randomID()
ms.deadline = deadline
ms.maxNLeeches = maxNLeeches
ms.drain = make(chan Metadata, 10)
ms.incomingInfoHashes = make(map[[20]byte][]net.TCPAddr)
ms.termination = make(chan interface{})
go func() {
for range time.Tick(deadline) {
ms.incomingInfoHashesMx.Lock()
l := len(ms.incomingInfoHashes)
ms.incomingInfoHashesMx.Unlock()
zap.L().Info("Sink status",
zap.Int("activeLeeches", l),
zap.Int("nDeleted", ms.deleted),
zap.Int("drainQueue", len(ms.drain)),
)
ms.deleted = 0
}
}()
return ms
}
func (ms *Sink) Sink(res dht.Result) {
if ms.terminated {
zap.L().Panic("Trying to Sink() an already closed Sink!")
}
ms.incomingInfoHashesMx.Lock()
defer ms.incomingInfoHashesMx.Unlock()
// cap the max # of leeches
if len(ms.incomingInfoHashes) >= ms.maxNLeeches {
return
}
infoHash := res.InfoHash()
peerAddrs := res.PeerAddrs()
if _, exists := ms.incomingInfoHashes[infoHash]; exists {
return
} else if len(peerAddrs) > 0 {
peer := peerAddrs[0]
ms.incomingInfoHashes[infoHash] = peerAddrs[1:]
go NewLeech(infoHash, &peer, ms.PeerID, LeechEventHandlers{
OnSuccess: ms.flush,
OnError: ms.onLeechError,
}).Do(time.Now().Add(ms.deadline))
}
zap.L().Debug("Sunk!", zap.Int("leeches", len(ms.incomingInfoHashes)), util.HexField("infoHash", infoHash[:]))
}
func (ms *Sink) Drain() <-chan Metadata {
if ms.terminated {
zap.L().Panic("Trying to Drain() an already closed Sink!")
}
return ms.drain
}
func (ms *Sink) Terminate() {
ms.terminated = true
close(ms.termination)
close(ms.drain)
}
func (ms *Sink) flush(result Metadata) {
if ms.terminated {
return
}
ms.drain <- result
// Delete the infoHash from ms.incomingInfoHashes ONLY AFTER once we've flushed the
// metadata!
ms.incomingInfoHashesMx.Lock()
defer ms.incomingInfoHashesMx.Unlock()
var infoHash [20]byte
copy(infoHash[:], result.InfoHash)
delete(ms.incomingInfoHashes, infoHash)
}
func (ms *Sink) onLeechError(infoHash [20]byte, err error) {
zap.L().Debug("leech error", util.HexField("infoHash", infoHash[:]), zap.Error(err))
ms.incomingInfoHashesMx.Lock()
defer ms.incomingInfoHashesMx.Unlock()
if len(ms.incomingInfoHashes[infoHash]) > 0 {
peer := ms.incomingInfoHashes[infoHash][0]
ms.incomingInfoHashes[infoHash] = ms.incomingInfoHashes[infoHash][1:]
go NewLeech(infoHash, &peer, ms.PeerID, LeechEventHandlers{
OnSuccess: ms.flush,
OnError: ms.onLeechError,
}).Do(time.Now().Add(ms.deadline))
} else {
ms.deleted++
delete(ms.incomingInfoHashes, infoHash)
}
}