Skip to content

Commit

Permalink
implement multi region support for cpa (#289)
Browse files Browse the repository at this point in the history
Signed-off-by: Alexander Liu <alliu@vmware.com>
Signed-off-by: alliu <alliu@vmware.com>
  • Loading branch information
shenmo3 committed Dec 1, 2023
1 parent 07ffbf7 commit 9faeba2
Show file tree
Hide file tree
Showing 44 changed files with 1,638 additions and 438 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ require (
github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/compute/armcompute v1.0.0
github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/network/armnetwork v1.1.0
github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/resourcegraph/armresourcegraph v0.6.0
github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/subscription/armsubscription v1.1.0
github.com/Azure/go-autorest/autorest/to v0.4.0
github.com/aws/aws-sdk-go v1.44.201
github.com/cenkalti/backoff/v4 v4.2.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/resourcegraph/armresourceg
github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/resourcegraph/armresourcegraph v0.6.0/go.mod h1:KKrvyReEXgIA2D4ez2Jq5dRynJW4bOjRDkONdze2qjs=
github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/resources/armresources v1.0.0 h1:ECsQtyERDVz3NP3kvDOTLvbQhqWp/x9EsGKtb4ogUr8=
github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/resources/armresources v1.0.0/go.mod h1:s1tW/At+xHqjNFvWU4G0c0Qv33KOhvbGNj0RCTQDV8s=
github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/subscription/armsubscription v1.1.0 h1:pYhaMoTHP/zYIJGDA1sWsfyTDjdglaoYjIFMOEcL+/U=
github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/subscription/armsubscription v1.1.0/go.mod h1:iLq8GwpQhj09gpI4EdELwifR9kHrb/Q0LThq6iQq9yY=
github.com/Azure/go-autorest v14.2.0+incompatible h1:V5VMDjClD3GiElqLWO7mz2MxNAK/vTfRHdAubSIPRgs=
github.com/Azure/go-autorest v14.2.0+incompatible/go.mod h1:r+4oMnoxhatjLLJ6zxSWATqVooLgysK6ZNox3g/xq24=
github.com/Azure/go-autorest/autorest/to v0.4.0 h1:oXVqrxakqqV1UZdSazDOPOLvOIz+XA683u8EctwboHk=
Expand Down
2 changes: 1 addition & 1 deletion pkg/accountmanager/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func (p *accountPoller) doAccountPolling() {
}

// TODO: Remove this when event based push is implemented in the plugin.
cloudInventory, err := p.cloudInterface.GetCloudInventory(p.accountNamespacedName)
cloudInventory, err := p.cloudInterface.GetAccountCloudInventory(p.accountNamespacedName)
if err != nil {
// Chances are while polling was happening, account is removed.
return
Expand Down
10 changes: 5 additions & 5 deletions pkg/accountmanager/poller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,21 +217,21 @@ var _ = Describe("Account Poller", func() {
CloudProviderAccountStatus{}, nil).AnyTimes()

// Invalid VPC.
mockCloudInterface.EXPECT().GetCloudInventory(&testAccountNamespacedName).Return(nil,
mockCloudInterface.EXPECT().GetAccountCloudInventory(&testAccountNamespacedName).Return(nil,
fmt.Errorf("error")).Times(1)
accountPollerObj.doAccountPolling()
Expect(len(accountPollerObj.inventory.GetAllVpcs())).To(Equal(0))

// Empty VPC.
cloudInventory := nephetypes.CloudInventory{}
mockCloudInterface.EXPECT().GetCloudInventory(&testAccountNamespacedName).Return(&cloudInventory,
mockCloudInterface.EXPECT().GetAccountCloudInventory(&testAccountNamespacedName).Return(&cloudInventory,
nil).Times(1)
accountPollerObj.doAccountPolling()
Expect(len(accountPollerObj.inventory.GetAllVpcs())).To(Equal(0))

// Valid VPC.
cloudInventory = nephetypes.CloudInventory{VpcMap: vpcList}
mockCloudInterface.EXPECT().GetCloudInventory(&testAccountNamespacedName).Return(&cloudInventory,
mockCloudInterface.EXPECT().GetAccountCloudInventory(&testAccountNamespacedName).Return(&cloudInventory,
nil).Times(1)
accountPollerObj.doAccountPolling()
Expect(len(accountPollerObj.inventory.GetAllVpcs())).To(Equal(len(vpcList)))
Expand All @@ -253,7 +253,7 @@ var _ = Describe("Account Poller", func() {

cloudInventory = nephetypes.CloudInventory{VpcMap: vpcList}
// Invalid VMs.
mockCloudInterface.EXPECT().GetCloudInventory(&testAccountNamespacedName).Return(&cloudInventory,
mockCloudInterface.EXPECT().GetAccountCloudInventory(&testAccountNamespacedName).Return(&cloudInventory,
nil).Times(1)
accountPollerObj.doAccountPolling()
Expect(len(accountPollerObj.inventory.GetAllVms())).To(Equal(0))
Expand All @@ -262,7 +262,7 @@ var _ = Describe("Account Poller", func() {
vmMap := make(map[types.NamespacedName]map[string]*runtimev1alpha1.VirtualMachine)
vmMap[testCesNamespacedName] = vmList
cloudInventory = nephetypes.CloudInventory{VpcMap: vpcList, VmMap: vmMap}
mockCloudInterface.EXPECT().GetCloudInventory(&testAccountNamespacedName).Return(&cloudInventory,
mockCloudInterface.EXPECT().GetAccountCloudInventory(&testAccountNamespacedName).Return(&cloudInventory,
nil).Times(1)
accountPollerObj.doAccountPolling()
Expect(len(accountPollerObj.inventory.GetAllVms())).To(Equal(len(vmList)))
Expand Down
27 changes: 16 additions & 11 deletions pkg/apiserver/webhook/cloudprovideraccount_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import (

const MinPollInterval = 30

var (
const (
errorMsgSecretNotConfigured = "unable to get secret"
errorMsgMinPollInterval = "pollIntervalInSeconds should be >= 30. If not specified, defaults to 60"
errorMsgMissingAwsCredential = "must specify either credentials or role arn, both cannot be empty"
Expand Down Expand Up @@ -248,20 +248,21 @@ func (v *CPAValidator) validateAWSAccount(account *crdv1alpha1.CloudProviderAcco
return fmt.Errorf(errorMsgMissingAwsCredential)
}

if len(awsConfig.Region) == 0 || len(strings.TrimSpace(awsConfig.Region[0])) == 0 {
if len(awsConfig.Region) == 0 {
return fmt.Errorf(errorMsgMissingRegion)
}

// NOTE: currently only AWS standard partition regions supported (aws-cn, aws-us-gov etc are not
// supported). As we add support for other partitions, validation needs to be updated.
regions := endpoints.AwsPartition().Regions()
_, found := regions[awsConfig.Region[0]]
if !found {
var supportedRegions []string
for key := range regions {
supportedRegions = append(supportedRegions, key)
awsRegionMap := endpoints.AwsPartition().Regions()
for _, region := range awsConfig.Region {
if _, found := awsRegionMap[strings.ToLower(region)]; !found {
var supportedRegions []string
for key := range awsRegionMap {
supportedRegions = append(supportedRegions, key)
}
return fmt.Errorf("%v %s [%v]", region, errorMsgInvalidRegion, supportedRegions)
}
return fmt.Errorf("%v %s [%v]", awsConfig.Region, errorMsgInvalidRegion, supportedRegions)
}

return nil
Expand Down Expand Up @@ -310,11 +311,15 @@ func (v *CPAValidator) validateAzureAccount(account *crdv1alpha1.CloudProviderAc
if len(strings.TrimSpace(azureCredential.ClientKey)) == 0 && len(strings.TrimSpace(azureCredential.SessionToken)) == 0 {
return fmt.Errorf(errorMsgMissingAzureCredential)
}

// validate region
if len(azureConfig.Region) == 0 || len(strings.TrimSpace(azureConfig.Region[0])) == 0 {
if len(azureConfig.Region) == 0 {
return fmt.Errorf(errorMsgMissingRegion)
}
for _, region := range azureConfig.Region {
if len(strings.TrimSpace(region)) == 0 {
return fmt.Errorf(errorMsgMissingRegion)
}
}

return nil
}
4 changes: 2 additions & 2 deletions pkg/cloudprovider/cloud/cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ type AccountMgmtInterface interface {

// ComputeInterface is an abstract providing set of methods to get inventory details to be implemented by cloud providers.
type ComputeInterface interface {
// GetCloudInventory gets VPC and VM inventory from plugin snapshot for a given cloud provider account.
GetCloudInventory(accountNamespacedName *types.NamespacedName) (*nephetypes.CloudInventory, error)
// GetAccountCloudInventory gets VPC and VM inventory from plugin snapshot for a given cloud provider account.
GetAccountCloudInventory(accountNamespacedName *types.NamespacedName) (*nephetypes.CloudInventory, error)
}

type SecurityInterface interface {
Expand Down
5 changes: 5 additions & 0 deletions pkg/cloudprovider/plugins/aws/aws_account_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package aws

import (
"strings"

"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"

Expand All @@ -23,6 +25,9 @@ import (

// AddProviderAccount adds and initializes given account of a cloud provider.
func (c *awsCloud) AddProviderAccount(client client.Client, account *crdv1alpha1.CloudProviderAccount) error {
for idx := range account.Spec.AWSConfig.Region {
account.Spec.AWSConfig.Region[idx] = strings.ToLower(account.Spec.AWSConfig.Region[idx])
}
return c.cloudCommon.AddCloudAccount(client, account, account.Spec.AWSConfig)
}

Expand Down
42 changes: 23 additions & 19 deletions pkg/cloudprovider/plugins/aws/aws_account_mgmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
"encoding/base64"
"encoding/json"
"fmt"
"reflect"
"sort"
"strings"

"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
Expand All @@ -32,15 +34,15 @@ import (

type awsAccountConfig struct {
crdv1alpha1.AwsAccountCredential
region string
regions []string
endpoint string
}

// setAccountCredentials sets account credentials.
func setAccountCredentials(client client.Client, credentials interface{}) (interface{}, error) {
awsProviderConfig := credentials.(*crdv1alpha1.CloudProviderAccountAWSConfig)
// setAccountConfig sets account config.
func setAccountConfig(client client.Client, config interface{}) (interface{}, error) {
awsProviderConfig := config.(*crdv1alpha1.CloudProviderAccountAWSConfig)
awsConfig := &awsAccountConfig{
region: strings.TrimSpace(awsProviderConfig.Region[0]),
regions: awsProviderConfig.Region,
endpoint: strings.TrimSpace(awsProviderConfig.Endpoint),
}
accCred, err := extractSecret(client, awsProviderConfig.SecretRef)
Expand All @@ -53,12 +55,12 @@ func setAccountCredentials(client client.Client, credentials interface{}) (inter
accCred.ExternalID = internal.AccountCredentialsDefault
}

// As only single region is supported right now, use 0th index in awsProviderConfig.Region as the configured region.
awsConfig.AwsAccountCredential = *accCred
return awsConfig, err
}

func compareAccountCredentials(accountName string, existing interface{}, new interface{}) bool {
// compareAccountConfig compares two account configs and returns they are different or not.
func compareAccountConfig(accountName string, existing interface{}, new interface{}) bool {
existingConfig := existing.(*awsAccountConfig)
newConfig := new.(*awsAccountConfig)

Expand All @@ -67,36 +69,38 @@ func compareAccountCredentials(accountName string, existing interface{}, new int
return true
}

credsChanged := false
configChanged := false
if strings.Compare(existingConfig.AccessKeyID, newConfig.AccessKeyID) != 0 {
credsChanged = true
configChanged = true
awsPluginLogger().Info("Account access key ID updated", "account", accountName)
}
if strings.Compare(existingConfig.AccessKeySecret, newConfig.AccessKeySecret) != 0 {
credsChanged = true
configChanged = true
awsPluginLogger().Info("Account access key secret updated", "account", accountName)
}
if strings.Compare(existingConfig.SessionToken, newConfig.SessionToken) != 0 {
credsChanged = true
configChanged = true
awsPluginLogger().Info("Account session token updated", "account", accountName)
}
if strings.Compare(existingConfig.RoleArn, newConfig.RoleArn) != 0 {
credsChanged = true
configChanged = true
awsPluginLogger().Info("Account IAM role updated", "account", accountName)
}
if strings.Compare(existingConfig.ExternalID, newConfig.ExternalID) != 0 {
credsChanged = true
configChanged = true
awsPluginLogger().Info("Account IAM external id updated", "account", accountName)
}
if strings.Compare(existingConfig.region, newConfig.region) != 0 {
credsChanged = true
awsPluginLogger().Info("Account region updated", "account", accountName)
}
if strings.Compare(existingConfig.endpoint, newConfig.endpoint) != 0 {
credsChanged = true
configChanged = true
awsPluginLogger().Info("Endpoint url updated", "account", accountName)
}
return credsChanged
sort.Strings(existingConfig.regions)
sort.Strings(newConfig.regions)
if !reflect.DeepEqual(existingConfig.regions, newConfig.regions) {
configChanged = true
awsPluginLogger().Info("Account regions updated", "account", accountName)
}
return configChanged
}

// extractSecret extracts credentials from a Kubernetes secret.
Expand Down
14 changes: 14 additions & 0 deletions pkg/cloudprovider/plugins/aws/aws_api_wrappers-mock_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

30 changes: 22 additions & 8 deletions pkg/cloudprovider/plugins/aws/aws_api_wrappers.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,14 @@ import (

// awsEC2Wrapper is layer above aws EC2 sdk apis to allow for unit-testing.
type awsEC2Wrapper interface {
// instances
// instances.
pagedDescribeInstancesWrapper(input *ec2.DescribeInstancesInput) ([]*ec2.Instance, error)

// network interfaces
// network interfaces.
pagedDescribeNetworkInterfaces(input *ec2.DescribeNetworkInterfacesInput) ([]*ec2.NetworkInterface, error)
modifyNetworkInterfaceAttribute(input *ec2.ModifyNetworkInterfaceAttributeInput) (*ec2.ModifyNetworkInterfaceAttributeOutput, error)

// security groups/rules
// security groups/rules.
createSecurityGroup(input *ec2.CreateSecurityGroupInput) (*ec2.CreateSecurityGroupOutput, error)
describeSecurityGroups(input *ec2.DescribeSecurityGroupsInput) (*ec2.DescribeSecurityGroupsOutput, error)
deleteSecurityGroup(input *ec2.DeleteSecurityGroupInput) (*ec2.DeleteSecurityGroupOutput, error)
Expand All @@ -39,23 +39,27 @@ type awsEC2Wrapper interface {
revokeSecurityGroupEgress(input *ec2.RevokeSecurityGroupEgressInput) (*ec2.RevokeSecurityGroupEgressOutput, error)
revokeSecurityGroupIngress(input *ec2.RevokeSecurityGroupIngressInput) (*ec2.RevokeSecurityGroupIngressOutput, error)

// vpcs
// vpcs.
describeVpcsWrapper(input *ec2.DescribeVpcsInput) (*ec2.DescribeVpcsOutput, error)

// peer connections
// peer connections.
describeVpcPeeringConnectionsWrapper(input *ec2.DescribeVpcPeeringConnectionsInput) (*ec2.DescribeVpcPeeringConnectionsOutput, error)

// helper function.
getRegion() string
}
type awsEC2WrapperImpl struct {
ec2 *ec2.EC2
}

// pagedDescribeInstancesWrapper returns ec2 instances that matches the input criteria.
func (ec2Wrapper *awsEC2WrapperImpl) pagedDescribeInstancesWrapper(input *ec2.DescribeInstancesInput) ([]*ec2.Instance, error) {
var instances []*ec2.Instance
var nextToken *string
for {
response, err := ec2Wrapper.ec2.DescribeInstances(input)
if err != nil {
return nil, fmt.Errorf("error describing ec2 instances: %q", err)
return nil, fmt.Errorf("error describing ec2 instances for region %s: %q", ec2Wrapper.getRegion(), err)
}

reservations := response.Reservations
Expand All @@ -72,14 +76,15 @@ func (ec2Wrapper *awsEC2WrapperImpl) pagedDescribeInstancesWrapper(input *ec2.De
return instances, nil
}

// pagedDescribeNetworkInterfaces returns network interfaces that matches the input criteria.
func (ec2Wrapper *awsEC2WrapperImpl) pagedDescribeNetworkInterfaces(input *ec2.DescribeNetworkInterfacesInput) ([]*ec2.NetworkInterface,
error) {
var networkInterfaces []*ec2.NetworkInterface
var nextToken *string
for {
response, err := ec2Wrapper.ec2.DescribeNetworkInterfaces(input)
if err != nil {
return nil, fmt.Errorf("error describing ec2 network interfaces: %q", err)
return nil, fmt.Errorf("error describing ec2 network interfaces for region %s: %q", ec2Wrapper.getRegion(), err)
}

interfaces := response.NetworkInterfaces
Expand Down Expand Up @@ -132,10 +137,11 @@ func (ec2Wrapper *awsEC2WrapperImpl) revokeSecurityGroupIngress(input *ec2.Revok
return ec2Wrapper.ec2.RevokeSecurityGroupIngress(input)
}

// describeVpcsWrapper returns vpcs that matches the input criteria.
func (ec2Wrapper *awsEC2WrapperImpl) describeVpcsWrapper(input *ec2.DescribeVpcsInput) (*ec2.DescribeVpcsOutput, error) {
vpcs, err := ec2Wrapper.ec2.DescribeVpcs(input)
if err != nil {
return nil, fmt.Errorf("error describing ec2 vpcs: %q", err)
return nil, fmt.Errorf("error describing ec2 vpcs for region %s: %q", ec2Wrapper.getRegion(), err)
}
return vpcs, nil
}
Expand All @@ -144,3 +150,11 @@ func (ec2Wrapper *awsEC2WrapperImpl) describeVpcPeeringConnectionsWrapper(input
*ec2.DescribeVpcPeeringConnectionsOutput, error) {
return ec2Wrapper.ec2.DescribeVpcPeeringConnections(input)
}

// getRegion returns the region of the ec2 client.
func (ec2Wrapper *awsEC2WrapperImpl) getRegion() string {
if ec2Wrapper.ec2.Config.Region == nil {
return ""
}
return *ec2Wrapper.ec2.Config.Region
}
12 changes: 8 additions & 4 deletions pkg/cloudprovider/plugins/aws/aws_cloudcommonhelper_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,14 @@ func (h *awsCloudCommonHelperImpl) GetCloudServicesCreateFunc() internal.CloudSe
return newAwsServiceConfigs
}

func (h *awsCloudCommonHelperImpl) SetAccountCredentialsFunc() internal.CloudCredentialValidatorFunc {
return setAccountCredentials
func (h *awsCloudCommonHelperImpl) GetCloudServicesUpdateFunc() internal.CloudServiceConfigUpdateFunc {
return updateAwsServiceConfigs
}

func (h *awsCloudCommonHelperImpl) GetCloudCredentialsComparatorFunc() internal.CloudCredentialComparatorFunc {
return compareAccountCredentials
func (h *awsCloudCommonHelperImpl) SetAccountConfigFunc() internal.CloudConfigValidatorFunc {
return setAccountConfig
}

func (h *awsCloudCommonHelperImpl) GetCloudConfigComparatorFunc() internal.CloudConfigComparatorFunc {
return compareAccountConfig
}
6 changes: 3 additions & 3 deletions pkg/cloudprovider/plugins/aws/aws_compute_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
nephetypes "antrea.io/nephe/pkg/types"
)

// GetCloudInventory pulls cloud vpc and vm inventory from internal snapshot.
func (c *awsCloud) GetCloudInventory(accountNamespacedName *types.NamespacedName) (*nephetypes.CloudInventory, error) {
return c.cloudCommon.GetCloudInventory(accountNamespacedName)
// GetAccountCloudInventory pulls cloud vpc and vm inventory from internal snapshot.
func (c *awsCloud) GetAccountCloudInventory(accountNamespacedName *types.NamespacedName) (*nephetypes.CloudInventory, error) {
return c.cloudCommon.GetAccountCloudInventory(accountNamespacedName)
}
Loading

0 comments on commit 9faeba2

Please sign in to comment.