This repository has been archived by the owner on Sep 4, 2021. It is now read-only.
/
gcs.go
145 lines (124 loc) · 4.21 KB
/
gcs.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
package backend
import (
"context"
"crypto"
"crypto/rand"
"crypto/rsa"
"crypto/sha256"
"crypto/x509"
"encoding/pem"
"fmt"
"io"
"time"
"cloud.google.com/go/storage"
"github.com/flynn/flynn/pkg/postgres"
"github.com/flynn/flynn/pkg/random"
"golang.org/x/oauth2/google"
"google.golang.org/api/option"
)
func init() {
Backends["gcs"] = NewGCS
}
func NewGCS(name string, info map[string]string) (Backend, error) {
b := &gcsBackend{
name: name,
bucketName: info["bucket"],
}
keyJSON := []byte(info["key"])
if b.bucketName == "" {
return nil, fmt.Errorf("blobstore: missing Google Cloud Storage bucket param for %s", name)
}
if len(keyJSON) == 0 {
return nil, fmt.Errorf("blobstore: missing Google Cloud Storage key JSON param for %s", name)
}
jwtToken, err := google.JWTConfigFromJSON(keyJSON, "https://www.googleapis.com/auth/devstorage.read_write")
if err != nil {
return nil, fmt.Errorf("blobstore: error loading Google Cloud Storage JSON key: %s", err)
}
tokenSource := jwtToken.TokenSource(context.Background())
// Test getting an OAuth token so we can disambiguate an issue with the
// token and an issue with the bucket permissions below.
if _, err := tokenSource.Token(); err != nil {
return nil, fmt.Errorf("blobstore: error getting Google Cloud Storage OAuth token: %s", err)
}
pemBlock, _ := pem.Decode(jwtToken.PrivateKey)
privateKey, err := x509.ParsePKCS8PrivateKey(pemBlock.Bytes)
if err != nil {
return nil, fmt.Errorf("blobstore: error decoding Google Cloud Storage private key: %s", err)
}
rsaPrivateKey, ok := privateKey.(*rsa.PrivateKey)
if !ok {
return nil, fmt.Errorf("blobstore: unexpected Google Cloud Storage key type: %T", privateKey)
}
b.signOpts = func() *storage.SignedURLOptions {
return &storage.SignedURLOptions{
GoogleAccessID: jwtToken.Email,
SignBytes: func(b []byte) ([]byte, error) {
digest := sha256.Sum256(b)
return rsa.SignPKCS1v15(rand.Reader, rsaPrivateKey, crypto.SHA256, digest[:])
},
Method: "GET",
Expires: time.Now().Add(10 * time.Minute),
}
}
client, err := storage.NewClient(context.Background(), option.WithTokenSource(tokenSource))
if err != nil {
return nil, fmt.Errorf("blobstore: error creating Google Cloud Storage client: %s", err)
}
b.bucket = client.Bucket(b.bucketName)
_, err = b.bucket.Attrs(context.Background())
if err != nil {
return nil, fmt.Errorf("blobstore: error checking Google Cloud Storage bucket %q existence, ensure that it exists and Owner access for %s is included the bucket ACL: %q", b.bucketName, jwtToken.Email, err)
}
return b, nil
}
type gcsBackend struct {
name string
bucket *storage.BucketHandle
// parameters used for URL signing
bucketName string
signOpts func() *storage.SignedURLOptions
}
func (b *gcsBackend) Name() string {
return b.name
}
func (b *gcsBackend) Put(tx *postgres.DBTx, info FileInfo, r io.Reader, append bool) error {
if append {
// TODO(titanous): This is a hack, we should use resumable uploads.
existing, err := b.Open(tx, info, false)
if err != nil {
return err
}
r = io.MultiReader(existing, r)
}
info.ExternalID = random.UUID()
if err := tx.Exec("UPDATE files SET external_id = $2 WHERE file_id = $1", info.ID, info.ExternalID); err != nil {
return err
}
w := b.bucket.Object(info.ExternalID).NewWriter(context.Background())
w.ContentType = info.Type
if _, err := io.Copy(w, r); err != nil {
w.Close()
return err
}
return w.Close()
}
func (b *gcsBackend) Copy(tx *postgres.DBTx, dst, src FileInfo) error {
dst.ExternalID = random.UUID()
if err := tx.Exec("UPDATE files SET external_id = $2 WHERE file_id = $1", dst.ID, dst.ExternalID); err != nil {
return err
}
_, err := b.bucket.Object(dst.ExternalID).CopierFrom(b.bucket.Object(src.ExternalID)).Run(context.Background())
return err
}
func (b *gcsBackend) Delete(tx *postgres.DBTx, info FileInfo) error {
return b.bucket.Object(info.ExternalID).Delete(context.Background())
}
func (b *gcsBackend) Open(tx *postgres.DBTx, info FileInfo, txControl bool) (FileStream, error) {
if txControl {
// We don't need the database transaction, so clean it up
tx.Rollback()
}
url, err := storage.SignedURL(b.bucketName, info.ExternalID, b.signOpts())
return newRedirectFileStream(url), err
}