-
Notifications
You must be signed in to change notification settings - Fork 0
/
dissector.go
191 lines (173 loc) · 6.3 KB
/
dissector.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
package beacon
import (
"sync"
"github.com/activecm/rita/config"
"github.com/activecm/rita/database"
"github.com/activecm/rita/pkg/uconn"
"github.com/globalsign/mgo/bson"
)
type (
dissector struct {
connLimit int64 // limit for strobe classification
db *database.DB // provides access to MongoDB
conf *config.Config // contains details needed to access MongoDB
dissectedCallback func(*uconn.Pair) // called on each analyzed result
closedCallback func() // called when .close() is called and no more calls to analyzedCallback will be made
dissectChannel chan *uconn.Pair // holds unanalyzed data
dissectWg sync.WaitGroup // wait for analysis to finish
}
)
//newdissector creates a new collector for gathering data
func newDissector(connLimit int64, db *database.DB, conf *config.Config, dissectedCallback func(*uconn.Pair), closedCallback func()) *dissector {
return &dissector{
connLimit: connLimit,
db: db,
conf: conf,
dissectedCallback: dissectedCallback,
closedCallback: closedCallback,
dissectChannel: make(chan *uconn.Pair),
}
}
//collect sends a chunk of data to be analyzed
func (d *dissector) collect(data *uconn.Pair) {
d.dissectChannel <- data
}
//close waits for the collector to finish
func (d *dissector) close() {
close(d.dissectChannel)
d.dissectWg.Wait()
d.closedCallback()
}
//start kicks off a new analysis thread
func (d *dissector) start() {
d.dissectWg.Add(1)
go func() {
ssn := d.db.Session.Copy()
defer ssn.Close()
for data := range d.dissectChannel {
// This will work for both updating and inserting completely new Beacons
// for every new uconn record we have, we will check the uconns table. This
// will always return a result because even with a brand new database, we already
// created the uconns table. It will only continue and analyze if the connection
// meets the required specs, again working for both an update and a new src-dst pair.
// We would have to perform this check regardless if we want the rolling update
// option to remain, and this gets us the vetting for both situations, and Only
// works on the current entries - not a re-aggregation on the whole collection,
// and individual lookups like this are really fast. This also ensures a unique
// set of timestamps for analysis.
uconnFindQuery := []bson.M{
bson.M{"$match": bson.M{"$and": []bson.M{
bson.M{"src": data.Src},
bson.M{"dst": data.Dst},
bson.M{"strobe": bson.M{"$ne": true}},
}}},
bson.M{"$limit": 1},
bson.M{"$project": bson.M{
"src": "$src",
"dst": "$dst",
"ts": "$dat.ts",
"bytes": "$dat.bytes",
"count": "$dat.count",
"tbytes": "$dat.tbytes",
"icerts": "$dat.icerts",
}},
bson.M{"$unwind": "$count"},
bson.M{"$group": bson.M{
"_id": "$_id",
"src": bson.M{"$first": "$src"},
"dst": bson.M{"$first": "$dst"},
"ts": bson.M{"$first": "$ts"},
"bytes": bson.M{"$first": "$bytes"},
"count": bson.M{"$sum": "$count"},
"tbytes": bson.M{"$first": "$tbytes"},
"icerts": bson.M{"$first": "$icerts"},
}},
bson.M{"$match": bson.M{"count": bson.M{"$gt": d.conf.S.Beacon.DefaultConnectionThresh}}},
bson.M{"$unwind": "$tbytes"},
bson.M{"$group": bson.M{
"_id": "$_id",
"src": bson.M{"$first": "$src"},
"dst": bson.M{"$first": "$dst"},
"ts": bson.M{"$first": "$ts"},
"bytes": bson.M{"$first": "$bytes"},
"count": bson.M{"$first": "$count"},
"tbytes": bson.M{"$sum": "$tbytes"},
"icerts": bson.M{"$first": "$icerts"},
}},
bson.M{"$unwind": "$ts"},
bson.M{"$unwind": "$ts"},
bson.M{"$group": bson.M{
"_id": "$_id",
"src": bson.M{"$first": "$src"},
"dst": bson.M{"$first": "$dst"},
"ts": bson.M{"$addToSet": "$ts"},
"bytes": bson.M{"$first": "$bytes"},
"count": bson.M{"$first": "$count"},
"tbytes": bson.M{"$first": "$tbytes"},
"icerts": bson.M{"$first": "$icerts"},
}},
bson.M{"$unwind": "$bytes"},
bson.M{"$unwind": "$bytes"},
bson.M{"$group": bson.M{
"_id": "$_id",
"src": bson.M{"$first": "$src"},
"dst": bson.M{"$first": "$dst"},
"ts": bson.M{"$first": "$ts"},
"bytes": bson.M{"$push": "$bytes"},
"count": bson.M{"$first": "$count"},
"tbytes": bson.M{"$first": "$tbytes"},
"icerts": bson.M{"$first": "$icerts"},
}},
bson.M{"$unwind": "$icerts"},
bson.M{"$group": bson.M{
"_id": "$_id",
"src": bson.M{"$first": "$src"},
"dst": bson.M{"$first": "$dst"},
"ts": bson.M{"$first": "$ts"},
"bytes": bson.M{"$first": "$bytes"},
"count": bson.M{"$first": "$count"},
"tbytes": bson.M{"$first": "$tbytes"},
"icerts": bson.M{"$push": "$icerts"},
}},
bson.M{"$project": bson.M{
"_id": "$_id",
"src": 1,
"dst": 1,
"ts": 1,
"bytes": 1,
"count": 1,
"tbytes": 1,
"icerts": bson.M{"$anyElementTrue": []interface{}{"$icerts"}},
}},
}
var res struct {
Src string `bson:"src"`
Dst string `bson:"dst"`
Count int64 `bson:"count"`
Ts []int64 `bson:"ts"`
Bytes []int64 `bson:"bytes"`
TBytes int64 `bson:"tbytes"`
ICerts bool `bson:"icerts"`
}
_ = ssn.DB(d.db.GetSelectedDB()).C(d.conf.T.Structure.UniqueConnTable).Pipe(uconnFindQuery).AllowDiskUse().One(&res)
// Check for errors and parse results
// this is here because it will still return an empty document even if there are no results
if res.Count > 0 {
analysisInput := &uconn.Pair{Src: res.Src, Dst: res.Dst, ConnectionCount: res.Count, TotalBytes: res.TBytes, InvalidCertFlag: res.ICerts}
// check if uconn has become a strobe
if analysisInput.ConnectionCount > d.connLimit {
// set to writer channel
d.dissectedCallback(analysisInput)
} else { // otherwise, parse timestamps and orig ip bytes
analysisInput.TsList = res.Ts
analysisInput.OrigBytesList = res.Bytes
// send to writer channel if we have over UNIQUE 3 timestamps (analysis needs this verification)
if len(analysisInput.TsList) > 3 {
d.dissectedCallback(analysisInput)
}
}
}
}
d.dissectWg.Done()
}()
}