From ac16bea15b5e638ef8468df6edb7af302503d637 Mon Sep 17 00:00:00 2001 From: zhengchun Date: Wed, 29 Nov 2017 15:22:20 +0800 Subject: [PATCH] BREAKING: rewrite architecture and all code Why rewrite architecture? Make it simple and powerful and also can fast to start in any project. 1. new Middleware interface and implements. 2. new Pipeline interface and implements. 3. new Spider handler interface and implements. 4. lets cookies,compression,robots.txt,proxy middlewares as the build-in middleware that used by crawler. 5. delete SpiderMux class, and Crawler class implements SpiderMux feature for the different websites. 7. delete deprecated middleware(etc redirect,url_dupe,delay...) --- README.md | 35 +- compression.go | 84 ++++ cookies.go | 35 ++ crawler.go | 649 +++++++++++++++++----------- downloader.go | 231 ---------- examples/amazon/main.go | 58 --- examples/simple/main.go | 19 - parse.go => html.go | 44 +- internal/util/atomic.go | 32 -- internal/util/wait_group_wrapper.go | 15 - mediatype.go | 39 -- middleware.go | 25 ++ middleware/compression.go | 87 ---- middleware/compression_test.go | 49 --- middleware/cookie.go | 46 -- middleware/cookie_test.go | 60 --- middleware/delay.go | 43 -- middleware/delay_test.go | 43 -- middleware/middleware.go | 36 -- middleware/proxy_test.go | 14 - middleware/redirect.go | 254 ----------- middleware/redirect_test.go | 34 -- middleware/robotstxt.go | 95 ---- middleware/robotstxt_test.go | 69 --- middleware/urldupe_filter.go | 52 --- middleware/urldupe_filter_test.go | 38 -- pipeline.go | 19 + middleware/proxy.go => proxy.go | 100 ++--- queue.go | 37 -- robotstxt.go | 84 ++++ spider.go | 105 +---- xml.go | 12 + 32 files changed, 778 insertions(+), 1765 deletions(-) create mode 100644 compression.go create mode 100644 cookies.go delete mode 100644 downloader.go delete mode 100644 examples/amazon/main.go delete mode 100644 examples/simple/main.go rename parse.go => html.go (52%) delete mode 100644 internal/util/atomic.go delete mode 100644 internal/util/wait_group_wrapper.go delete mode 100644 mediatype.go create mode 100644 middleware.go delete mode 100644 middleware/compression.go delete mode 100644 middleware/compression_test.go delete mode 100644 middleware/cookie.go delete mode 100644 middleware/cookie_test.go delete mode 100644 middleware/delay.go delete mode 100644 middleware/delay_test.go delete mode 100644 middleware/middleware.go delete mode 100644 middleware/proxy_test.go delete mode 100644 middleware/redirect.go delete mode 100644 middleware/redirect_test.go delete mode 100644 middleware/robotstxt.go delete mode 100644 middleware/robotstxt_test.go delete mode 100644 middleware/urldupe_filter.go delete mode 100644 middleware/urldupe_filter_test.go create mode 100644 pipeline.go rename middleware/proxy.go => proxy.go (50%) delete mode 100644 queue.go create mode 100644 robotstxt.go create mode 100644 xml.go diff --git a/README.md b/README.md index 461878e..a547aa2 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,36 @@ Antch ==== + +[![Go Report Card](https://goreportcard.com/badge/github.com/antchfx/antch)](https://goreportcard.com/report/github.com/antchfx/antch) [![GoDoc](https://godoc.org/github.com/antchfx/antch?status.svg)](https://godoc.org/github.com/antchfx/antch) -Overview -=== -Antch is a fast high-level scalable and extensible web crawling and web scraping framework, used to crawl websites and extract structured data from their pages. \ No newline at end of file +Antch, Its inspired by [Scrpay](https://scrapy.org/) project. If you're already familiar +with scrapy, you can quickly get started. + +Antch is a fast high-level web crawling & scraping framework for Go, used +to crawl websites and extract structured data from their pages. + +Features +==== + +TODO + +Requirements +==== + +TODO + +Tutorial +==== + +TODO + +Middlewares +==== + +TODO + +Pipelines +==== + +TODO diff --git a/compression.go b/compression.go new file mode 100644 index 0000000..47090d1 --- /dev/null +++ b/compression.go @@ -0,0 +1,84 @@ +package antch + +import ( + "compress/gzip" + "compress/zlib" + "io" + "net/http" +) + +func decompress(name string, rc io.ReadCloser) (io.ReadCloser, bool) { + switch name { + case "gzip": + return &gzipReader{rc: rc}, true + case "deflate": + return &deflateReader{rc: rc}, true + } + return nil, false +} + +func compressionHandler(next HttpMessageHandler) HttpMessageHandler { + return HttpMessageHandlerFunc(func(req *http.Request) (*http.Response, error) { + req.Header.Set("Accept-Encoding", "gzip, deflate") + + resp, err := next.Send(req) + if err != nil { + return nil, err + } + if rc, ok := decompress(resp.Header.Get("Content-Encoding"), resp.Body); ok { + resp.Header.Del("Content-Encoding") + resp.Header.Del("Content-Length") + + resp.Body = rc + resp.ContentLength = -1 + resp.Uncompressed = true + } + return resp, err + }) +} + +// gzipReader is a reader with gzip decompress mode. +type gzipReader struct { + rr io.Reader + rc io.ReadCloser +} + +func (z *gzipReader) Read(p []byte) (n int, err error) { + if z.rr == nil { + z.rr, err = gzip.NewReader(z.rc) + if err != nil { + return n, err + } + } + return z.rr.Read(p) +} + +func (z *gzipReader) Close() error { + return z.rc.Close() +} + +// deflateReader is a reader with deflate decompress mode. +type deflateReader struct { + rr io.Reader + rc io.ReadCloser +} + +func (r *deflateReader) Read(p []byte) (n int, err error) { + if r.rr == nil { + r.rr, err = zlib.NewReader(r.rc) + if err != nil { + return n, err + } + } + return r.rr.Read(p) +} + +func (r *deflateReader) Close() error { + return r.rc.Close() +} + +// CompressionMiddleware is a middleware to allows compressed +// (gzip, deflate) traffic to be sent/received from sites. +func CompressionMiddleware() Middleware { + return Middleware(compressionHandler) +} diff --git a/cookies.go b/cookies.go new file mode 100644 index 0000000..cfd5e2d --- /dev/null +++ b/cookies.go @@ -0,0 +1,35 @@ +package antch + +import ( + "net/http" + "net/http/cookiejar" + + "golang.org/x/net/publicsuffix" +) + +func cookiesHandler(next HttpMessageHandler) HttpMessageHandler { + jar, _ := cookiejar.New(&cookiejar.Options{PublicSuffixList: publicsuffix.List}) + return HttpMessageHandlerFunc(func(req *http.Request) (*http.Response, error) { + // Delete previous cookie value before set new cookie value. + req.Header.Del("Cookie") + + for _, cookie := range jar.Cookies(req.URL) { + req.AddCookie(cookie) + } + + resp, err := next.Send(req) + if err != nil { + return nil, err + } + if rc := resp.Cookies(); len(rc) > 0 { + jar.SetCookies(req.URL, rc) + } + return resp, err + }) +} + +// CookiesMiddleware is an HTTP cookies middleware to allows cookies +// to tracking for each of HTTP requests. +func CookiesMiddleware() Middleware { + return Middleware(cookiesHandler) +} diff --git a/crawler.go b/crawler.go index 6c3f048..fc95262 100644 --- a/crawler.go +++ b/crawler.go @@ -1,128 +1,309 @@ package antch import ( - "context" "errors" - "io" + "fmt" + "net" "net/http" - "runtime" + "net/url" + "strings" "sync" - "sync/atomic" "time" "github.com/sirupsen/logrus" - - "github.com/antchfx/antch/internal/util" ) -const DefaultMaxFetchersPerHost = 1 +// Item is represents an item object. +type Item interface{} -// Crawler is a web crawl server for crawl websites. +// Crawler is core of web crawl server that provides crawl websites +// and calls pipeline to process for received data from their pages. type Crawler struct { - // MaxWorkers specifies the maximum number of worker to working. - MaxWorkers int + // CheckRedirect specifies the policy for handling redirects. + CheckRedirect func(req *http.Request, via []*http.Request) error + + // MaxConcurrentRequests specifies the maximum number of concurrent + // requests that will be performed. + MaxConcurrentRequests int + + // MaxConcurrentRequestsPerHost specifies the maximum number of + // concurrent requests that will be performed to any single domain. + MaxConcurrentRequestsPerSite int + + // RequestTimeout specifies a time to wait before the request times out. + RequestTimeout time.Duration + + // DownloadDelay specifies delay time to wait before access same website. + DownloadDelay time.Duration + + // MaxConcurrentItems specifies the maximum number of concurrent items + // to process parallel in the pipeline. + MaxConcurrentItems int + + // UserAgent specifies the user-agent for the remote server. + UserAgent string + + // Exit is an optional channel whose closure indicates that the Crawler + // instance should be stop work and exit. + Exit <-chan struct{} + + readCh chan *http.Request + writeCh chan Item - // MaxFetchersPerHost Specifies the number of fetcher - // that should be allowed to access same host at one time. - // If zero, DefaultMaxFetchersPerHost is used. - MaxFetchersPerHost int + client *http.Client + pipeHandler PipelineHandler + mids []Middleware + pipes []Pipeline - // DownloadHandler specifies a download handler to fetching HTTP - // response from remote server when server received a crawl request. - // If No specifies Downloader, then default use http.DefaultClient to - // execute HTTP request. - DownloadHandler Downloader + spider map[string]*spider + spiderMu sync.Mutex - // MessageHandler specifies a message handler to handling received - // HTTP response. - // If No specifies Spider, then default use NoOpSpider to reponse all - // received HTTP response. - MessageHandler Spider + once sync.Once + mu sync.RWMutex + m map[string]muxEntry +} - isRunning int32 - poolSize int +// NewCrawler returns a new Crawler with default settings. +func NewCrawler() *Crawler { + return &Crawler{ + UserAgent: "antch(github.com)", + Exit: make(chan struct{}), + } +} - fetcherMu sync.RWMutex - fetchers map[string]*fetcher +type muxEntry struct { + pattern string + h Handler +} - exitCh chan struct{} - waitGroup util.WaitGroupWrapper +// StartURLs starts crawling for the given URL list. +func (c *Crawler) StartURLs(URLs []string) { + for _, URL := range URLs { + req, _ := http.NewRequest("GET", URL, nil) + c.Request(req) + } } -// DefaultCrawler is the default Crawler used to crawl website. -var DefaultCrawler = &Crawler{} +// Request puts an HTTP request into the working queue to crawling. +func (c *Crawler) Request(req *http.Request) error { + c.once.Do(c.init) + if req == nil { + return errors.New("req is nil") + } + return c.enqueue(req, 5*time.Second) +} -// Stop stops crawling and exit. -func (c *Crawler) Stop() error { - if atomic.LoadInt32(&c.isRunning) == 0 { - return errors.New("antch: crawler is not running") +func (c *Crawler) enqueue(req *http.Request, timeout time.Duration) error { + select { + case c.readCh <- req: + case <-time.After(timeout): + return errors.New("crawler: timeout, worker is busy") } - close(c.exitCh) - c.waitGroup.Wait() return nil } -// Run starts server to begin crawling with URLs Queue. -func (c *Crawler) Run(q Queue) { - if q == nil { - panic("antch: nil queue") +// Handle registers the Handler for the given pattern. +// If pattern is "*" means matches all requests. +func (c *Crawler) Handle(pattern string, handler Handler) { + c.mu.Lock() + defer c.mu.Unlock() + + if pattern == "" { + panic("antch: invalid domain") + } + if handler == nil { + panic("antch: handler is nil") } - c.exitCh = make(chan struct{}) + if c.m == nil { + c.m = make(map[string]muxEntry) + } + c.m[pattern] = muxEntry{pattern: pattern, h: handler} +} + +// Handler returns a Handler for the give URL. +func (c *Crawler) Handler(u *url.URL) (h Handler, pattern string) { + return c.handler(u) +} + +// UseMiddleware adds a Middleware to the crawler. +func (c *Crawler) UseMiddleware(m Middleware) *Crawler { + c.mids = append(c.mids, m) + return c +} + +// UsePipeline adds a Pipeline to the crawler. +func (c *Crawler) UsePipeline(p Pipeline) *Crawler { + c.pipes = append(c.pipes, p) + return c +} + +// UseCookies enables the cookies middleware to working. +func (c *Crawler) UseCookies() *Crawler { + return c.UseMiddleware(CookiesMiddleware()) +} - c.waitGroup.Wrap(func() { c.queueScanLoop(q) }) +// UseCompression enables the HTTP compression middleware to +// supports gzip, deflate for HTTP Request/Response. +func (c *Crawler) UseCompression() *Crawler { + return c.UseMiddleware(CompressionMiddleware()) +} - atomic.StoreInt32(&c.isRunning, 1) +// UseProxy enables proxy for each of HTTP requests. +func (c *Crawler) UseProxy(proxyURL *url.URL) *Crawler { + return c.UseMiddleware(ProxyMiddleware(http.ProxyURL(proxyURL))) } -func (c *Crawler) downloader() Downloader { - if c.DownloadHandler != nil { - return c.DownloadHandler +// UseRobotstxt enables support robots.txt. +func (c *Crawler) UseRobotstxt() *Crawler { + return c.UseMiddleware(RobotstxtMiddleware()) +} + +func (c *Crawler) transport() http.RoundTripper { + ts := &http.Transport{ + MaxIdleConns: 1000, + MaxIdleConnsPerHost: c.maxConcurrentRequestsPerSite() * 2, + IdleConnTimeout: 120 * time.Second, + TLSHandshakeTimeout: 10 * time.Second, + ExpectContinueTimeout: 1 * time.Second, + DialContext: proxyDialContext, } - return httpClient() + + var stack HttpMessageHandler = HttpMessageHandlerFunc(func(req *http.Request) (*http.Response, error) { + return ts.RoundTrip(req) + }) + for i := len(c.mids) - 1; i >= 0; i-- { + stack = c.mids[i](stack) + } + + return roundTripperFunc(stack.Send) } -func (c *Crawler) spider() Spider { - if c.MessageHandler != nil { - return c.MessageHandler +func (c *Crawler) pipeline() PipelineHandler { + var stack PipelineHandler = PipelineHandlerFunc(func(item Item) {}) + for i := len(c.pipes) - 1; i >= 0; i-- { + stack = c.pipes[i](stack) } - return NoOpSpider() + return stack } -func (c *Crawler) queueScanWorker(requestCh chan *http.Request, closeCh chan int) { - newFetcher := func() *fetcher { - return &fetcher{ - c: c, - queueCh: make(chan requestAndChan, c.maxFetchersPerHost()), - quitCh: make(chan struct{}), +type roundTripperFunc func(*http.Request) (*http.Response, error) + +func (f roundTripperFunc) RoundTrip(req *http.Request) (*http.Response, error) { + return f(req) +} + +func (c *Crawler) pathMatch(path string) (h Handler, pattern string) { + var n = 0 + for k, v := range c.m { + if strings.Index(k, path) == -1 { + continue + } + if h == nil || len(k) > n { + n = len(k) + h = v.h + pattern = v.pattern } } + return +} + +func (c *Crawler) handler(u *url.URL) (h Handler, pattern string) { + c.mu.RLock() + defer c.mu.RUnlock() + + host, _, _ := net.SplitHostPort(u.Host) + h, pattern = c.pathMatch(host) + if h == nil { + h, pattern = c.pathMatch("*") + } + if h == nil { + h, pattern = VoidHandler(), "" + } + return +} + +func (c *Crawler) maxConcurrentRequestsPerSite() int { + if v := c.MaxConcurrentRequestsPerSite; v > 0 { + return v + } + return 1 +} +func (c *Crawler) maxConcurrentRequests() int { + if v := c.MaxConcurrentRequests; v > 0 { + return v + } + return 16 +} + +func (c *Crawler) maxConcurrentItems() int { + if v := c.MaxConcurrentItems; v > 0 { + return v + } + return 32 +} + +func (c *Crawler) downloadDelay() time.Duration { + if v := c.DownloadDelay; v > 0 { + return v + } + return 250 * time.Millisecond // 0.25s +} + +func (c *Crawler) requestTimeout() time.Duration { + if v := c.RequestTimeout; v > 0 { + return v + } + return 30 * time.Second +} + +func (c *Crawler) init() { + c.client = &http.Client{ + Transport: c.transport(), + CheckRedirect: c.CheckRedirect, + Timeout: c.requestTimeout(), + } + + c.pipeHandler = c.pipeline() + c.readCh = make(chan *http.Request) + c.writeCh = make(chan Item) + go c.readLoop() + go c.writeLoop() +} + +func (c *Crawler) scanRequestWork(workCh chan chan *http.Request, closeCh chan int) { + reqch := make(chan *http.Request) for { + workCh <- reqch select { - case req := <-requestCh: - var ( - f = c.getFetcher(req.URL.Host, newFetcher) - resch = make(chan responseAndError) - reqch = requestAndChan{ctx: context.Background(), req: req, ch: resch} - ) + case req := <-reqch: + resc := make(chan responseAndError) + spider := c.getSpider(req.URL) + + if req.Header.Get("User-Agent") == "" && c.UserAgent != "" { + req.Header.Set("User-Agent", c.UserAgent) + } + spider.reqch <- requestAndChan{req: req, ch: resc} select { - case f.queueCh <- reqch: - // Waiting an HTTP response. - select { - case re := <-resch: - if re.err != nil { - logrus.Error(re.err) - } else { - c.spider().ProcessResponse(re.ctx, re.res) - re.res.Body.Close() - } - case <-closeCh: - return + case re := <-resc: + closeRequest(req) + if re.err != nil { + logrus.Warnf("antch: send HTTP request got error: %v", re.err) + } else { + go func(res *http.Response) { + defer closeResponse(res) + defer func() { + if r := recover(); r != nil { + logrus.Panicf("antch: Handler got panic error: %v", r) + } + }() + h, _ := c.Handler(res.Request.URL) + h.ServeSpider(c.writeCh, res) + }(re.res) } - case <-f.quitCh: - // fetcher has exit work. case <-closeCh: + closeRequest(req) return } case <-closeCh: @@ -131,244 +312,190 @@ func (c *Crawler) queueScanWorker(requestCh chan *http.Request, closeCh chan int } } -func (c *Crawler) resizePool(requestCh chan *http.Request, closeCh chan int) { +// readLoop reads HTTP crawl request from queue and to execute. +func (c *Crawler) readLoop() { + closeCh := make(chan int) + workCh := make(chan chan *http.Request, c.maxConcurrentRequests()) + + for i := 0; i < c.maxConcurrentRequests(); i++ { + go func() { + c.scanRequestWork(workCh, closeCh) + }() + } + for { - if c.poolSize == c.maxWorkers() { - break - } else if c.poolSize > c.maxWorkers() { - // contract - closeCh <- 1 - c.poolSize-- - } else { - // expand - c.waitGroup.Wrap(func() { - c.queueScanWorker(requestCh, closeCh) - }) - c.poolSize++ + select { + case req := <-c.readCh: + reqch := <-workCh + reqch <- req + case <-c.Exit: + goto exit } } +exit: + close(closeCh) } -func (c *Crawler) queueScanLoop(q Queue) { - requestCh := make(chan *http.Request, c.maxWorkers()) - closeCh := make(chan int, c.maxWorkers()) - - refreshTicker := time.NewTicker(6 * time.Second) - - c.resizePool(requestCh, closeCh) +// writeLoop writes a received Item into the item pippeline. +func (c *Crawler) writeLoop() { + closeCh := make(chan int) + workCh := make(chan Item, c.maxConcurrentItems()) + for i := 0; i < c.maxConcurrentItems(); i++ { + go func() { + c.scanPipelineWork(workCh, closeCh) + }() + } for { select { - case <-refreshTicker.C: - c.resizePool(requestCh, closeCh) - case <-c.exitCh: + case item := <-c.writeCh: + workCh <- item + case <-c.Exit: goto exit - default: - urlStr, err := q.Dequeue() - switch { - case err == io.EOF: - // No URLs in the queue q. - select { - case <-time.After(200 * time.Millisecond): - } - continue - case err != nil: - // Got error. - logrus.Error(err) - continue - } - req, err := http.NewRequest("GET", urlStr, nil) - if err != nil { - continue - } - // Settings a User-Agent to identify by remote server. - req.Header.Set("User-Agent", "antch") - requestCh <- req } } - exit: close(closeCh) - refreshTicker.Stop() } -// getFetcher returns a fetcher to access for the specified website. -func (c *Crawler) getFetcher(host string, newf func() *fetcher) *fetcher { - c.fetcherMu.RLock() - f, ok := c.fetchers[host] - c.fetcherMu.RUnlock() - if ok { - return f - } - - c.fetcherMu.Lock() - defer c.fetcherMu.Unlock() - - if c.fetchers == nil { - c.fetchers = make(map[string]*fetcher) +func (c *Crawler) scanPipelineWork(workCh chan Item, closeCh chan int) { + for { + select { + case v := <-workCh: + done := make(chan int) + go func() { + defer close(done) + defer func() { + if r := recover(); r != nil { + logrus.Panicf("antch: Handler got panic error: %v", r) + } + }() + c.pipeHandler.ServePipeline(v) + }() + select { + case <-done: + case <-closeCh: + return + } + case <-closeCh: + return + } } - f = newf() - c.fetchers[host] = f - - go func() { f.fetchLoop() }() - - return f } -func (c *Crawler) removeFetcher(f *fetcher) { - c.fetcherMu.Lock() - defer c.fetcherMu.Unlock() - - var host string - for k, v := range c.fetchers { - if v == f { - host = k - break - } - } - delete(c.fetchers, host) +// removeIdleSpider makes spider as dead. +func (c *Crawler) removeSpider(s *spider) { + c.spiderMu.Lock() + defer c.spiderMu.Unlock() + delete(c.spider, s.key) } -func (c *Crawler) maxWorkers() int { - if v := c.MaxWorkers; v != 0 { - return v +// getSpider returns a spider for the given URL. +func (c *Crawler) getSpider(url *url.URL) *spider { + c.spiderMu.Lock() + defer c.spiderMu.Unlock() + + if c.spider == nil { + c.spider = make(map[string]*spider) } - return runtime.NumCPU() * 3 -} -func (c *Crawler) maxFetchersPerHost() int { - if v := c.MaxFetchersPerHost; v != 0 { - return v + host, _, _ := net.SplitHostPort(url.Host) + key := fmt.Sprintf("%s%s", url.Scheme, host) + s, ok := c.spider[key] + if !ok { + s = &spider{ + c: c, + reqch: make(chan requestAndChan), + key: key, + } + c.spider[key] = s + go s.crawlLoop() } - return DefaultMaxFetchersPerHost + return s } type requestAndChan struct { req *http.Request - ctx context.Context ch chan responseAndError } type responseAndError struct { res *http.Response - ctx context.Context err error } -// fetcher is a crawler worker to limit number of worker to fetch at one time -// for the same host. -type fetcher struct { - c *Crawler - n int // number of worker running - queueCh chan requestAndChan // URLS queue with in same host. - quitCh chan struct{} // worker exit channel. +// spider is http spider for the single site. +type spider struct { + c *Crawler + reqch chan requestAndChan + key string } -func (f *fetcher) maxWorkers() int { - return f.c.maxFetchersPerHost() +func (s *spider) queueScanWorker(workCh chan chan requestAndChan, respCh chan int, closeCh chan struct{}) { + rc := make(chan requestAndChan) + for { + workCh <- rc + select { + case c := <-rc: + resp, err := s.c.client.Do(c.req) + select { + case c.ch <- responseAndError{resp, err}: + respCh <- 1 + case <-closeCh: + return + } + case <-closeCh: + return + } + } } -// fetchLoop runs in a single goroutine to execute download for a receved crawl request -// from its own URLs channel. -func (f *fetcher) fetchLoop() { - workerPool := make(chan chan requestAndChan, f.maxWorkers()) - closeCh := make(chan int, f.maxWorkers()) +func (s *spider) crawlLoop() { + const idleTimeout = 120 * time.Second - const idleTimeout = 10 * time.Minute + respCh := make(chan int) + closeCh := make(chan struct{}) idleTimer := time.NewTimer(idleTimeout) - refreshTicker := time.NewTicker(8 * time.Second) + workCh := make(chan chan requestAndChan, s.c.maxConcurrentRequestsPerSite()) + + for i := 0; i < s.c.maxConcurrentRequestsPerSite(); i++ { + go func() { + s.queueScanWorker(workCh, respCh, closeCh) + }() + } - f.resizePool(workerPool, closeCh) for { select { - case reqCh := <-f.queueCh: - select { - case workCh := <-workerPool: - select { - case workCh <- reqCh: - idleTimer.Reset(idleTimeout) - default: - // Workch has closed. - d := reqCh - go func() { - f.queueCh <- d - }() - } - case <-f.c.exitCh: - // Main server has exit. - goto exit + case rc := <-s.reqch: + // Wait a moment time before start fetching. + if t := s.c.downloadDelay(); t > 0 { + <-time.After(t) } - case <-refreshTicker.C: - f.resizePool(workerPool, closeCh) + c := <-workCh + c <- rc + case <-respCh: + idleTimer.Reset(idleTimeout) case <-idleTimer.C: - // Worker has inactive. - f.c.removeFetcher(f) goto exit - case <-f.c.exitCh: - // Main server has exit. + case <-s.c.Exit: goto exit } } + exit: + s.c.removeSpider(s) close(closeCh) - close(f.quitCh) - refreshTicker.Stop() + idleTimer.Stop() } -func (f *fetcher) resizePool(workerPool chan chan requestAndChan, closeCh chan int) { - for { - if f.n == f.maxWorkers() { - break - } else if f.n > f.maxWorkers() { - // Decrease a worker number - closeCh <- 1 - f.n-- - } else { - // Increase a worker number - go func() { f.fetchWorker(workerPool, closeCh) }() - f.n++ - } - } -} - -func (f *fetcher) fetchWorker(workerPool chan chan requestAndChan, closeCh chan int) { - workCh := make(chan requestAndChan) - for { - select { - case workerPool <- workCh: - select { - case rc := <-workCh: - resch := make(chan responseAndError) - ctx, cancel := context.WithCancel(rc.ctx) - - go func() { - defer closeRequest(rc.req) - resp, err := f.c.downloader().ProcessRequest(ctx, rc.req) - resch <- responseAndError{res: resp, err: err, ctx: rc.ctx} - }() - - select { - case re := <-resch: - rc.ch <- re - case <-closeCh: - cancel() - rc.ch <- responseAndError{err: errors.New("antch: request has canceled")} - return - } - case <-closeCh: - // exit a worker - return - } - case <-closeCh: - // exit a worker - return - } +func closeRequest(r *http.Request) { + if r != nil && r.Body != nil { + r.Body.Close() } } -// Close HTTP request object. -func closeRequest(req *http.Request) { - if req.Body != nil { - req.Body.Close() +func closeResponse(r *http.Response) { + if r != nil && r.Body != nil { + r.Body.Close() } } diff --git a/downloader.go b/downloader.go deleted file mode 100644 index 529081d..0000000 --- a/downloader.go +++ /dev/null @@ -1,231 +0,0 @@ -package antch - -import ( - "context" - "errors" - "io" - "net" - "net/http" - "sync" - "time" - - "github.com/antchfx/antch/internal/util" -) - -var ( - // DefaultDialer is a default HTTP dial for HTTP connection - // and is used by DownloaderStack. - DefaultDialer DialFunc = (&net.Dialer{ - Timeout: 30 * time.Second, - KeepAlive: 30 * time.Second, - DualStack: true, - }).DialContext -) - -// Dialer specifies the dial function for creating HTTP connections. -type DialFunc func(ctx context.Context, network, address string) (net.Conn, error) - -// Downloader is a web crawler handler to download web page from remote server -// for the given crawl request. -type Downloader interface { - ProcessRequest(context.Context, *http.Request) (*http.Response, error) -} - -// DownloaderFunc is an adapter type to allow the use of ordinary -// functions as Downloader handlers. -type DownloaderFunc func(context.Context, *http.Request) (*http.Response, error) - -func (f DownloaderFunc) ProcessRequest(ctx context.Context, req *http.Request) (*http.Response, error) { - return f(ctx, req) -} - -// DownloaderMiddleware is middleware type for downloader. -type DownloaderMiddleware func(Downloader) Downloader - -// httpClient returns Downloader that uses http.DefaultClient -// to execute HTTP request. -func httpClient() Downloader { - return DownloaderFunc(func(_ context.Context, req *http.Request) (*http.Response, error) { - return http.DefaultClient.Do(req) - }) -} - -// DownloaderStack is an implementation of Downloader to allows register custom Downloader -// middleware to handle HTTP request and Http response. -type DownloaderStack struct { - handler Downloader - - mids []DownloaderMiddleware - midOnce sync.Once - - ts http.RoundTripper - tsOnce sync.Once - - // Timeout specifies the maximum time to wait before - // receive HTTP response. - Timeout time.Duration -} - -// UseMiddleware registeres a new middleware into Downloader. -func (dl *DownloaderStack) UseMiddleware(mid DownloaderMiddleware) *DownloaderStack { - dl.mids = append(dl.mids, mid) - return dl -} - -// onceSetupMiddleware is setup all middlewares that registered by UseMiddleware function. -func (dl *DownloaderStack) onceSetupMiddleware() { - var stack Downloader - stack = DownloaderFunc(dl.send) - for i := len(dl.mids) - 1; i >= 0; i-- { - stack = dl.mids[i](stack) - } - dl.handler = stack -} - -func (dl *DownloaderStack) deadline() time.Time { - if dl.Timeout > 0 { - return time.Now().Add(dl.Timeout) - } - return time.Time{} -} - -// setRequestCancel sets the cancel HTTP request operation if deadline was reached. -func setRequestCancel(deadline time.Time, doCancel func()) (stopTimer func(), didTimeout func() bool) { - var nop = func() {} - var alwaysFalse = func() bool { return false } - - if deadline.IsZero() { - return nop, alwaysFalse - } - - stopTimerCh := make(chan struct{}) - var once sync.Once - stopTimer = func() { once.Do(func() { close(stopTimerCh) }) } - - timer := time.NewTimer(time.Until(deadline)) - var timeout util.AtomicBool - - go func() { - select { - case <-timer.C: - timeout.SetTrue() - doCancel() - case <-stopTimerCh: - timer.Stop() - } - }() - - return stopTimer, timeout.IsSet -} - -// send is sends a HTTP request and receive HTTP response from remote server. -func (dl *DownloaderStack) send(ctx context.Context, req *http.Request) (*http.Response, error) { - deadline := dl.deadline() - reqctx, doCancel := context.WithCancel(req.Context()) - stopTimer, didTimeout := setRequestCancel(deadline, doCancel) - - resc := make(chan responseAndError) - go func() { - resp, err := dl.transport().RoundTrip(req.WithContext(reqctx)) - resc <- responseAndError{res: resp, err: err} - }() - - var re responseAndError - select { - case <-ctx.Done(): - doCancel() - re = responseAndError{err: ctx.Err()} - case re = <-resc: - } - - if re.err != nil { - stopTimer() - err := re.err - if !deadline.IsZero() && didTimeout() { - err = &httpError{ - err: err.Error() + " (spider timeout exceeded while awaiting headers)", - timeout: true, - } - } - return nil, err - } - - if !deadline.IsZero() { - re.res.Body = &cancelTimerBody{ - stop: stopTimer, - rc: re.res.Body, - reqDidTimeout: didTimeout, - } - } - return re.res, nil -} - -func (dl *DownloaderStack) initTransport() { - dl.ts = &http.Transport{ - DialContext: DefaultDialer, - DisableCompression: false, - MaxIdleConns: 500, - MaxIdleConnsPerHost: 10, - IdleConnTimeout: 120 * time.Second, - TLSHandshakeTimeout: 10 * time.Second, - ExpectContinueTimeout: 1 * time.Second, - } -} - -func (dl *DownloaderStack) transport() http.RoundTripper { - dl.tsOnce.Do(dl.initTransport) - return dl.ts -} - -func (dl *DownloaderStack) ProcessRequest(ctx context.Context, req *http.Request) (*http.Response, error) { - if req == nil { - return nil, errors.New("antch: req is nil") - } - // Setup middlewares by once. - dl.midOnce.Do(dl.onceSetupMiddleware) - return dl.handler.ProcessRequest(ctx, req) -} - -type httpError struct { - err string - timeout bool -} - -func (e *httpError) Error() string { return e.err } -func (e *httpError) Timeout() bool { return e.timeout } -func (e *httpError) Temporary() bool { return true } - -// cancelTimerBody is an io.ReadCloser that wraps rc with two features: -// 1) on Read error or close, the stop func is called. -// 2) On Read failure, if reqDidTimeout is true, the error is wrapped and -// marked as net.Error that hit its timeout. -// https://github.com/golang/go/blob/master/src/net/http/client.go#L800 -type cancelTimerBody struct { - stop func() - rc io.ReadCloser - reqDidTimeout func() bool -} - -func (b *cancelTimerBody) Read(p []byte) (n int, err error) { - n, err = b.rc.Read(p) - if err == nil { - return n, nil - } - b.stop() - if err == io.EOF { - return n, err - } - if b.reqDidTimeout() { - err = &httpError{ - err: err.Error() + " (spider timeout exceeded while reading body)", - timeout: true, - } - } - return n, err -} - -func (b *cancelTimerBody) Close() error { - err := b.rc.Close() - b.stop() - return err -} diff --git a/examples/amazon/main.go b/examples/amazon/main.go deleted file mode 100644 index dabd8ab..0000000 --- a/examples/amazon/main.go +++ /dev/null @@ -1,58 +0,0 @@ -package main - -import ( - "context" - "fmt" - "net/http" - - "github.com/antchfx/antch" - "github.com/antchfx/antch/middleware" - "github.com/antchfx/xquery/html" -) - -func amazonSpider(q antch.Queue) antch.Spider { - return antch.SpiderFunc(func(ctx context.Context, resp *http.Response) error { - fmt.Println(resp.Request.URL) - // Parse HTTP response as HTML document. - doc, err := antch.ParseHTML(resp) - if err != nil { - return err - } - // extract all links from HTML document using XPath. - for _, n := range htmlquery.Find(doc, "//a[@href]") { - urlstr := htmlquery.SelectAttr(n, "href") - if u, err := resp.Request.URL.Parse(urlstr); err == nil { - q.Enqueue(u.String()) - } - } - return nil - }) -} - -func main() { - exitCh := make(chan int) - var startURL = "https://www.amazon.com/" - - // Declare a Queue is used by Crawler. - queue := &antch.SimpleHeapQueue{} - queue.Enqueue(startURL) - - // Declare a new instance of Downloader. - downloader := &antch.DownloaderStack{} - // Registers a new middleware for Downloader - for _, mid := range middleware.DefaultDownloaderMiddlewares { - downloader.UseMiddleware(mid) - } - - // Declare a spider to handle all receives HTTP response. - spider := amazonSpider(queue) - var crawler = &antch.Crawler{ - MaxWorkers: 1, - DownloadHandler: downloader, - MessageHandler: spider, - } - crawler.Run(queue) - - <-exitCh - crawler.Stop() -} diff --git a/examples/simple/main.go b/examples/simple/main.go deleted file mode 100644 index 1daed9d..0000000 --- a/examples/simple/main.go +++ /dev/null @@ -1,19 +0,0 @@ -package main - -import "github.com/antchfx/antch" - -func main() { - var startURLs = []string{ - "https://www.amazon.com/", - "https://www.reddit.com/", - "https://news.ycombinator.com/news", - } - // Declare a Queue is used by Crawler. - queue := &antch.SimpleHeapQueue{} - for _, URL := range startURLs { - queue.Enqueue(URL) - } - - // Start to crawling website. - antch.DefaultCrawler.Run(queue) -} diff --git a/parse.go b/html.go similarity index 52% rename from parse.go rename to html.go index 4aff7ab..af9be2e 100644 --- a/parse.go +++ b/html.go @@ -2,11 +2,12 @@ package antch import ( "bytes" + "fmt" "io" + "mime" "net/http" "github.com/antchfx/xquery/html" - "github.com/antchfx/xquery/xml" "golang.org/x/net/html" "golang.org/x/net/html/charset" "golang.org/x/text/encoding" @@ -14,6 +15,39 @@ import ( "golang.org/x/text/transform" ) +// MediaType describe the content type of an HTTP request or HTTP response. +type MediaType struct { + // Type is the HTTP content type represents. such as + // "text/html", "image/jpeg". + Type string + // Charset is the HTTP content encoding represents. + Charset string +} + +// ContentType returns the HTTP header content-type value. +func (m MediaType) ContentType() string { + if len(m.Type) > 0 && m.Charset != "" { + return fmt.Sprintf("%s; charset=%s", m.Type, m.Charset) + } + return m.Type +} + +// ParseMediaType parsing a specified string v to MediaType struct. +func ParseMediaType(v string) MediaType { + if v == "" { + return MediaType{} + } + + mimetype, params, err := mime.ParseMediaType(v) + if err != nil { + return MediaType{} + } + return MediaType{ + Type: mimetype, + Charset: params["charset"], + } +} + // ParseHTML parses an HTTP response as HTML document. func ParseHTML(resp *http.Response) (*html.Node, error) { var ( @@ -23,7 +57,8 @@ func ParseHTML(resp *http.Response) (*html.Node, error) { mediatype := ParseMediaType(resp.Header.Get("Content-Type")) if mediatype.Charset == "" { - // If response HTTP header not include charset. + // If HTTP Response's header not include a charset field, + // reads 1024 bytes from Response body and geting encoding. preview := make([]byte, 1024) n, err := io.ReadFull(r, preview) switch { @@ -50,8 +85,3 @@ func ParseHTML(resp *http.Response) (*html.Node, error) { } return htmlquery.Parse(r) } - -// ParseXML parses an HTTP response as XML document. -func ParseXML(resp *http.Response) (*xmlquery.Node, error) { - return xmlquery.Parse(resp.Body) -} diff --git a/internal/util/atomic.go b/internal/util/atomic.go deleted file mode 100644 index a25e67e..0000000 --- a/internal/util/atomic.go +++ /dev/null @@ -1,32 +0,0 @@ -package util - -import ( - "strconv" - "sync/atomic" -) - -// An AtomicBool is an boolean value updated atomically. -type AtomicBool int32 - -func (b *AtomicBool) IsSet() bool { - return atomic.LoadInt32((*int32)(b)) != 0 -} - -func (b *AtomicBool) SetTrue() { - atomic.StoreInt32((*int32)(b), 1) -} - -// An AtomicInt is an int64 to be accessed atomically. -type AtomicInt int64 - -func (i *AtomicInt) Add(n int64) { - atomic.AddInt64((*int64)(i), n) -} - -func (i *AtomicInt) Get() int64 { - return atomic.LoadInt64((*int64)(i)) -} - -func (i *AtomicInt) String() string { - return strconv.FormatInt(i.Get(), 10) -} diff --git a/internal/util/wait_group_wrapper.go b/internal/util/wait_group_wrapper.go deleted file mode 100644 index abba6e7..0000000 --- a/internal/util/wait_group_wrapper.go +++ /dev/null @@ -1,15 +0,0 @@ -package util - -import "sync" - -type WaitGroupWrapper struct { - sync.WaitGroup -} - -func (w *WaitGroupWrapper) Wrap(cb func()) { - w.Add(1) - go func() { - cb() - w.Done() - }() -} diff --git a/mediatype.go b/mediatype.go deleted file mode 100644 index 3e96667..0000000 --- a/mediatype.go +++ /dev/null @@ -1,39 +0,0 @@ -package antch - -import ( - "fmt" - "mime" -) - -// MediaType describe the content type of an HTTP request or HTTP response. -type MediaType struct { - // Type is the HTTP content type represents. such as - // "text/html", "image/jpeg". - Type string - // Charset is the HTTP content encoding represents. - Charset string -} - -// ContentType returns the HTTP header content-type value. -func (m MediaType) ContentType() string { - if len(m.Type) > 0 && m.Charset != "" { - return fmt.Sprintf("%s; charset=%s", m.Type, m.Charset) - } - return m.Type -} - -// ParseMediaType parsing a specified string v to MediaType struct. -func ParseMediaType(v string) MediaType { - if v == "" { - return MediaType{} - } - - mimetype, params, err := mime.ParseMediaType(v) - if err != nil { - return MediaType{} - } - return MediaType{ - Type: mimetype, - Charset: params["charset"], - } -} diff --git a/middleware.go b/middleware.go new file mode 100644 index 0000000..505c5ed --- /dev/null +++ b/middleware.go @@ -0,0 +1,25 @@ +package antch + +import ( + "net/http" +) + +// HttpMessageHandler is an interface that receives an HTTP request +// and returns an HTTP response. +type HttpMessageHandler interface { + Send(*http.Request) (*http.Response, error) +} + +// Middleware is the HTTP message transport middle layer that send +// HTTP request passed one message Handler to the next message Handler +// until returns an HTTP response. +type Middleware func(HttpMessageHandler) HttpMessageHandler + +// HttpMessageHandlerFunc is an adapter to allow the use of ordinary +// functions as HttpMessageHandler. +type HttpMessageHandlerFunc func(*http.Request) (*http.Response, error) + +// Send sends a HTTP request and receives HTTP response. +func (f HttpMessageHandlerFunc) Send(req *http.Request) (*http.Response, error) { + return f(req) +} diff --git a/middleware/compression.go b/middleware/compression.go deleted file mode 100644 index aeff047..0000000 --- a/middleware/compression.go +++ /dev/null @@ -1,87 +0,0 @@ -package middleware - -import ( - "compress/gzip" - "compress/zlib" - "context" - "io" - "net/http" - - "github.com/antchfx/antch" -) - -// HttpCompression is a middleware of Downloader to allows compressed -// (gzip, deflate) traffic to be sent/received from web sites. -type HttpCompression struct { - Next antch.Downloader -} - -func (c *HttpCompression) ProcessRequest(ctx context.Context, req *http.Request) (resp *http.Response, err error) { - req.Header.Set("Accept-Encoding", "gzip, deflate") - - resp, err = c.Next.ProcessRequest(ctx, req) - if err != nil { - return nil, err - } - - if body, ok := decompress(resp.Header.Get("Content-Encoding"), resp.Body); ok { - resp.Body = body - resp.Header.Del("Content-Encoding") - resp.Header.Del("Content-Length") - resp.ContentLength = -1 - resp.Uncompressed = true - } - return -} - -func decompress(name string, body io.ReadCloser) (io.ReadCloser, bool) { - switch name { - case "gzip": - return &gzipReader{body: body}, true - case "deflate": - return &deflateReader{body: body}, true - } - return nil, false -} - -// gzipReader is a reader with gzip decompress mode. -type gzipReader struct { - r io.Reader - body io.ReadCloser -} - -func (z *gzipReader) Read(p []byte) (int, error) { - if z.r == nil { - var err error - z.r, err = gzip.NewReader(z.body) - if err != nil { - return 0, err - } - } - return z.r.Read(p) -} - -func (z *gzipReader) Close() error { - return z.body.Close() -} - -// deflateReader is a reader with deflate decompress mode. -type deflateReader struct { - r io.Reader - body io.ReadCloser -} - -func (r *deflateReader) Read(p []byte) (int, error) { - if r.r == nil { - rc, err := zlib.NewReader(r.body) - if err != nil { - return 0, err - } - r.r = rc - } - return r.r.Read(p) -} - -func (r *deflateReader) Close() error { - return r.body.Close() -} diff --git a/middleware/compression_test.go b/middleware/compression_test.go deleted file mode 100644 index 2a0cbe9..0000000 --- a/middleware/compression_test.go +++ /dev/null @@ -1,49 +0,0 @@ -package middleware - -import ( - "compress/gzip" - "context" - "io/ioutil" - "net/http" - "net/http/httptest" - "testing" - - "github.com/antchfx/antch" -) - -func TestHttpCompressionMiddleware(t *testing.T) { - ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.Header().Set("Content-Encoding", "gzip") - gw := gzip.NewWriter(w) - gw.Write([]byte("hello world")) - gw.Close() - })) - defer ts.Close() - - ds := &antch.DownloaderStack{} - fn := antch.DownloaderFunc(func(ctx context.Context, req *http.Request) (*http.Response, error) { - return ds.ProcessRequest(ctx, req) - }) - - req, _ := http.NewRequest("GET", ts.URL, nil) - mid := &HttpCompression{Next: fn} - - resp, err := mid.ProcessRequest(context.Background(), req) - if err != nil { - t.Fatal(err) - } - defer resp.Body.Close() - - b, err := ioutil.ReadAll(resp.Body) - if err != nil { - t.Fatalf("Read error: %v", err) - } - - if resp.ContentLength != -1 { - t.Fatalf("ContentLength = %d; want -1", resp.ContentLength) - } - - if given, excepted := string(b), "hello world"; given != excepted { - t.Fatalf("response body = %s; want %s", given, excepted) - } -} diff --git a/middleware/cookie.go b/middleware/cookie.go deleted file mode 100644 index d437c09..0000000 --- a/middleware/cookie.go +++ /dev/null @@ -1,46 +0,0 @@ -package middleware - -import ( - "context" - "net/http" - "net/http/cookiejar" - - "github.com/antchfx/antch" - "golang.org/x/net/publicsuffix" -) - -var defaultCookieJar, _ = cookiejar.New(&cookiejar.Options{PublicSuffixList: publicsuffix.List}) - -// Cookie is a middleware of Downloader for HTTP cookies -// management for each of HTTP request and HTTP response. -type Cookie struct { - // Jar specifies the cookie jar. - Jar http.CookieJar - - Next antch.Downloader -} - -func (c *Cookie) jar() http.CookieJar { - if c.Jar != nil { - return c.Jar - } - return defaultCookieJar -} - -func (c *Cookie) ProcessRequest(ctx context.Context, req *http.Request) (resp *http.Response, err error) { - // Delete previous cookie value before set new cookie value. - req.Header.Del("Cookie") - - jar := c.jar() - - for _, cookie := range jar.Cookies(req.URL) { - req.AddCookie(cookie) - } - - if resp, err = c.Next.ProcessRequest(ctx, req); err == nil { - if rc := resp.Cookies(); len(rc) > 0 { - jar.SetCookies(req.URL, rc) - } - } - return resp, err -} diff --git a/middleware/cookie_test.go b/middleware/cookie_test.go deleted file mode 100644 index 992939a..0000000 --- a/middleware/cookie_test.go +++ /dev/null @@ -1,60 +0,0 @@ -package middleware - -import ( - "context" - "net/http" - "net/http/httptest" - "net/url" - "testing" - - "github.com/antchfx/antch" -) - -func TestCookieMiddleware(t *testing.T) { - ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - for _, cookie := range r.Cookies() { - http.SetCookie(w, cookie) - } - })) - defer ts.Close() - - var expectedCookies = []*http.Cookie{ - {Name: "ChocolateChip", Value: "tasty"}, - {Name: "First", Value: "Hit"}, - {Name: "Second", Value: "Hit"}, - } - - ds := antch.DownloaderStack{} - fn := antch.DownloaderFunc(func(ctx context.Context, req *http.Request) (*http.Response, error) { - return ds.ProcessRequest(ctx, req) - }) - - u, _ := url.Parse(ts.URL) - mid := &Cookie{Jar: defaultCookieJar, Next: fn} - mid.Jar.SetCookies(u, expectedCookies) - - req, _ := http.NewRequest("GET", ts.URL, nil) - resp, err := mid.ProcessRequest(context.Background(), req) - if err != nil { - t.Fatal(err) - } - defer resp.Body.Close() - - givenCookies := resp.Cookies() - if len(givenCookies) != len(expectedCookies) { - t.Errorf("Expected %d cookies, got %d", len(expectedCookies), len(givenCookies)) - } - - for _, ec := range expectedCookies { - foundC := false - for _, c := range givenCookies { - if ec.Name == c.Name && ec.Value == c.Value { - foundC = true - break - } - } - if !foundC { - t.Errorf("Missing cookie %v", ec) - } - } -} diff --git a/middleware/delay.go b/middleware/delay.go deleted file mode 100644 index 7898e04..0000000 --- a/middleware/delay.go +++ /dev/null @@ -1,43 +0,0 @@ -package middleware - -import ( - "context" - "net/http" - "time" - - "github.com/antchfx/antch" -) - -// DownloadDelay is a middleware of Downloader to delay to download -// for each of HTTP request. -type DownloadDelay struct { - // DelayTime specifies delay time to wait before access website. - // If Zero, then default delay time(200ms) is used. - DelayTime time.Duration - - Next antch.Downloader -} - -// -type DownloadDelayKey struct{} - -const DefaultDelayTime = 200 * time.Millisecond - -func (d *DownloadDelay) ProcessRequest(ctx context.Context, req *http.Request) (*http.Response, error) { - dt := d.DelayTime - // If request context have a crawl-delay value, - // then this delay is used instead. - if v := req.Context().Value(DownloadDelayKey{}); v != nil { - dt = v.(time.Duration) - } - - if dt > 0 { - select { - case <-ctx.Done(): - return nil, context.Canceled - case <-time.After(dt): - } - } - - return d.Next.ProcessRequest(ctx, req) -} diff --git a/middleware/delay_test.go b/middleware/delay_test.go deleted file mode 100644 index 984e7c3..0000000 --- a/middleware/delay_test.go +++ /dev/null @@ -1,43 +0,0 @@ -package middleware - -import ( - "context" - "net/http" - "net/http/httptest" - "testing" - "time" - - "github.com/antchfx/antch" -) - -func TestDelayMiddleware(t *testing.T) { - ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(200) - })) - defer ts.Close() - - ds := &antch.DownloaderStack{} - fn := antch.DownloaderFunc(func(ctx context.Context, req *http.Request) (*http.Response, error) { - return ds.ProcessRequest(ctx, req) - }) - - req, _ := http.NewRequest("GET", ts.URL, nil) - - mid := &DownloadDelay{DelayTime: 100 * time.Millisecond, Next: fn} - _, err := mid.ProcessRequest(context.Background(), req) - if err != nil { - t.Fatal(err) - } - - ctx, cancel := context.WithCancel(context.Background()) - go func() { - select { - case <-time.After(50 * time.Millisecond): - cancel() - } - }() - _, err = mid.ProcessRequest(ctx, req) - if err != context.Canceled { - t.Fatalf("expected error %q, got %q", err, context.Canceled) - } -} diff --git a/middleware/middleware.go b/middleware/middleware.go deleted file mode 100644 index 23df42a..0000000 --- a/middleware/middleware.go +++ /dev/null @@ -1,36 +0,0 @@ -package middleware - -import ( - "github.com/antchfx/antch" -) - -var DefaultDownloaderMiddlewares = []antch.DownloaderMiddleware{ - downloadDelayMiddleware(), - httpCompressionMiddleware(), - redirectMiddleware(), - cookieMiddleware(), -} - -func redirectMiddleware() antch.DownloaderMiddleware { - return func(next antch.Downloader) antch.Downloader { - return &Redirect{Next: next} - } -} - -func cookieMiddleware() antch.DownloaderMiddleware { - return func(next antch.Downloader) antch.Downloader { - return &Cookie{Next: next} - } -} - -func httpCompressionMiddleware() antch.DownloaderMiddleware { - return func(next antch.Downloader) antch.Downloader { - return &HttpCompression{Next: next} - } -} - -func downloadDelayMiddleware() antch.DownloaderMiddleware { - return func(next antch.Downloader) antch.Downloader { - return &DownloadDelay{Next: next} - } -} diff --git a/middleware/proxy_test.go b/middleware/proxy_test.go deleted file mode 100644 index 4057d56..0000000 --- a/middleware/proxy_test.go +++ /dev/null @@ -1,14 +0,0 @@ -package middleware - -import ( - "net/http" - "net/http/httptest" - "testing" -) - -func TestProxyMiddleware(t *testing.T) { - ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - - })) - defer ts.Close() -} diff --git a/middleware/redirect.go b/middleware/redirect.go deleted file mode 100644 index 883ef83..0000000 --- a/middleware/redirect.go +++ /dev/null @@ -1,254 +0,0 @@ -package middleware - -import ( - "context" - "errors" - "fmt" - "io" - "io/ioutil" - "net/http" - "net/url" - "sort" - "strings" - - "github.com/antchfx/antch" -) - -// Redirect is a middleware of Downloader for handling redirects -// from server. -type Redirect struct { - // CheckRedirect specifies the policy for handling redirects. - CheckRedirect func(req *http.Request, via []*http.Request) error - - // - Next antch.Downloader -} - -func defaultCheckRedirect(req *http.Request, via []*http.Request) error { - if len(via) >= 10 { - return errors.New("stopped after 10 redirects") - } - return nil -} - -func (r *Redirect) checkRedirect(req *http.Request, via []*http.Request) error { - fn := r.CheckRedirect - if fn == nil { - fn = defaultCheckRedirect - } - return fn(req, via) -} - -func (rd *Redirect) ProcessRequest(ctx context.Context, req *http.Request) (*http.Response, error) { - var ( - reqs []*http.Request - resp *http.Response - copyHeaders = makeHeadersCopier(req) - - redirectMethod string - includeBody bool - ) - - uerr := func(err error) error { - method := reqs[0].Method - var urlStr string - if resp != nil && resp.Request != nil { - urlStr = resp.Request.URL.String() - } else { - urlStr = req.URL.String() - } - return &url.Error{ - Op: method[:1] + strings.ToLower(method[1:]), - URL: urlStr, - Err: err, - } - } - - for { - if len(reqs) > 0 { - loc := resp.Header.Get("Location") - if loc == "" { - return nil, uerr(fmt.Errorf("redirct: %d response missing Location header", resp.StatusCode)) - } - u, err := req.URL.Parse(loc) - if err != nil { - return nil, uerr(fmt.Errorf("redirct: failed to parse Location header %q: %v", loc, err)) - } - - ireq := reqs[0] - req = &http.Request{ - Method: redirectMethod, - Response: resp, - URL: u, - Header: make(http.Header), - Cancel: ireq.Cancel, - } - req = req.WithContext(ireq.Context()) - - if includeBody && ireq.GetBody != nil { - req.Body, err = ireq.GetBody() - if err != nil { - return nil, uerr(err) - } - req.ContentLength = ireq.ContentLength - } - - copyHeaders(req) - - if ref := refererForURL(reqs[len(reqs)-1].URL, req.URL); ref != "" { - req.Header.Set("Referer", ref) - } - - err = rd.checkRedirect(req, reqs) - if err == http.ErrUseLastResponse { - return resp, nil - } - - const maxBodySlurpSize = 2 << 10 - if resp.ContentLength == -1 || resp.ContentLength <= maxBodySlurpSize { - io.CopyN(ioutil.Discard, resp.Body, maxBodySlurpSize) - } - resp.Body.Close() - - if err != nil { - ue := uerr(err) - ue.(*url.Error).URL = loc - return resp, ue - } - } - - reqs = append(reqs, req) - var err error - if resp, err = rd.Next.ProcessRequest(ctx, req); err != nil { - return nil, uerr(err) - } - - var shouldRedirect bool - redirectMethod, shouldRedirect, includeBody = redirectBehavior(req.Method, resp, reqs[0]) - if !shouldRedirect { - return resp, nil - } - } -} - -// makeHeadersCopier makes a function that copies headers from the -// initial Request, ireq. -func makeHeadersCopier(ireq *http.Request) func(*http.Request) { - var ( - ireqhdr = cloneHeader(ireq.Header) - icookies map[string][]*http.Cookie - ) - if ireq.Header.Get("Cookie") != "" { - icookies = make(map[string][]*http.Cookie) - for _, c := range ireq.Cookies() { - icookies[c.Name] = append(icookies[c.Name], c) - } - } - preq := ireq - return func(req *http.Request) { - if icookies != nil { - var changed bool - resp := req.Response // The response that caused the upcoming redirect - for _, c := range resp.Cookies() { - if _, ok := icookies[c.Name]; ok { - delete(icookies, c.Name) - changed = true - } - } - if changed { - ireqhdr.Del("Cookie") - var ss []string - for _, cs := range icookies { - for _, c := range cs { - ss = append(ss, c.Name+"="+c.Value) - } - } - sort.Strings(ss) - ireqhdr.Set("Cookie", strings.Join(ss, "; ")) - } - } - // Copy the initial request's Header values - // (at least the safe ones). - for k, vv := range ireqhdr { - if shouldCopyHeaderOnRedirect(k, preq.URL, req.URL) { - req.Header[k] = vv - } - } - - preq = req // Update previous Request with the current request - } -} - -func cloneHeader(h http.Header) http.Header { - h2 := make(http.Header, len(h)) - for k, vv := range h { - vv2 := make([]string, len(vv)) - copy(vv2, vv) - h2[k] = vv2 - } - return h2 -} - -// redirectBehavior describes what should happen when the -// client encounters a 3xx status code from the server -func redirectBehavior(reqMethod string, resp *http.Response, ireq *http.Request) (redirectMethod string, shouldRedirect, includeBody bool) { - switch resp.StatusCode { - case 301, 302, 303: - redirectMethod = reqMethod - shouldRedirect = true - includeBody = false - - if reqMethod != "GET" && reqMethod != "HEAD" { - redirectMethod = "GET" - } - case 307, 308: - redirectMethod = reqMethod - shouldRedirect = true - includeBody = true - - if resp.Header.Get("Location") == "" { - shouldRedirect = false - break - } - if ireq.GetBody == nil && ireq.Body != nil && ireq.Body != http.NoBody { - shouldRedirect = false - } - } - return redirectMethod, shouldRedirect, includeBody -} - -func refererForURL(lastReq, newReq *url.URL) string { - if lastReq.Scheme == "https" && newReq.Scheme == "http" { - return "" - } - referer := lastReq.String() - if lastReq.User != nil { - auth := lastReq.User.String() + "@" - referer = strings.Replace(referer, auth, "", 1) - } - return referer -} - -func shouldCopyHeaderOnRedirect(headerKey string, initial, dest *url.URL) bool { - switch http.CanonicalHeaderKey(headerKey) { - case "Authorization", "Www-Authenticate", "Cookie", "Cookie2": - ihost := strings.ToLower(initial.Host) - dhost := strings.ToLower(dest.Host) - return isDomainOrSubdomain(dhost, ihost) - } - // All other headers are copied: - return true -} - -func isDomainOrSubdomain(sub, parent string) bool { - if sub == parent { - return true - } - // If sub is "foo.example.com" and parent is "example.com", - // that means sub must end in "."+parent. - // Do it without allocating. - if !strings.HasSuffix(sub, parent) { - return false - } - return sub[len(sub)-len(parent)-1] == '.' -} diff --git a/middleware/redirect_test.go b/middleware/redirect_test.go deleted file mode 100644 index e8c2e1a..0000000 --- a/middleware/redirect_test.go +++ /dev/null @@ -1,34 +0,0 @@ -package middleware - -import ( - "context" - "fmt" - "net/http" - "net/http/httptest" - "testing" - - "github.com/antchfx/antch" -) - -func TestRedirectMiddleware(t *testing.T) { - ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - http.Redirect(w, r, "/", http.StatusTemporaryRedirect) - })) - defer ts.Close() - - tr := &http.Transport{} - defer tr.CloseIdleConnections() - - ds := &antch.DownloaderStack{} - fn := antch.DownloaderFunc(func(ctx context.Context, req *http.Request) (*http.Response, error) { - return ds.ProcessRequest(ctx, req) - }) - - req, _ := http.NewRequest("GET", ts.URL, nil) - - mid := &Redirect{Next: fn} - _, err := mid.ProcessRequest(context.Background(), req) - if e, g := "Get /: stopped after 10 redirects", fmt.Sprintf("%v", err); e != g { - t.Errorf("expected error %q, got %q", e, g) - } -} diff --git a/middleware/robotstxt.go b/middleware/robotstxt.go deleted file mode 100644 index 15ce791..0000000 --- a/middleware/robotstxt.go +++ /dev/null @@ -1,95 +0,0 @@ -package middleware - -import ( - "context" - "errors" - "fmt" - "net/http" - "net/url" - "sync" - "time" - - "github.com/antchfx/antch" - "github.com/temoto/robotstxt" -) - -// Robotstxt is a middleware of Downloader for robots.txt. -type Robotstxt struct { - // UserAgent specifies a user-agent value for robots.txt. - // If No specifies value then take User-Agent of the request - // header value as default UserAgent value. - UserAgent string - // - Next antch.Downloader - - m map[string]*robotsEntry - mu sync.RWMutex -} - -func (rt *Robotstxt) get(u *url.URL) *robotstxt.RobotsData { - rt.mu.RLock() - value, ok := rt.m[u.Host] - rt.mu.RUnlock() - if ok { - value.update() - return value.data - } - - rt.mu.Lock() - defer rt.mu.Unlock() - - if rt.m == nil { - rt.m = make(map[string]*robotsEntry) - } - addr := fmt.Sprintf("%s://%s/robots.txt", u.Scheme, u.Host) - entry := &robotsEntry{ - url: addr, - } - entry.update() - rt.m[u.Host] = entry - return entry.data -} - -func (rt *Robotstxt) useragent(req *http.Request) string { - if v := rt.UserAgent; v != "" { - return v - } - return req.Header.Get("User-Agent") -} - -func (rt *Robotstxt) ProcessRequest(ctx context.Context, req *http.Request) (*http.Response, error) { - robots := rt.get(req.URL) - if !robots.TestAgent(req.URL.Path, rt.useragent(req)) { - return nil, errors.New("robotstxt: request was denied") - } - return rt.Next.ProcessRequest(ctx, req) -} - -type robotsEntry struct { - data *robotstxt.RobotsData - url string - time time.Time - state int32 -} - -func (e *robotsEntry) update() { - if time.Now().Sub(e.time).Hours() <= 24 { - return - } - defer func() { e.time = time.Now() }() - - resp, err := http.Get(e.url) - if err != nil { - // If receive a robots.txt got error, - // make all request to allowed. - e.data = &robotstxt.RobotsData{} - return - } - defer resp.Body.Close() - data, err := robotstxt.FromResponse(resp) - if err != nil { - e.data = &robotstxt.RobotsData{} - return - } - e.data = data -} diff --git a/middleware/robotstxt_test.go b/middleware/robotstxt_test.go deleted file mode 100644 index 0f8d591..0000000 --- a/middleware/robotstxt_test.go +++ /dev/null @@ -1,69 +0,0 @@ -package middleware - -import ( - "context" - "fmt" - "net/http" - "net/http/httptest" - "testing" - - "github.com/antchfx/antch" -) - -var robotstxtTestBody = `User-agent: testbot -Disallow: - -User-agent: * -Disallow: /` - -func TestRobotstxtMiddlewareAllow(t *testing.T) { - ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - if r.URL.Path == "/robots.txt" { - w.Write([]byte(robotstxtTestBody)) - w.Header().Set("Content-Type", "text/plain") - return - } - w.WriteHeader(200) - })) - defer ts.Close() - - ds := &antch.DownloaderStack{} - fn := antch.DownloaderFunc(func(ctx context.Context, req *http.Request) (*http.Response, error) { - return ds.ProcessRequest(ctx, req) - }) - - req, _ := http.NewRequest("GET", ts.URL, nil) - - mid := Robotstxt{Next: fn} - - mid.UserAgent = "testbot" - _, err := mid.ProcessRequest(context.Background(), req) - if err != nil { - t.Fatalf("expected nil error but got %v", err) - } -} - -func TestRobotstxtMiddlewareDisallow(t *testing.T) { - ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - if r.URL.Path == "/robots.txt" { - w.Write([]byte(robotstxtTestBody)) - w.Header().Set("Content-Type", "text/plain") - return - } - w.WriteHeader(200) - })) - defer ts.Close() - - ds := &antch.DownloaderStack{} - fn := antch.DownloaderFunc(func(ctx context.Context, req *http.Request) (*http.Response, error) { - return ds.ProcessRequest(ctx, req) - }) - - req, _ := http.NewRequest("GET", ts.URL, nil) - - mid := &Robotstxt{Next: fn} - _, err := mid.ProcessRequest(context.Background(), req) - if e, g := "robotstxt: request was denied", fmt.Sprintf("%v", err); e != g { - t.Errorf("expected error %q, got %q", e, g) - } -} diff --git a/middleware/urldupe_filter.go b/middleware/urldupe_filter.go deleted file mode 100644 index 16b0976..0000000 --- a/middleware/urldupe_filter.go +++ /dev/null @@ -1,52 +0,0 @@ -package middleware - -import ( - "context" - "errors" - "net/http" - "sync" - - "github.com/antchfx/antch" - "github.com/tylertreat/BoomFilters" -) - -// UrlDupeFilter is a middleware of Downloader is simple to filter -// duplicate URLs that had already been seen before start request. -type UrlDupeFilter struct { - Next antch.Downloader - - mu sync.RWMutex - once sync.Once - boom boom.Filter -} - -type UrlDupeIgnoredKey struct{} - -func (f *UrlDupeFilter) ProcessRequest(ctx context.Context, req *http.Request) (*http.Response, error) { - f.once.Do(func() { - f.boom = boom.NewDefaultScalableBloomFilter(0.01) - }) - - // If request context has `UrlDupeIgnoredKey` means current request - // ignored to process by this middleware. - if v := ctx.Value(UrlDupeIgnoredKey{}); v != nil { - return f.Next.ProcessRequest(ctx, req) - } - key := []byte(req.URL.String()) - - f.mu.RLock() - seen := f.boom.Test(key) - f.mu.RUnlock() - - if seen { - return nil, errors.New("urldupefilter: request was denied") - } - - res, err := f.Next.ProcessRequest(ctx, req) - if err == nil { - f.mu.Lock() - f.boom.Add(key) - f.mu.Unlock() - } - return res, err -} diff --git a/middleware/urldupe_filter_test.go b/middleware/urldupe_filter_test.go deleted file mode 100644 index 8ec7e30..0000000 --- a/middleware/urldupe_filter_test.go +++ /dev/null @@ -1,38 +0,0 @@ -package middleware - -import ( - "context" - "fmt" - "net/http" - "net/http/httptest" - "testing" - - "github.com/antchfx/antch" -) - -func TestUrlDupeFilterMiddleware(t *testing.T) { - ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(200) - })) - defer ts.Close() - - ds := &antch.DownloaderStack{} - fn := antch.DownloaderFunc(func(ctx context.Context, req *http.Request) (*http.Response, error) { - return ds.ProcessRequest(ctx, req) - }) - - req, _ := http.NewRequest("GET", ts.URL, nil) - mid := &UrlDupeFilter{Next: fn} - - _, err := mid.ProcessRequest(context.Background(), req) - // first request. - if err != nil { - t.Fatal(err) - } - - // second request. - _, err = mid.ProcessRequest(context.Background(), req) - if e, g := "urldupefilter: request was denied", fmt.Sprintf("%v", err); e != g { - t.Errorf("expected error %q, got %q", e, g) - } -} diff --git a/pipeline.go b/pipeline.go new file mode 100644 index 0000000..651df6f --- /dev/null +++ b/pipeline.go @@ -0,0 +1,19 @@ +package antch + +// PipelineHandler is an interface for a handler in pipeline. +type PipelineHandler interface { + ServePipeline(Item) +} + +// PipelineHandlerFunc is an adapter to allow the use of ordinary +// functions as PipelineHandler. +type PipelineHandlerFunc func(Item) + +// ServePipeline performs for given Item data. +func (f PipelineHandlerFunc) ServePipeline(v Item) { + f(v) +} + +// Pipeline allows perform value Item passed one PipelineHandler +// to the next PipelineHandler in the chain. +type Pipeline func(PipelineHandler) PipelineHandler diff --git a/middleware/proxy.go b/proxy.go similarity index 50% rename from middleware/proxy.go rename to proxy.go index 7d5b888..6227a30 100644 --- a/middleware/proxy.go +++ b/proxy.go @@ -1,4 +1,4 @@ -package middleware +package antch import ( "bufio" @@ -12,34 +12,31 @@ import ( "net/http" "net/url" "strings" - "sync" - "github.com/antchfx/antch" "golang.org/x/net/proxy" ) -// Proxy is a middleware of Downloader allows all HTTP requests -// access internet via proxy server. -// Proxy middleware supports the following proxy protocols: -// HTTP, HTTPS, SOCKS5. -// -type Proxy struct { - // ProxyURL specifies a function to return a proxy for a given - // Request. - ProxyURL func(*http.Request) *url.URL +// ProxyKey is a key for the proxy URL that used by Crawler. +type ProxyKey struct{} - Next antch.Downloader +func proxyHandler(f func(*http.Request) (*url.URL, error), next HttpMessageHandler) HttpMessageHandler { + // Registers proxy protocol(HTTP,HTTPS,SOCKS5). + proxy.RegisterDialerType("http", httpProxy) + proxy.RegisterDialerType("https", httpProxy) - once sync.Once + return HttpMessageHandlerFunc(func(req *http.Request) (*http.Response, error) { + proxyURL, err := f(req) + if err != nil { + return nil, err + } + ctx := context.WithValue(req.Context(), ProxyKey{}, proxyURL) + return next.Send(req.WithContext(ctx)) + }) } -// ProxyKey is a proxy key for HTTP request context that used by -// proxy middleware. -type ProxyKey struct{} +var zeroDialer net.Dialer -var zero net.Dialer - -func (p *Proxy) dial(ctx context.Context, network, address string) (net.Conn, error) { +func proxyDialContext(ctx context.Context, network, address string) (net.Conn, error) { if v := ctx.Value(ProxyKey{}); v != nil { dialer, err := proxy.FromURL(v.(*url.URL), proxy.Direct) if err != nil { @@ -47,41 +44,40 @@ func (p *Proxy) dial(ctx context.Context, network, address string) (net.Conn, er } return dialer.Dial(network, address) } - return zero.DialContext(ctx, network, address) + return zeroDialer.DialContext(ctx, network, address) } -func (p *Proxy) ProcessRequest(ctx context.Context, req *http.Request) (*http.Response, error) { - p.once.Do(func() { - // Change antch default dialer handler for proxy. - antch.DefaultDialer = p.dial - }) - - if p.ProxyURL != nil { - if proxyURL := p.ProxyURL(req); proxyURL != nil { - req = req.WithContext(context.WithValue(req.Context(), ProxyKey{}, proxyURL)) - } +func httpProxy(u *url.URL, forward proxy.Dialer) (proxy.Dialer, error) { + h := &httpDialer{ + host: u.Host, + forward: forward, + } + if u.User != nil { + h.shouldAuth = true + h.username = u.User.Username() + h.password, _ = u.User.Password() } - return p.Next.ProcessRequest(ctx, req) + return h, nil } -type httpProxyDialer struct { +type httpDialer struct { host string - haveAuth bool + shouldAuth bool username, password string forward proxy.Dialer } -func (p *httpProxyDialer) auth() string { - if p.haveAuth { - auth := p.username + ":" + p.password +func (d *httpDialer) auth() string { + if d.shouldAuth { + auth := d.username + ":" + d.password return "Basic " + base64.StdEncoding.EncodeToString([]byte(auth)) } return "" } -func (p *httpProxyDialer) Dial(network, addr string) (net.Conn, error) { - conn, err := p.forward.Dial("tcp", p.host) +func (d *httpDialer) Dial(network, addr string) (net.Conn, error) { + conn, err := d.forward.Dial("tcp", d.host) if err != nil { return nil, err } @@ -93,7 +89,7 @@ func (p *httpProxyDialer) Dial(network, addr string) (net.Conn, error) { Header: make(http.Header), Close: false, } - if pa := p.auth(); pa != "" { + if pa := d.auth(); pa != "" { connectReq.Header.Set("Proxy-Authorization", pa) } connectReq.Write(conn) @@ -117,21 +113,13 @@ func (p *httpProxyDialer) Dial(network, addr string) (net.Conn, error) { return conn, nil } -func httpProxy(u *url.URL, forward proxy.Dialer) (proxy.Dialer, error) { - h := &httpProxyDialer{ - host: u.Host, - forward: forward, - } - if u.User != nil { - h.haveAuth = true - h.username = u.User.Username() - h.password, _ = u.User.Password() +// ProxyMiddleware is an HTTP proxy middleware to take HTTP Request +// use the HTTP proxy to access remote sites. +// +// ProxyMiddleware supports HTTP/HTTPS,SOCKS5 protocol list. +// etc http://127.0.0.1:8080 or https://127.0.0.1:8080 or socks5://127.0.0.1:1080 +func ProxyMiddleware(f func(*http.Request) (*url.URL, error)) Middleware { + return func(next HttpMessageHandler) HttpMessageHandler { + return proxyHandler(f, next) } - return h, nil -} - -func init() { - // Registers proxy protocol type with associate with handler. - proxy.RegisterDialerType("http", httpProxy) - proxy.RegisterDialerType("https", httpProxy) } diff --git a/queue.go b/queue.go deleted file mode 100644 index 522aded..0000000 --- a/queue.go +++ /dev/null @@ -1,37 +0,0 @@ -package antch - -import ( - "io" - "sync" -) - -// Queue is a URLs list manager. -type Queue interface { - Enqueue(string) - Dequeue() (string, error) -} - -// SimpleHeapQueue is a simple FIFO URLs queue. -type SimpleHeapQueue struct { - mu sync.Mutex - data []string -} - -func (q *SimpleHeapQueue) Enqueue(urlStr string) { - q.mu.Lock() - defer q.mu.Unlock() - q.data = append(q.data, urlStr) -} - -func (q *SimpleHeapQueue) Dequeue() (string, error) { - q.mu.Lock() - defer q.mu.Unlock() - - if len(q.data) == 0 { - return "", io.EOF - } - - var urlStr string - urlStr, q.data = q.data[0], q.data[1:] - return urlStr, nil -} diff --git a/robotstxt.go b/robotstxt.go new file mode 100644 index 0000000..f9412a2 --- /dev/null +++ b/robotstxt.go @@ -0,0 +1,84 @@ +package antch + +import ( + "errors" + "fmt" + "net/http" + "net/url" + "sync" + "time" + + "github.com/temoto/robotstxt" +) + +type robotsEntry struct { + data *robotstxt.RobotsData + url string + last time.Time +} + +func (e *robotsEntry) update() { + e.last = time.Now() + allAllowed := func() *robotstxt.RobotsData { + return &robotstxt.RobotsData{} + } + + resp, err := http.Get(e.url) + if err != nil { + e.data = allAllowed() + return + } + defer resp.Body.Close() + data, err := robotstxt.FromResponse(resp) + if err == nil { + e.data = data + } else { + e.data = allAllowed() + } +} + +func robotstxtHandler(next HttpMessageHandler) HttpMessageHandler { + var ( + mu sync.RWMutex + m = make(map[string]*robotsEntry) + ) + + get := func(URL string) *robotstxt.RobotsData { + mu.RLock() + e := m[URL] + mu.RUnlock() + + if e == nil { + mu.Lock() + defer mu.Unlock() + e = &robotsEntry{url: URL} + e.update() + m[URL] = e + return e.data + } + + if (time.Now().Sub(e.last).Hours()) >= 24 { + go e.update() + } + return e.data + } + + return HttpMessageHandlerFunc(func(req *http.Request) (*http.Response, error) { + r := get(robotstxtURL(req.URL)) + ua := req.Header.Get("User-Agent") + if r.TestAgent(req.URL.Path, ua) { + return next.Send(req) + } + return nil, errors.New("request was denied by robots.txt") + }) +} + +func robotstxtURL(u *url.URL) string { + return fmt.Sprintf("%s://%s/robots.txt", u.Scheme, u.Host) +} + +// RobotstxtMiddleware is a middleware for robots.txt, make HTTP +// request is more polite. +func RobotstxtMiddleware() Middleware { + return Middleware(robotstxtHandler) +} diff --git a/spider.go b/spider.go index 560e784..5d76092 100644 --- a/spider.go +++ b/spider.go @@ -1,102 +1,33 @@ package antch import ( - "context" "io" "io/ioutil" "net/http" - "sync" ) -// Spider is an interface for handle received HTTP response from -// remote server that HTTP request send by web crawler. -type Spider interface { - ProcessResponse(context.Context, *http.Response) error +// Handler is the HTTP Response handler interface that defines +// how to extract scraped items from their pages. +// +// ServeSpider should be write got Item to the Channel. +type Handler interface { + ServeSpider(chan<- Item, *http.Response) } -// SpiderFunc is an adapter type to allow the use of ordinary -// functions as Spider handlers. -type SpiderFunc func(context.Context, *http.Response) error +// HandlerFunc is an adapter to allow the use of ordinary +// functions as Spider. +type HandlerFunc func(chan<- Item, *http.Response) -func (f SpiderFunc) ProcessResponse(ctx context.Context, res *http.Response) error { - return f(ctx, res) +// ServeSpider performs extract data from received HTTP response and +// write it into the Channel c. +func (f HandlerFunc) ServeSpider(c chan<- Item, resp *http.Response) { + f(c, resp) } -// NoOpSpider returns a Spider object that silently ignores all HTTP response -// without do anything. -func NoOpSpider() Spider { - return SpiderFunc(func(_ context.Context, res *http.Response) error { - // Make HTTP connection reusing for next HTTP request. - // (https://stackoverflow.com/questions/17948827/reusing-http-connections-in-golang) - io.Copy(ioutil.Discard, res.Body) - return nil +// VoidHandler returns a Handler that without doing anything. +func VoidHandler() Handler { + return HandlerFunc(func(_ chan<- Item, resp *http.Response) { + // https://stackoverflow.com/questions/17948827/reusing-http-connections-in-golang + io.Copy(ioutil.Discard, resp.Body) }) } - -// SpiderMux is a multiplexer for handle HTTP response. -// It matches a registered handler based on host name of -// HTTP response URL to handle. -type SpiderMux struct { - mu sync.Mutex - m map[string]spiderMuxEntry -} - -type spiderMuxEntry struct { - pattern string - explicit bool - h Spider -} - -func (mux *SpiderMux) match(host string) (h Spider, pattern string) { - if entry, ok := mux.m[host]; ok { - h, pattern = entry.h, entry.pattern - } - return -} - -func (mux *SpiderMux) handler(host string) (h Spider, pattern string) { - mux.mu.Lock() - defer mux.mu.Unlock() - - h, pattern = mux.match(host) - if h == nil { - h, pattern = NoOpSpider(), "" - } - return -} - -// Handler returns the handler to use for the given host name. -func (mux *SpiderMux) Handler(host string) (h Spider, pattern string) { - return mux.handler(host) -} - -// Handle registers the handler for the given host name. -func (mux *SpiderMux) Handle(pattern string, handler Spider) { - mux.mu.Lock() - defer mux.mu.Unlock() - - if pattern == "" { - panic("antch: invalid host") - } - if handler == nil { - panic("antch: handler is nil") - } - if mux.m[pattern].explicit == true { - panic("antch: multiple registrations for " + pattern) - } - - if mux.m == nil { - mux.m = make(map[string]spiderMuxEntry) - } - mux.m[pattern] = spiderMuxEntry{explicit: true, pattern: pattern, h: handler} -} - -// HandleFunc registers the handler function for the given host name. -func (mux *SpiderMux) HandleFunc(host string, handler func(context.Context, *http.Response) error) { - mux.Handle(host, SpiderFunc(handler)) -} - -func (mux *SpiderMux) ProcessResponse(ctx context.Context, res *http.Response) error { - h, _ := mux.Handler(res.Request.URL.Host) - return h.ProcessResponse(ctx, res) -} diff --git a/xml.go b/xml.go new file mode 100644 index 0000000..6271957 --- /dev/null +++ b/xml.go @@ -0,0 +1,12 @@ +package antch + +import ( + "net/http" + + "github.com/antchfx/xquery/xml" +) + +// ParseXML parses an HTTP response as XML document. +func ParseXML(resp *http.Response) (*xmlquery.Node, error) { + return xmlquery.Parse(resp.Body) +}