/
inventory.go
129 lines (114 loc) · 4.59 KB
/
inventory.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
// Copyright 2022 Antrea Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package inventory
import (
"context"
"fmt"
"reflect"
"github.com/go-logr/logr"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/watch"
antreastorage "antrea.io/antrea/pkg/apiserver/storage"
runtimev1alpha1 "antrea.io/nephe/apis/runtime/v1alpha1"
"antrea.io/nephe/pkg/controllers/inventory/common"
"antrea.io/nephe/pkg/controllers/inventory/store"
"antrea.io/nephe/pkg/logging"
)
type Inventory struct {
log logr.Logger
vpcStore antreastorage.Interface
}
// InitInventory creates an instance of Inventory struct and initializes inventory with cache indexers.
func InitInventory() *Inventory {
inventory := &Inventory{
log: logging.GetLogger("inventory").WithName("Cloud"),
}
inventory.vpcStore = store.NewVPCInventoryStore()
return inventory
}
// BuildVpcCache using vpc list from cloud, update vpc cache(with vpcs applicable for the current cloud account).
func (inventory *Inventory) BuildVpcCache(discoveredVpcMap map[string]*runtimev1alpha1.Vpc, namespacedName *types.NamespacedName) error {
var numVpcsToAdd, numVpcsToUpdate, numVpcsToDelete int
vpcsInCache, _ := inventory.vpcStore.GetByIndex(common.VpcIndexerByAccountNameSpacedName, namespacedName.String())
// Remove vpcs in vpc cache which are not found in vpc list fetched from cloud.
for _, i := range vpcsInCache {
vpc := i.(*runtimev1alpha1.Vpc)
if _, found := discoveredVpcMap[vpc.Status.Id]; !found {
if err := inventory.vpcStore.Delete(fmt.Sprintf("%v/%v-%v", vpc.Namespace, vpc.Labels[common.VpcLabelAccountName],
vpc.Status.Id)); err != nil {
inventory.log.Error(err, "failed to delete vpc from vpc cache", "vpc id", vpc.Status.Id, "account",
namespacedName.String())
} else {
numVpcsToDelete++
}
}
}
for _, discoveredVpc := range discoveredVpcMap {
var err error
key := fmt.Sprintf("%v/%v-%v", discoveredVpc.Namespace, discoveredVpc.Labels[common.VpcLabelAccountName], discoveredVpc.Status.Id)
if cachedObj, found, _ := inventory.vpcStore.Get(key); !found {
err = inventory.vpcStore.Create(discoveredVpc)
if err == nil {
numVpcsToAdd++
}
} else {
cachedVpc := cachedObj.(*runtimev1alpha1.Vpc)
if !reflect.DeepEqual(cachedVpc.Status, discoveredVpc.Status) {
err = inventory.vpcStore.Update(discoveredVpc)
if err == nil {
numVpcsToUpdate++
}
}
}
if err != nil {
return fmt.Errorf("failed to add vpc into vpc cache, vpc id: %s, error: %v",
discoveredVpc.Status.Id, err)
}
}
if numVpcsToAdd != 0 || numVpcsToUpdate != 0 || numVpcsToDelete != 0 {
inventory.log.Info("Vpc poll statistics", "account", namespacedName, "added", numVpcsToAdd,
"update", numVpcsToUpdate, "delete", numVpcsToDelete)
}
return nil
}
// DeleteVpcCache deletes all entries from vpc cache.
func (inventory *Inventory) DeleteVpcCache(namespacedName *types.NamespacedName) error {
vpcsInCache, err := inventory.vpcStore.GetByIndex(common.VpcIndexerByAccountNameSpacedName, namespacedName.String())
if err != nil {
return err
}
for _, i := range vpcsInCache {
vpc := i.(*runtimev1alpha1.Vpc)
if err := inventory.vpcStore.Delete(fmt.Sprintf("%v/%v-%v", vpc.Namespace, vpc.Labels[common.VpcLabelAccountName],
vpc.Status.Id)); err != nil {
return fmt.Errorf("failed to delete entry from VpcIndexer, indexer %v, error %v", *namespacedName, err)
}
}
return nil
}
// GetVpcsFromIndexer returns vpcs matching the indexedValue for the requested index.
func (inventory *Inventory) GetVpcsFromIndexer(indexName string, indexedValue string) ([]interface{}, error) {
return inventory.vpcStore.GetByIndex(indexName, indexedValue)
}
// GetAllVpcs returns all vpcs from the indexer.
func (inventory *Inventory) GetAllVpcs() []interface{} {
return inventory.vpcStore.List()
}
// Watch returns a Watch interface of VPC.
func (inventory *Inventory) Watch(ctx context.Context, key string, labelSelector labels.Selector,
fieldSelector fields.Selector) (watch.Interface, error) {
return inventory.vpcStore.Watch(ctx, key, labelSelector, fieldSelector)
}