Skip to content

Commit

Permalink
Compaction fix and gather optimisation
Browse files Browse the repository at this point in the history
* Compact was previously broken, because the compacted wals were being
  merged back into the full log in the Replay step. This change has
  pulled the compaction call into the same thread as the Replay calls,
  meaning we are able to purge the local log prior to replay (so only
  the compacted wal is persisted).

* Compaction now also incorporates sort and dedup to cater for any
  historically broken wals (more of a cleanup task rather than an
  ongoing catch).

* Fixed compaction to cater for new changes to Note handling (as we no
  longer use pointers to byte slices.

* Large optimisation to gather algorithm, and more centralising
  (previously it wasn't using the `pull` function, but now it is).
  • Loading branch information
Sambigeara committed Jan 3, 2022
1 parent 3cc82fa commit 154bb2c
Show file tree
Hide file tree
Showing 3 changed files with 156 additions and 164 deletions.
25 changes: 20 additions & 5 deletions pkg/service/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ func (r *DBListRepo) Start(client Client) error {
}()
}

walChan := make(chan []EventLog)
replayChan := make(chan []EventLog)
compactChan := make(chan []EventLog)

// We need atomicity between wal pull/replays and handling of keypress events, as we need
// events to operate on a predictable state (rather than a keypress being applied to state
Expand All @@ -37,8 +38,22 @@ func (r *DBListRepo) Start(client Client) error {
go func() {
for {
select {
case partialWal := <-walChan:
if err := r.Replay(partialWal); err != nil {
case wal := <-compactChan:
// merge in any events added between the wal being added to compactChan and now
wal = merge(wal, r.log)
wal, _ = compact(wal)
// r.log will contain the uncompressed log, and r.Replay attempts to merge the new
// wal with r.log (which will render the compaction pointless, as we just merge the
// full log back in), therefore set the eventlog to a new empty one.
// This is acceptable given that mutations to r.log only occur within this thread
r.log = []EventLog{}
if err := r.Replay(wal); err != nil {
errChan <- err
return
}
scheduleRefresh()
case wal := <-replayChan:
if err := r.Replay(wal); err != nil {
errChan <- err
return
}
Expand Down Expand Up @@ -66,8 +81,8 @@ func (r *DBListRepo) Start(client Client) error {

// To avoid blocking key presses on the main processing loop, run heavy sync ops in a separate
// loop, and only add to channel for processing if there's any changes that need syncing
// This is run after the goroutine above is triggered to ensure a thread is consuming from walChan
err := r.startSync(ctx, walChan)
// This is run after the goroutine above is triggered to ensure a thread is consuming from replayChan
err := r.startSync(ctx, replayChan, compactChan)
if err != nil {
return err
}
Expand Down
9 changes: 8 additions & 1 deletion pkg/service/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,10 +156,17 @@ func (wf *s3WalFile) RemoveWals(ctx context.Context, fileNames []string) error {

func (wf *s3WalFile) Flush(ctx context.Context, b *bytes.Buffer, randomUUID string) error {
fileName := fmt.Sprintf(path.Join(wf.GetRoot(), walFilePattern), randomUUID)
// IMPORTANT we need to take a copy here, because passing the bytes.Buffer as the Body io.Reader in
// UploadInput means that the buffer is emptied on read.
// In `gather`, we pass a single Buffer pointer to numerous `push` calls for efficiency - if this
// s3 method is called before other walFile Flush calls, the buffer will be empty
// TODO think about a less risky implementation of this...
bCopy := *b
_, err := wf.uploader.Upload(&s3manager.UploadInput{
Bucket: aws.String(wf.bucket),
Key: aws.String(fileName),
Body: b,
//Body: b,
Body: &bCopy,
})
if err != nil {
//exitErrorf("Unable to upload %q to %q, %v", fileName, wf.bucket, err)
Expand Down
Loading

0 comments on commit 154bb2c

Please sign in to comment.