forked from longhorn/sparse-tools
/
sfold.go
139 lines (117 loc) · 4.01 KB
/
sfold.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
package sparse
import (
"context"
"fmt"
"os"
"sync"
"sync/atomic"
"time"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
)
const (
defaultCoalesceWorkerCount = 4
)
// FoldFile folds child snapshot data into its parent
func FoldFile(childFileName, parentFileName string, ops FileHandlingOperations) error {
childFInfo, err := os.Stat(childFileName)
if err != nil {
return errors.Wrapf(err, "failed to get file info of child file %v", childFileName)
}
parentFInfo, err := os.Stat(parentFileName)
if err != nil {
return errors.Wrapf(err, "failed to get file info of parent file %v", parentFileName)
}
// ensure no directory
if childFInfo.IsDir() || parentFInfo.IsDir() {
return fmt.Errorf("at least one file is directory, not a normal file")
}
// may be caused by the expansion
if childFInfo.Size() != parentFInfo.Size() {
if childFInfo.Size() < parentFInfo.Size() {
return fmt.Errorf("file sizes are not equal and the parent file is larger than the child file")
}
if err := os.Truncate(parentFileName, childFInfo.Size()); err != nil {
return errors.Wrap(err, "failed to expand the parent file size before coalesce")
}
}
// open child and parent files
childFileIo, err := NewDirectFileIoProcessor(childFileName, os.O_RDONLY, 0)
if err != nil {
return errors.Wrap(err, "failed to open childFile")
}
defer childFileIo.Close()
parentFileIo, err := NewDirectFileIoProcessor(parentFileName, os.O_WRONLY, 0)
if err != nil {
return errors.Wrap(err, "failed to open parentFile")
}
defer parentFileIo.Close()
return coalesce(parentFileIo, childFileIo, childFInfo.Size(), ops)
}
func coalesce(parentFileIo, childFileIo FileIoProcessor, fileSize int64, ops FileHandlingOperations) (err error) {
progress := new(uint32)
progressMutex := &sync.Mutex{}
defer func() {
if err != nil {
log.Errorf(err.Error())
updateProgress(progress, atomic.LoadUint32(progress), true, err, progressMutex, ops)
} else {
updateProgress(progress, progressComplete, true, nil, progressMutex, ops)
}
}()
blockSize, err := getFileSystemBlockSize(childFileIo)
if err != nil {
return errors.Wrap(err, "can't get FS block size")
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
syncStartTime := time.Now()
out, errc, err := GetFileLayout(ctx, childFileIo)
if err != nil {
return errors.Wrapf(err, "failed to retrieve file layout for file %v", childFileIo.Name())
}
processSegment := func(segment FileInterval) error {
batch := batchBlockCount * blockSize
buffer := AllocateAligned(batch)
if segment.Kind == SparseData {
for offset := segment.Begin; offset < segment.End; {
var n int
size := batch
if offset+int64(size) > segment.End {
size = int(segment.End - offset)
}
// read a batch from child
n, err := childFileIo.ReadAt(buffer[:size], offset)
if err != nil {
return errors.Wrapf(err, "failed to read childFile filename: %v, size: %v, at: %v",
childFileIo.Name(), size, offset)
}
// write a batch to parent
n, err = parentFileIo.WriteAt(buffer[:size], offset)
if err != nil {
return errors.Wrapf(err, "failed to write to parentFile filename: %v, size: %v, at: %v",
parentFileIo.Name(), size, offset)
}
offset += int64(n)
}
newProgress := uint32(float64(segment.End) / float64(fileSize) * 100)
updateProgress(progress, newProgress, false, nil, progressMutex, ops)
}
return nil
}
errorChannels := []<-chan error{errc}
for i := 0; i < defaultCoalesceWorkerCount; i++ {
errorChannels = append(errorChannels, processFileIntervals(ctx, out, processSegment))
}
// the below select will exit once all error channels are closed, or a single
// channel has run into an error, which will lead to the ctx being cancelled
mergedErrc := mergeErrorChannels(ctx, errorChannels...)
select {
case err = <-mergedErrc:
break
}
log.Debugf("Finished fold for parent %v, child %v, size %v, elapsed %.2fs",
parentFileIo.Name(), childFileIo.Name(), fileSize,
time.Since(syncStartTime).Seconds())
return err
}