forked from influxdata/influxdb
-
Notifications
You must be signed in to change notification settings - Fork 0
/
stream.go
157 lines (129 loc) · 4.15 KB
/
stream.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
package tar
import (
"archive/tar"
"fmt"
"io"
"os"
"path/filepath"
"strings"
"time"
)
// Stream is a convenience function for creating a tar of a shard dir. It walks over the directory and subdirs,
// possibly writing each file to a tar writer stream. By default StreamFile is used, which will result in all files
// being written. A custom writeFunc can be passed so that each file may be written, modified+written, or skipped
// depending on the custom logic.
func Stream(w io.Writer, dir, relativePath string, writeFunc func(f os.FileInfo, shardRelativePath, fullPath string, tw *tar.Writer) error) error {
tw := tar.NewWriter(w)
defer tw.Close()
if writeFunc == nil {
writeFunc = StreamFile
}
return filepath.Walk(dir, func(path string, f os.FileInfo, err error) error {
if err != nil {
return err
}
// Skip adding an entry for the root dir
if dir == path && f.IsDir() {
return nil
}
// Figure out the the full relative path including any sub-dirs
subDir, _ := filepath.Split(path)
subDir, err = filepath.Rel(dir, subDir)
if err != nil {
return err
}
return writeFunc(f, filepath.Join(relativePath, subDir), path, tw)
})
}
// Generates a filtering function for Stream that checks an incoming file, and only writes the file to the stream if
// its mod time is later than since. Example: to tar only files newer than a certain datetime, use
// tar.Stream(w, dir, relativePath, SinceFilterTarFile(datetime))
func SinceFilterTarFile(since time.Time) func(f os.FileInfo, shardRelativePath, fullPath string, tw *tar.Writer) error {
return func(f os.FileInfo, shardRelativePath, fullPath string, tw *tar.Writer) error {
if f.ModTime().After(since) {
return StreamFile(f, shardRelativePath, fullPath, tw)
}
return nil
}
}
// stream a single file to tw, extending the header name using the shardRelativePath
func StreamFile(f os.FileInfo, shardRelativePath, fullPath string, tw *tar.Writer) error {
h, err := tar.FileInfoHeader(f, f.Name())
if err != nil {
return err
}
h.Name = filepath.ToSlash(filepath.Join(shardRelativePath, f.Name()))
if err := tw.WriteHeader(h); err != nil {
return err
}
if !f.Mode().IsRegular() {
return nil
}
fr, err := os.Open(fullPath)
if err != nil {
return err
}
defer fr.Close()
_, err = io.CopyN(tw, fr, h.Size)
return err
}
// Restore reads a tar archive from r and extracts all of its files into dir,
// using only the base name of each file.
func Restore(r io.Reader, dir string) error {
tr := tar.NewReader(r)
for {
if err := extractFile(tr, dir); err == io.EOF {
break
} else if err != nil {
return err
}
}
return syncDir(dir)
}
// extractFile copies the next file from tr into dir, using the file's base name.
func extractFile(tr *tar.Reader, dir string) error {
// Read next archive file.
hdr, err := tr.Next()
if err != nil {
return err
}
// The hdr.Name is the relative path of the file from the root data dir.
// e.g (db/rp/1/xxxxx.tsm or db/rp/1/index/xxxxxx.tsi)
sections := strings.Split(filepath.FromSlash(hdr.Name), string(filepath.Separator))
if len(sections) < 3 {
return fmt.Errorf("invalid archive path: %s", hdr.Name)
}
relativePath := filepath.Join(sections[3:]...)
subDir, _ := filepath.Split(relativePath)
// If this is a directory entry (usually just `index` for tsi), create it an move on.
if hdr.Typeflag == tar.TypeDir {
return os.MkdirAll(filepath.Join(dir, subDir), os.FileMode(hdr.Mode).Perm())
}
// Make sure the dir we need to write into exists. It should, but just double check in
// case we get a slightly invalid tarball.
if subDir != "" {
if err := os.MkdirAll(filepath.Join(dir, subDir), 0755); err != nil {
return err
}
}
destPath := filepath.Join(dir, relativePath)
tmp := destPath + ".tmp"
// Create new file on disk.
f, err := os.OpenFile(tmp, os.O_CREATE|os.O_RDWR, os.FileMode(hdr.Mode).Perm())
if err != nil {
return err
}
defer f.Close()
// Copy from archive to the file.
if _, err := io.CopyN(f, tr, hdr.Size); err != nil {
return err
}
// Sync to disk & close.
if err := f.Sync(); err != nil {
return err
}
if err := f.Close(); err != nil {
return err
}
return renameFile(tmp, destPath)
}