/
storages.go
164 lines (143 loc) · 5.17 KB
/
storages.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
package storages
import (
"fmt"
"net/http"
watchdogConfig "github.com/allegro/akubra/internal/akubra/watchdog/config"
"github.com/allegro/akubra/internal/akubra/balancing"
"github.com/allegro/akubra/internal/akubra/httphandler"
"github.com/allegro/akubra/internal/akubra/log"
"github.com/allegro/akubra/internal/akubra/watchdog"
"github.com/allegro/akubra/internal/akubra/storages/auth"
"github.com/allegro/akubra/internal/akubra/storages/config"
"github.com/allegro/akubra/internal/akubra/storages/merger"
)
// ClusterStorage is basic cluster storage interface
type ClusterStorage interface {
GetShard(name string) (NamedShardClient, error)
MergeShards(name string, clusters ...NamedShardClient) NamedShardClient
}
// Storages config
type Storages struct {
clustersConf config.ShardsMap
storagesMap config.StoragesMap
ShardClients map[string]NamedShardClient
Backends map[string]*StorageClient
watchdog watchdog.ConsistencyWatchdog
shardFactory *shardFactory
}
// GetShard gets cluster by name or nil if cluster with given name was not found
func (st *Storages) GetShard(name string) (NamedShardClient, error) {
s3cluster, ok := st.ShardClients[name]
if ok {
return s3cluster, nil
}
return &ShardClient{}, fmt.Errorf("no such shard defined %q", name)
}
// MergeShards extends Clusters list of Storages by cluster made of joined clusters backends and returns it.
// If cluster of given name is already defined returns previously defined cluster instead.
func (st *Storages) MergeShards(name string, clusters ...NamedShardClient) NamedShardClient {
cluster, ok := st.ShardClients[name]
if ok {
return cluster
}
backendsNames := make([]string, 0)
for _, cluster := range clusters {
for _, backend := range cluster.Backends() {
backendsNames = append(backendsNames, backend.Name)
}
}
log.Debugf("Backend names %v\n", backendsNames)
sCluster, err := st.shardFactory.newShard(name, backendsNames, st.Backends)
if err != nil {
log.Fatalf("Initialization of region cluster %s failed reason: %s", name, err)
}
st.ShardClients[name] = sCluster
return sCluster
}
// Factory creates storages
type Factory struct {
transport http.RoundTripper
watchdog watchdog.ConsistencyWatchdog
shardFactory *shardFactory
}
//NewStoragesFactory creates StoragesFactory
func NewStoragesFactory(transport http.RoundTripper, watchdogConfig *watchdogConfig.WatchdogConfig,
watchdog watchdog.ConsistencyWatchdog, watchdogRequestFactory watchdog.ConsistencyRecordFactory) *Factory {
return &Factory{
transport: transport,
watchdog: watchdog,
shardFactory: &shardFactory{
watchdog: watchdog,
watchdogConfig: watchdogConfig,
consistencyRecordFactory: watchdogRequestFactory,
},
}
}
// InitStorages setups storages
func (factory *Factory) InitStorages(clustersConf config.ShardsMap, storagesMap config.StoragesMap, ignoredHeaders map[string]bool) (ClusterStorage, error) {
shards := make(map[string]NamedShardClient)
storageClients := make(map[string]*StorageClient)
if len(storagesMap) == 0 {
return nil, fmt.Errorf("empty map 'storagesMap' in 'InitStorages'")
}
for name, storage := range storagesMap {
if storage.Maintenance {
log.Printf("storage %q in maintenance mode", name)
}
decoratedBackend, err := decorateBackend(factory.transport, name, storage, ignoredHeaders)
if err != nil {
return nil, err
}
storageClients[name] = decoratedBackend
}
if len(clustersConf) == 0 {
return nil, fmt.Errorf("empty map 'clustersConf' in 'InitStorages'")
}
for name, clusterConf := range clustersConf {
cluster, err := factory.shardFactory.newShard(name, storageNames(clusterConf), storageClients)
cluster.balancer = balancing.NewBalancerPrioritySet(clusterConf.Storages, convertToRoundTrippersMap(storageClients))
if err != nil {
return nil, err
}
shards[name] = cluster
}
return &Storages{
clustersConf: clustersConf,
storagesMap: storagesMap,
ShardClients: shards,
Backends: storageClients,
shardFactory: factory.shardFactory,
}, nil
}
func convertToRoundTrippersMap(backends map[string]*StorageClient) map[string]http.RoundTripper {
newMap := map[string]http.RoundTripper{}
for key, backend := range backends {
newMap[key] = backend
}
return newMap
}
func storageNames(conf config.Shard) []string {
names := make([]string, 0)
for _, storageConfig := range conf.Storages {
names = append(names, storageConfig.Name)
}
return names
}
func decorateBackend(transport http.RoundTripper, name string, storageDef config.Storage, ignoredCanonicalizedHeaders map[string]bool) (*StorageClient, error) {
errPrefix := fmt.Sprintf("initialization of backend '%s' resulted with error", name)
decoratorFactory, ok := auth.Decorators[storageDef.Type]
if !ok {
return nil, fmt.Errorf("%s: no decorator defined for type '%s'", errPrefix, storageDef.Type)
}
decorator, err := decoratorFactory(name, storageDef, ignoredCanonicalizedHeaders)
if err != nil {
return nil, fmt.Errorf("%s: %q", errPrefix, err)
}
backend := &StorageClient{
RoundTripper: httphandler.Decorate(transport, decorator, merger.ListV2Interceptor),
Endpoint: *storageDef.Backend.URL,
Storage: storageDef,
Name: name,
}
return backend, nil
}