Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "don't pre-bind pods to nodes" #1826

Merged
merged 1 commit into from May 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
27 changes: 12 additions & 15 deletions pkg/apis/provisioning/v1alpha5/constraints.go
Expand Up @@ -70,28 +70,25 @@ func (c *Constraints) ToNode() *v1.Node {

// both the taints and startup taints are applied to nodes we create
taints := append(c.Taints, c.StartupTaints...)
taints = append(taints, v1.Taint{
Key: v1.TaintNodeNotReady,
Effect: v1.TaintEffectNoSchedule,
})

// Taint karpenter.sh/not-ready=NoSchedule to prevent the kube scheduler
// from scheduling pods before we're able to bind them ourselves. The kube
// scheduler has an eventually consistent cache of nodes and pods, so it's
// possible for it to see a provisioned node before it sees the pods bound
// to it. This creates an edge case where other pending pods may be bound to
// the node by the kube scheduler, causing OutOfCPU errors when the
// binpacked pods race to bind to the same node. The system eventually
// heals, but causes delays from additional provisioning (thrash). This
// taint will be removed by the node controller when a node is marked ready.
taints = append(taints, v1.Taint{Key: NotReadyTaintKey, Effect: v1.TaintEffectNoSchedule})

return &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Labels: labels,
Annotations: map[string]string{
NotReadyAnnotationKey: "true",
},
Labels: labels,
Finalizers: []string{TerminationFinalizer},
},
Spec: v1.NodeSpec{
Taints: taints,
},
Status: v1.NodeStatus{Conditions: []v1.NodeCondition{
{
Type: v1.NodeReady,
Status: v1.ConditionFalse,
Reason: "KubletNotReady",
},
}},
}
}
4 changes: 0 additions & 4 deletions pkg/apis/provisioning/v1alpha5/labels.go
Expand Up @@ -34,10 +34,6 @@ var (
KarpenterLabelDomain = "karpenter.sh"
LabelCapacityType = KarpenterLabelDomain + "/capacity-type"

// AnnotationExtendedResources is used to record the expected extended resources on a node that will be created when
// device plugins have finished initializing
AnnotationExtendedResources = KarpenterLabelDomain + "/extended-resources"

// RestrictedLabelDomains are either prohibited by the kubelet or reserved by karpenter
RestrictedLabelDomains = stringsets.NewString(
"kubernetes.io",
Expand Down
2 changes: 1 addition & 1 deletion pkg/apis/provisioning/v1alpha5/register.go
Expand Up @@ -41,7 +41,7 @@ var (
return nil
})
ProvisionerNameLabelKey = Group + "/provisioner-name"
NotReadyAnnotationKey = Group + "/not-ready"
NotReadyTaintKey = Group + "/not-ready"
DoNotEvictPodAnnotationKey = Group + "/do-not-evict"
EmptinessTimestampAnnotationKey = Group + "/emptiness-timestamp"
TerminationFinalizer = Group + "/termination"
Expand Down
67 changes: 11 additions & 56 deletions pkg/apis/provisioning/v1alpha5/util.go
Expand Up @@ -15,26 +15,21 @@ limitations under the License.
package v1alpha5

import (
"context"
"encoding/json"

v1 "k8s.io/api/core/v1"
"knative.dev/pkg/logging"

"github.com/aws/karpenter/pkg/utils/resources"
)

// NodeIsReady returns true if:
// a) its current status is set to Ready
// b) all the startup taints have been removed from the node
// c) all extended resources have been registered
// This method handles both nil provisioners and nodes without extended resources gracefully.
func NodeIsReady(ctx context.Context, node *v1.Node, provisioner *Provisioner) bool {
// fast checks first
if GetCondition(node.Status.Conditions, v1.NodeReady).Status != v1.ConditionTrue {
return false
// NodeIsReady returns true if all the startup taints have been removed from the node and its current status
// is set to Ready
func NodeIsReady(node *v1.Node, provisioner *Provisioner) bool {
for _, startupTaint := range provisioner.Spec.StartupTaints {
for i := 0; i < len(node.Spec.Taints); i++ {
// if the node still has a startup taint applied, it's not ready
if startupTaint.MatchTaint(&node.Spec.Taints[i]) {
return false
}
}
}
return isStartupTaintRemoved(node, provisioner) && isExtendedResourceRegistered(ctx, node)
return GetCondition(node.Status.Conditions, v1.NodeReady).Status == v1.ConditionTrue
}

func GetCondition(conditions []v1.NodeCondition, match v1.NodeConditionType) v1.NodeCondition {
Expand All @@ -45,43 +40,3 @@ func GetCondition(conditions []v1.NodeCondition, match v1.NodeConditionType) v1.
}
return v1.NodeCondition{}
}

// isStartupTaintRemoved returns true if there are no startup taints registered for the provisioner, or if all startup
// taints have been removed from the node
func isStartupTaintRemoved(node *v1.Node, provisioner *Provisioner) bool {
if provisioner != nil {
for _, startupTaint := range provisioner.Spec.StartupTaints {
for i := 0; i < len(node.Spec.Taints); i++ {
// if the node still has a startup taint applied, it's not ready
if startupTaint.MatchTaint(&node.Spec.Taints[i]) {
return false
}
}
}
}
return true
}

// isExtendedResourceRegistered returns true if there are no extended resources on the node, or they have all been
// registered by device plugins
func isExtendedResourceRegistered(ctx context.Context, node *v1.Node) bool {
if extendedResourcesStr, ok := node.Annotations[AnnotationExtendedResources]; ok {
extendedResources := v1.ResourceList{}
if err := json.Unmarshal([]byte(extendedResourcesStr), &extendedResources); err != nil {
logging.FromContext(ctx).Errorf("unmarshalling extended resource information, %s", err)
return false
}

for resourceName, quantity := range extendedResources {
// kubelet will zero out both the capacity and allocatable for an extended resource on startup, so if our
// annotation says the resource should be there, but it's zero'd in both then the device plugin hasn't
// registered it yet
if resources.IsZero(node.Status.Capacity[resourceName]) &&
resources.IsZero(node.Status.Allocatable[resourceName]) &&
!quantity.IsZero() {
return false
}
}
}
return true
}
14 changes: 10 additions & 4 deletions pkg/cloudprovider/aws/instance.go
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/aws/aws-sdk-go/service/ec2/ec2iface"
"go.uber.org/multierr"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
"knative.dev/pkg/logging"
Expand Down Expand Up @@ -261,11 +262,16 @@ func (p *InstanceProvider) instanceToNode(ctx context.Context, instance *ec2.Ins
nodeName = aws.StringValue(instance.InstanceId)
}

// Since we no longer pre-bind, we need to ensure that all resources that were possibly considered for scheduling
// purposes are placed on the node. The extended resources will be moved to an annotation, while the
// non-extended will be left on the node.
resources := v1.ResourceList{}
for resourceName, quantity := range instanceType.Resources() {
for resourceName, quantity := range map[v1.ResourceName]resource.Quantity{
v1.ResourcePods: instanceType.Resources()[v1.ResourcePods],
v1.ResourceCPU: instanceType.Resources()[v1.ResourceCPU],
v1.ResourceMemory: instanceType.Resources()[v1.ResourceMemory],
v1.ResourceEphemeralStorage: instanceType.Resources()[v1.ResourceEphemeralStorage],
v1alpha1.ResourceNVIDIAGPU: instanceType.Resources()[v1alpha1.ResourceNVIDIAGPU],
v1alpha1.ResourceAMDGPU: instanceType.Resources()[v1alpha1.ResourceAMDGPU],
v1alpha1.ResourceAWSNeuron: instanceType.Resources()[v1alpha1.ResourceAWSNeuron],
} {
if !quantity.IsZero() {
resources[resourceName] = quantity
}
Expand Down
12 changes: 6 additions & 6 deletions pkg/cloudprovider/aws/instancetype.go
Expand Up @@ -41,9 +41,9 @@ type InstanceType struct {
overhead v1.ResourceList
}

func newInstanceType(info ec2.InstanceTypeInfo, includePodENI bool) *InstanceType {
func newInstanceType(info ec2.InstanceTypeInfo) *InstanceType {
it := &InstanceType{InstanceTypeInfo: info}
it.resources = it.computeResources(includePodENI)
it.resources = it.computeResources()
it.overhead = it.computeOverhead()
return it
}
Expand Down Expand Up @@ -73,13 +73,13 @@ func (i *InstanceType) Resources() v1.ResourceList {
return i.resources
}

func (i *InstanceType) computeResources(includePodENI bool) v1.ResourceList {
func (i *InstanceType) computeResources() v1.ResourceList {
return v1.ResourceList{
v1.ResourceCPU: i.cpu(),
v1.ResourceMemory: i.memory(),
v1.ResourceEphemeralStorage: i.ephemeralStorage(),
v1.ResourcePods: i.pods(),
v1alpha1.ResourceAWSPodENI: i.awsPodENI(includePodENI),
v1alpha1.ResourceAWSPodENI: i.awsPodENI(),
v1alpha1.ResourceNVIDIAGPU: i.nvidiaGPUs(),
v1alpha1.ResourceAMDGPU: i.amdGPUs(),
v1alpha1.ResourceAWSNeuron: i.awsNeurons(),
Expand Down Expand Up @@ -147,10 +147,10 @@ func (i *InstanceType) pods() resource.Quantity {
return *resources.Quantity(fmt.Sprint(i.eniLimitedPods()))
}

func (i *InstanceType) awsPodENI(includePodENI bool) resource.Quantity {
func (i *InstanceType) awsPodENI() resource.Quantity {
// https://docs.aws.amazon.com/eks/latest/userguide/security-groups-for-pods.html#supported-instance-types
limits, ok := vpc.Limits[aws.StringValue(i.InstanceType)]
if includePodENI && ok && limits.IsTrunkingCompatible {
if ok && limits.IsTrunkingCompatible {
return *resources.Quantity(fmt.Sprint(limits.BranchInterface))
}
return *resources.Quantity("0")
Expand Down
3 changes: 1 addition & 2 deletions pkg/cloudprovider/aws/instancetypes.go
Expand Up @@ -130,7 +130,6 @@ func (p *InstanceTypeProvider) getInstanceTypes(ctx context.Context) (map[string
return cached.(map[string]*InstanceType), nil
}
instanceTypes := map[string]*InstanceType{}
enablePodENI := injection.GetOptions(ctx).AWSEnablePodENI
if err := p.ec2api.DescribeInstanceTypesPagesWithContext(ctx, &ec2.DescribeInstanceTypesInput{
Filters: []*ec2.Filter{
{
Expand All @@ -145,7 +144,7 @@ func (p *InstanceTypeProvider) getInstanceTypes(ctx context.Context) (map[string
}, func(page *ec2.DescribeInstanceTypesOutput, lastPage bool) bool {
for _, instanceType := range page.InstanceTypes {
if p.filter(instanceType) {
instanceTypes[aws.StringValue(instanceType.InstanceType)] = newInstanceType(*instanceType, enablePodENI)
instanceTypes[aws.StringValue(instanceType.InstanceType)] = newInstanceType(*instanceType)
}
}
return true
Expand Down
53 changes: 10 additions & 43 deletions pkg/cloudprovider/aws/suite_test.go
Expand Up @@ -17,15 +17,11 @@ package aws
import (
"context"
"encoding/base64"
"encoding/json"
"fmt"
"math"
"testing"

"github.com/Pallinder/go-randomdata"
"github.com/aws/amazon-vpc-resource-controller-k8s/pkg/aws/vpc"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/aws/karpenter/pkg/apis/provisioning/v1alpha5"
"github.com/aws/karpenter/pkg/cloudprovider"
"github.com/aws/karpenter/pkg/cloudprovider/aws/amifamily"
Expand All @@ -35,20 +31,23 @@ import (
"github.com/aws/karpenter/pkg/controllers/provisioning"
"github.com/aws/karpenter/pkg/controllers/state"
"github.com/aws/karpenter/pkg/test"
. "github.com/aws/karpenter/pkg/test/expectations"
"github.com/aws/karpenter/pkg/utils/injection"
"github.com/aws/karpenter/pkg/utils/options"

"github.com/Pallinder/go-randomdata"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/ec2"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/patrickmn/go-cache"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/kubernetes"
"knative.dev/pkg/ptr"

. "github.com/aws/karpenter/pkg/test/expectations"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
. "knative.dev/pkg/logging/testing"
"knative.dev/pkg/ptr"
)

var ctx context.Context
Expand Down Expand Up @@ -79,7 +78,6 @@ var _ = BeforeSuite(func() {
ClusterEndpoint: "https://test-cluster",
AWSNodeNameConvention: string(options.IPName),
AWSENILimitedPodDensity: true,
AWSEnablePodENI: true,
AWSDefaultInstanceProfile: "test-instance-profile",
}
Expect(opts.Validate()).To(Succeed(), "Failed to validate options")
Expand Down Expand Up @@ -187,29 +185,6 @@ var _ = Describe("Allocation", func() {
ExpectScheduled(ctx, env.Client, pod)
}
})
It("should fail to launch AWS Pod ENI if the command line option enabling it isn't set", func() {
// ensure the pod ENI option is off
optsCopy := opts
optsCopy.AWSEnablePodENI = false
cancelCtx, cancelFunc := context.WithCancel(injection.WithOptions(ctx, optsCopy))
// ensure the provisioner is shut down at the end of this test
defer cancelFunc()
// clear any cached instance types
cloudProvider.(*CloudProvider).instanceTypeProvider.cache = cache.New(InstanceTypesAndZonesCacheTTL, CacheCleanupInterval)
provisionContoller := provisioning.NewController(cancelCtx, env.Client, clientSet.CoreV1(), recorder, cloudProvider, cluster)
ExpectApplied(ctx, env.Client, provisioner)
for _, pod := range ExpectProvisioned(cancelCtx, env.Client, provisionContoller,
test.UnschedulablePod(test.PodOptions{
ResourceRequirements: v1.ResourceRequirements{
Requests: v1.ResourceList{v1alpha1.ResourceAWSPodENI: resource.MustParse("1")},
Limits: v1.ResourceList{v1alpha1.ResourceAWSPodENI: resource.MustParse("1")},
},
})) {
ExpectNotScheduled(cancelCtx, env.Client, pod)
}
// and ensure no one gets our no-ENI instance types
cloudProvider.(*CloudProvider).instanceTypeProvider.cache = cache.New(InstanceTypesAndZonesCacheTTL, CacheCleanupInterval)
})
It("should launch AWS Pod ENI on a compatible instance type", func() {
ExpectApplied(ctx, env.Client, provisioner)
for _, pod := range ExpectProvisioned(ctx, env.Client, controller,
Expand Down Expand Up @@ -254,8 +229,7 @@ var _ = Describe("Allocation", func() {
})) {
node := ExpectScheduled(ctx, env.Client, pod)
Expect(node.Labels).To(HaveKeyWithValue(v1.LabelInstanceTypeStable, "p3.8xlarge"))
extendedResources := ExpectExtendedResources(node)
Expect(extendedResources).To(HaveKeyWithValue(v1alpha1.ResourceNVIDIAGPU, resource.MustParse("4")))
Expect(node.Status.Capacity).To(HaveKeyWithValue(v1alpha1.ResourceNVIDIAGPU, resource.MustParse("4")))
nodeNames.Insert(node.Name)
}
Expect(nodeNames.Len()).To(Equal(2))
Expand Down Expand Up @@ -287,8 +261,7 @@ var _ = Describe("Allocation", func() {
) {
node := ExpectScheduled(ctx, env.Client, pod)
Expect(node.Labels).To(HaveKeyWithValue(v1.LabelInstanceTypeStable, "inf1.6xlarge"))
extendedResources := ExpectExtendedResources(node)
Expect(extendedResources).To(HaveKeyWithValue(v1alpha1.ResourceAWSNeuron, resource.MustParse("4")))
Expect(node.Status.Capacity).To(HaveKeyWithValue(v1alpha1.ResourceAWSNeuron, resource.MustParse("4")))
nodeNames.Insert(node.Name)
}
Expect(nodeNames.Len()).To(Equal(2))
Expand Down Expand Up @@ -1236,12 +1209,6 @@ var _ = Describe("Allocation", func() {
})
})

func ExpectExtendedResources(node *v1.Node) v1.ResourceList {
extended := v1.ResourceList{}
Expect(json.Unmarshal([]byte(node.Annotations[v1alpha5.AnnotationExtendedResources]), &extended)).To(Succeed())
return extended
}

// ExpectTags verifies that the expected tags are a subset of the tags found
func ExpectTags(tags []*ec2.Tag, expected map[string]string) {
existingTags := map[string]string{}
Expand Down
16 changes: 7 additions & 9 deletions pkg/cloudprovider/fake/cloudprovider.go
Expand Up @@ -67,7 +67,7 @@ func (c *CloudProvider) Create(ctx context.Context, nodeRequest *cloudprovider.N
break
}
}
n := &v1.Node{
return &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Labels: map[string]string{
Expand All @@ -84,15 +84,13 @@ func (c *CloudProvider) Create(ctx context.Context, nodeRequest *cloudprovider.N
Architecture: instance.Architecture(),
OperatingSystem: v1alpha5.OperatingSystemLinux,
},
Allocatable: v1.ResourceList{},
Capacity: v1.ResourceList{},
Allocatable: v1.ResourceList{
v1.ResourcePods: instance.Resources()[v1.ResourcePods],
v1.ResourceCPU: instance.Resources()[v1.ResourceCPU],
v1.ResourceMemory: instance.Resources()[v1.ResourceMemory],
},
},
}
for k, v := range instance.Resources() {
n.Status.Capacity[k] = v
n.Status.Allocatable[k] = v
}
return n, nil
}, nil
}

func (c *CloudProvider) GetInstanceTypes(_ context.Context) ([]cloudprovider.InstanceType, error) {
Expand Down