/
util.go
174 lines (141 loc) · 4.39 KB
/
util.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
package util
import (
"context"
"fmt"
"io"
"os"
"runtime/debug"
"strings"
"github.com/cortexproject/cortex/pkg/chunk"
"github.com/cortexproject/cortex/pkg/chunk/local"
util_log "github.com/cortexproject/cortex/pkg/util/log"
"github.com/go-kit/kit/log/level"
"go.etcd.io/bbolt"
"github.com/credativ/vali/pkg/chunkenc"
)
type StorageClient interface {
GetObject(ctx context.Context, objectKey string) (io.ReadCloser, error)
}
// GetFileFromStorage downloads a file from storage to given location.
func GetFileFromStorage(ctx context.Context, storageClient StorageClient, objectKey, destination string) error {
readCloser, err := storageClient.GetObject(ctx, objectKey)
if err != nil {
return err
}
defer func() {
if err := readCloser.Close(); err != nil {
level.Error(util_log.Logger)
}
}()
f, err := os.Create(destination)
if err != nil {
return err
}
var objectReader io.Reader = readCloser
if strings.HasSuffix(objectKey, ".gz") {
decompressedReader := chunkenc.Gzip.GetReader(readCloser)
defer chunkenc.Gzip.PutReader(decompressedReader)
objectReader = decompressedReader
}
_, err = io.Copy(f, objectReader)
if err != nil {
return err
}
level.Info(util_log.Logger).Log("msg", fmt.Sprintf("downloaded file %s", objectKey))
return f.Sync()
}
func GetDBNameFromObjectKey(objectKey string) (string, error) {
ss := strings.Split(objectKey, "/")
if len(ss) != 2 {
return "", fmt.Errorf("invalid object key: %v", objectKey)
}
if ss[1] == "" {
return "", fmt.Errorf("empty db name, object key: %v", objectKey)
}
return ss[1], nil
}
func BuildObjectKey(tableName, uploader, dbName string) string {
// Files are stored with <table-name>/<uploader>-<db-name>
objectKey := fmt.Sprintf("%s/%s-%s", tableName, uploader, dbName)
// if the file is a migrated one then don't add its name to the object key otherwise we would re-upload them again here with a different name.
if tableName == dbName {
objectKey = fmt.Sprintf("%s/%s", tableName, uploader)
}
return objectKey
}
func CompressFile(src, dest string) error {
level.Info(util_log.Logger).Log("msg", "compressing the file", "src", src, "dest", dest)
uncompressedFile, err := os.Open(src)
if err != nil {
return err
}
defer func() {
if err := uncompressedFile.Close(); err != nil {
level.Error(util_log.Logger).Log("msg", "failed to close uncompressed file", "path", src, "err", err)
}
}()
compressedFile, err := os.Create(dest)
if err != nil {
return err
}
defer func() {
if err := compressedFile.Close(); err != nil {
level.Error(util_log.Logger).Log("msg", "failed to close compressed file", "path", dest, "err", err)
}
}()
compressedWriter := chunkenc.Gzip.GetWriter(compressedFile)
defer chunkenc.Gzip.PutWriter(compressedWriter)
_, err = io.Copy(compressedWriter, uncompressedFile)
if err != nil {
return err
}
err = compressedWriter.Close()
if err == nil {
return err
}
return compressedFile.Sync()
}
type result struct {
boltdb *bbolt.DB
err error
}
// SafeOpenBoltdbFile will recover from a panic opening a DB file, and return the panic message in the err return object.
func SafeOpenBoltdbFile(path string) (*bbolt.DB, error) {
result := make(chan *result)
// Open the file in a separate goroutine because we want to change
// the behavior of a Fault for just this operation and not for the
// calling goroutine
go safeOpenBoltDbFile(path, result)
res := <-result
return res.boltdb, res.err
}
func safeOpenBoltDbFile(path string, ret chan *result) {
// boltdb can throw faults which are not caught by recover unless we turn them into panics
debug.SetPanicOnFault(true)
res := &result{}
defer func() {
if r := recover(); r != nil {
res.err = fmt.Errorf("recovered from panic opening boltdb file: %v", r)
}
// Return the result object on the channel to unblock the calling thread
ret <- res
}()
b, err := local.OpenBoltdbFile(path)
res.boltdb = b
res.err = err
}
// RemoveDirectories will return a new slice with any StorageObjects identified as directories removed.
func RemoveDirectories(incoming []chunk.StorageObject) []chunk.StorageObject {
outgoing := make([]chunk.StorageObject, 0, len(incoming))
for _, o := range incoming {
if IsDirectory(o.Key) {
continue
}
outgoing = append(outgoing, o)
}
return outgoing
}
// IsDirectory will return true if the string ends in a forward slash
func IsDirectory(key string) bool {
return strings.HasSuffix(key, "/")
}