Skip to content

Commit

Permalink
feat: publish instanceType, capacityType, availabilityZone in a k8s e…
Browse files Browse the repository at this point in the history
…vent when we receive a IsUnfulfillableCapacity error while trying to create fleet (#3999)

Co-authored-by: Jonathan Innis <joinnis@amazon.com>
  • Loading branch information
tasdikrahman and jonathan-innis committed Jun 27, 2023
1 parent 49642f0 commit 82f68a5
Show file tree
Hide file tree
Showing 7 changed files with 179 additions and 8 deletions.
32 changes: 32 additions & 0 deletions pkg/cache/events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package cache

import (
"fmt"

v1 "k8s.io/api/core/v1"

"github.com/aws/karpenter-core/pkg/events"
)

func UnavailableOfferingEvent(instanceType, availabilityZone, capacityType string) events.Event {
return events.Event{
Type: v1.EventTypeWarning,
Reason: "UnavailableOffering",
Message: fmt.Sprintf(`UnavailableOffering for {"instanceType": %q, "availabilityZone": %q, "capacityType": %q}`, instanceType, availabilityZone, capacityType),
DedupeValues: []string{instanceType, availabilityZone, capacityType},
}
}
33 changes: 33 additions & 0 deletions pkg/cache/suite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package cache_test

import (
"context"
"testing"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
. "knative.dev/pkg/logging/testing"
_ "knative.dev/pkg/system/testing"
)

var ctx context.Context

func TestAPIs(t *testing.T) {
ctx = TestContextWithLogger(t)
RegisterFailHandler(Fail)
RunSpecs(t, "Cache")
}
17 changes: 12 additions & 5 deletions pkg/cache/unavailableofferings.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,25 @@ import (
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/patrickmn/go-cache"
"knative.dev/pkg/logging"

"github.com/aws/karpenter-core/pkg/events"
)

// UnavailableOfferings stores any offerings that return ICE (insufficient capacity errors) when
// attempting to launch the capacity. These offerings are ignored as long as they are in the cache on
// GetInstanceTypes responses
type UnavailableOfferings struct {
// key: <capacityType>:<instanceType>:<zone>, value: struct{}{}
cache *cache.Cache
SeqNum uint64
cache *cache.Cache
SeqNum uint64
recorder events.Recorder
}

func NewUnavailableOfferings() *UnavailableOfferings {
func NewUnavailableOfferings(recorder events.Recorder) *UnavailableOfferings {
return &UnavailableOfferings{
cache: cache.New(UnavailableOfferingsTTL, DefaultCleanupInterval),
SeqNum: 0,
cache: cache.New(UnavailableOfferingsTTL, DefaultCleanupInterval),
SeqNum: 0,
recorder: recorder,
}
}

Expand All @@ -58,6 +62,9 @@ func (u *UnavailableOfferings) MarkUnavailable(ctx context.Context, unavailableR
"ttl", UnavailableOfferingsTTL).Debugf("removing offering from offerings")
u.cache.SetDefault(u.key(instanceType, zone, capacityType), struct{}{})
atomic.AddUint64(&u.SeqNum, 1)

// Add a k8s event for the instance type and zone without the involved object which has an ICE error
u.recorder.Publish(UnavailableOfferingEvent(instanceType, zone, capacityType))
}

func (u *UnavailableOfferings) MarkUnavailableForFleetErr(ctx context.Context, fleetErr *ec2.CreateFleetError, capacityType string) {
Expand Down
97 changes: 97 additions & 0 deletions pkg/cache/unavailableofferings_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package cache_test

import (
"context"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/ec2"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
_ "knative.dev/pkg/system/testing"

"github.com/aws/karpenter-core/pkg/apis/v1alpha5"
"github.com/aws/karpenter-core/pkg/test"
"github.com/aws/karpenter/pkg/cache"
)

var unavailableOfferingsCache *cache.UnavailableOfferings
var recorder *test.EventRecorder

var _ = Describe("UnavailableOfferings", func() {
BeforeEach(func() {
ctx = context.Background()
recorder = test.NewEventRecorder()
unavailableOfferingsCache = cache.NewUnavailableOfferings(recorder)
})

AfterEach(func() {
recorder.Reset()
})

It("should create an UnavailableOfferingEvent when receiving a CreateFleet error", func() {
unavailableOfferingsCache.MarkUnavailableForFleetErr(ctx, &ec2.CreateFleetError{
LaunchTemplateAndOverrides: &ec2.LaunchTemplateAndOverridesResponse{
Overrides: &ec2.FleetLaunchTemplateOverrides{
InstanceType: aws.String("c5.large"),
AvailabilityZone: aws.String("test-zone-1a"),
},
},
}, v1alpha5.CapacityTypeSpot)
Expect(recorder.Calls("UnavailableOffering")).To(BeNumerically("==", 1))
Expect(recorder.DetectedEvent(`UnavailableOffering for {"instanceType": "c5.large", "availabilityZone": "test-zone-1a", "capacityType": "spot"}`))
})
It("should create an UnavailableOfferingEvent when marking an offering as unavailable", func() {
unavailableOfferingsCache.MarkUnavailable(ctx, "offering is unavailable", "c5.large", "test-zone-1a", v1alpha5.CapacityTypeSpot)
Expect(recorder.Calls("UnavailableOffering")).To(BeNumerically("==", 1))
Expect(recorder.DetectedEvent(`UnavailableOffering for {"instanceType": "c5.large", "availabilityZone": "test-zone-1a", "capacityType": "spot"}`))
})
It("should create multiple UnavailableOfferingEvent when marking multiple offerings as unavailable", func() {
type offering struct {
instanceType string
availabilityZone string
capacityType string
}

offerings := []offering{
{
instanceType: "c5.large",
availabilityZone: "test-zone-1a",
capacityType: v1alpha5.CapacityTypeSpot,
},
{
instanceType: "g4dn.xlarge",
availabilityZone: "test-zone-1b",
capacityType: v1alpha5.CapacityTypeOnDemand,
},
{
instanceType: "inf1.24xlarge",
availabilityZone: "test-zone-1d",
capacityType: v1alpha5.CapacityTypeSpot,
},
{
instanceType: "t3.nano",
availabilityZone: "test-zone-1b",
capacityType: v1alpha5.CapacityTypeOnDemand,
},
}

for _, of := range offerings {
unavailableOfferingsCache.MarkUnavailable(ctx, "offering is unavailable", of.instanceType, of.availabilityZone, of.capacityType)
}
Expect(recorder.Calls("UnavailableOffering")).To(BeNumerically("==", len(offerings)))
})
})
2 changes: 1 addition & 1 deletion pkg/controllers/interruption/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func TestAPIs(t *testing.T) {
var _ = BeforeSuite(func() {
env = coretest.NewEnvironment(scheme.Scheme, coretest.WithCRDs(apis.CRDs...))
fakeClock = &clock.FakeClock{}
unavailableOfferingsCache = awscache.NewUnavailableOfferings()
unavailableOfferingsCache = awscache.NewUnavailableOfferings(events.NewRecorder(&record.FakeRecorder{}))
sqsapi = &fake.SQSAPI{}
sqsProvider = interruption.NewSQSProvider(sqsapi)
controller = interruption.NewController(env.Client, fakeClock, events.NewRecorder(&record.FakeRecorder{}), sqsProvider, unavailableOfferingsCache)
Expand Down
2 changes: 1 addition & 1 deletion pkg/operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func NewOperator(ctx context.Context, operator *operator.Operator) (context.Cont
logging.FromContext(ctx).With("kube-dns-ip", kubeDNSIP).Debugf("discovered kube dns")
}

unavailableOfferingsCache := awscache.NewUnavailableOfferings()
unavailableOfferingsCache := awscache.NewUnavailableOfferings(operator.EventRecorder)
subnetProvider := subnet.NewProvider(ec2api, cache.New(awscache.DefaultTTL, awscache.DefaultCleanupInterval))
securityGroupProvider := securitygroup.NewProvider(ec2api, cache.New(awscache.DefaultTTL, awscache.DefaultCleanupInterval))
pricingProvider := pricing.NewProvider(
Expand Down
4 changes: 3 additions & 1 deletion pkg/test/environment.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"net"

"k8s.io/client-go/tools/record"
"knative.dev/pkg/ptr"

"github.com/patrickmn/go-cache"
Expand All @@ -32,6 +33,7 @@ import (
"github.com/aws/karpenter/pkg/providers/securitygroup"
"github.com/aws/karpenter/pkg/providers/subnet"

"github.com/aws/karpenter-core/pkg/events"
coretest "github.com/aws/karpenter-core/pkg/test"

crmetrics "sigs.k8s.io/controller-runtime/pkg/metrics"
Expand Down Expand Up @@ -74,7 +76,7 @@ func NewEnvironment(ctx context.Context, env *coretest.Environment) *Environment
ec2Cache := cache.New(awscache.DefaultTTL, awscache.DefaultCleanupInterval)
kubernetesVersionCache := cache.New(awscache.DefaultTTL, awscache.DefaultCleanupInterval)
instanceTypeCache := cache.New(awscache.DefaultTTL, awscache.DefaultCleanupInterval)
unavailableOfferingsCache := awscache.NewUnavailableOfferings()
unavailableOfferingsCache := awscache.NewUnavailableOfferings(events.NewRecorder(&record.FakeRecorder{}))
launchTemplateCache := cache.New(awscache.DefaultTTL, awscache.DefaultCleanupInterval)
subnetCache := cache.New(awscache.DefaultTTL, awscache.DefaultCleanupInterval)
securityGroupCache := cache.New(awscache.DefaultTTL, awscache.DefaultCleanupInterval)
Expand Down

0 comments on commit 82f68a5

Please sign in to comment.