Skip to content

Commit

Permalink
Implemented pprof (#1908)
Browse files Browse the repository at this point in the history
* Implemented pprof

* Compress instance types

* Control profiling via flag

* Fixed hypervisor test
  • Loading branch information
ellistarn committed Jun 20, 2022
1 parent aa9777b commit de03b6f
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 35 deletions.
42 changes: 33 additions & 9 deletions cmd/controller/main.go
Expand Up @@ -17,15 +17,8 @@ package main
import (
"context"
"fmt"

"knative.dev/pkg/system"

"github.com/aws/karpenter/pkg/config"

"github.com/aws/karpenter/pkg/events"

"github.com/aws/karpenter/pkg/controllers/state"
"github.com/aws/karpenter/pkg/utils/project"
"net/http"
"net/http/pprof"

"github.com/go-logr/zapr"
"k8s.io/apimachinery/pkg/runtime"
Expand All @@ -40,12 +33,14 @@ import (
"knative.dev/pkg/injection/sharedmain"
"knative.dev/pkg/logging"
"knative.dev/pkg/signals"
"knative.dev/pkg/system"
controllerruntime "sigs.k8s.io/controller-runtime"

"github.com/aws/karpenter/pkg/apis"
"github.com/aws/karpenter/pkg/cloudprovider"
cloudprovidermetrics "github.com/aws/karpenter/pkg/cloudprovider/metrics"
"github.com/aws/karpenter/pkg/cloudprovider/registry"
"github.com/aws/karpenter/pkg/config"
"github.com/aws/karpenter/pkg/controllers"
"github.com/aws/karpenter/pkg/controllers/counter"
metricsnode "github.com/aws/karpenter/pkg/controllers/metrics/node"
Expand All @@ -54,9 +49,12 @@ import (
"github.com/aws/karpenter/pkg/controllers/node"
"github.com/aws/karpenter/pkg/controllers/persistentvolumeclaim"
"github.com/aws/karpenter/pkg/controllers/provisioning"
"github.com/aws/karpenter/pkg/controllers/state"
"github.com/aws/karpenter/pkg/controllers/termination"
"github.com/aws/karpenter/pkg/events"
"github.com/aws/karpenter/pkg/utils/injection"
"github.com/aws/karpenter/pkg/utils/options"
"github.com/aws/karpenter/pkg/utils/project"
)

var (
Expand Down Expand Up @@ -94,6 +92,11 @@ func main() {
MetricsBindAddress: fmt.Sprintf(":%d", opts.MetricsPort),
HealthProbeBindAddress: fmt.Sprintf(":%d", opts.HealthProbePort),
})

if opts.EnableProfiling {
utilruntime.Must(registerPprof(manager))
}

cloudProvider := registry.NewCloudProvider(ctx, cloudprovider.Options{ClientSet: clientSet, KubeClient: manager.GetClient()})
cloudProvider = cloudprovidermetrics.Decorate(cloudProvider)

Expand Down Expand Up @@ -125,6 +128,27 @@ func main() {
}
}

func registerPprof(manager controllers.Manager) error {
for path, handler := range map[string]http.Handler{
"/debug/pprof/": http.HandlerFunc(pprof.Index),
"/debug/pprof/cmdline": http.HandlerFunc(pprof.Cmdline),
"/debug/pprof/profile": http.HandlerFunc(pprof.Profile),
"/debug/pprof/symbol": http.HandlerFunc(pprof.Symbol),
"/debug/pprof/trace": http.HandlerFunc(pprof.Trace),
"/debug/pprof/allocs": pprof.Handler("allocs"),
"/debug/pprof/heap": pprof.Handler("heap"),
"/debug/pprof/block": pprof.Handler("block"),
"/debug/pprof/goroutine": pprof.Handler("goroutine"),
"/debug/pprof/threadcreate": pprof.Handler("threadcreate"),
} {
err := manager.AddMetricsExtraHandler(path, handler)
if err != nil {
return err
}
}
return nil
}

// LoggingContextOrDie injects a logger into the returned context. The logger is
// configured by the ConfigMap `config-logging` and live updates the level.
func LoggingContextOrDie(config *rest.Config, cmw *informer.InformedWatcher) context.Context {
Expand Down
18 changes: 18 additions & 0 deletions pkg/cloudprovider/aws/instancetype.go
Expand Up @@ -15,6 +15,7 @@ limitations under the License.
package aws

import (
"context"
"fmt"
"math"
"strings"
Expand All @@ -32,6 +33,7 @@ import (
"github.com/aws/karpenter/pkg/cloudprovider/aws/amifamily"
"github.com/aws/karpenter/pkg/cloudprovider/aws/apis/v1alpha1"
"github.com/aws/karpenter/pkg/scheduling"
"github.com/aws/karpenter/pkg/utils/injection"
"github.com/aws/karpenter/pkg/utils/resources"
"github.com/aws/karpenter/pkg/utils/sets"
)
Expand All @@ -46,6 +48,22 @@ type InstanceType struct {
maxPods *int32
}

func NewInstanceType(ctx context.Context, info *ec2.InstanceTypeInfo, provider *v1alpha1.AWS, offerings []cloudprovider.Offering) *InstanceType {
instanceType := &InstanceType{
InstanceTypeInfo: info,
provider: provider,
offerings: offerings,
}
// Precompute to minimize memory/compute overhead
instanceType.resources = instanceType.computeResources(injection.GetOptions(ctx).AWSEnablePodENI)
instanceType.overhead = instanceType.computeOverhead(injection.GetOptions(ctx).VMMemoryOverhead)
instanceType.requirements = instanceType.computeRequirements()
if !injection.GetOptions(ctx).AWSENILimitedPodDensity {
instanceType.maxPods = ptr.Int32(110)
}
return instanceType
}

func (i *InstanceType) Name() string {
return aws.StringValue(i.InstanceType)
}
Expand Down
40 changes: 20 additions & 20 deletions pkg/cloudprovider/aws/instancetypes.go
Expand Up @@ -27,12 +27,10 @@ import (
"github.com/samber/lo"
"k8s.io/apimachinery/pkg/util/sets"
"knative.dev/pkg/logging"
"knative.dev/pkg/ptr"

"github.com/aws/karpenter/pkg/cloudprovider"
"github.com/aws/karpenter/pkg/cloudprovider/aws/apis/v1alpha1"
"github.com/aws/karpenter/pkg/utils/functional"
"github.com/aws/karpenter/pkg/utils/injection"
)

const (
Expand Down Expand Up @@ -78,27 +76,11 @@ func (p *InstanceTypeProvider) Get(ctx context.Context, provider *v1alpha1.AWS)
}
var result []cloudprovider.InstanceType
for _, i := range instanceTypes {
result = append(result, p.newInstanceType(ctx, i, provider, p.createOfferings(i, instanceTypeZones[aws.StringValue(i.InstanceType)])))
result = append(result, NewInstanceType(ctx, i, provider, p.createOfferings(i, instanceTypeZones[aws.StringValue(i.InstanceType)])))
}
return result, nil
}

func (p *InstanceTypeProvider) newInstanceType(ctx context.Context, info *ec2.InstanceTypeInfo, provider *v1alpha1.AWS, offerings []cloudprovider.Offering) *InstanceType {
instanceType := &InstanceType{
InstanceTypeInfo: info,
provider: provider,
offerings: offerings,
}
// Precompute to minimize memory/compute overhead
instanceType.resources = instanceType.computeResources(injection.GetOptions(ctx).AWSEnablePodENI)
instanceType.overhead = instanceType.computeOverhead(injection.GetOptions(ctx).VMMemoryOverhead)
instanceType.requirements = instanceType.computeRequirements()
if !injection.GetOptions(ctx).AWSENILimitedPodDensity {
instanceType.maxPods = ptr.Int32(110)
}
return instanceType
}

func (p *InstanceTypeProvider) createOfferings(instanceType *ec2.InstanceTypeInfo, zones sets.String) []cloudprovider.Offering {
offerings := []cloudprovider.Offering{}
for zone := range zones {
Expand Down Expand Up @@ -168,7 +150,7 @@ func (p *InstanceTypeProvider) getInstanceTypes(ctx context.Context, provider *v
}, func(page *ec2.DescribeInstanceTypesOutput, lastPage bool) bool {
for _, instanceType := range page.InstanceTypes {
if p.filter(instanceType) {
instanceTypes[aws.StringValue(instanceType.InstanceType)] = instanceType
instanceTypes[aws.StringValue(instanceType.InstanceType)] = compressInstanceType(instanceType)
}
}
return true
Expand Down Expand Up @@ -210,6 +192,24 @@ func (p *InstanceTypeProvider) CacheUnavailable(ctx context.Context, fleetErr *e
p.unavailableOfferings.SetDefault(UnavailableOfferingsCacheKey(instanceType, zone, capacityType), struct{}{})
}

func compressInstanceType(instanceType *ec2.InstanceTypeInfo) *ec2.InstanceTypeInfo {
return &ec2.InstanceTypeInfo{
InstanceType: instanceType.InstanceType,
Hypervisor: instanceType.Hypervisor,
SupportedUsageClasses: instanceType.SupportedUsageClasses,
VCpuInfo: &ec2.VCpuInfo{DefaultVCpus: instanceType.VCpuInfo.DefaultVCpus},
GpuInfo: instanceType.GpuInfo,
InferenceAcceleratorInfo: instanceType.InferenceAcceleratorInfo,
InstanceStorageInfo: instanceType.InstanceStorageInfo,
MemoryInfo: &ec2.MemoryInfo{SizeInMiB: instanceType.MemoryInfo.SizeInMiB},
ProcessorInfo: &ec2.ProcessorInfo{SupportedArchitectures: instanceType.ProcessorInfo.SupportedArchitectures},
NetworkInfo: &ec2.NetworkInfo{
Ipv4AddressesPerInterface: instanceType.NetworkInfo.Ipv4AddressesPerInterface,
MaximumNetworkInterfaces: instanceType.NetworkInfo.MaximumNetworkInterfaces,
},
}
}

func UnavailableOfferingsCacheKey(instanceType string, zone string, capacityType string) string {
return fmt.Sprintf("%s:%s:%s", capacityType, instanceType, zone)
}
21 changes: 15 additions & 6 deletions pkg/utils/options/options.go
Expand Up @@ -36,12 +36,15 @@ const (
// Options for running this binary
type Options struct {
*flag.FlagSet
// Vendor Neutral
MetricsPort int
HealthProbePort int
KubeClientQPS int
KubeClientBurst int
EnableProfiling bool
// AWS Specific
ClusterName string
ClusterEndpoint string
MetricsPort int
HealthProbePort int
KubeClientQPS int
KubeClientBurst int
VMMemoryOverhead float64
AWSNodeNameConvention string
AWSENILimitedPodDensity bool
Expand All @@ -54,17 +57,23 @@ func New() Options {
opts := Options{}
f := flag.NewFlagSet("karpenter", flag.ContinueOnError)
opts.FlagSet = f
f.StringVar(&opts.ClusterName, "cluster-name", env.WithDefaultString("CLUSTER_NAME", ""), "The kubernetes cluster name for resource discovery")
f.StringVar(&opts.ClusterEndpoint, "cluster-endpoint", env.WithDefaultString("CLUSTER_ENDPOINT", ""), "The external kubernetes cluster endpoint for new nodes to connect with")

// Vendor Neutral
f.IntVar(&opts.MetricsPort, "metrics-port", env.WithDefaultInt("METRICS_PORT", 8080), "The port the metric endpoint binds to for operating metrics about the controller itself")
f.IntVar(&opts.HealthProbePort, "health-probe-port", env.WithDefaultInt("HEALTH_PROBE_PORT", 8081), "The port the health probe endpoint binds to for reporting controller health")
f.IntVar(&opts.KubeClientQPS, "kube-client-qps", env.WithDefaultInt("KUBE_CLIENT_QPS", 200), "The smoothed rate of qps to kube-apiserver")
f.IntVar(&opts.KubeClientBurst, "kube-client-burst", env.WithDefaultInt("KUBE_CLIENT_BURST", 300), "The maximum allowed burst of queries to the kube-apiserver")
f.BoolVar(&opts.EnableProfiling, "enable-profiling", env.WithDefaultBool("ENABLE_PROFILING", false), "Enable the profiling on the metric endpoint")

// AWS Specific
f.StringVar(&opts.ClusterName, "cluster-name", env.WithDefaultString("CLUSTER_NAME", ""), "The kubernetes cluster name for resource discovery")
f.StringVar(&opts.ClusterEndpoint, "cluster-endpoint", env.WithDefaultString("CLUSTER_ENDPOINT", ""), "The external kubernetes cluster endpoint for new nodes to connect with")
f.Float64Var(&opts.VMMemoryOverhead, "vm-memory-overhead", env.WithDefaultFloat64("VM_MEMORY_OVERHEAD", 0.075), "The VM memory overhead as a percent that will be subtracted from the total memory for all instance types")
f.StringVar(&opts.AWSNodeNameConvention, "aws-node-name-convention", env.WithDefaultString("AWS_NODE_NAME_CONVENTION", string(IPName)), "The node naming convention used by the AWS cloud provider. DEPRECATION WARNING: this field may be deprecated at any time")
f.BoolVar(&opts.AWSENILimitedPodDensity, "aws-eni-limited-pod-density", env.WithDefaultBool("AWS_ENI_LIMITED_POD_DENSITY", true), "Indicates whether new nodes should use ENI-based pod density")
f.StringVar(&opts.AWSDefaultInstanceProfile, "aws-default-instance-profile", env.WithDefaultString("AWS_DEFAULT_INSTANCE_PROFILE", ""), "The default instance profile to use when provisioning nodes in AWS")
f.BoolVar(&opts.AWSEnablePodENI, "aws-enable-pod-eni", env.WithDefaultBool("AWS_ENABLE_POD_ENI", false), "If true then instances that support pod ENI will report a vpc.amazonaws.com/pod-eni resource")

return opts
}

Expand Down
20 changes: 20 additions & 0 deletions website/content/en/preview/development-guide.md
Expand Up @@ -114,3 +114,23 @@ Once you have your ECR repository provisioned, configure your Docker daemon to a
export KO_DOCKER_REPO="${AWS_ACCOUNT_ID}.dkr.ecr.${AWS_DEFAULT_REGION}.amazonaws.com/karpenter"
aws ecr get-login-password --region "${AWS_DEFAULT_REGION}" | docker login --username AWS --password-stdin "${KO_DOCKER_REPO}"
```

## Profiling memory
Karpenter exposes a pprof endpoint on its metrics port.

Learn about profiling with pprof: https://jvns.ca/blog/2017/09/24/profiling-go-with-pprof/

### Prerequisites
```
brew install graphviz
go install github.com/google/pprof@latest
```

### Get a profile
```
# Connect to the metrics endpoint
kubectl port-forward service/karpenter -n karpenter 8080
open http://localhost:8080/debug/pprof/
# Visualize the memory
go tool pprof -http 0.0.0.0:9000 localhost:8080/debug/pprof/heap
```

0 comments on commit de03b6f

Please sign in to comment.