Skip to content

Commit

Permalink
Tune the hunt dispatcher API to provide hints for hunt searches. (#3462)
Browse files Browse the repository at this point in the history
  • Loading branch information
scudette committed Apr 29, 2024
1 parent ae71d77 commit a67c0b4
Show file tree
Hide file tree
Showing 8 changed files with 69 additions and 57 deletions.
33 changes: 17 additions & 16 deletions flows/housekeeping.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,26 +104,27 @@ func CheckClientStatus(
// Take a snapshot of the hunts that we need to run on this
// client to reduce the time under lock.
hunts := make([]*api_proto.Hunt, 0)
err = dispatcher.ApplyFuncOnHunts(ctx, func(hunt *api_proto.Hunt) error {
// Hunt is stopped we dont care about it.
if hunt.State != api_proto.Hunt_RUNNING {
return nil
}
err = dispatcher.ApplyFuncOnHunts(ctx, services.OnlyRunningHunts,
func(hunt *api_proto.Hunt) error {
// Hunt is stopped we dont care about it.
if hunt.State != api_proto.Hunt_RUNNING {
return nil
}

// This hunt is not relevant to this client.
if hunt.StartTime <= stats.LastHuntTimestamp {
return nil
}

// Take a snapshot of the hunt id and start time.
hunts = append(hunts, &api_proto.Hunt{
HuntId: hunt.HuntId,
StartTime: hunt.StartTime,
})

// This hunt is not relevant to this client.
if hunt.StartTime <= stats.LastHuntTimestamp {
return nil
}

// Take a snapshot of the hunt id and start time.
hunts = append(hunts, &api_proto.Hunt{
HuntId: hunt.HuntId,
StartTime: hunt.StartTime,
})

return nil
})

if err != nil {
return err
}
Expand Down
11 changes: 10 additions & 1 deletion services/hunt_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,20 @@ const (
HuntFlushToDatastoreAsync
)

type HuntSearchOptions int

const (
AllHunts HuntSearchOptions = iota

// Only visit non expired hunts
OnlyRunningHunts
)

type IHuntDispatcher interface {
// Applies the function on all the hunts. Functions may not
// modify the hunt but will have read only access to the hunt
// objects under lock.
ApplyFuncOnHunts(ctx context.Context,
ApplyFuncOnHunts(ctx context.Context, options HuntSearchOptions,
cb func(hunt *api_proto.Hunt) error) error

// As an optimization callers may get the latest hunt's
Expand Down
40 changes: 24 additions & 16 deletions services/hunt_dispatcher/hunt_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,21 +191,28 @@ func (self *HuntDispatcher) ProcessUpdate(
// Applies a callback on all hunts. The callback is not allowed to
// modify the hunts since it is getting a copy of the hunt object.
func (self *HuntDispatcher) ApplyFuncOnHunts(
ctx context.Context,
ctx context.Context, options services.HuntSearchOptions,
cb func(hunt *api_proto.Hunt) error) (res_error error) {

now := uint64(utils.GetTime().Now().UnixNano() / 1000)

// Page through the hunts table and apply the function on each
// page.
var offset, length int64
length = 1000
options := result_sets.ResultSetOptions{}
rs_options := result_sets.ResultSetOptions{}
for {
hunts, total, err := self.Store.ListHunts(ctx, options, offset, length)
hunts, total, err := self.Store.ListHunts(ctx, rs_options, offset, length)
if err != nil {
return err
}

for _, hunt := range hunts {
if options == services.OnlyRunningHunts &&
(hunt.State != api_proto.Hunt_RUNNING || now > hunt.Expires) {
continue
}

err := cb(hunt)
if err != nil {
res_error = err
Expand Down Expand Up @@ -343,19 +350,20 @@ func (self *HuntDispatcher) checkForExpiry(
// Check if the hunt is expired and adjust its state if so
now := uint64(utils.GetTime().Now().UnixNano() / 1000)

self.ApplyFuncOnHunts(ctx, func(hunt_obj *api_proto.Hunt) error {
if hunt_obj.State == api_proto.Hunt_RUNNING &&
now > hunt_obj.Expires {

self.MutateHunt(ctx, config_obj,
&api_proto.HuntMutation{
HuntId: hunt_obj.HuntId,
State: api_proto.Hunt_STOPPED,
Stats: &api_proto.HuntStats{Stopped: true},
})
}
return nil
})
self.ApplyFuncOnHunts(ctx, services.OnlyRunningHunts,
func(hunt_obj *api_proto.Hunt) error {
if hunt_obj.State == api_proto.Hunt_RUNNING &&
now > hunt_obj.Expires {

self.MutateHunt(ctx, config_obj,
&api_proto.HuntMutation{
HuntId: hunt_obj.HuntId,
State: api_proto.Hunt_STOPPED,
Stats: &api_proto.HuntStats{Stopped: true},
})
}
return nil
})
}
}

Expand Down
3 changes: 2 additions & 1 deletion services/hunt_dispatcher/hunt_dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,8 @@ func (self *HuntDispatcherTestSuite) TestModifyingHuntPropagateChanges() {
func (self *HuntDispatcherTestSuite) getAllHunts() []*api_proto.Hunt {
// Get the list of all hunts
hunts := []*api_proto.Hunt{}
err := self.master_dispatcher.ApplyFuncOnHunts(self.Ctx,
err := self.master_dispatcher.ApplyFuncOnHunts(
self.Ctx, services.AllHunts,
func(hunt *api_proto.Hunt) error {
hunts = append(hunts, hunt)
return nil
Expand Down
3 changes: 2 additions & 1 deletion services/hunt_dispatcher/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
config_proto "www.velocidex.com/golang/velociraptor/config/proto"
"www.velocidex.com/golang/velociraptor/paths"
"www.velocidex.com/golang/velociraptor/reporting"
"www.velocidex.com/golang/velociraptor/services"
)

// Backwards compatibility: Figure out the list of collected hunts
Expand Down Expand Up @@ -53,7 +54,7 @@ func (self *HuntDispatcher) ListHunts(
// creation time. This should be very fast because all hunts
// are kept in memory inside the hunt dispatcher.
items := make([]*api_proto.Hunt, 0, end)
err := self.ApplyFuncOnHunts(ctx,
err := self.ApplyFuncOnHunts(ctx, services.AllHunts,
func(hunt *api_proto.Hunt) error {
if in.UserFilter != "" &&
in.UserFilter != hunt.Creator {
Expand Down
27 changes: 14 additions & 13 deletions services/hunt_manager/hunt_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ func (self *HuntManager) ProcessInterrogation(
return errors.New("ClientId not found")
}

return self.participateInAllHunts(ctx, config_obj, client_id,
return self.participateInRunningHunts(ctx, config_obj, client_id,
// When a new client is interrogated, it can only really
// affect hunts with OS conditions.
func(hunt *api_proto.Hunt) bool {
Expand Down Expand Up @@ -446,7 +446,7 @@ func (self *HuntManager) ProcessLabelChange(
return nil
}

return self.participateInAllHunts(ctx, config_obj, client_id,
return self.participateInRunningHunts(ctx, config_obj, client_id,
// When a label changes it can only really affect hunts with
// include label conditions.
func(hunt *api_proto.Hunt) bool {
Expand All @@ -455,7 +455,7 @@ func (self *HuntManager) ProcessLabelChange(
})
}

func (self *HuntManager) participateInAllHunts(ctx context.Context,
func (self *HuntManager) participateInRunningHunts(ctx context.Context,
config_obj *config_proto.Config, client_id string,
should_participate_cb func(hunt *api_proto.Hunt) bool) error {

Expand All @@ -470,18 +470,19 @@ func (self *HuntManager) participateInAllHunts(ctx context.Context,
return err
}

return dispatcher.ApplyFuncOnHunts(ctx, func(hunt *api_proto.Hunt) error {
if !should_participate_cb(hunt) {
return nil
}
return dispatcher.ApplyFuncOnHunts(ctx, services.OnlyRunningHunts,
func(hunt *api_proto.Hunt) error {
if !should_participate_cb(hunt) {
return nil
}

journal.PushRowsToArtifactAsync(ctx, config_obj,
ordereddict.NewDict().
Set("HuntId", hunt.HuntId).
Set("ClientId", client_id), "System.Hunt.Participation")
journal.PushRowsToArtifactAsync(ctx, config_obj,
ordereddict.NewDict().
Set("HuntId", hunt.HuntId).
Set("ClientId", client_id), "System.Hunt.Participation")

return nil
})
return nil
})
}

// When a client is found to be missing a hunt, the foreman sends the
Expand Down
6 changes: 0 additions & 6 deletions services/notebook/storage_test.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,12 @@
package notebook_test

import (
"time"

"github.com/alecthomas/assert"
api_proto "www.velocidex.com/golang/velociraptor/api/proto"
"www.velocidex.com/golang/velociraptor/services"
"www.velocidex.com/golang/velociraptor/utils"
)

func (self *NotebookManagerTestSuite) TestNotebookStorage() {
closer := utils.MockTime(utils.NewMockClock(time.Unix(10, 10)))
defer closer()

notebook_manager, err := services.GetNotebookManager(self.ConfigObj)
assert.NoError(self.T(), err)

Expand Down
3 changes: 0 additions & 3 deletions vql/server/notebooks/notebooks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,6 @@ func (self *NotebookTestSuite) SetupTest() {
}

func (self *NotebookTestSuite) TestCreateNotebook() {
closer := utils.MockTime(utils.NewMockClock(time.Unix(100, 10)))
defer closer()

repository := self.LoadArtifacts(testArtifacts...)
builder := services.ScopeBuilder{
Config: self.ConfigObj,
Expand Down

0 comments on commit a67c0b4

Please sign in to comment.