-
Notifications
You must be signed in to change notification settings - Fork 0
/
mongodb.go
99 lines (80 loc) · 2.22 KB
/
mongodb.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
package uconn
import (
"runtime"
"time"
"github.com/activecm/rita/resources"
"github.com/activecm/rita/util"
"github.com/globalsign/mgo"
"github.com/vbauerster/mpb"
"github.com/vbauerster/mpb/decor"
)
type repo struct {
res *resources.Resources
}
//NewMongoRepository create new repository
func NewMongoRepository(res *resources.Resources) Repository {
return &repo{
res: res,
}
}
func (r *repo) CreateIndexes() error {
session := r.res.DB.Session.Copy()
defer session.Close()
// set collection name
collectionName := r.res.Config.T.Structure.UniqueConnTable
// check if collection already exists
names, _ := session.DB(r.res.DB.GetSelectedDB()).CollectionNames()
// if collection exists, we don't need to do anything else
for _, name := range names {
if name == collectionName {
return nil
}
}
// set desired indexes
indexes := []mgo.Index{
{Key: []string{"src", "dst"}, Unique: true},
{Key: []string{"$hashed:src"}},
{Key: []string{"$hashed:dst"}},
{Key: []string{"$dat.count"}},
}
// create collection
err := r.res.DB.CreateCollection(collectionName, indexes)
if err != nil {
return err
}
return nil
}
//Upsert loops through every domain ....
func (r *repo) Upsert(uconnMap map[string]*Pair) {
//Create the workers
writerWorker := newWriter(r.res.Config.T.Structure.UniqueConnTable, r.res.DB, r.res.Config, r.res.Log)
analyzerWorker := newAnalyzer(
r.res.Config.S.Bro.CurrentChunk,
int64(r.res.Config.S.Strobe.ConnectionLimit),
writerWorker.collect,
writerWorker.close,
)
//kick off the threaded goroutines
for i := 0; i < util.Max(1, runtime.NumCPU()/2); i++ {
analyzerWorker.start()
writerWorker.start()
}
// progress bar for troubleshooting
p := mpb.New(mpb.WithWidth(20))
bar := p.AddBar(int64(len(uconnMap)),
mpb.PrependDecorators(
decor.Name("\t[-] Uconn Analysis:", decor.WC{W: 30, C: decor.DidentRight}),
decor.CountersNoUnit(" %d / %d ", decor.WCSyncWidth),
),
mpb.AppendDecorators(decor.Percentage()),
)
// loop over map entries
for _, entry := range uconnMap {
start := time.Now()
analyzerWorker.collect(entry)
bar.IncrBy(1, time.Since(start))
}
p.Wait()
// start the closing cascade (this will also close the other channels)
analyzerWorker.close()
}