Skip to content

Commit

Permalink
Added automated backup and restore capability (#3346)
Browse files Browse the repository at this point in the history
This backs up volatile but critical information like hunts and client
info records.
  • Loading branch information
scudette committed Mar 16, 2024
1 parent d5765b0 commit bcec615
Show file tree
Hide file tree
Showing 23 changed files with 1,451 additions and 349 deletions.
631 changes: 327 additions & 304 deletions config/proto/config.pb.go

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions config/proto/config.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1036,6 +1036,7 @@ message ServerServicesConfig {
bool launcher = 23;
bool notebook_service = 24;
bool scheduler_service = 29;
bool backup_service = 30;

// Client services
bool http_communicator = 27;
Expand Down Expand Up @@ -1163,6 +1164,9 @@ message Defaults {

// Maximum length of the line that will be parsed (16kb)
int64 watch_plugin_buffer_size = 45;

// Period in seconds when to produce a backup
int64 backup_period_seconds = 47;
}

// Configures crypto preferences
Expand Down
28 changes: 28 additions & 0 deletions paths/backups.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package paths

import (
"strings"
"time"

"www.velocidex.com/golang/velociraptor/file_store/api"
"www.velocidex.com/golang/velociraptor/utils"
)

type BackupsPathManager struct{}

func (self BackupsPathManager) CustomBackup(
name string) api.FSPathSpec {
return BACKUPS_ROOT.AddUnsafeChild(name).
SetType(api.PATH_TYPE_FILESTORE_DOWNLOAD_ZIP)
}

func (self BackupsPathManager) BackupFile() api.FSPathSpec {
now := utils.GetTime().Now().UTC()
return BACKUPS_ROOT.AddChild("backup_" +
strings.Replace(now.Format(time.RFC3339), ":", "_", -1)).
SetType(api.PATH_TYPE_FILESTORE_DOWNLOAD_ZIP)
}

func NewBackupPathManager() BackupsPathManager {
return BackupsPathManager{}
}
3 changes: 3 additions & 0 deletions paths/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ var (
NOTEBOOK_ROOT = path_specs.NewSafeDatastorePath("notebooks").
SetType(api.PATH_TYPE_DATASTORE_JSON)

BACKUPS_ROOT = path_specs.NewSafeFilestorePath("backups").
SetType(api.PATH_TYPE_FILESTORE_ANY)

DOWNLOADS_ROOT = path_specs.NewUnsafeFilestorePath("downloads").
SetType(api.PATH_TYPE_FILESTORE_DOWNLOAD_ZIP)

Expand Down
56 changes: 56 additions & 0 deletions services/backup.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package services

import (
"context"
"sync"

config_proto "www.velocidex.com/golang/velociraptor/config/proto"
"www.velocidex.com/golang/velociraptor/file_store/api"
"www.velocidex.com/golang/vfilter"
)

// Callers may register a backup provider to be included in the backup
type BackupProvider interface {
// The name of this provider
ProviderName() string

// The name of the result saved in the container
Name() []string

// Providers may write result sets into the backup. This will be
// called by the backup service to obtain a channel over which we
// can write the backup file (named in Name() above).
BackupResults(
ctx context.Context,
wg *sync.WaitGroup) (<-chan vfilter.Row, error)

// This is the opposite of backup - it allows a provider to
// recover from an existing backup. Typcially providers need to
// clear their data and read new data from this channel. The
// provider may return stats about its operation.
Restore(ctx context.Context, in <-chan vfilter.Row) (BackupStat, error)
}

// Alows each provider to report the stats of the most recent
// operation.
type BackupStat struct {
// Name of provider
Name string
Error error
Message string
}

type BackupService interface {
Register(provider BackupProvider)
RestoreBackup(export_path api.FSPathSpec) ([]BackupStat, error)
CreateBackup(export_path api.FSPathSpec) ([]BackupStat, error)
}

func GetBackupService(config_obj *config_proto.Config) (BackupService, error) {
org_manager, err := GetOrgManager()
if err != nil {
return nil, err
}

return org_manager.Services(config_obj.OrgId).BackupService()
}
269 changes: 269 additions & 0 deletions services/backup/backup.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,269 @@
package backup

import (
"archive/zip"
"bufio"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"strings"
"sync"
"time"

"github.com/Velocidex/ordereddict"
config_proto "www.velocidex.com/golang/velociraptor/config/proto"
"www.velocidex.com/golang/velociraptor/file_store"
"www.velocidex.com/golang/velociraptor/file_store/api"
"www.velocidex.com/golang/velociraptor/logging"
"www.velocidex.com/golang/velociraptor/paths"
"www.velocidex.com/golang/velociraptor/reporting"
"www.velocidex.com/golang/velociraptor/services"
"www.velocidex.com/golang/velociraptor/utils"
vql_subsystem "www.velocidex.com/golang/velociraptor/vql"
"www.velocidex.com/golang/vfilter"
)

type BackupService struct {
mu sync.Mutex
ctx context.Context
wg *sync.WaitGroup
config_obj *config_proto.Config

registrations []services.BackupProvider
}

func (self *BackupService) CreateBackup(
export_path api.FSPathSpec) (stats []services.BackupStat, err error) {

self.mu.Lock()
defer self.mu.Unlock()

logger := logging.GetLogger(self.config_obj, &logging.FrontendComponent)
start := utils.GetTime().Now()

// Create a container to hold the backup
file_store_factory := file_store.GetFileStore(self.config_obj)

// Delay shutdown until the file actually hits the disk
self.wg.Add(1)
fd, err := file_store_factory.WriteFileWithCompletion(
export_path, self.wg.Done)
if err != nil {
return nil, err
}

fd.Truncate()

// Create a container with the file.
container, err := reporting.NewContainerFromWriter(
self.config_obj, fd, "", 5, nil)
if err != nil {
fd.Close()
return nil, err
}

defer func() {
zip_stats := container.Stats()

logger.Info("BackupService: <green>Completed Backup to %v (size %v) in %v</>",
export_path.String(), zip_stats.TotalCompressedBytes,
utils.GetTime().Now().Sub(start))

stats = append(stats, services.BackupStat{
Name: "BackupService",
Message: fmt.Sprintf("Completed Backup to %v (size %v) in %v",
export_path.String(), zip_stats.TotalCompressedBytes,
utils.GetTime().Now().Sub(start)),
})

}()

defer container.Close()

// Now we can dump all providers into the file.
scope := vql_subsystem.MakeScope()

for _, provider := range self.registrations {
dest := strings.Join(provider.Name(), "/")
stat := services.BackupStat{
Name: provider.ProviderName(),
}

rows, err := provider.BackupResults(self.ctx, self.wg)
if err != nil {
logger.Info("BackupService: <red>Error writing to %v: %v",
dest, err)

stat.Error = err
stats = append(stats, stat)
continue
}

// Write the results to the container now
total_rows, err := container.WriteResultSet(self.ctx, self.config_obj,
scope, reporting.ContainerFormatJson, dest, rows)
if err != nil {
logger.Info("BackupService: <red>Error writing to %v: %v",
dest, err)
stat.Error = err
stats = append(stats, stat)
continue
}

stat.Message = fmt.Sprintf("Wrote %v rows", total_rows)
stats = append(stats, stat)
}

return stats, nil
}

// Opens a backup file and recovers all the data in it.
func (self *BackupService) RestoreBackup(
export_path api.FSPathSpec) (stats []services.BackupStat, err error) {
// Create a container to hold the backup
file_store_factory := file_store.GetFileStore(self.config_obj)

// Delay shutdown until the file actually hits the disk
fd, err := file_store_factory.ReadFile(export_path)
if err != nil {
return nil, err
}
defer fd.Close()

fd_stats, err := fd.Stat()
if err != nil {
return nil, err
}

zip_reader, err := zip.NewReader(
utils.MakeReaderAtter(fd), fd_stats.Size())
if err != nil {
return nil, err
}

logger := logging.GetLogger(self.config_obj, &logging.FrontendComponent)

for _, provider := range self.registrations {
stat, err := self.feedProvider(provider, zip_reader)
if err != nil {
dest := strings.Join(provider.Name(), "/")
logger.Info("BackupService: <red>Error restoring to %v: %v",
dest, err)
}
stats = append(stats, stat)
}

return stats, nil
}

func (self *BackupService) feedProvider(
provider services.BackupProvider,
container *zip.Reader) (stat services.BackupStat, err error) {
dest := strings.Join(provider.Name(), "/")
member, err := container.Open(dest)
if err != nil {
return stat, err
}
defer member.Close()

reader := bufio.NewReader(member)

// Wait for the provider to finish before we go to the next
// provider.
wg := &sync.WaitGroup{}
defer wg.Wait()

output := make(chan vfilter.Row)
defer close(output)

sub_ctx, cancel := context.WithCancel(self.ctx)

// Feed the provider in the background
wg.Add(1)
go func() {
defer wg.Done()
defer cancel()

// Preserve the provider error as our return
stat, err = provider.Restore(sub_ctx, output)
stat.Name = provider.ProviderName()
if err != nil {
stat.Error = err
}
}()

// Now dump the rows into the provider.
for {
row_data, err := reader.ReadBytes('\n')
if len(row_data) == 0 || errors.Is(err, io.EOF) {
return stat, nil
}
if err != nil {
return stat, err
}

row := ordereddict.NewDict()
err = json.Unmarshal(row_data, &row)
if err != nil {
return stat, err
}

select {
case <-sub_ctx.Done():
return stat, nil
case output <- row:
}
}
}

func (self *BackupService) Register(provider services.BackupProvider) {
self.mu.Lock()
defer self.mu.Unlock()

self.registrations = append(self.registrations, provider)
}

func NewBackupService(
ctx context.Context,
wg *sync.WaitGroup,
config_obj *config_proto.Config) services.BackupService {

result := &BackupService{
ctx: ctx,
wg: wg,
config_obj: config_obj,
}

// Every day
delay := time.Hour * 24
if config_obj.Defaults != nil &&
config_obj.Defaults.BackupPeriodSeconds > 0 {
delay = time.Duration(
config_obj.Defaults.BackupPeriodSeconds) * time.Second
}

logger := logging.GetLogger(config_obj, &logging.FrontendComponent)
logger.Info("Starting <green>Backup Services</> for %v",
services.GetOrgName(config_obj))

wg.Add(1)
go func() {
defer wg.Done()

for {
select {
case <-ctx.Done():
return

case <-utils.GetTime().After(delay):
export_path := paths.NewBackupPathManager().
BackupFile()
result.CreateBackup(export_path)
}
}
}()

return result
}
Loading

0 comments on commit bcec615

Please sign in to comment.