/
brink.go
274 lines (225 loc) · 6.49 KB
/
brink.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
package brink
import (
"fmt"
"io/ioutil"
"log"
"net/http"
"strconv"
"strings"
"sync"
"time"
)
// Start starts the crawler at the specified rootDomain. It will scrape the page for
// links and then visit each of them, provided the domains are allowed. It will keep
// repeating this process on each page until it runs out of pages to visit.
//
// Start requires at least one handler to be registered, otherwise errors out.
func (c *Crawler) Start() error {
// Prefetch checks
if c.RootDomain == "" {
return fmt.Errorf("root domain not specified")
}
if c.defaultHandler == nil && len(c.handlers) == 0 {
return fmt.Errorf("no handlers specified")
}
// Spawn workers
var wg sync.WaitGroup
c.spawnWorkers(&wg)
c.urls <- Link{LinkedFrom: "start", Href: c.RootDomain}
// Spawn checker
go func() {
interval := time.Duration(c.opts.IdleWorkCheckInterval)
ticker:
for range time.Tick(interval * time.Millisecond) {
for _, running := range c.workersRunning {
if *running == true {
continue ticker
}
}
log.Println("No urls to parse, exiting.")
c.Stop()
break ticker
}
}()
wg.Wait()
return nil
}
func (c *Crawler) spawnWorkers(wg *sync.WaitGroup) {
wg.Add(c.opts.WorkerCount)
for i := 0; i < c.opts.WorkerCount; i++ {
name := fmt.Sprintf("worker-%d", i+1)
log.Printf("Spawning %s", name)
running := false
c.workersRunning[i] = &running
go func(name string, running *bool) {
defer wg.Done()
loop:
for link := range c.urls {
*running = true
_url, err := c.normalizeURL(link.Href)
if err != nil {
// Debug..
log.Printf("%s: failed normalize: %v", name, err)
*running = false
continue
}
if st, ok := c.visitedURLs.Load(_url); ok {
st, _ := strconv.Atoi(st)
if f, ok := c.handlers[st]; ok {
f(link.LinkedFrom, _url, st, "", true)
} else {
c.defaultHandler(link.LinkedFrom, _url, st, "", true)
}
*running = false
continue
}
st, bod, err := c.Fetch(_url)
if err != nil {
// Debug..
//log.Printf("%s: failed fetch: %v", name, err)
*running = false
continue
}
c.visitedURLs.Store(_url, strconv.Itoa(st))
if f, ok := c.handlers[st]; ok {
f(link.LinkedFrom, _url, st, string(bod), false)
} else {
c.defaultHandler(link.LinkedFrom, _url, st, string(bod), false)
}
if st != http.StatusOK || pathForbidden(c, _url) {
*running = false
continue
}
// Parse links and send them all to the urls channel
links, err := AbsoluteLinksIn(link.Href, link.Href, bod, true)
if err != nil {
log.Printf("err in AbsLinksIn: %v", err)
*running = false
continue
}
for _, l := range links {
if l.Href == "" {
*running = false
continue
}
if c.stopping {
break loop
}
c.urls <- l
}
*running = false
//log.Printf("%s: count: %d, linkCount: %d", name, count, lc)
}
*running = false
}(name, &running)
}
}
// Stop attempts to stop the crawler.
func (c *Crawler) Stop() {
log.Println("Received signal to stop... Will finish cached runs.")
c.stopping = true
close(c.urls)
}
// AllowDomains instructs the crawler which domains it is allowed
// to visit. The RootDomain is automatically added to this list.
// Domains not allowed will be checked for http status, but will
// not be traversed.
//
// Subsequent calls to AllowDomains adds to the list of domains
// allowed to the crawler to traverse.
func (c *Crawler) AllowDomains(domains ...string) {
for _, domain := range domains {
c.allowedDomains.StoreKey(domain)
}
}
// Fetch fetches the URL and returns its status, body and/or any errors it
// encountered.
func (c *Crawler) Fetch(url string) (status int, body []byte, err error) {
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return 0, nil, fmt.Errorf("failed creating new request: %v", err)
}
// Add cookies
reqCookies := c.cookies()
if len(reqCookies) != 0 {
for _, cookie := range reqCookies {
req.AddCookie(cookie)
for _, sessionCookieName := range c.opts.SessionCookieNames {
if strings.ToLower(cookie.Name) == strings.ToLower(sessionCookieName) {
c.reqHeaders.Delete(authorizationHeaderName)
break
}
}
}
}
// Add headers
if c.reqHeaders.Size() != 0 {
for key, value := range c.reqHeaders.ToMap() {
req.Header.Add(key, value)
}
}
resp, err := c.client.Do(req)
if err != nil {
return 0, nil, fmt.Errorf("get failed: %v", err)
}
defer resp.Body.Close()
// Add response cookies
respCookies := resp.Cookies()
if len(respCookies) != 0 {
c.addCookies(respCookies)
}
scheme, host, err := schemeAndHost(url)
if err != nil {
return 0, nil, fmt.Errorf("malformed url: %v", err)
}
domain := fmt.Sprintf("%s://%s", scheme, host)
// if URL is not allowed, return with only its status code
if !c.domainAllowed(domain) {
return resp.StatusCode, nil, NotAllowed{domain}
}
// if response size is too large (or unknown), return early with
// only the status code
if resp.ContentLength > c.opts.MaxContentLength {
return resp.StatusCode, nil, ContentTooLarge{url}
}
b, err := ioutil.ReadAll(resp.Body)
if err != nil {
return 0, nil, fmt.Errorf("failed reading response body: %v", err)
}
return resp.StatusCode, b, nil
}
// HandleDefaultFunc will be called for all pages returned by a status
// which doesn't have a seperate handler defined by HandleFunc. Subsequent
// calls to HandleDefaultFunc will overwrite the previously set handlers,
// if any.
func (c *Crawler) HandleDefaultFunc(h func(linkedFrom string, url string, status int, body string, cached bool)) {
c.defaultHandler = h
}
// HandleFunc is used to register a function to be called when a new page is
// found with the specified status. Subsequent calls to register functions
// to the same statuses will silently overwrite previously set handlers, if any.
func (c *Crawler) HandleFunc(status int, h func(linkedFrom string, url string, status int, body string, cached bool)) {
c.handlers[status] = h
}
func (c *Crawler) seenURL(url string) bool {
return c.visitedURLs.Contains(url)
}
func (c *Crawler) domainAllowed(domain string) bool {
_, ok := c.allowedDomains.Load(domain)
return ok
}
func (c *Crawler) cookies() (cks []*http.Cookie) {
c.cmu.RLock()
defer c.cmu.RUnlock()
for _, cookie := range c.opts.Cookies {
cks = append(cks, cookie)
}
return cks
}
func (c *Crawler) addCookies(cookies []*http.Cookie) {
c.cmu.Lock()
defer c.cmu.Unlock()
for _, newCookie := range cookies {
c.opts.Cookies[newCookie.Name] = newCookie
}
}