forked from thanos-io/thanos
/
block.go
138 lines (114 loc) · 4.62 KB
/
block.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
// Package block contains common functionality for interacting with TSDB blocks
// in the context of Thanos.
package block
import (
"context"
"encoding/json"
"os"
"path"
"path/filepath"
"github.com/improbable-eng/thanos/pkg/block/metadata"
"fmt"
"github.com/go-kit/kit/log"
"github.com/improbable-eng/thanos/pkg/objstore"
"github.com/improbable-eng/thanos/pkg/runutil"
"github.com/oklog/ulid"
"github.com/pkg/errors"
)
const (
// MetaFilename is the known JSON filename for meta information.
MetaFilename = "meta.json"
// IndexFilename is the known index file for block index.
IndexFilename = "index"
// ChunksDirname is the known dir name for chunks with compressed samples.
ChunksDirname = "chunks"
// DebugMetas is a directory for debug meta files that happen in the past. Useful for debugging.
DebugMetas = "debug/metas"
)
// Download downloads directory that is mean to be block directory.
func Download(ctx context.Context, logger log.Logger, bucket objstore.Bucket, id ulid.ULID, dst string) error {
if err := objstore.DownloadDir(ctx, logger, bucket, id.String(), dst); err != nil {
return err
}
chunksDir := filepath.Join(dst, ChunksDirname)
_, err := os.Stat(chunksDir)
if os.IsNotExist(err) {
// This can happen if block is empty. We cannot easily upload empty directory, so create one here.
return os.Mkdir(chunksDir, os.ModePerm)
}
if err != nil {
return errors.Wrapf(err, "stat %s", chunksDir)
}
return nil
}
// Upload uploads block from given block dir that ends with block id.
// It makes sure cleanup is done on error to avoid partial block uploads.
// It also verifies basic features of Thanos block.
// TODO(bplotka): Ensure bucket operations have reasonable backoff retries.
func Upload(ctx context.Context, logger log.Logger, bkt objstore.Bucket, bdir string) error {
df, err := os.Stat(bdir)
if err != nil {
return errors.Wrap(err, "stat bdir")
}
if !df.IsDir() {
return errors.Errorf("%s is not a directory", bdir)
}
// Verify dir.
id, err := ulid.Parse(df.Name())
if err != nil {
return errors.Wrap(err, "not a block dir")
}
meta, err := metadata.Read(bdir)
if err != nil {
// No meta or broken meta file.
return errors.Wrap(err, "read meta")
}
if meta.Thanos.Labels == nil || len(meta.Thanos.Labels) == 0 {
return errors.Errorf("empty external labels are not allowed for Thanos block.")
}
if err := objstore.UploadFile(ctx, logger, bkt, path.Join(bdir, MetaFilename), path.Join(DebugMetas, fmt.Sprintf("%s.json", id))); err != nil {
return errors.Wrap(err, "upload meta file to debug dir")
}
if err := objstore.UploadDir(ctx, logger, bkt, path.Join(bdir, ChunksDirname), path.Join(id.String(), ChunksDirname)); err != nil {
return cleanUp(bkt, id, errors.Wrap(err, "upload chunks"))
}
if err := objstore.UploadFile(ctx, logger, bkt, path.Join(bdir, IndexFilename), path.Join(id.String(), IndexFilename)); err != nil {
return cleanUp(bkt, id, errors.Wrap(err, "upload index"))
}
// Meta.json always need to be uploaded as a last item. This will allow to assume block directories without meta file
// to be pending uploads.
if err := objstore.UploadFile(ctx, logger, bkt, path.Join(bdir, MetaFilename), path.Join(id.String(), MetaFilename)); err != nil {
return cleanUp(bkt, id, errors.Wrap(err, "upload meta file"))
}
return nil
}
func cleanUp(bkt objstore.Bucket, id ulid.ULID, err error) error {
// Cleanup the dir with an uncancelable context.
cleanErr := Delete(context.Background(), bkt, id)
if cleanErr != nil {
return errors.Wrapf(err, "failed to clean block after upload issue. Partial block in system. Err: %s", err.Error())
}
return err
}
// Delete removes directory that is mean to be block directory.
// NOTE: Prefer this method instead of objstore.Delete to avoid deleting empty dir (whole bucket) by mistake.
func Delete(ctx context.Context, bucket objstore.Bucket, id ulid.ULID) error {
return objstore.DeleteDir(ctx, bucket, id.String())
}
// DownloadMeta downloads only meta file from bucket by block ID.
func DownloadMeta(ctx context.Context, logger log.Logger, bkt objstore.Bucket, id ulid.ULID) (metadata.Meta, error) {
rc, err := bkt.Get(ctx, path.Join(id.String(), MetaFilename))
if err != nil {
return metadata.Meta{}, errors.Wrapf(err, "meta.json bkt get for %s", id.String())
}
defer runutil.CloseWithLogOnErr(logger, rc, "download meta bucket client")
var m metadata.Meta
if err := json.NewDecoder(rc).Decode(&m); err != nil {
return metadata.Meta{}, errors.Wrapf(err, "decode meta.json for block %s", id.String())
}
return m, nil
}
func IsBlockDir(path string) (id ulid.ULID, ok bool) {
id, err := ulid.Parse(filepath.Base(path))
return id, err == nil
}