Skip to content

Commit

Permalink
Merge pull request #164 from sgotti/datamanager_dont_create_ost_check…
Browse files Browse the repository at this point in the history
…pointed_files

datamanager: don't create ost wal checkpointed files
  • Loading branch information
sgotti committed Nov 7, 2019
2 parents ae8eec9 + acd62a3 commit 33e2d50
Showing 1 changed file with 10 additions and 60 deletions.
70 changes: 10 additions & 60 deletions internal/datamanager/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package datamanager

import (
"bytes"
"container/ring"
"context"
"encoding/json"
"fmt"
Expand Down Expand Up @@ -151,10 +150,8 @@ func (d *DataManager) ReadWalData(walFileID string) (io.ReadCloser, error) {
}

type WalFile struct {
WalSequence string
Err error
Committed bool
Checkpointed bool
WalSequence string
Err error
}

func (d *DataManager) ListOSTWals(start string) <-chan *WalFile {
Expand All @@ -181,36 +178,25 @@ func (d *DataManager) ListOSTWals(start string) <-chan *WalFile {

name := path.Base(object.Path)
ext := path.Ext(name)
// accept only ".committed" files (skip old files that had ".checkpointed" extensions)
if ext != ".committed" {
continue
}

walSequence := strings.TrimSuffix(name, ext)

// wal file refers to another wal, so return the current one
if curWal.WalSequence != walSequence {
// if this happen something is wrong on the objectstorage
if !curWal.Committed && curWal.Checkpointed {
walCh <- &WalFile{
Err: errors.Errorf("wal is checkpointed but not committed. this should never happen"),
}
return
}

if curWal.WalSequence != "" {
// skip not committed wals
if curWal.Committed {
walCh <- curWal
}
walCh <- curWal
}

curWal = &WalFile{
WalSequence: walSequence,
}
}

if ext == ".committed" {
curWal.Committed = true
}
if ext == ".checkpointed" {
curWal.Checkpointed = true
}
}

if curWal.WalSequence != "" {
walCh <- curWal
}
Expand Down Expand Up @@ -627,13 +613,6 @@ func (d *DataManager) sync(ctx context.Context) error {
if !tresp.Succeeded {
return errors.Errorf("failed to write committedstorage wal: concurrent update")
}
case WalStatusCheckpointed:
walFilePath := d.storageWalStatusFile(walData.WalSequence)
d.log.Debugf("checkpointing committed wal to storage")
walFileCheckpointedPath := walFilePath + ".checkpointed"
if err := d.ost.WriteObject(walFileCheckpointedPath, bytes.NewReader([]byte{}), 0, true); err != nil {
return err
}
}
}
return nil
Expand Down Expand Up @@ -963,9 +942,6 @@ func (d *DataManager) InitEtcd(ctx context.Context, dataStatus *DataStatus) erro
WalStatus: WalStatusCommittedStorage,
PreviousWalSequence: header.PreviousWalSequence,
}
if wal.Checkpointed {
walData.WalStatus = WalStatusCheckpointed
}
walDataj, err := json.Marshal(walData)
if err != nil {
return err
Expand Down Expand Up @@ -1057,8 +1033,6 @@ func (d *DataManager) InitEtcd(ctx context.Context, dataStatus *DataStatus) erro
// if there're some wals in the objectstorage this means etcd has been reset.
// So take all the wals in committed or checkpointed state starting from the
// first not checkpointed wal and put them in etcd
lastCommittedStorageWalsRing := ring.New(100)
lastCommittedStorageWalElem := lastCommittedStorageWalsRing
lastCommittedStorageWalSequence := ""
wroteWals := 0
for wal := range d.ListOSTWals("") {
Expand All @@ -1075,37 +1049,13 @@ func (d *DataManager) InitEtcd(ctx context.Context, dataStatus *DataStatus) erro
continue
}

lastCommittedStorageWalElem.Value = wal
lastCommittedStorageWalElem = lastCommittedStorageWalElem.Next()
lastCommittedStorageWalSequence = wal.WalSequence
if wal.Checkpointed {
continue
}

if err := writeWal(wal); err != nil {
return err
}
wroteWals++
}

// if no wal has been written (because all are checkpointed), write at least
// the ones in the ring
if wroteWals == 0 {
var err error
lastCommittedStorageWalsRing.Do(func(e interface{}) {
if e == nil {
return
}
wal := e.(*WalFile)
err = writeWal(wal)
if err != nil {
return
}
lastCommittedStorageWalSequence = wal.WalSequence
})
if err != nil {
return err
}
}

// insert an empty wal and make it already committedstorage
Expand Down

0 comments on commit 33e2d50

Please sign in to comment.