Skip to content

Commit

Permalink
Revert "don't pre-bind pods to nodes (#1773)"
Browse files Browse the repository at this point in the history
This reverts commit 12336f5.
  • Loading branch information
tzneal committed May 18, 2022
1 parent 12336f5 commit f8cd031
Show file tree
Hide file tree
Showing 22 changed files with 201 additions and 455 deletions.
27 changes: 12 additions & 15 deletions pkg/apis/provisioning/v1alpha5/constraints.go
Expand Up @@ -70,28 +70,25 @@ func (c *Constraints) ToNode() *v1.Node {

// both the taints and startup taints are applied to nodes we create
taints := append(c.Taints, c.StartupTaints...)
taints = append(taints, v1.Taint{
Key: v1.TaintNodeNotReady,
Effect: v1.TaintEffectNoSchedule,
})

// Taint karpenter.sh/not-ready=NoSchedule to prevent the kube scheduler
// from scheduling pods before we're able to bind them ourselves. The kube
// scheduler has an eventually consistent cache of nodes and pods, so it's
// possible for it to see a provisioned node before it sees the pods bound
// to it. This creates an edge case where other pending pods may be bound to
// the node by the kube scheduler, causing OutOfCPU errors when the
// binpacked pods race to bind to the same node. The system eventually
// heals, but causes delays from additional provisioning (thrash). This
// taint will be removed by the node controller when a node is marked ready.
taints = append(taints, v1.Taint{Key: NotReadyTaintKey, Effect: v1.TaintEffectNoSchedule})

return &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Labels: labels,
Annotations: map[string]string{
NotReadyAnnotationKey: "true",
},
Labels: labels,
Finalizers: []string{TerminationFinalizer},
},
Spec: v1.NodeSpec{
Taints: taints,
},
Status: v1.NodeStatus{Conditions: []v1.NodeCondition{
{
Type: v1.NodeReady,
Status: v1.ConditionFalse,
Reason: "KubletNotReady",
},
}},
}
}
4 changes: 0 additions & 4 deletions pkg/apis/provisioning/v1alpha5/labels.go
Expand Up @@ -34,10 +34,6 @@ var (
KarpenterLabelDomain = "karpenter.sh"
LabelCapacityType = KarpenterLabelDomain + "/capacity-type"

// AnnotationExtendedResources is used to record the expected extended resources on a node that will be created when
// device plugins have finished initializing
AnnotationExtendedResources = KarpenterLabelDomain + "/extended-resources"

// RestrictedLabelDomains are either prohibited by the kubelet or reserved by karpenter
RestrictedLabelDomains = stringsets.NewString(
"kubernetes.io",
Expand Down
2 changes: 1 addition & 1 deletion pkg/apis/provisioning/v1alpha5/register.go
Expand Up @@ -41,7 +41,7 @@ var (
return nil
})
ProvisionerNameLabelKey = Group + "/provisioner-name"
NotReadyAnnotationKey = Group + "/not-ready"
NotReadyTaintKey = Group + "/not-ready"
DoNotEvictPodAnnotationKey = Group + "/do-not-evict"
EmptinessTimestampAnnotationKey = Group + "/emptiness-timestamp"
TerminationFinalizer = Group + "/termination"
Expand Down
67 changes: 11 additions & 56 deletions pkg/apis/provisioning/v1alpha5/util.go
Expand Up @@ -15,26 +15,21 @@ limitations under the License.
package v1alpha5

import (
"context"
"encoding/json"

v1 "k8s.io/api/core/v1"
"knative.dev/pkg/logging"

"github.com/aws/karpenter/pkg/utils/resources"
)

// NodeIsReady returns true if:
// a) its current status is set to Ready
// b) all the startup taints have been removed from the node
// c) all extended resources have been registered
// This method handles both nil provisioners and nodes without extended resources gracefully.
func NodeIsReady(ctx context.Context, node *v1.Node, provisioner *Provisioner) bool {
// fast checks first
if GetCondition(node.Status.Conditions, v1.NodeReady).Status != v1.ConditionTrue {
return false
// NodeIsReady returns true if all the startup taints have been removed from the node and its current status
// is set to Ready
func NodeIsReady(node *v1.Node, provisioner *Provisioner) bool {
for _, startupTaint := range provisioner.Spec.StartupTaints {
for i := 0; i < len(node.Spec.Taints); i++ {
// if the node still has a startup taint applied, it's not ready
if startupTaint.MatchTaint(&node.Spec.Taints[i]) {
return false
}
}
}
return isStartupTaintRemoved(node, provisioner) && isExtendedResourceRegistered(ctx, node)
return GetCondition(node.Status.Conditions, v1.NodeReady).Status == v1.ConditionTrue
}

func GetCondition(conditions []v1.NodeCondition, match v1.NodeConditionType) v1.NodeCondition {
Expand All @@ -45,43 +40,3 @@ func GetCondition(conditions []v1.NodeCondition, match v1.NodeConditionType) v1.
}
return v1.NodeCondition{}
}

// isStartupTaintRemoved returns true if there are no startup taints registered for the provisioner, or if all startup
// taints have been removed from the node
func isStartupTaintRemoved(node *v1.Node, provisioner *Provisioner) bool {
if provisioner != nil {
for _, startupTaint := range provisioner.Spec.StartupTaints {
for i := 0; i < len(node.Spec.Taints); i++ {
// if the node still has a startup taint applied, it's not ready
if startupTaint.MatchTaint(&node.Spec.Taints[i]) {
return false
}
}
}
}
return true
}

// isExtendedResourceRegistered returns true if there are no extended resources on the node, or they have all been
// registered by device plugins
func isExtendedResourceRegistered(ctx context.Context, node *v1.Node) bool {
if extendedResourcesStr, ok := node.Annotations[AnnotationExtendedResources]; ok {
extendedResources := v1.ResourceList{}
if err := json.Unmarshal([]byte(extendedResourcesStr), &extendedResources); err != nil {
logging.FromContext(ctx).Errorf("unmarshalling extended resource information, %s", err)
return false
}

for resourceName, quantity := range extendedResources {
// kubelet will zero out both the capacity and allocatable for an extended resource on startup, so if our
// annotation says the resource should be there, but it's zero'd in both then the device plugin hasn't
// registered it yet
if resources.IsZero(node.Status.Capacity[resourceName]) &&
resources.IsZero(node.Status.Allocatable[resourceName]) &&
!quantity.IsZero() {
return false
}
}
}
return true
}
14 changes: 10 additions & 4 deletions pkg/cloudprovider/aws/instance.go
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/aws/aws-sdk-go/service/ec2/ec2iface"
"go.uber.org/multierr"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
"knative.dev/pkg/logging"
Expand Down Expand Up @@ -261,11 +262,16 @@ func (p *InstanceProvider) instanceToNode(ctx context.Context, instance *ec2.Ins
nodeName = aws.StringValue(instance.InstanceId)
}

// Since we no longer pre-bind, we need to ensure that all resources that were possibly considered for scheduling
// purposes are placed on the node. The extended resources will be moved to an annotation, while the
// non-extended will be left on the node.
resources := v1.ResourceList{}
for resourceName, quantity := range instanceType.Resources() {
for resourceName, quantity := range map[v1.ResourceName]resource.Quantity{
v1.ResourcePods: instanceType.Resources()[v1.ResourcePods],
v1.ResourceCPU: instanceType.Resources()[v1.ResourceCPU],
v1.ResourceMemory: instanceType.Resources()[v1.ResourceMemory],
v1.ResourceEphemeralStorage: instanceType.Resources()[v1.ResourceEphemeralStorage],
v1alpha1.ResourceNVIDIAGPU: instanceType.Resources()[v1alpha1.ResourceNVIDIAGPU],
v1alpha1.ResourceAMDGPU: instanceType.Resources()[v1alpha1.ResourceAMDGPU],
v1alpha1.ResourceAWSNeuron: instanceType.Resources()[v1alpha1.ResourceAWSNeuron],
} {
if !quantity.IsZero() {
resources[resourceName] = quantity
}
Expand Down
12 changes: 6 additions & 6 deletions pkg/cloudprovider/aws/instancetype.go
Expand Up @@ -41,9 +41,9 @@ type InstanceType struct {
overhead v1.ResourceList
}

func newInstanceType(info ec2.InstanceTypeInfo, includePodENI bool) *InstanceType {
func newInstanceType(info ec2.InstanceTypeInfo) *InstanceType {
it := &InstanceType{InstanceTypeInfo: info}
it.resources = it.computeResources(includePodENI)
it.resources = it.computeResources()
it.overhead = it.computeOverhead()
return it
}
Expand Down Expand Up @@ -73,13 +73,13 @@ func (i *InstanceType) Resources() v1.ResourceList {
return i.resources
}

func (i *InstanceType) computeResources(includePodENI bool) v1.ResourceList {
func (i *InstanceType) computeResources() v1.ResourceList {
return v1.ResourceList{
v1.ResourceCPU: i.cpu(),
v1.ResourceMemory: i.memory(),
v1.ResourceEphemeralStorage: i.ephemeralStorage(),
v1.ResourcePods: i.pods(),
v1alpha1.ResourceAWSPodENI: i.awsPodENI(includePodENI),
v1alpha1.ResourceAWSPodENI: i.awsPodENI(),
v1alpha1.ResourceNVIDIAGPU: i.nvidiaGPUs(),
v1alpha1.ResourceAMDGPU: i.amdGPUs(),
v1alpha1.ResourceAWSNeuron: i.awsNeurons(),
Expand Down Expand Up @@ -147,10 +147,10 @@ func (i *InstanceType) pods() resource.Quantity {
return *resources.Quantity(fmt.Sprint(i.eniLimitedPods()))
}

func (i *InstanceType) awsPodENI(includePodENI bool) resource.Quantity {
func (i *InstanceType) awsPodENI() resource.Quantity {
// https://docs.aws.amazon.com/eks/latest/userguide/security-groups-for-pods.html#supported-instance-types
limits, ok := vpc.Limits[aws.StringValue(i.InstanceType)]
if includePodENI && ok && limits.IsTrunkingCompatible {
if ok && limits.IsTrunkingCompatible {
return *resources.Quantity(fmt.Sprint(limits.BranchInterface))
}
return *resources.Quantity("0")
Expand Down
3 changes: 1 addition & 2 deletions pkg/cloudprovider/aws/instancetypes.go
Expand Up @@ -130,7 +130,6 @@ func (p *InstanceTypeProvider) getInstanceTypes(ctx context.Context) (map[string
return cached.(map[string]*InstanceType), nil
}
instanceTypes := map[string]*InstanceType{}
enablePodENI := injection.GetOptions(ctx).AWSEnablePodENI
if err := p.ec2api.DescribeInstanceTypesPagesWithContext(ctx, &ec2.DescribeInstanceTypesInput{
Filters: []*ec2.Filter{
{
Expand All @@ -145,7 +144,7 @@ func (p *InstanceTypeProvider) getInstanceTypes(ctx context.Context) (map[string
}, func(page *ec2.DescribeInstanceTypesOutput, lastPage bool) bool {
for _, instanceType := range page.InstanceTypes {
if p.filter(instanceType) {
instanceTypes[aws.StringValue(instanceType.InstanceType)] = newInstanceType(*instanceType, enablePodENI)
instanceTypes[aws.StringValue(instanceType.InstanceType)] = newInstanceType(*instanceType)
}
}
return true
Expand Down
53 changes: 10 additions & 43 deletions pkg/cloudprovider/aws/suite_test.go
Expand Up @@ -17,15 +17,11 @@ package aws
import (
"context"
"encoding/base64"
"encoding/json"
"fmt"
"math"
"testing"

"github.com/Pallinder/go-randomdata"
"github.com/aws/amazon-vpc-resource-controller-k8s/pkg/aws/vpc"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/aws/karpenter/pkg/apis/provisioning/v1alpha5"
"github.com/aws/karpenter/pkg/cloudprovider"
"github.com/aws/karpenter/pkg/cloudprovider/aws/amifamily"
Expand All @@ -35,20 +31,23 @@ import (
"github.com/aws/karpenter/pkg/controllers/provisioning"
"github.com/aws/karpenter/pkg/controllers/state"
"github.com/aws/karpenter/pkg/test"
. "github.com/aws/karpenter/pkg/test/expectations"
"github.com/aws/karpenter/pkg/utils/injection"
"github.com/aws/karpenter/pkg/utils/options"

"github.com/Pallinder/go-randomdata"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/ec2"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/patrickmn/go-cache"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/kubernetes"
"knative.dev/pkg/ptr"

. "github.com/aws/karpenter/pkg/test/expectations"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
. "knative.dev/pkg/logging/testing"
"knative.dev/pkg/ptr"
)

var ctx context.Context
Expand Down Expand Up @@ -79,7 +78,6 @@ var _ = BeforeSuite(func() {
ClusterEndpoint: "https://test-cluster",
AWSNodeNameConvention: string(options.IPName),
AWSENILimitedPodDensity: true,
AWSEnablePodENI: true,
AWSDefaultInstanceProfile: "test-instance-profile",
}
Expect(opts.Validate()).To(Succeed(), "Failed to validate options")
Expand Down Expand Up @@ -187,29 +185,6 @@ var _ = Describe("Allocation", func() {
ExpectScheduled(ctx, env.Client, pod)
}
})
It("should fail to launch AWS Pod ENI if the command line option enabling it isn't set", func() {
// ensure the pod ENI option is off
optsCopy := opts
optsCopy.AWSEnablePodENI = false
cancelCtx, cancelFunc := context.WithCancel(injection.WithOptions(ctx, optsCopy))
// ensure the provisioner is shut down at the end of this test
defer cancelFunc()
// clear any cached instance types
cloudProvider.(*CloudProvider).instanceTypeProvider.cache = cache.New(InstanceTypesAndZonesCacheTTL, CacheCleanupInterval)
provisionContoller := provisioning.NewController(cancelCtx, env.Client, clientSet.CoreV1(), recorder, cloudProvider, cluster)
ExpectApplied(ctx, env.Client, provisioner)
for _, pod := range ExpectProvisioned(cancelCtx, env.Client, provisionContoller,
test.UnschedulablePod(test.PodOptions{
ResourceRequirements: v1.ResourceRequirements{
Requests: v1.ResourceList{v1alpha1.ResourceAWSPodENI: resource.MustParse("1")},
Limits: v1.ResourceList{v1alpha1.ResourceAWSPodENI: resource.MustParse("1")},
},
})) {
ExpectNotScheduled(cancelCtx, env.Client, pod)
}
// and ensure no one gets our no-ENI instance types
cloudProvider.(*CloudProvider).instanceTypeProvider.cache = cache.New(InstanceTypesAndZonesCacheTTL, CacheCleanupInterval)
})
It("should launch AWS Pod ENI on a compatible instance type", func() {
ExpectApplied(ctx, env.Client, provisioner)
for _, pod := range ExpectProvisioned(ctx, env.Client, controller,
Expand Down Expand Up @@ -254,8 +229,7 @@ var _ = Describe("Allocation", func() {
})) {
node := ExpectScheduled(ctx, env.Client, pod)
Expect(node.Labels).To(HaveKeyWithValue(v1.LabelInstanceTypeStable, "p3.8xlarge"))
extendedResources := ExpectExtendedResources(node)
Expect(extendedResources).To(HaveKeyWithValue(v1alpha1.ResourceNVIDIAGPU, resource.MustParse("4")))
Expect(node.Status.Capacity).To(HaveKeyWithValue(v1alpha1.ResourceNVIDIAGPU, resource.MustParse("4")))
nodeNames.Insert(node.Name)
}
Expect(nodeNames.Len()).To(Equal(2))
Expand Down Expand Up @@ -287,8 +261,7 @@ var _ = Describe("Allocation", func() {
) {
node := ExpectScheduled(ctx, env.Client, pod)
Expect(node.Labels).To(HaveKeyWithValue(v1.LabelInstanceTypeStable, "inf1.6xlarge"))
extendedResources := ExpectExtendedResources(node)
Expect(extendedResources).To(HaveKeyWithValue(v1alpha1.ResourceAWSNeuron, resource.MustParse("4")))
Expect(node.Status.Capacity).To(HaveKeyWithValue(v1alpha1.ResourceAWSNeuron, resource.MustParse("4")))
nodeNames.Insert(node.Name)
}
Expect(nodeNames.Len()).To(Equal(2))
Expand Down Expand Up @@ -1236,12 +1209,6 @@ var _ = Describe("Allocation", func() {
})
})

func ExpectExtendedResources(node *v1.Node) v1.ResourceList {
extended := v1.ResourceList{}
Expect(json.Unmarshal([]byte(node.Annotations[v1alpha5.AnnotationExtendedResources]), &extended)).To(Succeed())
return extended
}

// ExpectTags verifies that the expected tags are a subset of the tags found
func ExpectTags(tags []*ec2.Tag, expected map[string]string) {
existingTags := map[string]string{}
Expand Down
16 changes: 7 additions & 9 deletions pkg/cloudprovider/fake/cloudprovider.go
Expand Up @@ -67,7 +67,7 @@ func (c *CloudProvider) Create(ctx context.Context, nodeRequest *cloudprovider.N
break
}
}
n := &v1.Node{
return &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Labels: map[string]string{
Expand All @@ -84,15 +84,13 @@ func (c *CloudProvider) Create(ctx context.Context, nodeRequest *cloudprovider.N
Architecture: instance.Architecture(),
OperatingSystem: v1alpha5.OperatingSystemLinux,
},
Allocatable: v1.ResourceList{},
Capacity: v1.ResourceList{},
Allocatable: v1.ResourceList{
v1.ResourcePods: instance.Resources()[v1.ResourcePods],
v1.ResourceCPU: instance.Resources()[v1.ResourceCPU],
v1.ResourceMemory: instance.Resources()[v1.ResourceMemory],
},
},
}
for k, v := range instance.Resources() {
n.Status.Capacity[k] = v
n.Status.Allocatable[k] = v
}
return n, nil
}, nil
}

func (c *CloudProvider) GetInstanceTypes(_ context.Context) ([]cloudprovider.InstanceType, error) {
Expand Down

0 comments on commit f8cd031

Please sign in to comment.