Skip to content

Commit

Permalink
Refactored comms between client and server (#2375)
Browse files Browse the repository at this point in the history
Client now maintans the flow state to remove need for the server to
maintain it. This makes the server a lot faster.
  • Loading branch information
scudette committed Jan 10, 2023
1 parent 22c93c4 commit 917be51
Show file tree
Hide file tree
Showing 57 changed files with 2,823 additions and 1,815 deletions.
1 change: 1 addition & 0 deletions .gitattributes
Expand Up @@ -15,6 +15,7 @@ logs text eol=lf

*.db binary
*.json -text
*.json.db -text
*.csv binary
*.zip binary
*.index binary
Expand Down
17 changes: 14 additions & 3 deletions accessors/file_store/accessor.go
Expand Up @@ -116,23 +116,34 @@ func (self FileStoreFileSystemAccessor) OpenWithOSPath(filename *accessors.OSPat
return nil, errors.New("Invalid path")
}

var fullpath api.FSPathSpec

// It is a data store path
if filename.Components[0] == "ds:" {
ds_path := getDSPathSpec(filename)
fullpath := ds_path.AsFilestorePath()
fullpath = ds_path.AsFilestorePath()
switch ds_path.Type() {
case api.PATH_TYPE_DATASTORE_JSON:
fullpath = fullpath.SetType(api.PATH_TYPE_FILESTORE_DB_JSON)

case api.PATH_TYPE_DATASTORE_PROTO:
fullpath = fullpath.SetType(api.PATH_TYPE_FILESTORE_DB)
}
} else {
fullpath = path_specs.FromGenericComponentList(filename.Components)
}

fullpath := path_specs.FromGenericComponentList(filename.Components)
file, err := self.file_store.ReadFile(fullpath)
if err != nil {
return nil, err
// Try to open the old protobuf style files as a fallback.
if fullpath.Type() == api.PATH_TYPE_FILESTORE_DB_JSON {
file, err = self.file_store.ReadFile(
fullpath.SetType(api.PATH_TYPE_FILESTORE_DB))
}

if err != nil {
return nil, err
}
}

return file, nil
Expand Down
2 changes: 1 addition & 1 deletion actions/events_test.go
Expand Up @@ -71,7 +71,7 @@ func (self *EventsTestSuite) SetupTest() {
self.client_id = "C.2232"
self.Clock = &utils.IncClock{}

self.responder = responder.TestResponder()
self.responder = responder.TestResponder(self.ConfigObj)

actions.GlobalEventTable = actions.NewEventTable(
self.ConfigObj, self.responder,
Expand Down
1 change: 1 addition & 0 deletions actions/proto/vql.proto
Expand Up @@ -127,6 +127,7 @@ message VQLResponse {
description: "Large VQL responses are split across many parts. "
"This carries the part of this response.",
}];

VQLRequest Query = 3 [(sem_type) = {
description: "The query that was executed.",
}];
Expand Down
12 changes: 6 additions & 6 deletions actions/vql.go
Expand Up @@ -25,7 +25,6 @@ import (
"os"
"runtime"
"runtime/debug"
"strings"
"time"

"github.com/Velocidex/ordereddict"
Expand Down Expand Up @@ -257,10 +256,6 @@ func (self VQLClientAction) StartQuery(
query_log.Close()
break run_query
}
// Skip let queries since they never produce results.
if strings.HasPrefix(strings.ToLower(query.VQL), "let") {
continue
}
response := &actions_proto.VQLResponse{
Query: query,
QueryId: uint64(query_idx),
Expand All @@ -271,6 +266,11 @@ func (self VQLClientAction) StartQuery(
Timestamp: uint64(time.Now().UTC().UnixNano() / 1000),
}

// Do not send empty responses
if result.TotalRows == 0 {
continue
}

row_tracker.AddRows(query, uint64(result.TotalRows))

// Don't log empty VQL statements.
Expand All @@ -287,7 +287,7 @@ func (self VQLClientAction) StartQuery(
))
}
response.Columns = result.Columns
responder.AddResponse(ctx, &crypto_proto.VeloMessage{
responder.AddResponse(&crypto_proto.VeloMessage{
VQLResponse: response})
}
}
Expand Down
10 changes: 5 additions & 5 deletions actions/vql_test.go
Expand Up @@ -27,22 +27,22 @@ func (self *ClientVQLTestSuite) TestCPUThrottler() {
}

// Query is not limited
resp := responder.TestResponder()
resp := responder.TestResponder(self.ConfigObj)
actions.VQLClientAction{}.StartQuery(self.ConfigObj, self.Sm.Ctx, resp, request)
resp.Close(self.Ctx)
resp.Close()
assert.NotContains(self.T(), getLogs(resp), "Will throttle query")

// Query will now be limited
resp = responder.TestResponder()
resp = responder.TestResponder(self.ConfigObj)
request.CpuLimit = 20
actions.VQLClientAction{}.StartQuery(self.ConfigObj, self.Sm.Ctx, resp, request)
resp.Close(self.Ctx)
resp.Close()
assert.Contains(self.T(), getLogs(resp), "Will throttle query")
}

// Make sure that dependent artifacts are properly used
func (self *ClientVQLTestSuite) TestDependentArtifacts() {
resp := responder.TestResponder()
resp := responder.TestResponder(self.ConfigObj)

actions.VQLClientAction{}.StartQuery(self.ConfigObj, self.Sm.Ctx, resp,
&actions_proto.VQLCollectorArgs{
Expand Down
2 changes: 1 addition & 1 deletion api/api.go
Expand Up @@ -210,7 +210,7 @@ func (self *ApiServer) CollectArtifact(
result.FlowId = flow_id

// Log this event as an Audit event.
logging.LogAudit(org_config_obj, principal, "CancelFlow",
logging.LogAudit(org_config_obj, principal, "ScheduleFlow",
logrus.Fields{
"client": in.ClientId,
"flow_id": flow_id,
Expand Down
2 changes: 1 addition & 1 deletion api/download.go
Expand Up @@ -105,7 +105,7 @@ func vfsFileDownloadHandler() http.Handler {
decoder := schema.NewDecoder()
err := decoder.Decode(&request, r.URL.Query())
if err != nil {
returnError(w, 404, err.Error())
returnError(w, 200, "Ok")
return
}

Expand Down
2 changes: 1 addition & 1 deletion artifacts/testdata/server/testcases/filestore.out.yaml
Expand Up @@ -75,7 +75,7 @@ SELECT FullPath, hash(path=FullPath, accessor="fs") FROM glob( globs='/clients/C
"SHA256": "c337a4c3885ed775b50994f8018bdc7b3fa885f2efb81a2b766f12e81ac85394"
},
"Data": {
"VFSPath": "ds:/clients/C.4f5e52adf0a337a9/collections/F.BN2HP3OSS3LK6.db"
"VFSPath": "ds:/clients/C.4f5e52adf0a337a9/collections/F.BN2HP3OSS3LK6.json.db"
}
},
{
Expand Down
Expand Up @@ -10,7 +10,7 @@ LET Sanitize(X) = regex_replace(re="[CF]\\.[0-9a-z]+", replace="C.ID", source=X)
"vfs_path": "/clients/C.ID/artifacts/Linux.Search.FileFinder/C.ID.json.index"
},
{
"vfs_path": "/clients/C.ID/collections/C.ID.db"
"vfs_path": "/clients/C.ID/collections/C.ID.json.db"
},
{
"vfs_path": "/clients/C.ID/collections/C.ID/logs.json"
Expand Down

0 comments on commit 917be51

Please sign in to comment.