Skip to content

Commit

Permalink
Bugfix: Removed global filestore flush
Browse files Browse the repository at this point in the history
The memcache filestore delays writes in order to batch them. This can
cause race conditions in some cases. In particular when creating a
download export zip, the GUI tries to access the file
immediately. However if the writes are still in flight then it will
not be able to find it.

Previously this order was ensured by using FlushFilestore which is a
global flush of all inflight writes. However, for memcache filestore
on EFS this global flush is a performance killer since the filestore
is locked for the duration of the flush. This appears as a deadlock
where the entire process is blocked while an export is underway.

This PR improves things in a number of ways:
1. The flushes are gone in the background - ensuring the filestore is
   not locked too long.

2. We avoid calling the global FlushFilestore by installing
   appropriate completion functions in the export functions.

3. The TTL cache is updated to allow for overriding the expiry
   behavior. This makes it simpler to avoid expiring files that are
   still open.

Also this PR fixes a GUI issue where JSON parsing raises an exception
for invalid data. Now the GUI handles it with a suitable default
object.
  • Loading branch information
scudette committed May 17, 2024
1 parent e1cdaf8 commit 61b3060
Show file tree
Hide file tree
Showing 31 changed files with 180 additions and 129 deletions.
3 changes: 2 additions & 1 deletion accessors/smb/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,11 +163,12 @@ func NewSMBMountCache(scope vfilter.Scope) *SMBMountCache {
}
result.lru.SetTTL(time.Hour)
result.lru.SetExpirationCallback(
func(key string, value interface{}) {
func(key string, value interface{}) error {
ctx, ok := value.(*SMBConnectionContext)
if ok {
ctx.Close()
}
return nil
})

vql_subsystem.GetRootScope(scope).AddDestructor(func() {
Expand Down
2 changes: 1 addition & 1 deletion file_store/directory/directory.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func (self *DirectoryFileStore) StatFile(

func (self *DirectoryFileStore) WriteFile(
filename api.FSPathSpec) (api.FileWriter, error) {
return self.WriteFileWithCompletion(filename, nil)
return self.WriteFileWithCompletion(filename, utils.SyncCompleter)
}

func (self *DirectoryFileStore) WriteFileWithCompletion(
Expand Down
92 changes: 55 additions & 37 deletions file_store/memcache/memcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ package memcache
import (
"bytes"
"sync"
"sync/atomic"
"time"

"github.com/Velocidex/ttlcache/v2"
Expand Down Expand Up @@ -41,8 +42,10 @@ type MemcacheFileWriter struct {
// Is the writer currently closed? NOTE!!! There is an implicit
// assumption that there is only one concurrent writer to the same
// result set! Writers are all cached in the same data_cache keyed
// by the same key and are flushed separately.
closed bool
// by the same key and are flushed separately. Writers may be
// closed at any time but this does not mean they get
// flushed. Flushing is delayed until the lru flush cycle.
closed int32
buffer bytes.Buffer
size int64

Expand All @@ -59,6 +62,11 @@ type MemcacheFileWriter struct {
// to combine writes to the underlying storage, but if a file is
// opened, closed then opened again, we need to fire all the
// completions without losing any.

// NOTE: When the file is closed, this just tags the closed flag
// but does not lead to immediate flushing. A syncronous write
// must have a completion function here to wait for the ultimate
// flush.
completions []func()
}

Expand Down Expand Up @@ -121,25 +129,37 @@ func (self *MemcacheFileWriter) Truncate() error {
return nil
}

// Lock free to avoid deadlocks
func (self *MemcacheFileWriter) IsClosed() bool {
return atomic.LoadInt32(&self.closed) > 0
}

func (self *MemcacheFileWriter) AddCompletion(cb func()) {
self.mu.Lock()
defer self.mu.Unlock()

self.completions = append(self.completions, cb)
}

// Closing the file does not trigger a flush - we just return a
// success status and wait for the file to be written asynchronously.
// We assume no concurrent writes to the same file but closing and
// opening the same path (quicker than cache expiration time) will
// usually give the same writer.
func (self *MemcacheFileWriter) Close() error {
self.mu.Lock()
self.closed = true
atomic.StoreInt32(&self.closed, 1)

// Convert all utils.SyncCompleter calls to sync waits on return
// from Close(). The writer pool will release us when done.
wg := sync.WaitGroup{}
defer wg.Wait()

sync_call := false
for idx, c := range self.completions {
if utils.CompareFuncs(c, utils.SyncCompleter) {
wg.Add(1)

// Wait for the flusher to close us.
defer wg.Wait()
self.completions[idx] = wg.Done
sync_call = true
}
Expand All @@ -148,8 +168,9 @@ func (self *MemcacheFileWriter) Close() error {
// Release the lock before we wait for the flusher.
self.mu.Unlock()

// If any of the calls were synchronous do not wait - just write
// them now.
// If any of the calls were synchronous do not wait for the cache
// expiry - just write them now immediately. We will wait here for
// them to complete.
if sync_call {
return self.Flush()
}
Expand All @@ -158,7 +179,6 @@ func (self *MemcacheFileWriter) Close() error {
}

func (self *MemcacheFileWriter) Flush() error {

// While the file is flushed it blocks other writers to the same
// file (which will be blocked on the mutex. This ensures writes
// to the underlying filestore occur in order).
Expand All @@ -168,12 +188,13 @@ func (self *MemcacheFileWriter) Flush() error {
return self._Flush()
}

// This function has lock held
func (self *MemcacheFileWriter) _Flush() error {
defer func() {
// Only send completions once the file is actually closed. It
// is possible for the file to flush many times before it is
// being closed but this does not count as a completion.
if self.closed {
if atomic.LoadInt32(&self.closed) > 0 {
for _, c := range self.completions {
c()
}
Expand Down Expand Up @@ -218,7 +239,7 @@ type MemcacheFileStore struct {
min_age time.Duration
max_age time.Duration

closed bool
closed int32
}

func NewMemcacheFileStore(config_obj *config_proto.Config) *MemcacheFileStore {
Expand Down Expand Up @@ -248,26 +269,31 @@ func NewMemcacheFileStore(config_obj *config_proto.Config) *MemcacheFileStore {
result.data_cache.SetTTL(result.min_age)
result.data_cache.SkipTTLExtensionOnHit(true)
result.data_cache.SetCacheSizeLimit(int(max_writers))
result.data_cache.SetNewItemCallback(func(key string, value interface{}) {
result.data_cache.SetNewItemCallback(func(key string, value interface{}) error {
metricDataLRU.Inc()
return nil
})

result.data_cache.SetExpirationCallback(func(key string, value interface{}) {
writer, ok := value.(*MemcacheFileWriter)
if ok {
writer.mu.Lock()
defer writer.mu.Unlock()

writer._Flush()

// We are not done with it yet - return it to the cache.
if !result.IsClosed() && !writer.closed {
result.data_cache.Set(writer.key, writer)
result.data_cache.SetExpirationCallback(
func(key string, value interface{}) error {
writer, ok := value.(*MemcacheFileWriter)
if ok {
// We are not done with it yet - return it to the cache.
if !result.IsClosed() && !writer.IsClosed() {
return ttlcache.NoExpireError
}

// This callback happens under global cache lock. The
// below Flush operation may take an unspecified
// amount of time, so we need to do it in a goroutine
// so as not to hold the cache locked.
go writer.Flush()
}
}

metricDataLRU.Dec()
})
// The cache will now expire this item.
metricDataLRU.Dec()
return nil
})

return result
}
Expand All @@ -287,9 +313,6 @@ func (self *MemcacheFileStore) WriteFileWithCompletion(
path api.FSPathSpec, completion func()) (api.FileWriter, error) {
defer api.Instrument("write_open", "MemcacheFileStore", path)()

self.mu.Lock()
defer self.mu.Unlock()

key := path.AsClientPath()

var result *MemcacheFileWriter
Expand All @@ -309,7 +332,7 @@ func (self *MemcacheFileStore) WriteFileWithCompletion(

} else {
result = result_any.(*MemcacheFileWriter)
result.closed = false
atomic.StoreInt32(&result.closed, 0)

// If we have more time until the max_age, re-set it into the
// cache and extend ttl, otherwise, let it expire normally
Expand All @@ -321,7 +344,7 @@ func (self *MemcacheFileStore) WriteFileWithCompletion(

// Add the completion to the writer.
if completion != nil {
result.completions = append(result.completions, completion)
result.AddCompletion(completion)
}

return result, nil
Expand All @@ -339,16 +362,11 @@ func (self *MemcacheFileStore) Flush() {
}

func (self *MemcacheFileStore) IsClosed() bool {
self.mu.Lock()
defer self.mu.Unlock()

return self.closed
return atomic.LoadInt32(&self.closed) > 0
}

func (self *MemcacheFileStore) Close() error {
self.mu.Lock()
self.closed = true
self.mu.Unlock()
atomic.StoreInt32(&self.closed, 1)

self.Flush()
return nil
Expand Down
3 changes: 3 additions & 0 deletions file_store/memcache/memcache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,9 @@ func (self *MemcacheTestSuite) TestDelayedWrites() {
}
file_store.Flush()

mu.Lock()
defer mu.Unlock()

return write_times
}

Expand Down
2 changes: 1 addition & 1 deletion file_store/memory/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ func (self *MemoryFileStore) ReadFile(path api.FSPathSpec) (api.FileReader, erro
}

func (self *MemoryFileStore) WriteFile(path api.FSPathSpec) (api.FileWriter, error) {
return self.WriteFileWithCompletion(path, nil)
return self.WriteFileWithCompletion(path, utils.SyncCompleter)
}

func (self *MemoryFileStore) WriteFileWithCompletion(
Expand Down
3 changes: 2 additions & 1 deletion flows/artifacts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -596,6 +596,7 @@ func (self *TestSuite) testCollectionCompletion(
"System.Flow.Completion", "", func(ctx context.Context,
config_obj *config_proto.Config,
row *ordereddict.Dict) error {

mu.Lock()
defer mu.Unlock()

Expand All @@ -609,7 +610,7 @@ func (self *TestSuite) testCollectionCompletion(
}
runner.Close(self.Ctx)

vtesting.WaitUntil(time.Second, self.T(), func() bool {
vtesting.WaitUntil(10*time.Second, self.T(), func() bool {
mu.Lock()
defer mu.Unlock()

Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ require (
github.com/Velocidex/json v0.0.0-20220224052537-92f3c0326e5a
github.com/Velocidex/pkcs7 v0.0.0-20230220112103-d4ed02e1862a
github.com/Velocidex/sflags v0.3.1-0.20231011011525-620ab7ca8617
github.com/Velocidex/ttlcache/v2 v2.9.1-0.20230724083715-1eb048b1f6d6
github.com/Velocidex/ttlcache/v2 v2.9.1-0.20240517145123-a3f45e86e130
github.com/Velocidex/yaml/v2 v2.2.8
github.com/Velocidex/zip v0.0.0-20210101070220-e7ecefb7aad7
github.com/alecthomas/assert v1.0.0
Expand Down Expand Up @@ -233,7 +233,7 @@ require (
go.opencensus.io v0.24.0 // indirect
go.uber.org/goleak v1.2.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/sync v0.4.0 // indirect
golang.org/x/sync v0.7.0 // indirect
golang.org/x/term v0.18.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
google.golang.org/appengine v1.6.8 // indirect
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,8 @@ github.com/Velocidex/sflags v0.3.1-0.20231011011525-620ab7ca8617 h1:pxAOaYTYwbWh
github.com/Velocidex/sflags v0.3.1-0.20231011011525-620ab7ca8617/go.mod h1:TTYBEgQFkjJvyBOC2s7G1+mNPZ3IHuqLZmVFRzyPfq4=
github.com/Velocidex/sigma-go v0.0.0-20240505024531-e8ce54ec3aed h1:zqhuWeg6oqO3jNabjKJaGO7DreiGhbVfeyqleICMAZk=
github.com/Velocidex/sigma-go v0.0.0-20240505024531-e8ce54ec3aed/go.mod h1:fHCN8y8cC1l5CYY7oOhPIznHmj/yeGxUvU+vAV7alr4=
github.com/Velocidex/ttlcache/v2 v2.9.1-0.20230724083715-1eb048b1f6d6 h1:3HSrLwAt64gM7FNTPRXYMszpS5bRijQ6xaPVq2vsbuI=
github.com/Velocidex/ttlcache/v2 v2.9.1-0.20230724083715-1eb048b1f6d6/go.mod h1:3/pI9BBAF7gydBWvMVtV7W1qRwshEG9lBwed/d8xfFg=
github.com/Velocidex/ttlcache/v2 v2.9.1-0.20240517145123-a3f45e86e130 h1:+QujZ0D7KSy3WJVchkOhMkvAUab6/CIisO5LCoN48q4=
github.com/Velocidex/ttlcache/v2 v2.9.1-0.20240517145123-a3f45e86e130/go.mod h1:3/pI9BBAF7gydBWvMVtV7W1qRwshEG9lBwed/d8xfFg=
github.com/Velocidex/yaml/v2 v2.2.8 h1:GUrSy4SBJ6RjGt43k6MeBKtw2z/27gh4A3hfFmFY3No=
github.com/Velocidex/yaml/v2 v2.2.8/go.mod h1:PlXIg/Pxmoja48C1vMHo7C5pauAZvLq/UEPOQ3DsjS4=
github.com/Velocidex/zip v0.0.0-20210101070220-e7ecefb7aad7 h1:IAry9WUMrVYA+XPvMF5UMN56ya5II/hoUOtqaHKOHrs=
Expand Down Expand Up @@ -907,8 +907,8 @@ golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.4.0 h1:zxkM55ReGkDlKSM+Fu41A+zmbZuaPVbGMzvvdUPznYQ=
golang.org/x/sync v0.4.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y=
golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M=
golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand Down
20 changes: 11 additions & 9 deletions gui/velociraptor/src/components/artifacts/reporting.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@ import { NotebookLineChart, NotebookTimeChart,
} from '../notebooks/notebook-chart-renderer.jsx';

import VeloValueRenderer from '../utils/value.jsx';
import { JSONparse } from '../utils/json_parse.jsx';

// Renders a report in the DOM.
const parse_param = domNode=>JSON.parse(decodeURIComponent(
domNode.attribs.params || "{}"));
const parse_param = domNode=>JSONparse(decodeURIComponent(
domNode.attribs.params, {}));

// NOTE: The server sanitizes reports using the bluemonday
// sanitization. We therefore trust the output and are allowed to
Expand Down Expand Up @@ -111,7 +112,7 @@ export default class VeloReportViewer extends React.Component {
let new_state = {
template: response.data.template || "No Reports",
messages: response.data.messages || [],
data: JSON.parse(response.data.data),
data: JSONparse(response.data.data),
loading: false,
version: this.state.version + 1,
};
Expand Down Expand Up @@ -155,7 +156,7 @@ export default class VeloReportViewer extends React.Component {
let data = this.state.data;
let value = decodeURIComponent(domNode.attribs.value || "");
let response = data[value] || {};
let rows = JSON.parse(response.Response);
let rows = JSONparse(response.Response, []);
return (
<VeloTable
rows={rows}
Expand All @@ -172,7 +173,7 @@ export default class VeloReportViewer extends React.Component {
if (value) {
let match = re.exec(value);
let data = this.state.data[match[1]];
let rows = JSON.parse(data.Response);
let rows = JSONparse(data.Response, []);

return (
<VeloTable rows={rows} columns={data.Columns} />
Expand Down Expand Up @@ -223,8 +224,8 @@ export default class VeloReportViewer extends React.Component {
let match = re.exec(value);
let data = this.state.data[match[1]];
try {
let rows = JSON.parse(data.Response);
let params = JSON.parse(decodeURIComponent(domNode.attribs.params));
let rows = JSONparse(data.Response, []);
let params = JSONparse(decodeURIComponent(domNode.attribs.params), {});
return (
<VeloLineChart data={rows}
columns={data.Columns}
Expand All @@ -244,8 +245,9 @@ export default class VeloReportViewer extends React.Component {
let match = re.exec(value);
let data = this.state.data[match[1]];
try {
let rows = JSON.parse(data.Response);
let params = JSON.parse(decodeURIComponent(domNode.attribs.params));
let rows = JSONparse(data.Response, []);
let params = JSONparse(
decodeURIComponent(domNode.attribs.params), {});
return (
<VeloTimeChart data={rows}
columns={data.Columns}
Expand Down
7 changes: 2 additions & 5 deletions gui/velociraptor/src/components/clients/shell-viewer.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import Completer from '../artifacts/syntax.jsx';
import { DeleteFlowDialog } from "../flows/flows-list.jsx";
import Button from 'react-bootstrap/Button';
import { withRouter } from "react-router-dom";
import { JSONparse } from '../utils/json_parse.jsx';

// Refresh every 5 seconds
const SHELL_POLL_TIME = 5000;
Expand Down Expand Up @@ -620,11 +621,7 @@ class ShellViewer extends Component {
// Column 8 is the _Flow column;
let flow_json = rows[i] && rows[i].cell &&
rows[i].cell[column_idx];
try {
var flow = JSON.parse(flow_json);
} catch(e) {
continue;
}
let flow = JSONparse(flow_json);
if (!flow || !flow.request) {
continue;
}
Expand Down
Loading

0 comments on commit 61b3060

Please sign in to comment.