Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Control memory use in memcache #3495

Merged
merged 3 commits into from
May 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions bin/deaddisk.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,10 @@ func addCommonPermissions(config_obj *config_proto.Config) {
addPermission(config_obj, "READ_RESULTS")
addPermission(config_obj, "MACHINE_STATE")
addPermission(config_obj, "SERVER_ADMIN")

// For http_client
addPermission(config_obj, "COLLECT_SERVER")
addPermission(config_obj, "EXECVE")
}

func addWindowsPartition(
Expand Down
1,061 changes: 537 additions & 524 deletions config/proto/config.pb.go

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions config/proto/config.proto
Original file line number Diff line number Diff line change
Expand Up @@ -902,6 +902,9 @@ message DatastoreConfig {
// filesystems (default 100).
int64 memcache_write_mutation_writers = 6;

// Aim to not exceed this memory limit.
int64 memcache_write_max_memory = 17; // Default 100Mb

// How long to delay writes so they can be combined. This applies
// for writing result sets - we keep the writes in memory for
// min_age seconds in order to combine further writes. If another
Expand Down
102 changes: 79 additions & 23 deletions file_store/memcache/memcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@ import (
"context"
"errors"
"sync"
"sync/atomic"
"time"

"github.com/alitto/pond"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
config_proto "www.velocidex.com/golang/velociraptor/config/proto"
Expand Down Expand Up @@ -43,6 +45,12 @@ var (
Help: "Total number of current writers flushing (capped at concurrency)",
})

metricTotalSyncWrites = promauto.NewCounter(
prometheus.CounterOpts{
Name: "memcache_filestore_total_sync_writes",
Help: "Total number of syncronous writer operations done on the memcache filestore",
})

metricTotalWrites = promauto.NewCounter(
prometheus.CounterOpts{
Name: "memcache_filestore_total_writes",
Expand Down Expand Up @@ -172,6 +180,7 @@ func (self *MemcacheFileWriter) Write(data []byte) (int, error) {

self.written_size += int64(len(data))

self.owner.ChargeBytes(int64(len(data)))
metricCachedBytes.Add(float64(len(data)))
metricTotalWrites.Inc()
metricTotalWritesBytes.Add(float64(len(data)))
Expand Down Expand Up @@ -239,6 +248,7 @@ func (self *MemcacheFileWriter) Close() error {
// flush cycle, instead force a flush now and wait for it to
// complete.
if sync_call {
metricTotalSyncWrites.Inc()
return self.Flush()
}

Expand Down Expand Up @@ -270,6 +280,12 @@ func (self *MemcacheFileWriter) timeToExpire() bool {
return utils.GetTime().Now().Sub(self.last_close_time) > self.max_age
}

func (self *MemcacheFileWriter) callCompletions(completions []func()) {
for _, c := range completions {
c()
}
}

// Begin the flush cycle
func (self *MemcacheFileWriter) Flush() error {
// While the file is flushed it blocks other writers to the same
Expand All @@ -278,32 +294,34 @@ func (self *MemcacheFileWriter) Flush() error {
self.mu.Lock()
defer self.mu.Unlock()

// Copy the completions and buffer so we can serve writes while
// flushing.
var completions []func()

// Only send completions once the file is actually closed. It
// is possible for the file to flush many times before it is
// being closed but this does not count as a completion.
if self.closed {
completions = append(completions, self.completions...)
self.completions = nil
}

// Nothing to do
if self.last_flush.IsZero() {
self.callCompletions(completions)
return nil
}

// Not really an error but we can not flush while we are already
// flushing - will try again in the next flush cycle.
if self.flushing {
self.callCompletions(completions)
return currentlyFlushingError
}

// Will be cleared when the flush is done and we can flush again.
self.flushing = true

// Copy the completions and buffer so we can serve writes while
// flushing.
var completions []func()

// Only send completions once the file is actually closed. It
// is possible for the file to flush many times before it is
// being closed but this does not count as a completion.
if self.closed {
completions = append(completions, self.completions...)
self.completions = nil
}

truncated := self.truncated
self.truncated = false

Expand All @@ -322,8 +340,11 @@ func (self *MemcacheFileWriter) Flush() error {
// Flush in the background and return immediately. We can collect
// writes into memory in the meantime.
self.wg.Add(1)
go self._FlushInBackground(
buffer.Bytes(), truncated, completions, self.wg)
self.owner.pool.Submit(func() {
defer self.wg.Done()

self._FlushInBackground(buffer.Bytes(), truncated, completions)
})

return nil
}
Expand All @@ -347,20 +368,24 @@ func (self *MemcacheFileWriter) Size() (int64, error) {

// Flush the data in the background.
func (self *MemcacheFileWriter) _FlushInBackground(
data []byte, truncate bool, completions []func(),
wg *sync.WaitGroup) {
data []byte, truncate bool, completions []func()) {

defer func() {
for _, c := range completions {
c()
}
// We guarantee to call the completions after this had been
// flushed but we can not wait for them to exit before we
// release the writer. Therefore we call these in the
// background so we can return immediately and release our
// concurrency slot.
go func() {
for _, c := range completions {
c()
}
}()

// We are ready to flush again!
self.mu.Lock()
self.flushing = false
self.mu.Unlock()

wg.Done()
}()

// The below is covered by concurrency control - will wait here
Expand Down Expand Up @@ -393,6 +418,7 @@ func (self *MemcacheFileWriter) _FlushInBackground(
writer.Truncate()
}

self.owner.ChargeBytes(-int64((len(data))))
metricCachedBytes.Sub(float64(len(data)))
_, err = writer.Write(data)
if err != nil {
Expand Down Expand Up @@ -426,6 +452,16 @@ type MemcacheFileStore struct {
max_age time.Duration

closed bool

// Total number of bytes in flight right now. If this gets too
// large we start turning writes to be synchronous to push back
// against writers and protect our memory usage.
total_cached_bytes int64

// Pool of flusher workers
pool *pond.WorkerPool

target_memory_use int64
}

func NewMemcacheFileStore(
Expand All @@ -448,6 +484,11 @@ func NewMemcacheFileStore(
max_writers = 200
}

target_memory_use := config_obj.Datastore.MemcacheWriteMaxMemory
if target_memory_use == 0 {
target_memory_use = 100 * 1024 * 1024
}

result := &MemcacheFileStore{
id: utils.GetId(),
ctx: ctx,
Expand All @@ -457,15 +498,21 @@ func NewMemcacheFileStore(
data_cache: make(map[string]*MemcacheFileWriter),
concurrency: utils.NewConcurrencyControl(
int(max_writers), time.Hour),
max_age: time.Duration(max_age) * time.Millisecond,
min_age: time.Duration(ttl) * time.Millisecond,
max_age: time.Duration(max_age) * time.Millisecond,
min_age: time.Duration(ttl) * time.Millisecond,
pool: pond.New(int(max_writers), int(max_writers*10)),
target_memory_use: target_memory_use,
}

go result.Start(ctx)

return result
}

func (self *MemcacheFileStore) ChargeBytes(count int64) {
atomic.AddInt64(&self.total_cached_bytes, count)
}

func (self *MemcacheFileStore) FlushCycle(ctx context.Context) {
writers := []*MemcacheFileWriter{}
self.mu.Lock()
Expand Down Expand Up @@ -567,6 +614,11 @@ func (self *MemcacheFileStore) WriteFileWithCompletion(
result.AddCompletion(completion)
}

// Turn the call into syncronous if our memory is exceeded.
if atomic.LoadInt64(&self.total_cached_bytes) > self.target_memory_use {
result.AddCompletion(utils.SyncCompleter)
}

return result, nil
}

Expand All @@ -589,6 +641,10 @@ func (self *MemcacheFileStore) Flush() {
writer.Flush()
}

logger := logging.GetLogger(self.config_obj, &logging.FrontendComponent)
logger.Info("<red>MemcacheFileStore</>: Shutdown: Waiting to flush %v bytes",
atomic.LoadInt64(&self.total_cached_bytes))

// Wait for all the flushers to finish
self.wg.Wait()

Expand Down
28 changes: 23 additions & 5 deletions file_store/memcache/memcache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,12 @@ type MemcacheTestSuite struct {
}

func (self *MemcacheTestSuite) TestWriterExpiry() {
self.config_obj.Datastore.MemcacheWriteMutationMaxAge = 1 // 100 Ms
file_store := NewTestMemcacheFilestore(self.config_obj)
config_obj := config.GetDefaultConfig()
config_obj.Datastore.Implementation = "MemcacheFileDataStore"
config_obj.Datastore.MemcacheWriteMutationBuffer = 100
config_obj.Datastore.MemcacheWriteMutationMaxAge = 1 // 100 Ms

file_store := NewTestMemcacheFilestore(config_obj)

data := []byte("Hello")

Expand All @@ -44,14 +48,18 @@ func (self *MemcacheTestSuite) TestWriterExpiry() {
file_store.FlushCycle(context.Background())

// Still there.
file_store.mu.Lock()
assert.Equal(self.T(), 1, len(file_store.data_cache))
file_store.mu.Unlock()

time.Sleep(100 * time.Millisecond)

file_store.FlushCycle(context.Background())

// Old writers are cleared after max_age
file_store.mu.Lock()
assert.Equal(self.T(), 0, len(file_store.data_cache))
file_store.mu.Unlock()
}

// Size reporting is very important to keep track of the result set
Expand Down Expand Up @@ -146,12 +154,15 @@ func (self *MemcacheTestSuite) TestFileAsyncWrite() {
}

func (self *MemcacheTestSuite) TestFileSyncWrite() {
self.config_obj.Datastore.MemcacheWriteMutationMinAge = 0
config_obj := config.GetDefaultConfig()
config_obj.Datastore.Implementation = "MemcacheFileDataStore"
config_obj.Datastore.MemcacheWriteMutationBuffer = 100
config_obj.Datastore.MemcacheWriteMutationMinAge = 0

// Stop automatic flushing
self.config_obj.Datastore.MemcacheWriteMutationMaxAge = 40000000000
config_obj.Datastore.MemcacheWriteMutationMaxAge = 40000000000

file_store := NewTestMemcacheFilestore(self.config_obj)
file_store := NewTestMemcacheFilestore(config_obj)

filename := path_specs.NewSafeFilestorePath("test", "sync")
fd, err := file_store.WriteFileWithCompletion(filename, utils.SyncCompleter)
Expand Down Expand Up @@ -226,6 +237,13 @@ func (self *MemcacheTestSuite) TestFileWriteCompletions() {
file_store.Flush()

// Both completions are fired.
vtesting.WaitUntil(time.Second, self.T(), func() bool {
mu.Lock()
defer mu.Unlock()

return len(result) == 2
})

mu.Lock()
assert.Equal(self.T(), len(result), 2)
assert.Equal(self.T(), "Done", result[0])
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ require (
github.com/alecthomas/colour v0.1.0 // indirect
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect
github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137 // indirect
github.com/alitto/pond v1.8.3 // indirect
github.com/andybalholm/cascadia v1.3.2 // indirect
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.1 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.15.2 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,8 @@ github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137 h1:s6gZFSlWYmbqAu
github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137/go.mod h1:OMCwj8VM1Kc9e19TLln2VL61YJF0x1XFtfdL4JdbSyE=
github.com/alexmullins/zip v0.0.0-20180717182244-4affb64b04d0 h1:BVts5dexXf4i+JX8tXlKT0aKoi38JwTXSe+3WUneX0k=
github.com/alexmullins/zip v0.0.0-20180717182244-4affb64b04d0/go.mod h1:FDIQmoMNJJl5/k7upZEnGvgWVZfFeE6qHeN7iCMbCsA=
github.com/alitto/pond v1.8.3 h1:ydIqygCLVPqIX/USe5EaV/aSRXTRXDEI9JwuDdu+/xs=
github.com/alitto/pond v1.8.3/go.mod h1:CmvIIGd5jKLasGI3D87qDkQxjzChdKMmnXMg3fG6M6Q=
github.com/andybalholm/brotli v1.0.4 h1:V7DdXeJtZscaqfNuAdSRuRFzuiKlHSC/Zh3zl9qY3JY=
github.com/andybalholm/brotli v1.0.4/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig=
github.com/andybalholm/cascadia v1.1.0/go.mod h1:GsXiBklL0woXo1j/WYWtSYYC4ouU9PqHO0sqidkEA4Y=
Expand Down
Loading
Loading