Skip to content

Commit

Permalink
datamanager: check wal previouswalsequence is correct in initEtcd
Browse files Browse the repository at this point in the history
  • Loading branch information
sgotti committed Nov 7, 2019
1 parent 33e2d50 commit 9c1f3b2
Show file tree
Hide file tree
Showing 2 changed files with 142 additions and 2 deletions.
132 changes: 132 additions & 0 deletions internal/datamanager/datamanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@ import (
"io"
"io/ioutil"
"os"
"path"
"reflect"
"sort"
"strings"
"testing"
"time"

Expand All @@ -33,6 +35,7 @@ import (
ostypes "agola.io/agola/internal/objectstorage/types"
"agola.io/agola/internal/testutil"
"github.com/google/go-cmp/cmp"
errors "golang.org/x/xerrors"

"go.uber.org/zap"
"go.uber.org/zap/zapcore"
Expand Down Expand Up @@ -171,6 +174,135 @@ func TestEtcdReset(t *testing.T) {
}
}

func TestEtcdResetWalsGap(t *testing.T) {
dir, err := ioutil.TempDir("", "agola")
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
defer os.RemoveAll(dir)

etcdDir, err := ioutil.TempDir(dir, "etcd")
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
tetcd := setupEtcd(t, etcdDir)
defer shutdownEtcd(tetcd)

ctx, cancel := context.WithCancel(context.Background())

ostDir, err := ioutil.TempDir(dir, "ost")
if err != nil {
t.Fatalf("unexpected err: %v", err)
}

ost, err := posix.New(ostDir)
if err != nil {
t.Fatalf("unexpected err: %v", err)
}

dmConfig := &DataManagerConfig{
BasePath: "basepath",
E: tetcd.TestEtcd.Store,
OST: objectstorage.NewObjStorage(ost, "/"),
EtcdWalsKeepNum: 10,
DataTypes: []string{"datatype01"},
}
dm, err := NewDataManager(ctx, logger, dmConfig)
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
dmReadyCh := make(chan struct{})

t.Logf("starting datamanager")
go func() { _ = dm.Run(ctx, dmReadyCh) }()
<-dmReadyCh

actions := []*Action{
{
ActionType: ActionTypePut,
DataType: "datatype01",
Data: []byte("{}"),
},
}

for i := 0; i < 20; i++ {
objectID := fmt.Sprintf("object%02d", i)
actions[0].ID = objectID
if _, err := dm.WriteWal(ctx, actions, nil); err != nil {
t.Fatalf("unexpected err: %v", err)
}
}

// wait for wal to be committed storage
time.Sleep(5 * time.Second)

t.Logf("stopping datamanager")
cancel()

t.Logf("stopping etcd")
// Reset etcd
shutdownEtcd(tetcd)
if err := tetcd.WaitDown(10 * time.Second); err != nil {
t.Fatalf("unexpected err: %v", err)
}
t.Logf("resetting etcd")
os.RemoveAll(etcdDir)
t.Logf("starting etcd")
tetcd = setupEtcd(t, etcdDir)
if err := tetcd.Start(); err != nil {
t.Fatalf("unexpected err: %v", err)
}
defer shutdownEtcd(tetcd)

// Remove a wal in the middle
doneCh := make(chan struct{})
defer close(doneCh)

walStatusFiles := []string{}
for object := range dm.ost.List(path.Join(dm.basePath, storageWalsStatusDir)+"/", "", true, doneCh) {
if object.Err != nil {
t.Fatalf("unexpected err: %v", err)
}

walStatusFiles = append(walStatusFiles, object.Path)
}
if len(walStatusFiles) < 20 {
t.Fatalf("exptected at least 20 wals, got: %d wals", len(walStatusFiles))
}

removeIndex := 10
if err := dm.ost.DeleteObject(walStatusFiles[removeIndex]); err != nil {
t.Fatalf("unexpected err: %v", err)
}
errorWalSequence := strings.TrimSuffix(path.Base(walStatusFiles[removeIndex+1]), path.Ext(walStatusFiles[removeIndex+1]))
prevWalSequence := strings.TrimSuffix(path.Base(walStatusFiles[removeIndex]), path.Ext(walStatusFiles[removeIndex]))
expectedPrevWalSequence := strings.TrimSuffix(path.Base(walStatusFiles[removeIndex-1]), path.Ext(walStatusFiles[removeIndex-1]))

ctx, cancel = context.WithCancel(context.Background())
defer cancel()
dmConfig = &DataManagerConfig{
BasePath: "basepath",
E: tetcd.TestEtcd.Store,
OST: objectstorage.NewObjStorage(ost, "/"),
EtcdWalsKeepNum: 10,
DataTypes: []string{"datatype01"},
}
dm, err = NewDataManager(ctx, logger, dmConfig)
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
dmReadyCh = make(chan struct{})

expectedErr := errors.Errorf("wal %q previousWalSequence %q is different than expected walSequence %q", errorWalSequence, prevWalSequence, expectedPrevWalSequence)
err = dm.InitEtcd(ctx, nil)
if err == nil {
t.Fatalf("expected err: %q, got nil error", expectedErr)
}
if expectedErr.Error() != err.Error() {
t.Fatalf("expected err: %q, got err %q", expectedErr, err)
}
}

func TestConcurrentUpdate(t *testing.T) {
dir, err := ioutil.TempDir("", "agola")
if err != nil {
Expand Down
12 changes: 10 additions & 2 deletions internal/datamanager/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -923,7 +923,7 @@ func (d *DataManager) etcdPinger(ctx context.Context) error {
}

func (d *DataManager) InitEtcd(ctx context.Context, dataStatus *DataStatus) error {
writeWal := func(wal *WalFile) error {
writeWal := func(wal *WalFile, prevWalSequence string) error {
walFile, err := d.ost.ReadObject(d.storageWalStatusFile(wal.WalSequence) + ".committed")
if err != nil {
return err
Expand All @@ -936,6 +936,12 @@ func (d *DataManager) InitEtcd(ctx context.Context, dataStatus *DataStatus) erro
}
walFile.Close()

if prevWalSequence != "" {
if header.PreviousWalSequence != "" && header.PreviousWalSequence != prevWalSequence {
return errors.Errorf("wal %q previousWalSequence %q is different than expected walSequence %q", wal.WalSequence, header.PreviousWalSequence, prevWalSequence)
}
}

walData := &WalData{
WalSequence: wal.WalSequence,
WalDataFileID: header.WalDataFileID,
Expand Down Expand Up @@ -1034,6 +1040,7 @@ func (d *DataManager) InitEtcd(ctx context.Context, dataStatus *DataStatus) erro
// So take all the wals in committed or checkpointed state starting from the
// first not checkpointed wal and put them in etcd
lastCommittedStorageWalSequence := ""
previousWalSequence := ""
wroteWals := 0
for wal := range d.ListOSTWals("") {
// if there're wals in ost but not a datastatus return an error
Expand All @@ -1051,9 +1058,10 @@ func (d *DataManager) InitEtcd(ctx context.Context, dataStatus *DataStatus) erro

lastCommittedStorageWalSequence = wal.WalSequence

if err := writeWal(wal); err != nil {
if err := writeWal(wal, previousWalSequence); err != nil {
return err
}
previousWalSequence = wal.WalSequence
wroteWals++

}
Expand Down

0 comments on commit 9c1f3b2

Please sign in to comment.