Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implemented enrollment rate #1003

Merged
merged 4 commits into from
Apr 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ name: Linux Build All Arches
on:
pull_request:
types: [closed]
branches:
- master
# branches:
# - master

jobs:
build:
Expand Down
97 changes: 83 additions & 14 deletions actions/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,44 @@ type EventTable struct {

config_obj *config_proto.Config

// This will be closed to signal we need to abort the current
// event queries.
// This will be closed to signal that we need to abort the
// current event queries.
Done chan bool
wg sync.WaitGroup
}

func (self *EventTable) equal(events []*actions_proto.VQLCollectorArgs) bool {
if len(events) != len(self.Events) {
return false
}

for i := range self.Events {
lhs := self.Events[i]
rhs := events[i]

if len(lhs.Query) != len(rhs.Query) {
return false
}

for j := range lhs.Query {
if !proto.Equal(lhs.Query[j], rhs.Query[j]) {
return false
}
}

if len(lhs.Env) != len(rhs.Env) {
return false
}

for j := range lhs.Env {
if !proto.Equal(lhs.Env[j], rhs.Env[j]) {
return false
}
}
}
return true
}

func (self *EventTable) Close() {
logger := logging.GetLogger(self.config_obj, &logging.ClientComponent)
logger.Info("Closing EventTable\n")
Expand All @@ -74,14 +106,28 @@ func GlobalEventTableVersion() uint64 {
func update(
config_obj *config_proto.Config,
responder *responder.Responder,
table *actions_proto.VQLEventTable) (*EventTable, error) {
table *actions_proto.VQLEventTable) (*EventTable, error, bool) {

mu.Lock()
defer mu.Unlock()

// Only update the event table if we need to.
if table.Version <= GlobalEventTable.version {
return GlobalEventTable, nil
return GlobalEventTable, nil, false
}

// If the new update is identical to the old queries we wont
// restart. This can happen e.g. if the server changes label
// groups and recaculates the table version but the actual
// queries dont end up changing.
if GlobalEventTable.equal(table.Event) {
logger := logging.GetLogger(config_obj, &logging.ClientComponent)
logger.Info("Client event query update %v did not "+
"change queries, skipping", table.Version)

// Update the version only but keep queries the same.
GlobalEventTable.version = table.Version
return GlobalEventTable, nil, false
}

// Close the old table.
Expand All @@ -90,10 +136,9 @@ func update(
}

// Make a new table.
GlobalEventTable = NewEventTable(
config_obj, responder, table)
GlobalEventTable = NewEventTable(config_obj, responder, table)

return GlobalEventTable, nil
return GlobalEventTable, nil, true /* changed */
}

func NewEventTable(
Expand All @@ -119,9 +164,24 @@ func (self UpdateEventTable) Run(
arg *actions_proto.VQLEventTable) {

// Make a new table.
table, err := update(config_obj, responder, arg)
table, err, changed := update(config_obj, responder, arg)
if err != nil {
responder.Log(ctx, "Error updating global event table: %v", err)
responder.RaiseError(ctx, fmt.Sprintf(
"Error updating global event table: %v", err))
return
}

// No change required, skip it.
if !changed {
// We still need to write the new version
err = update_writeback(config_obj, arg)
if err != nil {
responder.RaiseError(ctx, fmt.Sprintf(
"Unable to write events to writeback: %v", err))
} else {
responder.Return(ctx)
}
return
}

logger := logging.GetLogger(config_obj, &logging.ClientComponent)
Expand Down Expand Up @@ -178,15 +238,24 @@ func (self UpdateEventTable) Run(
}(event)
}

// Store the event table in the Writeback file.
config_copy := proto.Clone(config_obj).(*config_proto.Config)
event_copy := proto.Clone(arg).(*actions_proto.VQLEventTable)
config_copy.Writeback.EventQueries = event_copy
err = config.UpdateWriteback(config_copy)
err = update_writeback(config_obj, arg)
if err != nil {
responder.RaiseError(ctx, fmt.Sprintf(
"Unable to write events to writeback: %v", err))
return
}

responder.Return(ctx)
}

func update_writeback(
config_obj *config_proto.Config,
event_table *actions_proto.VQLEventTable) error {

// Store the event table in the Writeback file.
config_copy := proto.Clone(config_obj).(*config_proto.Config)
event_copy := proto.Clone(event_table).(*actions_proto.VQLEventTable)
config_copy.Writeback.EventQueries = event_copy

return config.UpdateWriteback(config_copy)
}
199 changes: 100 additions & 99 deletions actions/proto/vql.pb.go

Large diffs are not rendered by default.

6 changes: 2 additions & 4 deletions actions/proto/vql.proto
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,9 @@ message VQLEnv {
string value = 2;
}

// This is the most common type of message - it specifies a query to
// run on the endpoint.
message VQLCollectorArgs {
option (flow_metadata) = {
category: "Generic";
};

// If this is specified we run this query first and if it returns
// any rows we continue with the real query.
string precondition = 29;
Expand Down
2 changes: 1 addition & 1 deletion actions/vql.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ func (self VQLClientAction) StartQuery(
)
}
response.Columns = result.Columns
responder.AddResponse(ctx, &crypto_proto.GrrMessage{
responder.AddResponse(ctx, &crypto_proto.VeloMessage{
VQLResponse: response})
}
}
Expand Down
26 changes: 13 additions & 13 deletions api/proto/api.pb.gw.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.