/
host_workers.go
81 lines (70 loc) · 1.72 KB
/
host_workers.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
package crawler
import (
"fmt"
"log"
"net/url"
"sync"
"time"
"github.com/ReanGD/go-web-search/content"
"github.com/ReanGD/go-web-search/proxy"
"github.com/uber-go/zap"
)
type hostWorker struct {
Request *request
Tasks []content.URL
ChDB chan<- *proxy.PageData
}
// Run - start worker
func (w *hostWorker) Start(wgParent *sync.WaitGroup) {
defer wgParent.Done()
cnt := len(w.Tasks)
for i := 0; i != cnt; i++ {
parsed, err := url.Parse(w.Tasks[i].URL)
if err != nil {
log.Printf("ERROR: Worker query. Parse URL %s, message: %s", w.Tasks[i].URL, err)
continue
}
result, workDuration := w.Request.Process(parsed)
result.SetParentURL(w.Tasks[i].ID)
w.ChDB <- result
fmt.Printf(".")
if result.GetMeta().NeedWaitAfterRequest() && i != cnt-1 {
time.Sleep(time.Duration(1000-workDuration) * time.Millisecond)
}
}
}
type hostWorkers struct {
workers []*hostWorker
}
func (w *hostWorkers) Init(db *content.DBrw, logger zap.Logger, baseHosts []string, cnt int) error {
hostMng := &hostsManager{}
err := hostMng.Init(db, baseHosts)
if err != nil {
return err
}
hosts := hostMng.GetHosts()
w.workers = make([]*hostWorker, 0)
cntPerHost := cnt / len(hosts)
if cntPerHost < 1 {
cntPerHost = 1
}
for hostName, hostID := range hosts {
worker := &hostWorker{Request: &request{hostMng: hostMng}}
worker.Request.Init(logger.With(zap.String("host", hostName)))
worker.Tasks, err = db.GetNewURLs(hostID, cntPerHost)
if err != nil {
return err
}
w.workers = append(w.workers, worker)
}
return nil
}
func (w *hostWorkers) Start(chDB chan<- *proxy.PageData) {
var wg sync.WaitGroup
defer wg.Wait()
for _, worker := range w.workers {
worker.ChDB = chDB
wg.Add(1)
go worker.Start(&wg)
}
}