Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tune the hunt dispatcher API to provide hints for hunt searches. #3462

Merged
merged 1 commit into from
Apr 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 17 additions & 16 deletions flows/housekeeping.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,26 +104,27 @@ func CheckClientStatus(
// Take a snapshot of the hunts that we need to run on this
// client to reduce the time under lock.
hunts := make([]*api_proto.Hunt, 0)
err = dispatcher.ApplyFuncOnHunts(ctx, func(hunt *api_proto.Hunt) error {
// Hunt is stopped we dont care about it.
if hunt.State != api_proto.Hunt_RUNNING {
return nil
}
err = dispatcher.ApplyFuncOnHunts(ctx, services.OnlyRunningHunts,
func(hunt *api_proto.Hunt) error {
// Hunt is stopped we dont care about it.
if hunt.State != api_proto.Hunt_RUNNING {
return nil
}

// This hunt is not relevant to this client.
if hunt.StartTime <= stats.LastHuntTimestamp {
return nil
}

// Take a snapshot of the hunt id and start time.
hunts = append(hunts, &api_proto.Hunt{
HuntId: hunt.HuntId,
StartTime: hunt.StartTime,
})

// This hunt is not relevant to this client.
if hunt.StartTime <= stats.LastHuntTimestamp {
return nil
}

// Take a snapshot of the hunt id and start time.
hunts = append(hunts, &api_proto.Hunt{
HuntId: hunt.HuntId,
StartTime: hunt.StartTime,
})

return nil
})

if err != nil {
return err
}
Expand Down
11 changes: 10 additions & 1 deletion services/hunt_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,20 @@ const (
HuntFlushToDatastoreAsync
)

type HuntSearchOptions int

const (
AllHunts HuntSearchOptions = iota

// Only visit non expired hunts
OnlyRunningHunts
)

type IHuntDispatcher interface {
// Applies the function on all the hunts. Functions may not
// modify the hunt but will have read only access to the hunt
// objects under lock.
ApplyFuncOnHunts(ctx context.Context,
ApplyFuncOnHunts(ctx context.Context, options HuntSearchOptions,
cb func(hunt *api_proto.Hunt) error) error

// As an optimization callers may get the latest hunt's
Expand Down
40 changes: 24 additions & 16 deletions services/hunt_dispatcher/hunt_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,21 +191,28 @@ func (self *HuntDispatcher) ProcessUpdate(
// Applies a callback on all hunts. The callback is not allowed to
// modify the hunts since it is getting a copy of the hunt object.
func (self *HuntDispatcher) ApplyFuncOnHunts(
ctx context.Context,
ctx context.Context, options services.HuntSearchOptions,
cb func(hunt *api_proto.Hunt) error) (res_error error) {

now := uint64(utils.GetTime().Now().UnixNano() / 1000)

// Page through the hunts table and apply the function on each
// page.
var offset, length int64
length = 1000
options := result_sets.ResultSetOptions{}
rs_options := result_sets.ResultSetOptions{}
for {
hunts, total, err := self.Store.ListHunts(ctx, options, offset, length)
hunts, total, err := self.Store.ListHunts(ctx, rs_options, offset, length)
if err != nil {
return err
}

for _, hunt := range hunts {
if options == services.OnlyRunningHunts &&
(hunt.State != api_proto.Hunt_RUNNING || now > hunt.Expires) {
continue
}

err := cb(hunt)
if err != nil {
res_error = err
Expand Down Expand Up @@ -343,19 +350,20 @@ func (self *HuntDispatcher) checkForExpiry(
// Check if the hunt is expired and adjust its state if so
now := uint64(utils.GetTime().Now().UnixNano() / 1000)

self.ApplyFuncOnHunts(ctx, func(hunt_obj *api_proto.Hunt) error {
if hunt_obj.State == api_proto.Hunt_RUNNING &&
now > hunt_obj.Expires {

self.MutateHunt(ctx, config_obj,
&api_proto.HuntMutation{
HuntId: hunt_obj.HuntId,
State: api_proto.Hunt_STOPPED,
Stats: &api_proto.HuntStats{Stopped: true},
})
}
return nil
})
self.ApplyFuncOnHunts(ctx, services.OnlyRunningHunts,
func(hunt_obj *api_proto.Hunt) error {
if hunt_obj.State == api_proto.Hunt_RUNNING &&
now > hunt_obj.Expires {

self.MutateHunt(ctx, config_obj,
&api_proto.HuntMutation{
HuntId: hunt_obj.HuntId,
State: api_proto.Hunt_STOPPED,
Stats: &api_proto.HuntStats{Stopped: true},
})
}
return nil
})
}
}

Expand Down
3 changes: 2 additions & 1 deletion services/hunt_dispatcher/hunt_dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,8 @@ func (self *HuntDispatcherTestSuite) TestModifyingHuntPropagateChanges() {
func (self *HuntDispatcherTestSuite) getAllHunts() []*api_proto.Hunt {
// Get the list of all hunts
hunts := []*api_proto.Hunt{}
err := self.master_dispatcher.ApplyFuncOnHunts(self.Ctx,
err := self.master_dispatcher.ApplyFuncOnHunts(
self.Ctx, services.AllHunts,
func(hunt *api_proto.Hunt) error {
hunts = append(hunts, hunt)
return nil
Expand Down
3 changes: 2 additions & 1 deletion services/hunt_dispatcher/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
config_proto "www.velocidex.com/golang/velociraptor/config/proto"
"www.velocidex.com/golang/velociraptor/paths"
"www.velocidex.com/golang/velociraptor/reporting"
"www.velocidex.com/golang/velociraptor/services"
)

// Backwards compatibility: Figure out the list of collected hunts
Expand Down Expand Up @@ -53,7 +54,7 @@ func (self *HuntDispatcher) ListHunts(
// creation time. This should be very fast because all hunts
// are kept in memory inside the hunt dispatcher.
items := make([]*api_proto.Hunt, 0, end)
err := self.ApplyFuncOnHunts(ctx,
err := self.ApplyFuncOnHunts(ctx, services.AllHunts,
func(hunt *api_proto.Hunt) error {
if in.UserFilter != "" &&
in.UserFilter != hunt.Creator {
Expand Down
27 changes: 14 additions & 13 deletions services/hunt_manager/hunt_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ func (self *HuntManager) ProcessInterrogation(
return errors.New("ClientId not found")
}

return self.participateInAllHunts(ctx, config_obj, client_id,
return self.participateInRunningHunts(ctx, config_obj, client_id,
// When a new client is interrogated, it can only really
// affect hunts with OS conditions.
func(hunt *api_proto.Hunt) bool {
Expand Down Expand Up @@ -446,7 +446,7 @@ func (self *HuntManager) ProcessLabelChange(
return nil
}

return self.participateInAllHunts(ctx, config_obj, client_id,
return self.participateInRunningHunts(ctx, config_obj, client_id,
// When a label changes it can only really affect hunts with
// include label conditions.
func(hunt *api_proto.Hunt) bool {
Expand All @@ -455,7 +455,7 @@ func (self *HuntManager) ProcessLabelChange(
})
}

func (self *HuntManager) participateInAllHunts(ctx context.Context,
func (self *HuntManager) participateInRunningHunts(ctx context.Context,
config_obj *config_proto.Config, client_id string,
should_participate_cb func(hunt *api_proto.Hunt) bool) error {

Expand All @@ -470,18 +470,19 @@ func (self *HuntManager) participateInAllHunts(ctx context.Context,
return err
}

return dispatcher.ApplyFuncOnHunts(ctx, func(hunt *api_proto.Hunt) error {
if !should_participate_cb(hunt) {
return nil
}
return dispatcher.ApplyFuncOnHunts(ctx, services.OnlyRunningHunts,
func(hunt *api_proto.Hunt) error {
if !should_participate_cb(hunt) {
return nil
}

journal.PushRowsToArtifactAsync(ctx, config_obj,
ordereddict.NewDict().
Set("HuntId", hunt.HuntId).
Set("ClientId", client_id), "System.Hunt.Participation")
journal.PushRowsToArtifactAsync(ctx, config_obj,
ordereddict.NewDict().
Set("HuntId", hunt.HuntId).
Set("ClientId", client_id), "System.Hunt.Participation")

return nil
})
return nil
})
}

// When a client is found to be missing a hunt, the foreman sends the
Expand Down
6 changes: 0 additions & 6 deletions services/notebook/storage_test.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,12 @@
package notebook_test

import (
"time"

"github.com/alecthomas/assert"
api_proto "www.velocidex.com/golang/velociraptor/api/proto"
"www.velocidex.com/golang/velociraptor/services"
"www.velocidex.com/golang/velociraptor/utils"
)

func (self *NotebookManagerTestSuite) TestNotebookStorage() {
closer := utils.MockTime(utils.NewMockClock(time.Unix(10, 10)))
defer closer()

notebook_manager, err := services.GetNotebookManager(self.ConfigObj)
assert.NoError(self.T(), err)

Expand Down
3 changes: 0 additions & 3 deletions vql/server/notebooks/notebooks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,6 @@ func (self *NotebookTestSuite) SetupTest() {
}

func (self *NotebookTestSuite) TestCreateNotebook() {
closer := utils.MockTime(utils.NewMockClock(time.Unix(100, 10)))
defer closer()

repository := self.LoadArtifacts(testArtifacts...)
builder := services.ScopeBuilder{
Config: self.ConfigObj,
Expand Down
Loading