/
updater.go
185 lines (135 loc) · 4.35 KB
/
updater.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
package db
import (
"context"
"fmt"
"os"
"sync"
"time"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
)
type UpdateType uint8
const (
InsertNewDomain UpdateType = iota
UpdateExistingDomain
)
// UpdateableDomain used to distinguish domain coming from /api/insert and domains coming from updater functions.
type UpdateableDomain struct {
Domain string
Type UpdateType
}
var (
UpdaterChan chan UpdateableDomain
updaterLimit int
)
// recordsUpdaterRoutine reads from DomainChan and internalDomainChan
// and updates the FQDN coming from the channel.
func updaterWorker(wg *sync.WaitGroup) {
defer wg.Done()
for dom := range UpdaterChan {
var err error
switch dom.Type {
case InsertNewDomain:
fmt.Printf("updater: inserting %s\n", dom.Domain)
err = DomainsInsertWithRecord(dom.Domain, false)
case UpdateExistingDomain:
err = RecordsUpdate(dom.Domain, false)
default:
err = fmt.Errorf("invalid UpdateType: %d", dom.Type)
}
if err != nil {
fmt.Fprintf(os.Stderr, "Failed to update DNS records for %s in updaterWorker(): %s\n", dom.Domain, err)
}
}
}
// oldDomainUpdater is a function created to run as goroutine in the background.
// Updates the old records, that not updated in the last 30 days
func oldDomainUpdater(wg *sync.WaitGroup) {
defer wg.Done()
for {
matchStage := bson.D{
{Key: "$match", Value: bson.D{{Key: "$or", Value: bson.A{
bson.M{"updated": bson.M{"$lt": time.Now().Add(-720 * time.Hour).Unix()}},
bson.M{"updated": bson.M{"$exists": false}},
}},
}}}
sampleStage := bson.D{{Key: "$sample", Value: bson.M{"size": 1000}}}
// Get domains that not updated in the last 30 days
cursor, err := Domains.Aggregate(context.TODO(), mongo.Pipeline{matchStage, sampleStage})
//cursor, err := Domains.Find(context.TODO(), bson.M{"$or": bson.A{bson.M{"updated": bson.M{"$lt": time.Now().Add(-720 * time.Hour).Unix()}}, bson.M{"updated": bson.M{"$exists": false}}}})
if err != nil {
fmt.Fprintf(os.Stderr, "oldDomainUpdater() failed to find toplist: %s\n", err)
// Wait before the next try
time.Sleep(600 * time.Second)
continue
}
for cursor.Next(context.TODO()) {
d := new(Domain)
err = cursor.Decode(d)
if err != nil {
fmt.Fprintf(os.Stderr, "oldDomainUpdater() failed to find: %s\n", err)
break
}
for len(UpdaterChan) > updaterLimit {
time.Sleep(60 * time.Second)
}
UpdaterChan <- UpdateableDomain{Domain: d.String(), Type: UpdateExistingDomain}
}
if err = cursor.Err(); err != nil {
fmt.Fprintf(os.Stderr, "OldDomainUpdater() cursor failed: %s\n", err)
}
cursor.Close(context.TODO())
}
}
// topListUpdater is a function created to run as goroutine in the background.
// Updates the domains and it subdomains in topList collection by sending every entries into internalRecordsUpdaterDomainChan.
// This function uses concurrent goroutines and print only/ignores any error.
func topListUpdater(wg *sync.WaitGroup) {
defer wg.Done()
for {
sampleStage := bson.D{{Key: "$sample", Value: bson.M{"size": 1000}}}
cursor, err := TopList.Aggregate(context.TODO(), mongo.Pipeline{sampleStage})
//cursor, err := TopList.Find(context.TODO(), bson.M{}, options.Find().SetSort(bson.M{"count": -1}))
if err != nil {
fmt.Fprintf(os.Stderr, "topListUpdater() failed to find toplist: %s\n", err)
continue
}
for cursor.Next(context.TODO()) {
d := new(TopListSchema)
err = cursor.Decode(d)
if err != nil {
fmt.Fprintf(os.Stderr, "topListUpdater() failed to find: %s\n", err)
break
}
ds, err := DomainsLookupFull(d.Domain, -1)
if err != nil {
fmt.Fprintf(os.Stderr, "topListUpdater() failed to lookup full for %s: %s\n", d.Domain, err)
continue
}
for i := range ds {
for len(UpdaterChan) > updaterLimit {
time.Sleep(60 * time.Second)
}
UpdaterChan <- UpdateableDomain{Domain: ds[i], Type: UpdateExistingDomain}
}
}
if err = cursor.Err(); err != nil {
fmt.Fprintf(os.Stderr, "topListUpdater() cursor failed: %s\n", err)
}
cursor.Close(context.TODO())
}
}
func RecordsUpdater(nworker int, chanSize int) {
UpdaterChan = make(chan UpdateableDomain, chanSize)
updaterLimit = chanSize / 2
wg := new(sync.WaitGroup)
for i := 0; i < nworker; i++ {
wg.Add(1)
go updaterWorker(wg)
}
wg.Add(1)
go oldDomainUpdater(wg)
wg.Add(1)
go topListUpdater(wg)
wg.Wait()
}