forked from asonawalla/gazette
/
stores.go
154 lines (133 loc) · 4.72 KB
/
stores.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
package fragment
import (
"context"
"fmt"
"io"
"net/url"
"strings"
"time"
"github.com/LiveRamp/gazette/v2/pkg/metrics"
pb "github.com/LiveRamp/gazette/v2/pkg/protocol"
"github.com/gorilla/schema"
)
type backend interface {
Provider() string
SignGet(ep *url.URL, fragment pb.Fragment, d time.Duration) (string, error)
Exists(ctx context.Context, ep *url.URL, fragment pb.Fragment) (bool, error)
Open(ctx context.Context, ep *url.URL, fragment pb.Fragment) (io.ReadCloser, error)
Persist(ctx context.Context, ep *url.URL, spool Spool) error
List(ctx context.Context, store pb.FragmentStore, ep *url.URL, name pb.Journal, callback func(pb.Fragment)) error
Remove(ctx context.Context, fragment pb.Fragment) error
}
var sharedStores = struct {
s3 *s3Backend
gcs *gcsBackend
fs *fsBackend
}{
s3: newS3Backend(),
gcs: &gcsBackend{},
fs: &fsBackend{},
}
func getBackend(scheme string) backend {
switch scheme {
case "s3":
return sharedStores.s3
case "gs":
return sharedStores.gcs
case "file":
return sharedStores.fs
default:
panic("unsupported scheme: " + scheme)
}
}
// SignGetURL returns a URL authenticating the bearer to perform a GET operation
// of the Fragment for the provided Duration from the current time.
func SignGetURL(fragment pb.Fragment, d time.Duration) (string, error) {
var ep = fragment.BackingStore.URL()
var b = getBackend(ep.Scheme)
var signedURL, err = b.SignGet(ep, fragment, d)
instrumentStoreOp(b.Provider(), "get_signed_url", err)
return signedURL, err
}
// Open a Reader of the Fragment on the store. The returned ReadCloser does not
// perform any applicable client-side decompression, but does request server
// decompression in the case of GZIP_OFFLOAD_DECOMPRESSION.
func Open(ctx context.Context, fragment pb.Fragment) (io.ReadCloser, error) {
var ep = fragment.BackingStore.URL()
var b = getBackend(ep.Scheme)
var rc, err = b.Open(ctx, ep, fragment)
instrumentStoreOp(b.Provider(), "open", err)
return rc, err
}
// Persist a Spool to its store. If the Spool Fragment is already present,
// this is a no-op. If the Spool has not been compressed incrementally,
// it will be compressed before being persisted.
func Persist(ctx context.Context, spool Spool) error {
var ep = spool.Fragment.BackingStore.URL()
var b = getBackend(ep.Scheme)
var exists, err = b.Exists(ctx, ep, spool.Fragment.Fragment)
instrumentStoreOp(b.Provider(), "exist", err)
if err != nil {
return err
} else if exists {
return nil // All done.
}
// Ensure |compressedFile| is ready. This is a no-op if compressed incrementally.
if spool.CompressionCodec != pb.CompressionCodec_NONE {
spool.finishCompression()
}
err = b.Persist(ctx, ep, spool)
instrumentStoreOp(b.Provider(), "persist", err)
metrics.StorePersistedBytesTotal.WithLabelValues(b.Provider()).Add(float64(spool.ContentLength()))
return err
}
// List Fragments of the FragmentStore for a given journal. |callback| is
// invoked with each listed Fragment, and any returned error aborts the listing.
func List(ctx context.Context, store pb.FragmentStore, name pb.Journal, callback func(pb.Fragment)) error {
var ep = store.URL()
var b = getBackend(ep.Scheme)
var err = b.List(ctx, store, ep, name, callback)
instrumentStoreOp(b.Provider(), "list", err)
return err
}
// Remove |fragment| from its BackingStore.
func Remove(ctx context.Context, fragment pb.Fragment) error {
var b = getBackend(fragment.BackingStore.URL().Scheme)
var err = b.Remove(ctx, fragment)
instrumentStoreOp(b.Provider(), "remove", err)
return err
}
func parseStoreArgs(ep *url.URL, args interface{}) error {
var decoder = schema.NewDecoder()
decoder.IgnoreUnknownKeys(false)
if q, err := url.ParseQuery(ep.RawQuery); err != nil {
return err
} else if err = decoder.Decode(args, q); err != nil {
return fmt.Errorf("parsing store URL arguments: %s", err)
}
return nil
}
func instrumentStoreOp(provider, op string, err error) {
if err != nil {
metrics.StoreRequestTotal.WithLabelValues(provider, op, metrics.Fail).Inc()
} else {
metrics.StoreRequestTotal.WithLabelValues(provider, op, metrics.Ok).Inc()
}
}
// rewriterCfg holds a find/replace pair, often populated by parseStoreArgs()
// and provides a convenience function to rewrite the given path.
//
// It is meant to be embedded by other backend store configs.
type rewriterCfg struct {
Find, Replace string
}
// rewritePath replace the first occurrence of the find string with the replace
// string in journal path |j| and appends it to the fragment store path |s|,
// effectively rewriting the default Journal path. If find is empty or not
// found, the original |j| is appended.
func (cfg rewriterCfg) rewritePath(s, j string) string {
if cfg.Find == "" {
return s + j
}
return s + strings.Replace(j, cfg.Find, cfg.Replace, 1)
}