Skip to content

Commit

Permalink
fix: create offerings regardless of subnets (#4857)
Browse files Browse the repository at this point in the history
  • Loading branch information
njtran committed Oct 23, 2023
1 parent 67ead12 commit 5d73e22
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 47 deletions.
6 changes: 3 additions & 3 deletions pkg/fake/ec2api.go
Original file line number Diff line number Diff line change
Expand Up @@ -483,9 +483,9 @@ func (e *EC2API) DescribeAvailabilityZonesWithContext(context.Context, *ec2.Desc
return e.DescribeAvailabilityZonesOutput.Clone(), nil
}
return &ec2.DescribeAvailabilityZonesOutput{AvailabilityZones: []*ec2.AvailabilityZone{
{ZoneName: aws.String("test-zone-1a"), ZoneId: aws.String("testzone1a")},
{ZoneName: aws.String("test-zone-1b"), ZoneId: aws.String("testzone1b")},
{ZoneName: aws.String("test-zone-1c"), ZoneId: aws.String("testzone1c")},
{ZoneName: aws.String("test-zone-1a"), ZoneId: aws.String("testzone1a"), ZoneType: aws.String("availability-zone")},
{ZoneName: aws.String("test-zone-1b"), ZoneId: aws.String("testzone1b"), ZoneType: aws.String("availability-zone")},
{ZoneName: aws.String("test-zone-1c"), ZoneId: aws.String("testzone1c"), ZoneType: aws.String("availability-zone")},
}}, nil
}

Expand Down
113 changes: 69 additions & 44 deletions pkg/providers/instancetype/instancetype.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"net/http"
"sync"
"sync/atomic"
"time"

"github.com/prometheus/client_golang/prometheus"

Expand All @@ -44,8 +45,9 @@ import (
)

const (
InstanceTypesCacheKey = "types"
InstanceTypeZonesCacheKeyPrefix = "zones:"
InstanceTypesCacheKey = "types"
InstanceTypeOfferingsCacheKey = "offerings"
AvailabilityZonesCacheKey = "zones"
)

