Skip to content

Commit

Permalink
fix: Ensure shallow copy of data when returning back cached data (#6167)
Browse files Browse the repository at this point in the history
  • Loading branch information
jonathan-innis committed May 15, 2024
1 parent cf4b13d commit 16bbca7
Show file tree
Hide file tree
Showing 11 changed files with 628 additions and 254 deletions.
268 changes: 160 additions & 108 deletions pkg/apis/crds/karpenter.k8s.aws_awsnodetemplates.yaml

Large diffs are not rendered by default.

322 changes: 186 additions & 136 deletions pkg/apis/crds/karpenter.k8s.aws_ec2nodeclasses.yaml

Large diffs are not rendered by default.

10 changes: 7 additions & 3 deletions pkg/providers/amifamily/ami.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,9 @@ func (p *Provider) Get(ctx context.Context, nodeClass *v1beta1.EC2NodeClass, opt

func (p *Provider) getDefaultAMIs(ctx context.Context, nodeClass *v1beta1.EC2NodeClass, options *Options) (res AMIs, err error) {
if images, ok := p.cache.Get(lo.FromPtr(nodeClass.Spec.AMIFamily)); ok {
return images.(AMIs), nil
// Ensure what's returned from this function is a deep-copy of AMIs so alterations
// to the data don't affect the original
return append(AMIs{}, images.(AMIs)...), nil
}
amiFamily := GetAMIFamily(nodeClass.Spec.AMIFamily, options)
kubernetesVersion, err := p.versionProvider.Get(ctx)
Expand Down Expand Up @@ -189,8 +191,10 @@ func (p *Provider) getAMIs(ctx context.Context, terms []v1beta1.AMISelectorTerm,
if err != nil {
return nil, err
}
if images, ok := p.cache.Get(fmt.Sprintf("%t/%d", isNodeTemplate, hash)); ok {
return images.(AMIs), nil
if images, ok := p.cache.Get(fmt.Sprintf("%d", hash)); ok {
// Ensure what's returned from this function is a deep-copy of AMIs so alterations
// to the data don't affect the original
return append(AMIs{}, images.(AMIs)...), nil
}
images := map[uint64]AMI{}
for _, filtersAndOwners := range filterAndOwnerSets {
Expand Down
53 changes: 49 additions & 4 deletions pkg/providers/amifamily/ami_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"fmt"
"sort"
"sync"
"testing"
"time"

Expand All @@ -30,6 +31,7 @@ import (
. "knative.dev/pkg/logging/testing"

"github.com/aws/karpenter-core/pkg/apis/v1alpha5"
corev1beta1 "github.com/aws/karpenter-core/pkg/apis/v1beta1"
coreoptions "github.com/aws/karpenter-core/pkg/operator/options"
"github.com/aws/karpenter-core/pkg/operator/scheme"
"github.com/aws/karpenter-core/pkg/scheduling"
Expand Down Expand Up @@ -75,7 +77,7 @@ var _ = BeforeEach(func() {
{
Name: aws.String(amd64AMI),
ImageId: aws.String("amd64-ami-id"),
CreationDate: aws.String(time.Now().Format(time.RFC3339)),
CreationDate: aws.String(time.Time{}.Format(time.RFC3339)),
Architecture: aws.String("x86_64"),
Tags: []*ec2.Tag{
{Key: aws.String("Name"), Value: aws.String(amd64AMI)},
Expand All @@ -85,7 +87,7 @@ var _ = BeforeEach(func() {
{
Name: aws.String(arm64AMI),
ImageId: aws.String("arm64-ami-id"),
CreationDate: aws.String(time.Now().Add(time.Minute).Format(time.RFC3339)),
CreationDate: aws.String(time.Time{}.Add(time.Minute).Format(time.RFC3339)),
Architecture: aws.String("arm64"),
Tags: []*ec2.Tag{
{Key: aws.String("Name"), Value: aws.String(arm64AMI)},
Expand All @@ -95,7 +97,7 @@ var _ = BeforeEach(func() {
{
Name: aws.String(amd64NvidiaAMI),
ImageId: aws.String("amd64-nvidia-ami-id"),
CreationDate: aws.String(time.Now().Add(2 * time.Minute).Format(time.RFC3339)),
CreationDate: aws.String(time.Time{}.Add(2 * time.Minute).Format(time.RFC3339)),
Architecture: aws.String("x86_64"),
Tags: []*ec2.Tag{
{Key: aws.String("Name"), Value: aws.String(amd64NvidiaAMI)},
Expand All @@ -105,7 +107,7 @@ var _ = BeforeEach(func() {
{
Name: aws.String(arm64NvidiaAMI),
ImageId: aws.String("arm64-nvidia-ami-id"),
CreationDate: aws.String(time.Now().Add(2 * time.Minute).Format(time.RFC3339)),
CreationDate: aws.String(time.Time{}.Add(2 * time.Minute).Format(time.RFC3339)),
Architecture: aws.String("arm64"),
Tags: []*ec2.Tag{
{Key: aws.String("Name"), Value: aws.String(arm64NvidiaAMI)},
Expand Down Expand Up @@ -187,6 +189,49 @@ var _ = Describe("AMIProvider", func() {
Expect(err).ToNot(HaveOccurred())
Expect(amis).To(HaveLen(0))
})
It("should not cause data races when calling Get() simultaneously", func() {
nodeClass.Spec.AMISelectorTerms = []v1beta1.AMISelectorTerm{
{
ID: "amd64-ami-id",
},
{
ID: "arm64-ami-id",
},
}
wg := sync.WaitGroup{}
for i := 0; i < 10000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
defer GinkgoRecover()
images, err := awsEnv.AMIProvider.Get(ctx, nodeClass, &amifamily.Options{})
Expect(err).ToNot(HaveOccurred())

Expect(images).To(HaveLen(2))
// Sort everything in parallel and ensure that we don't get data races
images.Sort()
Expect(images).To(BeEquivalentTo([]amifamily.AMI{
{
Name: arm64AMI,
AmiID: "arm64-ami-id",
CreationDate: time.Time{}.Add(time.Minute).Format(time.RFC3339),
Requirements: scheduling.NewLabelRequirements(map[string]string{
v1.LabelArchStable: corev1beta1.ArchitectureArm64,
}),
},
{
Name: amd64AMI,
AmiID: "amd64-ami-id",
CreationDate: time.Time{}.Format(time.RFC3339),
Requirements: scheduling.NewLabelRequirements(map[string]string{
v1.LabelArchStable: corev1beta1.ArchitectureAmd64,
}),
},
}))
}()
}
wg.Wait()
})
Context("SSM Alias Missing", func() {
It("should succeed to partially resolve AMIs if all SSM aliases don't exist (Al2)", func() {
nodeClass.Spec.AMIFamily = &v1beta1.AMIFamilyAL2
Expand Down
4 changes: 3 additions & 1 deletion pkg/providers/instancetype/instancetype.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,9 @@ func (p *Provider) List(ctx context.Context, kc *corev1beta1.KubeletConfiguratio
systemReservedHash,
)
if item, ok := p.cache.Get(key); ok {
return item.([]*cloudprovider.InstanceType), nil
// Ensure what's returned from this function is a shallow-copy of the slice (not a deep-copy of the data itself)
// so that modifications to the ordering of the data don't affect the original
return append([]*cloudprovider.InstanceType{}, item.([]*cloudprovider.InstanceType)...), nil
}
result := lo.Map(instanceTypes, func(i *ec2.InstanceTypeInfo, _ int) *cloudprovider.InstanceType {
return NewInstanceType(ctx, i, kc, p.region, nodeClass, p.createOfferings(ctx, i, instanceTypeOfferings[aws.StringValue(i.InstanceType)], zones, subnetZones))
Expand Down
36 changes: 36 additions & 0 deletions pkg/providers/instancetype/nodeclass_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"math"
"sort"
"strings"
"sync"
"time"

"github.com/aws/aws-sdk-go/aws"
Expand Down Expand Up @@ -1493,4 +1494,39 @@ var _ = Describe("NodeClass/InstanceTypes", func() {
})
})
})
It("should not cause data races when calling List() simultaneously", func() {
mu := sync.RWMutex{}
var instanceTypeOrder []string
wg := sync.WaitGroup{}
for i := 0; i < 10000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
defer GinkgoRecover()
instanceTypes, err := awsEnv.InstanceTypesProvider.List(ctx, &corev1beta1.KubeletConfiguration{}, &v1beta1.EC2NodeClass{})
Expect(err).ToNot(HaveOccurred())

// Sort everything in parallel and ensure that we don't get data races
sort.Slice(instanceTypes, func(i, j int) bool {
return instanceTypes[i].Name < instanceTypes[j].Name
})
// Get the ordering of the instance types based on name
tempInstanceTypeOrder := lo.Map(instanceTypes, func(i *corecloudprovider.InstanceType, _ int) string {
return i.Name
})
// Expect that all the elements in the instance type list are unique
Expect(lo.Uniq(tempInstanceTypeOrder)).To(HaveLen(len(tempInstanceTypeOrder)))

// We have to lock since we are doing simultaneous access to this value
mu.Lock()
if len(instanceTypeOrder) == 0 {
instanceTypeOrder = tempInstanceTypeOrder
} else {
Expect(tempInstanceTypeOrder).To(BeEquivalentTo(instanceTypeOrder))
}
mu.Unlock()
}()
}
wg.Wait()
})
})
36 changes: 36 additions & 0 deletions pkg/providers/instancetype/nodetemplate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"math"
"sort"
"strings"
"sync"
"time"

"github.com/aws/aws-sdk-go/aws"
Expand Down Expand Up @@ -1523,4 +1524,39 @@ var _ = Describe("NodeTemplate/InstanceTypes", func() {
})
})
})
It("should not cause data races when calling List() simultaneously", func() {
mu := sync.RWMutex{}
var instanceTypeOrder []string
wg := sync.WaitGroup{}
for i := 0; i < 10000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
defer GinkgoRecover()
instanceTypes, err := awsEnv.InstanceTypesProvider.List(ctx, nodepoolutil.NewKubeletConfiguration(&v1alpha5.KubeletConfiguration{}), nodeclassutil.New(&v1alpha1.AWSNodeTemplate{}))
Expect(err).ToNot(HaveOccurred())

// Sort everything in parallel and ensure that we don't get data races
sort.Slice(instanceTypes, func(i, j int) bool {
return instanceTypes[i].Name < instanceTypes[j].Name
})
// Get the ordering of the instance types based on name
tempInstanceTypeOrder := lo.Map(instanceTypes, func(i *corecloudprovider.InstanceType, _ int) string {
return i.Name
})
// Expect that all the elements in the instance type list are unique
Expect(lo.Uniq(tempInstanceTypeOrder)).To(HaveLen(len(tempInstanceTypeOrder)))

// We have to lock since we are doing simultaneous access to this value
mu.Lock()
if len(instanceTypeOrder) == 0 {
instanceTypeOrder = tempInstanceTypeOrder
} else {
Expect(tempInstanceTypeOrder).To(BeEquivalentTo(instanceTypeOrder))
}
mu.Unlock()
}()
}
wg.Wait()
})
})
4 changes: 3 additions & 1 deletion pkg/providers/securitygroup/securitygroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,9 @@ func (p *Provider) getSecurityGroups(ctx context.Context, filterSets [][]*ec2.Fi
return nil, err
}
if sg, ok := p.cache.Get(fmt.Sprint(hash)); ok {
return sg.([]*ec2.SecurityGroup), nil
// Ensure what's returned from this function is a shallow-copy of the slice (not a deep-copy of the data itself)
// so that modifications to the ordering of the data don't affect the original
return append([]*ec2.SecurityGroup{}, sg.([]*ec2.SecurityGroup)...), nil
}
securityGroups := map[string]*ec2.SecurityGroup{}
for _, filters := range filterSets {
Expand Down
68 changes: 68 additions & 0 deletions pkg/providers/securitygroup/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package securitygroup_test

import (
"context"
"sort"
"sync"
"testing"

"github.com/aws/aws-sdk-go/aws"
Expand Down Expand Up @@ -267,6 +269,72 @@ var _ = Describe("SecurityGroupProvider", func() {
},
}, securityGroups)
})
It("should not cause data races when calling List() simultaneously", func() {
wg := sync.WaitGroup{}
for i := 0; i < 10000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
defer GinkgoRecover()
securityGroups, err := awsEnv.SecurityGroupProvider.List(ctx, nodeClass)
Expect(err).ToNot(HaveOccurred())

Expect(securityGroups).To(HaveLen(3))
// Sort everything in parallel and ensure that we don't get data races
sort.Slice(securityGroups, func(i, j int) bool {
return *securityGroups[i].GroupId < *securityGroups[j].GroupId
})
Expect(securityGroups).To(BeEquivalentTo([]*ec2.SecurityGroup{
{
GroupId: lo.ToPtr("sg-test1"),
GroupName: lo.ToPtr("securityGroup-test1"),
Tags: []*ec2.Tag{
{
Key: lo.ToPtr("Name"),
Value: lo.ToPtr("test-security-group-1"),
},
{
Key: lo.ToPtr("foo"),
Value: lo.ToPtr("bar"),
},
},
},
{
GroupId: lo.ToPtr("sg-test2"),
GroupName: lo.ToPtr("securityGroup-test2"),
Tags: []*ec2.Tag{
{
Key: lo.ToPtr("Name"),
Value: lo.ToPtr("test-security-group-2"),
},
{
Key: lo.ToPtr("foo"),
Value: lo.ToPtr("bar"),
},
},
},
{
GroupId: lo.ToPtr("sg-test3"),
GroupName: lo.ToPtr("securityGroup-test3"),
Tags: []*ec2.Tag{
{
Key: lo.ToPtr("Name"),
Value: lo.ToPtr("test-security-group-3"),
},
{
Key: lo.ToPtr("TestTag"),
},
{
Key: lo.ToPtr("foo"),
Value: lo.ToPtr("bar"),
},
},
},
}))
}()
}
wg.Wait()
})
})

func ExpectConsistsOfSecurityGroups(expected, actual []*ec2.SecurityGroup) {
Expand Down
4 changes: 3 additions & 1 deletion pkg/providers/subnet/subnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,9 @@ func (p *Provider) List(ctx context.Context, nodeClass *v1beta1.EC2NodeClass) ([
return nil, err
}
if subnets, ok := p.cache.Get(fmt.Sprint(hash)); ok {
return subnets.([]*ec2.Subnet), nil
// Ensure what's returned from this function is a shallow-copy of the slice (not a deep-copy of the data itself)
// so that modifications to the ordering of the data don't affect the original
return append([]*ec2.Subnet{}, subnets.([]*ec2.Subnet)...), nil
}

// Ensure that all the subnets that are returned here are unique
Expand Down

0 comments on commit 16bbca7

Please sign in to comment.