/
mongodb.go
107 lines (89 loc) · 2.42 KB
/
mongodb.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
package useragent
import (
"runtime"
"time"
"github.com/activecm/rita/resources"
"github.com/activecm/rita/util"
"github.com/globalsign/mgo"
"github.com/vbauerster/mpb"
"github.com/vbauerster/mpb/decor"
)
type repo struct {
res *resources.Resources
}
//NewMongoRepository create new repository
func NewMongoRepository(res *resources.Resources) Repository {
return &repo{
res: res,
}
}
func (r *repo) CreateIndexes() error {
session := r.res.DB.Session.Copy()
defer session.Close()
// set collection name
collectionName := r.res.Config.T.UserAgent.UserAgentTable
// check if collection already exists
names, _ := session.DB(r.res.DB.GetSelectedDB()).CollectionNames()
// if collection exists, we don't need to do anything else
for _, name := range names {
if name == collectionName {
return nil
}
}
// set desired indexes
indexes := []mgo.Index{
{Key: []string{"user_agent"}, Unique: true},
{Key: []string{"dat.seen"}},
}
// create collection
err := r.res.DB.CreateCollection(collectionName, indexes)
if err != nil {
return err
}
return nil
}
func (r *repo) Upsert(userAgentMap map[string]*Input) {
//Create the workers
writerWorker := newWriter(
r.res.DB,
r.res.Config,
r.res.Log,
)
analyzerWorker := newAnalyzer(
r.res.Config.S.Rolling.CurrentChunk,
r.res.DB,
r.res.Config,
writerWorker.collect,
writerWorker.close,
)
//kick off the threaded goroutines
for i := 0; i < util.Max(1, runtime.NumCPU()/2); i++ {
analyzerWorker.start()
writerWorker.start()
}
// progress bar for troubleshooting
p := mpb.New(mpb.WithWidth(20))
bar := p.AddBar(int64(len(userAgentMap)),
mpb.PrependDecorators(
decor.Name("\t[-] UserAgent Analysis:", decor.WC{W: 30, C: decor.DidentRight}),
decor.CountersNoUnit(" %d / %d ", decor.WCSyncWidth),
),
mpb.AppendDecorators(decor.Percentage()),
)
// loop over map entries
for key, value := range userAgentMap {
start := time.Now()
//Mongo Index key is limited to a size of 1024 https://docs.mongodb.com/v3.4/reference/limits/#index-limitations
// so if the key is too large, we should cut it back, this is rough but
// works. Figured 800 allows some wiggle room, while also not being too large
if len(key) > 1024 {
key = key[:800]
}
value.name = key
analyzerWorker.collect(value)
bar.IncrBy(1, time.Since(start))
}
p.Wait()
// start the closing cascade (this will also close the other channels)
analyzerWorker.close()
}