-
-
Notifications
You must be signed in to change notification settings - Fork 76
/
discovered_nodes.go
59 lines (54 loc) · 1.7 KB
/
discovered_nodes.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
package dhtcrawler
import (
"context"
"github.com/bitmagnet-io/bitmagnet/internal/concurrency"
"github.com/bitmagnet-io/bitmagnet/internal/protocol/dht/ktable"
"go.uber.org/fx"
"net/netip"
"time"
)
type DiscoveredNodesParams struct {
fx.In
Config Config
}
type DiscoveredNodesResult struct {
fx.Out
DiscoveredNodes concurrency.BatchingChannel[ktable.Node] `name:"dht_discovered_nodes"`
}
// NewDiscoveredNodes creates the channel for discovered nodes.
// It receives nodes discovered by the crawler, as well as nodes from incoming requests to the DHT server.
// It is provided as a separate service to avoid a circular dependency with the DHT server.
func NewDiscoveredNodes(params DiscoveredNodesParams) DiscoveredNodesResult {
return DiscoveredNodesResult{
DiscoveredNodes: concurrency.NewBatchingChannel[ktable.Node](int(100*params.Config.ScalingFactor), 10, time.Second/100),
}
}
func (c *crawler) runDiscoveredNodes(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case ps := <-c.discoveredNodes.Out():
addrs := make([]netip.Addr, 0, 1)
m := make(map[string]ktable.Node, 1)
for _, p := range ps {
if _, ok := m[p.Addr().Addr().String()]; !ok {
m[p.Addr().Addr().String()] = p
addrs = append(addrs, p.Addr().Addr())
}
}
// for any discovered node not already in the routing table, we will block until it can be sent to any one of the pipeline channels.
unknownAddrs := c.kTable.FilterKnownAddrs(addrs)
for _, addr := range unknownAddrs {
p := m[addr.String()]
select {
case <-ctx.Done():
return
case c.nodesForFindNode.In() <- p:
case c.nodesForSampleInfoHashes.In() <- p:
case c.nodesForPing.In() <- p:
}
}
}
}
}