Skip to content

Commit

Permalink
datamanager: add data sequence to data file name
Browse files Browse the repository at this point in the history
When creating a datafile name make it start with the current data sequence. This
is useful in future to know which data sequence created a new data file.
  • Loading branch information
sgotti committed Nov 4, 2019
1 parent e034661 commit 4b4416f
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 12 deletions.
18 changes: 11 additions & 7 deletions internal/datamanager/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ func (w walActions) Len() int { return len(w) }
func (w walActions) Less(i, j int) bool { return w[i].ID < w[j].ID }
func (w walActions) Swap(i, j int) { w[i], w[j] = w[j], w[i] }

func (d *DataManager) dataFileID(dataSequence *sequence.Sequence, next string) string {
return fmt.Sprintf("%s-%s", dataSequence.String(), next)
}

func (d *DataManager) walIndex(ctx context.Context, wals []*WalData) (walIndex, error) {
wimap := map[string]map[string]*Action{}

Expand Down Expand Up @@ -168,7 +172,7 @@ func (d *DataManager) writeDataSnapshot(ctx context.Context, wals []*WalData) er
if curDataStatus != nil {
curDataStatusFiles = curDataStatus.Files[dataType]
}
dataStatusFiles, err := d.writeDataType(ctx, wi, dataType, curDataStatusFiles)
dataStatusFiles, err := d.writeDataType(ctx, wi, dataType, dataSequence, curDataStatusFiles)
if err != nil {
return err
}
Expand All @@ -179,7 +183,7 @@ func (d *DataManager) writeDataSnapshot(ctx context.Context, wals []*WalData) er
if err != nil {
return err
}
if err := d.ost.WriteObject(d.dataStatusPath(dataSequence.String()), bytes.NewReader(dataStatusj), int64(len(dataStatusj)), true); err != nil {
if err := d.ost.WriteObject(d.dataStatusPath(dataSequence), bytes.NewReader(dataStatusj), int64(len(dataStatusj)), true); err != nil {
return err
}

Expand Down Expand Up @@ -285,7 +289,7 @@ func (d *DataManager) actionGroups(ctx context.Context, wi walIndex, dataType st
return actionGroups, remainingDataStatusFiles
}

func (d *DataManager) writeDataType(ctx context.Context, wi walIndex, dataType string, curDataStatusFiles []*DataStatusFile) ([]*DataStatusFile, error) {
func (d *DataManager) writeDataType(ctx context.Context, wi walIndex, dataType string, dataSequence *sequence.Sequence, curDataStatusFiles []*DataStatusFile) ([]*DataStatusFile, error) {
type SplitPoint struct {
pos int64
lastEntryID string
Expand Down Expand Up @@ -443,7 +447,7 @@ func (d *DataManager) writeDataType(ctx context.Context, wi walIndex, dataType s
}
dataFileIndexes = append(dataFileIndexes, dataFileIndex)
for i, sp := range splitPoints {
curDataFileID := uuid.NewV4().String()
curDataFileID := d.dataFileID(dataSequence, uuid.NewV4().String())
if err := d.writeDataFile(ctx, &buf, sp.pos-curPos, dataFileIndexes[i], curDataFileID, dataType); err != nil {
return nil, err
}
Expand Down Expand Up @@ -629,7 +633,7 @@ func (d *DataManager) Import(ctx context.Context, r io.Reader) error {

err := dec.Decode(&de)
if err == io.EOF {
dataFileID := uuid.NewV4().String()
dataFileID := d.dataFileID(dataSequence, uuid.NewV4().String())
if err := d.writeDataFile(ctx, &buf, int64(buf.Len()), dataFileIndex, dataFileID, curDataType); err != nil {
return err
}
Expand Down Expand Up @@ -663,7 +667,7 @@ func (d *DataManager) Import(ctx context.Context, r io.Reader) error {
}

if mustWrite {
dataFileID := uuid.NewV4().String()
dataFileID := d.dataFileID(dataSequence, uuid.NewV4().String())
if err := d.writeDataFile(ctx, &buf, int64(buf.Len()), dataFileIndex, dataFileID, curDataType); err != nil {
return err
}
Expand Down Expand Up @@ -709,7 +713,7 @@ func (d *DataManager) Import(ctx context.Context, r io.Reader) error {
if err != nil {
return err
}
if err := d.ost.WriteObject(d.dataStatusPath(dataSequence.String()), bytes.NewReader(dataStatusj), int64(len(dataStatusj)), true); err != nil {
if err := d.ost.WriteObject(d.dataStatusPath(dataSequence), bytes.NewReader(dataStatusj), int64(len(dataStatusj)), true); err != nil {
return err
}

Expand Down
19 changes: 14 additions & 5 deletions internal/datamanager/datamanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"agola.io/agola/internal/etcd"
"agola.io/agola/internal/objectstorage"
"agola.io/agola/internal/sequence"

"go.uber.org/zap"
errors "golang.org/x/xerrors"
Expand Down Expand Up @@ -158,16 +159,24 @@ func (d *DataManager) storageDataDir() string {
return path.Join(d.basePath, storageDataDir)
}

func (d *DataManager) dataStatusPath(sequence string) string {
func (d *DataManager) dataStatusPath(sequence *sequence.Sequence) string {
return fmt.Sprintf("%s/%s.status", d.storageDataDir(), sequence)
}

func (d *DataManager) DataFileIndexPath(dataType, id string) string {
return fmt.Sprintf("%s/%s/%s.index", d.storageDataDir(), dataType, id)
func (d *DataManager) DataTypeDir(dataType string) string {
return fmt.Sprintf("%s/%s", d.storageDataDir(), dataType)
}

func (d *DataManager) DataFilePath(dataType, id string) string {
return fmt.Sprintf("%s/%s/%s.data", d.storageDataDir(), dataType, id)
func (d *DataManager) DataFileBasePath(dataType, name string) string {
return fmt.Sprintf("%s/%s", d.DataTypeDir(dataType), name)
}

func (d *DataManager) DataFileIndexPath(dataType, name string) string {
return fmt.Sprintf("%s.index", d.DataFileBasePath(dataType, name))
}

func (d *DataManager) DataFilePath(dataType, name string) string {
return fmt.Sprintf("%s.data", d.DataFileBasePath(dataType, name))
}

func etcdWalKey(walSeq string) string {
Expand Down

0 comments on commit 4b4416f

Please sign in to comment.