Skip to content

Commit

Permalink
Fixed a bug where spot capacity was not prioritized if flexible to bo…
Browse files Browse the repository at this point in the history
…th spot and on demand capacity (#893)

* Fixed a bug where spot capacity was not prioritized if flexible to both spot and on demand capacity

* Fixed an issue where zones were not taken into account

* Reduce pod retry

* Added a test

* PR Comments
  • Loading branch information
ellistarn committed Dec 3, 2021
1 parent 2b8abcd commit 4daa335
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 37 deletions.
20 changes: 15 additions & 5 deletions pkg/cloudprovider/aws/fake/ec2api.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,13 @@ import (
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/aws/aws-sdk-go/service/ec2/ec2iface"
"github.com/aws/karpenter/pkg/apis/provisioning/v1alpha5"
"github.com/aws/karpenter/pkg/cloudprovider/aws/apis/v1alpha1"
"github.com/aws/karpenter/pkg/utils/functional"
set "github.com/deckarep/golang-set"
)

type CapacityPool struct {
CapacityType string
InstanceType string
Zone string
}
Expand Down Expand Up @@ -80,11 +82,18 @@ func (e *EC2API) CreateFleetWithContext(_ context.Context, input *ec2.CreateFlee
instances := []*ec2.Instance{}
instanceIds := []*string{}
skippedPools := []CapacityPool{}
var spotInstanceRequestID *string

if aws.StringValue(input.TargetCapacitySpecification.DefaultTargetCapacityType) == v1alpha1.CapacityTypeSpot {
spotInstanceRequestID = aws.String(randomdata.SillyName())
}

for i := 0; i < int(*input.TargetCapacitySpecification.TotalTargetCapacity); i++ {
skipInstance := false
for _, pool := range e.InsufficientCapacityPools {
if pool.InstanceType == aws.StringValue(input.LaunchTemplateConfigs[0].Overrides[0].InstanceType) &&
pool.Zone == aws.StringValue(input.LaunchTemplateConfigs[0].Overrides[0].AvailabilityZone) {
pool.Zone == aws.StringValue(input.LaunchTemplateConfigs[0].Overrides[0].AvailabilityZone) &&
pool.CapacityType == aws.StringValue(input.TargetCapacitySpecification.DefaultTargetCapacityType) {
skippedPools = append(skippedPools, pool)
skipInstance = true
break
Expand All @@ -94,10 +103,11 @@ func (e *EC2API) CreateFleetWithContext(_ context.Context, input *ec2.CreateFlee
continue
}
instances = append(instances, &ec2.Instance{
InstanceId: aws.String(randomdata.SillyName()),
Placement: &ec2.Placement{AvailabilityZone: input.LaunchTemplateConfigs[0].Overrides[0].AvailabilityZone},
PrivateDnsName: aws.String(randomdata.IpV4Address()),
InstanceType: input.LaunchTemplateConfigs[0].Overrides[0].InstanceType,
InstanceId: aws.String(randomdata.SillyName()),
Placement: &ec2.Placement{AvailabilityZone: input.LaunchTemplateConfigs[0].Overrides[0].AvailabilityZone},
PrivateDnsName: aws.String(randomdata.IpV4Address()),
InstanceType: input.LaunchTemplateConfigs[0].Overrides[0].InstanceType,
SpotInstanceRequestId: spotInstanceRequestID,
})
e.Instances.Store(*instances[i].InstanceId, instances[i])
instanceIds = append(instanceIds, instances[i].InstanceId)
Expand Down
34 changes: 20 additions & 14 deletions pkg/cloudprovider/aws/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,16 +106,8 @@ func (p *InstanceProvider) Terminate(ctx context.Context, node *v1.Node) error {
}

func (p *InstanceProvider) launchInstances(ctx context.Context, constraints *v1alpha1.Constraints, instanceTypes []cloudprovider.InstanceType, quantity int) ([]*string, error) {
// Default to on-demand unless constrained otherwise or if flexible to spot and
// on-demand. This code assumes two options: {spot, on-demand}, which is enforced
// by constraints.Constrain(). Spot may be selected by constraining the provisioner,
// or using nodeSelectors, required node affinity, or preferred node affinity.
capacityType := v1alpha1.CapacityTypeOnDemand
if capacityTypes := constraints.Requirements.CapacityTypes(); len(capacityTypes) == 0 {
return nil, fmt.Errorf("invariant violated, must contain at least one capacity type")
} else if len(capacityTypes) == 1 {
capacityType = capacityTypes.UnsortedList()[0]
}
capacityType := p.getCapacityType(constraints, instanceTypes)

// Get Launch Template Configs, which may differ due to GPU or Architecture requirements
launchTemplateConfigs, err := p.getLaunchTemplateConfigs(ctx, constraints, instanceTypes, capacityType)
if err != nil {
Expand Down Expand Up @@ -183,7 +175,6 @@ func (p *InstanceProvider) getOverrides(instanceTypeOptions []cloudprovider.Inst
var overrides []*ec2.FleetLaunchTemplateOverridesRequest
for i, instanceType := range instanceTypeOptions {
for _, offering := range instanceType.Offerings() {
// we can't assume that all zones will be available for all capacity types, hence this check
if capacityType != offering.CapacityType {
continue
}
Expand Down Expand Up @@ -285,6 +276,22 @@ func (p *InstanceProvider) updateUnavailableOfferingsCache(ctx context.Context,
}
}

// getCapacityType selects spot if both constraints are flexible and there is an
// available offering. The AWS Cloud Provider defaults to [ on-demand ], so spot
// must be explicitly included in capacity type requirements.
func (p *InstanceProvider) getCapacityType(constraints *v1alpha1.Constraints, instanceTypes []cloudprovider.InstanceType) string {
if constraints.Requirements.CapacityTypes().Has(v1alpha1.CapacityTypeSpot) {
for _, instanceType := range instanceTypes {
for _, offering := range instanceType.Offerings() {
if constraints.Requirements.Zones().Has(offering.Zone) && offering.CapacityType == v1alpha1.CapacityTypeSpot {
return v1alpha1.CapacityTypeSpot
}
}
}
}
return v1alpha1.CapacityTypeOnDemand
}

func getInstanceID(node *v1.Node) (*string, error) {
id := strings.Split(node.Spec.ProviderID, "/")
if len(id) < 5 {
Expand All @@ -305,11 +312,10 @@ func combineFleetErrors(errors []*ec2.CreateFleetError) (errs error) {
}

func getCapacityType(instance *ec2.Instance) string {
capacityType := v1alpha1.CapacityTypeOnDemand
if instance.SpotInstanceRequestId != nil {
capacityType = v1alpha1.CapacityTypeSpot
return v1alpha1.CapacityTypeSpot
}
return capacityType
return v1alpha1.CapacityTypeOnDemand
}

func combineFleetInstances(createFleetOutput ec2.CreateFleetOutput) []*string {
Expand Down
2 changes: 1 addition & 1 deletion pkg/cloudprovider/aws/instancetypes.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func (p *InstanceTypeProvider) CacheUnavailable(ctx context.Context, instanceTyp
capacityType,
InsufficientCapacityErrorCacheTTL)
// even if the key is already in the cache, we still need to call Set to extend the cached entry's TTL
p.unavailableOfferings.SetDefault(UnavailableOfferingsCacheKey(capacityType, instanceType, zone), sets.Empty{})
p.unavailableOfferings.SetDefault(UnavailableOfferingsCacheKey(capacityType, instanceType, zone), struct{}{})
}

func UnavailableOfferingsCacheKey(capacityType string, instanceType string, zone string) string {
Expand Down
39 changes: 23 additions & 16 deletions pkg/cloudprovider/aws/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ var _ = Describe("Allocation", func() {
})
Context("Insufficient Capacity Error Cache", func() {
It("should launch instances of different type on second reconciliation attempt with Insufficient Capacity Error Cache fallback", func() {
fakeEC2API.InsufficientCapacityPools = []fake.CapacityPool{{InstanceType: "inf1.6xlarge", Zone: "test-zone-1a"}}
fakeEC2API.InsufficientCapacityPools = []fake.CapacityPool{{CapacityType: v1alpha1.CapacityTypeOnDemand, InstanceType: "inf1.6xlarge", Zone: "test-zone-1a"}}
pods := ExpectProvisioned(ctx, env.Client, scheduler, provisioners, provisioner,
test.UnschedulablePod(test.PodOptions{
NodeSelector: map[string]string{v1.LabelTopologyZone: "test-zone-1a"},
Expand Down Expand Up @@ -213,7 +213,7 @@ var _ = Describe("Allocation", func() {
Expect(nodeNames.Len()).To(Equal(2))
})
It("should launch instances in a different zone on second reconciliation attempt with Insufficient Capacity Error Cache fallback", func() {
fakeEC2API.InsufficientCapacityPools = []fake.CapacityPool{{InstanceType: "p3.8xlarge", Zone: "test-zone-1a"}}
fakeEC2API.InsufficientCapacityPools = []fake.CapacityPool{{CapacityType: v1alpha1.CapacityTypeOnDemand, InstanceType: "p3.8xlarge", Zone: "test-zone-1a"}}
pod := test.UnschedulablePod(test.PodOptions{
ResourceRequirements: v1.ResourceRequirements{
Requests: v1.ResourceList{resources.NvidiaGPU: resource.MustParse("1")},
Expand All @@ -238,7 +238,7 @@ var _ = Describe("Allocation", func() {
HaveKeyWithValue(v1.LabelTopologyZone, "test-zone-1b")))
})
It("should launch instances on later reconciliation attempt with Insufficient Capacity Error Cache expiry", func() {
fakeEC2API.InsufficientCapacityPools = []fake.CapacityPool{{InstanceType: "inf1.6xlarge", Zone: "test-zone-1a"}}
fakeEC2API.InsufficientCapacityPools = []fake.CapacityPool{{CapacityType: v1alpha1.CapacityTypeOnDemand, InstanceType: "inf1.6xlarge", Zone: "test-zone-1a"}}
pod := ExpectProvisioned(ctx, env.Client, scheduler, provisioners, provisioner,
test.UnschedulablePod(test.PodOptions{
NodeSelector: map[string]string{v1.LabelInstanceTypeStable: "inf1.6xlarge"},
Expand All @@ -256,26 +256,33 @@ var _ = Describe("Allocation", func() {
node := ExpectScheduled(ctx, env.Client, pod)
Expect(node.Labels).To(HaveKeyWithValue(v1.LabelInstanceTypeStable, "inf1.6xlarge"))
})
It("should launch on-demand capacity if flexible to both spot and on demand, but spot if unavailable", func() {
fakeEC2API.InsufficientCapacityPools = []fake.CapacityPool{{CapacityType: v1alpha1.CapacityTypeSpot, InstanceType: "m5.large", Zone: "test-zone-1a"}}
provisioner.Spec.Requirements = v1alpha5.Requirements{
{Key: v1alpha5.LabelCapacityType, Operator: v1.NodeSelectorOpIn, Values: []string{v1alpha1.CapacityTypeSpot, v1alpha1.CapacityTypeOnDemand}},
{Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpIn, Values: []string{"test-zone-1a"}},
{Key: v1.LabelInstanceTypeStable, Operator: v1.NodeSelectorOpIn, Values: []string{"m5.large"}},
}
// Spot Unavailable
pod := ExpectProvisioned(ctx, env.Client, scheduler, provisioners, provisioner, test.UnschedulablePod())[0]
ExpectNotScheduled(ctx, env.Client, pod)
// Fallback to OD
pod = ExpectProvisioned(ctx, env.Client, scheduler, provisioners, provisioner, test.UnschedulablePod())[0]
node := ExpectScheduled(ctx, env.Client, pod)
Expect(node.Labels).To(HaveKeyWithValue(v1alpha5.LabelCapacityType, v1alpha1.CapacityTypeOnDemand))
})
})
Context("CapacityType", func() {
It("should default to on demand", func() {
pod := ExpectProvisioned(ctx, env.Client, scheduler, provisioners, provisioner, test.UnschedulablePod())[0]
ExpectScheduled(ctx, env.Client, pod)
Expect(fakeEC2API.CalledWithCreateFleetInput.Cardinality()).To(Equal(1))
input := fakeEC2API.CalledWithCreateFleetInput.Pop().(*ec2.CreateFleetInput)
Expect(input.LaunchTemplateConfigs).To(HaveLen(1))
Expect(*input.TargetCapacitySpecification.DefaultTargetCapacityType).To(Equal(v1alpha1.CapacityTypeOnDemand))
node := ExpectScheduled(ctx, env.Client, pod)
Expect(node.Labels).To(HaveKeyWithValue(v1alpha5.LabelCapacityType, v1alpha1.CapacityTypeOnDemand))
})
It("should launch spot capacity if flexible to both spot and on demand", func() {
provisioner.Spec.Requirements = v1alpha5.Requirements{{Key: v1alpha5.LabelCapacityType, Operator: v1.NodeSelectorOpIn, Values: []string{v1alpha1.CapacityTypeSpot, v1alpha1.CapacityTypeOnDemand}}}
pod := ExpectProvisioned(ctx, env.Client, scheduler, provisioners, provisioner,
test.UnschedulablePod(test.PodOptions{NodeSelector: map[string]string{v1alpha5.LabelCapacityType: v1alpha1.CapacityTypeSpot}}),
)[0]
ExpectScheduled(ctx, env.Client, pod)
Expect(fakeEC2API.CalledWithCreateFleetInput.Cardinality()).To(Equal(1))
input := fakeEC2API.CalledWithCreateFleetInput.Pop().(*ec2.CreateFleetInput)
Expect(input.LaunchTemplateConfigs).To(HaveLen(1))
Expect(*input.TargetCapacitySpecification.DefaultTargetCapacityType).To(Equal(v1alpha1.CapacityTypeSpot))
pod := ExpectProvisioned(ctx, env.Client, scheduler, provisioners, provisioner, test.UnschedulablePod())[0]
node := ExpectScheduled(ctx, env.Client, pod)
Expect(node.Labels).To(HaveKeyWithValue(v1alpha5.LabelCapacityType, v1alpha1.CapacityTypeSpot))
})
})
Context("LaunchTemplates", func() {
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/scheduling/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (reco
return reconcile.Result{}, err
}
provisioner.Add(ctx, pod)
return reconcile.Result{RequeueAfter: time.Second * 5}, nil
return reconcile.Result{RequeueAfter: time.Second * 1}, nil
}

func (c *Controller) selectProvisioner(ctx context.Context, pod *v1.Pod) (*provisioning.Provisioner, error) {
Expand Down

0 comments on commit 4daa335

Please sign in to comment.