-
-
Notifications
You must be signed in to change notification settings - Fork 29
/
crawler_api.go
125 lines (102 loc) 路 3.1 KB
/
crawler_api.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
package libp2p
import (
"context"
"errors"
"fmt"
manet "github.com/multiformats/go-multiaddr/net"
log "github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"
"github.com/dennis-tra/nebula-crawler/kubo"
)
type APIResult struct {
// Indicates if we actually found IP addresses to probe
Attempted bool
// The ID response object from the Kubo API
ID *kubo.IDResponse
// The Kubo routing table. Doesn't contain multi addresses. Don't use this to continue crawling.
RoutingTable *kubo.RoutingTableResponse
}
func (c *Crawler) crawlAPI(ctx context.Context, pi PeerInfo) <-chan APIResult {
resultCh := make(chan APIResult)
// if Nebula is configured to not check for an exposed API return early
if !c.cfg.CheckExposed {
close(resultCh)
return resultCh
}
go func() {
crawledIPs := map[string]struct{}{}
for _, maddr := range pi.Addrs() {
// extract IP address from multi address
ip, err := manet.ToIP(maddr)
if err != nil {
log.WithField("maddr", maddr).WithError(err).Debugln("Could not parse IP from Multiaddr")
continue
}
// check if we have already crawled this IP address. A peer usually advertises the same IP address
// multiple times. E.g., for TCP/QUIC transports. We don't want to crawl an IP twice or more.
if _, alreadyCrawled := crawledIPs[ip.String()]; alreadyCrawled {
continue
}
crawledIPs[ip.String()] = struct{}{}
// declare responses
var (
idResp *kubo.IDResponse
rtResp *kubo.RoutingTableResponse
)
// init timeout context
tCtx, cancel := context.WithTimeout(ctx, kubo.RequestTimeout)
// start both requests in parallel and stop if either fails
errg := errgroup.Group{}
errg.Go(func() error {
idResp, err = c.client.ID(tCtx, ip.String())
if err != nil {
return fmt.Errorf("could not crawl ID api: %w", err)
}
return nil
})
// Only crawl routing table if we actually want to persist neighbors. The result from this API
// call cannot be used to continue our crawls because the response does not contain multiaddresses
// of remote peers.
if c.cfg.TrackNeighbors {
errg.Go(func() error {
rtResp, err = c.client.RoutingTable(tCtx, ip.String())
if err != nil {
return fmt.Errorf("could not crawl routing table api: %w", err)
}
return nil
})
}
// wait for an error or two successes
err = errg.Wait()
if errors.Is(err, context.Canceled) {
cancel()
break // properly closes the channel
} else if err != nil {
log.WithField("maddr", maddr).WithError(err).Debugln("Could not crawl api")
cancel()
continue
}
cancel()
// Report result back
result := APIResult{
Attempted: true,
ID: idResp,
RoutingTable: rtResp,
}
select {
case resultCh <- result:
case <-ctx.Done():
}
// since we have what we want, close the channel and return
close(resultCh)
return
}
select {
case resultCh <- APIResult{Attempted: len(crawledIPs) > 0}:
case <-ctx.Done():
}
// if crawling the API didn't succeed, just close the channel to indicate that we're done
close(resultCh)
}()
return resultCh
}