-
Notifications
You must be signed in to change notification settings - Fork 1
/
health_blobstore.go
153 lines (142 loc) · 4.5 KB
/
health_blobstore.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
/*
* Copyright 2020 The Magma Authors.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree.
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package storage
import (
"fmt"
"github.com/go-magma/magma/lib/go/protos"
fegprotos "github.com/go-magma/magma/modules/feg/cloud/go/protos"
"github.com/go-magma/magma/modules/feg/cloud/go/services/health"
"github.com/go-magma/magma/orc8r/cloud/go/blobstore"
"github.com/go-magma/magma/orc8r/cloud/go/storage"
"github.com/golang/glog"
)
type healthBlobstore struct {
factory blobstore.BlobStorageFactory
}
// NewHealthBlobstore creates a new HealthBlobstore using the provided
// blobstore factory for the underlying storage functionality.
func NewHealthBlobstore(factory blobstore.BlobStorageFactory) (HealthBlobstore, error) {
if factory == nil {
return nil, fmt.Errorf("Storage factory is nil")
}
return &healthBlobstore{
factory,
}, nil
}
// GetHealth fetches health status for the given networkID and gatewayID from
// the TransactionalBlobStorage.
func (h *healthBlobstore) GetHealth(networkID string, gatewayID string) (*fegprotos.HealthStats, error) {
store, err := h.factory.StartTransaction(nil)
if err != nil {
return nil, err
}
healthTypeAndKey := storage.TypeAndKey{
Type: health.HealthStatusType,
Key: gatewayID,
}
healthBlob, err := store.Get(networkID, healthTypeAndKey)
if err != nil {
store.Rollback()
return nil, err
}
retHealth := &fegprotos.HealthStats{}
err = protos.Unmarshal(healthBlob.Value, retHealth)
if err != nil {
store.Rollback()
return retHealth, err
}
return retHealth, store.Commit()
}
// UpdateHealth updates the given gateway's health status in the
// TransactionalBlobStorage.
func (h *healthBlobstore) UpdateHealth(networkID string, gatewayID string, healthStats *fegprotos.HealthStats) error {
healthBlob, err := HealthToBlob(gatewayID, healthStats)
if err != nil {
return err
}
store, err := h.factory.StartTransaction(nil)
if err != nil {
return err
}
err = store.CreateOrUpdate(networkID, []blobstore.Blob{healthBlob})
if err != nil {
store.Rollback()
return err
}
return store.Commit()
}
// UpdateClusterState updates the given cluster's state in the
// TransactionalBlobStorage.
func (h *healthBlobstore) UpdateClusterState(networkID string, clusterID string, logicalID string) error {
clusterBlob, err := ClusterToBlob(clusterID, logicalID)
if err != nil {
return err
}
store, err := h.factory.StartTransaction(nil)
if err != nil {
return err
}
err = store.CreateOrUpdate(networkID, []blobstore.Blob{clusterBlob})
if err != nil {
store.Rollback()
return err
}
return store.Commit()
}
// GetClusterState retrieves the stored clusterState for the provided networkID
// and logicalID from the TransactionalBlobStorage. The clusterState is
// initialized if it doesn't already exist.
func (h *healthBlobstore) GetClusterState(networkID string, logicalID string) (*fegprotos.ClusterState, error) {
keys := []string{networkID}
filter := blobstore.SearchFilter{
NetworkID: &networkID,
}
store, err := h.factory.StartTransaction(nil)
foundKeys, err := store.GetExistingKeys(keys, filter)
if err != nil {
store.Rollback()
return nil, err
}
if len(foundKeys) == 0 {
err = h.initializeCluster(store, networkID, networkID, logicalID)
if err != nil {
store.Rollback()
return nil, err
}
}
clusterID := networkID
clusterTypeAndKey := storage.TypeAndKey{
Type: health.ClusterStatusType,
Key: clusterID,
}
clusterBlob, err := store.Get(networkID, clusterTypeAndKey)
if err != nil {
store.Rollback()
return nil, err
}
retClusterState := &fegprotos.ClusterState{}
err = protos.Unmarshal(clusterBlob.Value, retClusterState)
if err != nil {
store.Rollback()
return retClusterState, err
}
return retClusterState, store.Commit()
}
func (h *healthBlobstore) initializeCluster(store blobstore.TransactionalBlobStorage, networkID string, clusterID string, logicalID string) error {
glog.V(2).Infof("Initializing clusterState for networkID: %s with active: %s", networkID, logicalID)
clusterBlob, err := ClusterToBlob(networkID, logicalID)
if err != nil {
return err
}
return store.CreateOrUpdate(networkID, []blobstore.Blob{clusterBlob})
}