This repository has been archived by the owner on Sep 4, 2021. It is now read-only.
/
azure.go
144 lines (122 loc) · 3.7 KB
/
azure.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
package backend
import (
"bytes"
"crypto/md5"
"encoding/base64"
"fmt"
"io"
"time"
"github.com/Azure/azure-sdk-for-go/storage"
"github.com/flynn/flynn/pkg/postgres"
"github.com/flynn/flynn/pkg/random"
)
func init() {
Backends["azure"] = NewAzure
}
const azureMaxBlockSize = 4194304 // 4MiB
func NewAzure(name string, info map[string]string) (Backend, error) {
b := &azureBackend{
name: name,
container: info["container"],
}
accountName := info["account_name"]
accountKey := info["account_key"]
if b.container == "" {
return nil, fmt.Errorf("blobstore: missing Azure Storage container param for %s", name)
}
if accountName == "" {
return nil, fmt.Errorf("blobstore: missing Azure Storage account_name param for %s", name)
}
if accountKey == "" {
return nil, fmt.Errorf("blobstore: missing Azure Storage account_key param for %s", name)
}
client, err := storage.NewBasicClient(accountName, accountKey)
if err != nil {
return nil, fmt.Errorf("blobstore: error creating Azure Storage client %s: %s", name, err)
}
b.client = client.GetBlobService()
ok, err := b.client.ContainerExists(b.container)
if err != nil {
return nil, fmt.Errorf("blobstore: error checking if Azure Storage container %q exists for %s: %s", b.container, name, err)
}
if !ok {
return nil, fmt.Errorf("blobstore: Azure Storage container %q does not exists for %s", b.container, name)
}
return b, nil
}
type azureBackend struct {
name string
container string
client storage.BlobStorageClient
}
func (b *azureBackend) Name() string {
return b.name
}
func (b *azureBackend) Put(tx *postgres.DBTx, info FileInfo, r io.Reader, appendBlob bool) error {
if appendBlob {
// TODO(titanous): This is a hack, we should modify the block list.
existing, err := b.Open(tx, info, false)
if err != nil {
return err
}
r = io.MultiReader(existing, r)
}
info.ExternalID = random.UUID()
if err := tx.Exec("UPDATE files SET external_id = $2 WHERE file_id = $1", info.ID, info.ExternalID); err != nil {
return err
}
// Create blob that will be filled with blocks
if err := b.client.CreateBlockBlob(b.container, info.ExternalID); err != nil {
return err
}
var blocks []storage.Block
// Create blocks
buf := make([]byte, azureMaxBlockSize)
for {
n, err := io.ReadFull(r, buf)
if err == io.EOF {
break
}
if err != nil && err != io.ErrUnexpectedEOF {
return err
}
data := buf[:n]
md5sum := md5.Sum(data)
blockID := base64.StdEncoding.EncodeToString(random.Bytes(16))
if err := b.client.PutBlockWithLength(
b.container,
info.ExternalID,
blockID,
uint64(n),
bytes.NewReader(data),
map[string]string{"Content-MD5": base64.StdEncoding.EncodeToString(md5sum[:])},
); err != nil {
return err
}
blocks = append(blocks, storage.Block{ID: blockID, Status: storage.BlockStatusUncommitted})
if err == io.ErrUnexpectedEOF {
break
}
}
// Save the list of blocks to the blob
return b.client.PutBlockList(b.container, info.ExternalID, blocks)
}
func (b *azureBackend) Copy(tx *postgres.DBTx, dst, src FileInfo) error {
// The Copy Blob operation is asynchronous, so we do the copy here instead.
existing, err := b.Open(tx, src, false)
if err != nil {
return err
}
return b.Put(tx, dst, existing, false)
}
func (b *azureBackend) Delete(tx *postgres.DBTx, info FileInfo) error {
return b.client.DeleteBlob(b.container, info.ExternalID, nil)
}
func (b *azureBackend) Open(tx *postgres.DBTx, info FileInfo, txControl bool) (FileStream, error) {
if txControl {
// We don't need the database transaction, so clean it up
tx.Rollback()
}
url, err := b.client.GetBlobSASURI(b.container, info.ExternalID, time.Now().Add(10*time.Minute), "r")
return newRedirectFileStream(url), err
}