type Provider struct {
Expand All @@ -66,6 +68,8 @@ type Provider struct {
cm *pretty.ChangeMonitor
// instanceTypesSeqNum is a monotonically increasing change counter used to avoid the expensive hashing operation on instance types
instanceTypesSeqNum uint64
// instanceTypeOfferingsSeqNum is a monotonically increasing change counter used to avoid the expensive hashing operation on instance types
instanceTypeOfferingsSeqNum uint64
}

func NewProvider(region string, cache *cache.Cache, ec2api ec2iface.EC2API, subnetProvider *subnet.Provider,
Expand All @@ -88,25 +92,35 @@ func (p *Provider) List(ctx context.Context, kc *corev1beta1.KubeletConfiguratio
if err != nil {
return nil, err
}
// Get Viable EC2 Purchase offerings
instanceTypeZones, err := p.getInstanceTypeZones(ctx, nodeClass)
// Get InstanceTypeOfferings from EC2
instanceTypeOfferings, err := p.getInstanceTypeOfferings(ctx)
if err != nil {
return nil, err
}
// Get AvailabilityZones from EC2
availabilityZones, err := p.getAvailabilityZones(ctx)
if err != nil {
return nil, err
}
// Constrain AZs from subnets
subnets, err := p.subnetProvider.List(ctx, nodeClass)
if err != nil {
return nil, err
}
subnetZones := sets.New[string](lo.Map(subnets, func(s *ec2.Subnet, _ int) string {
return aws.StringValue(s.AvailabilityZone)
})...)

// Compute fully initialized instance types hash key
instanceTypeZonesHash, _ := hashstructure.Hash(instanceTypeZones, hashstructure.FormatV2, &hashstructure.HashOptions{SlicesAsSets: true})
subnetHash, _ := hashstructure.Hash(subnets, hashstructure.FormatV2, &hashstructure.HashOptions{SlicesAsSets: true})
kcHash, _ := hashstructure.Hash(kc, hashstructure.FormatV2, &hashstructure.HashOptions{SlicesAsSets: true})
key := fmt.Sprintf("%d-%d-%s-%016x-%016x", p.instanceTypesSeqNum, p.unavailableOfferings.SeqNum, nodeClass.UID, instanceTypeZonesHash, kcHash)
key := fmt.Sprintf("%d-%d-%d-%s-%016x-%016x", p.instanceTypesSeqNum, p.instanceTypeOfferingsSeqNum, p.unavailableOfferings.SeqNum, nodeClass.UID, subnetHash, kcHash)

if item, ok := p.cache.Get(key); ok {
return item.([]*cloudprovider.InstanceType), nil
}
// Reject any instance types that don't have any offerings due to zone
result := lo.Reject(lo.Map(instanceTypes, func(i *ec2.InstanceTypeInfo, _ int) *cloudprovider.InstanceType {
return NewInstanceType(ctx, i, kc, p.region, nodeClass, p.createOfferings(ctx, i, instanceTypeZones[aws.StringValue(i.InstanceType)]))
}), func(i *cloudprovider.InstanceType, _ int) bool {
return len(i.Offerings) == 0
result := lo.Map(instanceTypes, func(i *ec2.InstanceTypeInfo, _ int) *cloudprovider.InstanceType {
return NewInstanceType(ctx, i, kc, p.region, nodeClass, p.createOfferings(ctx, i, instanceTypeOfferings[aws.StringValue(i.InstanceType)], availabilityZones, subnetZones))
})
for _, instanceType := range instanceTypes {
InstanceTypeVCPU.With(prometheus.Labels{
Expand All @@ -127,27 +141,27 @@ func (p *Provider) LivenessProbe(req *http.Request) error {
return p.pricingProvider.LivenessProbe(req)
}

func (p *Provider) createOfferings(ctx context.Context, instanceType *ec2.InstanceTypeInfo, zones sets.Set[string]) []cloudprovider.Offering {
func (p *Provider) createOfferings(ctx context.Context, instanceType *ec2.InstanceTypeInfo, instanceTypeZones, availabilityZones, subnetZones sets.Set[string]) []cloudprovider.Offering {
var offerings []cloudprovider.Offering
for zone := range zones {
for az := range availabilityZones {
// while usage classes should be a distinct set, there's no guarantee of that
for capacityType := range sets.NewString(aws.StringValueSlice(instanceType.SupportedUsageClasses)...) {
// exclude any offerings that have recently seen an insufficient capacity error from EC2
isUnavailable := p.unavailableOfferings.IsUnavailable(*instanceType.InstanceType, zone, capacityType)
isUnavailable := p.unavailableOfferings.IsUnavailable(*instanceType.InstanceType, az, capacityType)
var price float64
var ok bool
switch capacityType {
case ec2.UsageClassTypeSpot:
price, ok = p.pricingProvider.SpotPrice(*instanceType.InstanceType, zone)
price, ok = p.pricingProvider.SpotPrice(*instanceType.InstanceType, az)
case ec2.UsageClassTypeOnDemand:
price, ok = p.pricingProvider.OnDemandPrice(*instanceType.InstanceType)
default:
logging.FromContext(ctx).Errorf("Received unknown capacity type %s for instance type %s", capacityType, *instanceType.InstanceType)
continue
}
available := !isUnavailable && ok
available := !isUnavailable && ok && instanceTypeZones.Has(az) && subnetZones.Has(az)
offerings = append(offerings, cloudprovider.Offering{
Zone: zone,
Zone: az,
CapacityType: capacityType,
Price: price,
Available: available,
Expand All @@ -157,56 +171,67 @@ func (p *Provider) createOfferings(ctx context.Context, instanceType *ec2.Instan
return offerings
}

func (p *Provider) getInstanceTypeZones(ctx context.Context, nodeClass *v1beta1.EC2NodeClass) (map[string]sets.Set[string], error) {
func (p *Provider) getAvailabilityZones(ctx context.Context) (sets.Set[string], error) {
// DO NOT REMOVE THIS LOCK ----------------------------------------------------------------------------
// We lock here so that multiple callers to getInstanceTypeZones do not result in cache misses and multiple
// We lock here so that multiple callers to getAvailabilityZones do not result in cache misses and multiple
// calls to EC2 when we could have just made one call.
// TODO @joinnis: This can be made more efficient by holding a Read lock and only obtaining the Write if not in cache
p.mu.Lock()
defer p.mu.Unlock()
if cached, ok := p.cache.Get(AvailabilityZonesCacheKey); ok {
return cached.(sets.Set[string]), nil
}

subnetSelectorHash, err := hashstructure.Hash(nodeClass.Spec.SubnetSelectorTerms, hashstructure.FormatV2, &hashstructure.HashOptions{SlicesAsSets: true})
// Get zones from EC2
instanceTypeZones := sets.Set[string]{}
output, err := p.ec2api.DescribeAvailabilityZonesWithContext(ctx, &ec2.DescribeAvailabilityZonesInput{})
if err != nil {
return nil, fmt.Errorf("failed to hash the subnet selector: %w", err)
return nil, fmt.Errorf("describing availability zones, %w", err)
}
cacheKey := fmt.Sprintf("%s%016x", InstanceTypeZonesCacheKeyPrefix, subnetSelectorHash)
if cached, ok := p.cache.Get(cacheKey); ok {
return cached.(map[string]sets.Set[string]), nil
for i := range output.AvailabilityZones {
zone := output.AvailabilityZones[i]
if aws.StringValue(zone.ZoneType) == "availability-zone" {
instanceTypeZones.Insert(aws.StringValue(zone.ZoneName))
}
}

// Constrain AZs from subnets
subnets, err := p.subnetProvider.List(ctx, nodeClass)
if err != nil {
return nil, err
if p.cm.HasChanged("zones", instanceTypeZones) {
logging.FromContext(ctx).With("zones", instanceTypeZones.UnsortedList()).Debugf("discovered availability zones")
}
if len(subnets) == 0 {
return nil, nil
p.cache.Set(AvailabilityZonesCacheKey, instanceTypeZones, 24*time.Hour)
return instanceTypeZones, nil
}

func (p *Provider) getInstanceTypeOfferings(ctx context.Context) (map[string]sets.Set[string], error) {
// DO NOT REMOVE THIS LOCK ----------------------------------------------------------------------------
// We lock here so that multiple callers to getInstanceTypeOfferings do not result in cache misses and multiple
// calls to EC2 when we could have just made one call.
// TODO @joinnis: This can be made more efficient by holding a Read lock and only obtaining the Write if not in cache
p.mu.Lock()
defer p.mu.Unlock()
if cached, ok := p.cache.Get(InstanceTypeOfferingsCacheKey); ok {
return cached.(map[string]sets.Set[string]), nil
}
zones := sets.NewString(lo.Map(subnets, func(subnet *ec2.Subnet, _ int) string {
return aws.StringValue(subnet.AvailabilityZone)
})...)

// Get offerings from EC2
instanceTypeZones := map[string]sets.Set[string]{}
instanceTypeOfferings := map[string]sets.Set[string]{}
if err := p.ec2api.DescribeInstanceTypeOfferingsPagesWithContext(ctx, &ec2.DescribeInstanceTypeOfferingsInput{LocationType: aws.String("availability-zone")},
func(output *ec2.DescribeInstanceTypeOfferingsOutput, lastPage bool) bool {
for _, offering := range output.InstanceTypeOfferings {
if zones.Has(aws.StringValue(offering.Location)) {
if _, ok := instanceTypeZones[aws.StringValue(offering.InstanceType)]; !ok {
instanceTypeZones[aws.StringValue(offering.InstanceType)] = sets.New[string]()
}
instanceTypeZones[aws.StringValue(offering.InstanceType)].Insert(aws.StringValue(offering.Location))
if _, ok := instanceTypeOfferings[aws.StringValue(offering.InstanceType)]; !ok {
instanceTypeOfferings[aws.StringValue(offering.InstanceType)] = sets.New[string]()
}
instanceTypeOfferings[aws.StringValue(offering.InstanceType)].Insert(aws.StringValue(offering.Location))
}
return true
}); err != nil {
return nil, fmt.Errorf("describing instance type zone offerings, %w", err)
}
if p.cm.HasChanged("zonal-offerings", nodeClass.Spec.SubnetSelectorTerms) {
logging.FromContext(ctx).With("zones", zones.List(), "instance-type-count", len(instanceTypeZones), "node-template", nodeClass.Name).Debugf("discovered offerings for instance types")
if p.cm.HasChanged("instance-type-count", len(instanceTypeOfferings)) {
logging.FromContext(ctx).With("instance-type-count", len(instanceTypeOfferings)).Debugf("discovered offerings for instance types")
}
p.cache.SetDefault(cacheKey, instanceTypeZones)
return instanceTypeZones, nil
atomic.AddUint64(&p.instanceTypeOfferingsSeqNum, 1)
p.cache.SetDefault(InstanceTypeOfferingsCacheKey, instanceTypeOfferings)
return instanceTypeOfferings, nil
}

// GetInstanceTypes retrieves all instance types from the ec2 DescribeInstanceTypes API using some opinionated filters
Expand Down

0 comments on commit 5d73e22

Please sign in to comment.