Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use AWS SDK for OnDemand Nodes #5

Merged
merged 13 commits into from
Jan 23, 2019
3,093 changes: 1,551 additions & 1,542 deletions cluster-autoscaler/Godeps/Godeps.json

Large diffs are not rendered by default.

23 changes: 23 additions & 0 deletions cluster-autoscaler/cloudprovider/aws/api/UPDATE_TEST_DATA.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
Update AWS test data
=====

Install AWS CLI Tools

Update

```bash
cd ${GOPATH}/src/k8s.io/autoscaler

# update on demand data
aws pricing get-products \
--region=us-east-1 \
--service-code=AmazonEC2 \
--filter Type=TERM_MATCH,Field=capacitystatus,Value=Used \
Type=TERM_MATCH,Field=preInstalledSw,Value=NA \
Type=TERM_MATCH,Field=location,Value="EU (Ireland)" \
Type=TERM_MATCH,Field=instanceType,Value="m4.xlarge" \
> ./cluster-autoscaler/cloudprovider/aws/api/pricing_ondemand_eu-west-1.json

# update spot data

```
214 changes: 153 additions & 61 deletions cluster-autoscaler/cloudprovider/aws/api/instance_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,18 @@ package api
import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"regexp"
"strconv"
"strings"
"sync"
"time"

"github.com/golang/glog"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/private/protocol"
"github.com/aws/aws-sdk-go/service/pricing"
"github.com/pkg/errors"
"k8s.io/klog"
)

const (
Expand All @@ -37,6 +40,33 @@ const (
instanceTenancyShared = "Shared"
)

// TODO <ylallemant> find some API for this map - support case opened
var (
regionNameMap = map[string]string{
"us-east-2": "USA East (Ohio)",
"us-east-1": "USA East (N. Virginia)",
"us-west-1": "USA West (N. California)",
"us-west-2": "USA West (Oregon)",
"ap-south-1": "Asia Pacific (Mumbai)",
"ap-northeast-3": "Asia Pacific (Osaka-Local)",
"ap-northeast-2": "Asia Pacific (Seoul)",
"ap-southeast-1": "Asia Pacific (Singapore)",
"ap-southeast-2": "Asia Pacific (Sydney)",
"ap-northeast-1": "Asia Pacific (Tokyo)",
"ca-central-1": "Canada (Central)",
"cn-north-1": "China (Beijing)",
"cn-northwest-1": "China (Ningxia)",
"eu-central-1": "EU (Frankfurt)",
"eu-west-1": "EU (Ireland)",
"eu-west-2": "EU (London)",
"eu-west-3": "EU (Paris)",
"eu-north-1": "EU (Stockholm)",
"sa-east-1": "South America (São Paulo)",
"us-gov-east-1": "AWS GovCloud (US-East)",
"us-gov-west-1": "AWS GovCloud (USA)",
}
)

// InstanceInfo holds AWS EC2 instance information
type InstanceInfo struct {
// InstanceType of the described instance
Expand All @@ -51,71 +81,71 @@ type InstanceInfo struct {
GPU int64
}

type httpClient interface {
Do(req *http.Request) (*http.Response, error)
type awsClient interface {
GetProducts(input *pricing.GetProductsInput) (*pricing.GetProductsOutput, error)
}

// NewEC2InstanceInfoService is the constructor of instanceInfoService which is a wrapper for AWS Pricing API.
func NewEC2InstanceInfoService(client httpClient) *instanceInfoService {
func NewEC2InstanceInfoService(client awsClient) *instanceInfoService {
return &instanceInfoService{
client: client,
cache: make(instanceInfoCache),
}
}

type instanceInfoService struct {
client httpClient
client awsClient
cache instanceInfoCache
sync.RWMutex
}

// DescribeInstanceInfo returns the corresponding aws instance info by given instance type and availability zone.
func (s *instanceInfoService) DescribeInstanceInfo(instanceType string, availabilityZone string) (*InstanceInfo, error) {
if s.shouldSync(availabilityZone) {
if err := s.sync(availabilityZone); err != nil {
func (s *instanceInfoService) DescribeInstanceInfo(instanceType string, region string) (*InstanceInfo, error) {
if s.shouldSync(region) {
if err := s.sync(region); err != nil {
// TODO <mrcrgl> may this be tolerated for resilience
return nil, fmt.Errorf("failed to sync aws product and price information: %v", err)
}
}

if bucket, found := s.cache[availabilityZone]; found {
if bucket, found := s.cache[region]; found {
for _, info := range bucket.info {
if info.InstanceType == instanceType {
return &info, nil
}
}
}
return nil, fmt.Errorf("instance info not available for instance type %s in zone %s", instanceType, availabilityZone)
return nil, fmt.Errorf("instance info not available for instance type %s region %s", instanceType, region)
}

func (s *instanceInfoService) shouldSync(availabilityZone string) bool {
bucket, found := s.cache[availabilityZone]
func (s *instanceInfoService) shouldSync(region string) bool {
bucket, found := s.cache[region]
if !found {
return true
}

return bucket.LastSync().Before(time.Now().Truncate(instanceInfoCacheMaxAge))
}

func (s *instanceInfoService) sync(availabilityZone string) error {
func (s *instanceInfoService) sync(region string) error {
s.Lock()
defer s.Unlock()

start := time.Now()

bucket, found := s.cache[availabilityZone]
bucket, found := s.cache[region]
if !found {
bucket = new(regionalInstanceInfoBucket)
s.cache[availabilityZone] = bucket
s.cache[region] = bucket
}

response, err := s.fetch(availabilityZone, bucket.ETag)
response, err := s.fetch(region, bucket.ETag)
if err != nil {
return err
}

defer func() {
glog.V(4).Infof("Synchronized aws ec2 instance information for availability zone %s - took %s", availabilityZone, time.Now().Sub(start).String())
klog.V(4).Infof("Synchronized aws ec2 instance information for region %s - took %s", region, time.Now().Sub(start).String())
}()

if response == nil {
Expand Down Expand Up @@ -152,51 +182,49 @@ func (s *instanceInfoService) sync(availabilityZone string) error {
var err error
if attr.Memory != "" && attr.Memory != "NA" {
if i.MemoryMb, err = parseMemory(attr.Memory); err != nil {
return fmt.Errorf("parser error %v", err)
return errors.Wrapf(err, "error parsing memory for SKU %s [%s]", sku, attr.Memory)
}
}

if attr.VCPU != "" {
if i.VCPU, err = parseCPU(attr.VCPU); err != nil {
return fmt.Errorf("parser error %v", err)
return errors.Wrapf(err, "error parsing VCPU for SKU %s [%s]", sku, attr.VCPU)
}
}
if attr.GPU != "" {
if i.GPU, err = parseCPU(attr.GPU); err != nil {
return fmt.Errorf("parser error %v", err)
return errors.Wrapf(err, "error parsing GPU for SKU %s [%s]", sku, attr.GPU)
}
}

for priceSKU, offers := range response.Terms.OnDemand {
for priceSKU, offer := range response.Terms.OnDemand {
if priceSKU != sku {
continue
}

var lastOfferTime time.Time
var lastOfferPrice float64

for _, offer := range offers {
if offer.EffectiveDate.After(now) {
if offer.EffectiveDate.After(now) {
continue
}

for _, price := range offer.PriceDimensions {
if price.EndRange != "Inf" || price.Unit != "Hrs" {
continue
}
p, err := strconv.ParseFloat(price.PricePerUnit.USD, 64)
if err != nil {
return errors.Wrapf(err, "error parsing price for SKU %s [%s]", sku, price.PricePerUnit.USD)
}

if p == 0.0 {
continue
}

for _, price := range offer.PriceDimensions {
if price.EndRange != "Inf" || price.Unit != "Hrs" {
continue
}
p, err := strconv.ParseFloat(price.PricePerUnit.USD, 64)
if err != nil {
return fmt.Errorf("error parsing price for SKU %s [%s] %v", sku, price.PricePerUnit.USD, err)
}

if p == 0.0 {
continue
}

if lastOfferTime.IsZero() || lastOfferTime.After(offer.EffectiveDate) {
lastOfferTime = offer.EffectiveDate
lastOfferPrice = p
}
if lastOfferTime.IsZero() || lastOfferTime.After(offer.EffectiveDate) {
lastOfferTime = offer.EffectiveDate
lastOfferPrice = p
}
}

Expand All @@ -213,38 +241,91 @@ func (s *instanceInfoService) sync(availabilityZone string) error {
return nil
}

func (s *instanceInfoService) fetch(availabilityZone string, etag string) (*response, error) {
url := fmt.Sprintf(awsPricingAPIURLTemplate, availabilityZone)
func (s *instanceInfoService) fetch(region string, etag string) (*response, error) {
url := fmt.Sprintf(awsPricingAPIURLTemplate, region)
regionName, err := regionFullName(region)
if err != nil {
return nil, err
}

req, err := http.NewRequest("GET", url, nil)

if len(etag) != 0 {
req.Header.Add("If-None-Match", etag)
}

res, err := s.client.Do(req)
input := &pricing.GetProductsInput{
ServiceCode: aws.String("AmazonEC2"),
Filters: []*pricing.Filter{
{
Type: aws.String("TERM_MATCH"),
Field: aws.String("location"),
Value: aws.String(regionName),
},
{
Type: aws.String("TERM_MATCH"),
Field: aws.String("operatingSystem"),
Value: aws.String("Linux"),
},
{
Type: aws.String("TERM_MATCH"),
Field: aws.String("capacitystatus"),
Value: aws.String("Used"),
},
{
Type: aws.String("TERM_MATCH"),
Field: aws.String("tenancy"),
Value: aws.String("Shared"),
},
{
Type: aws.String("TERM_MATCH"),
Field: aws.String("preInstalledSw"),
Value: aws.String("NA"),
},
},
}

output, err := s.client.GetProducts(input)
if err != nil {
return nil, fmt.Errorf("error fetching [%s]", url)
return nil, errors.Wrapf(err, "could not fetch products for region %s", region)
}

defer res.Body.Close()
var data = new(response)
data.Products = make(map[string]product, 0)
data.Terms.OnDemand = make(map[string]productOffer, 0)

if res.StatusCode == 304 {
return nil, nil
}
for _, entry := range output.PriceList {
raw, err := protocol.EncodeJSONValue(entry, protocol.NoEscape)
if err != nil {
return nil, errors.Wrap(err, "could not encode back aws sdk pricing response")
}

var body []byte
if body, err = ioutil.ReadAll(res.Body); err != nil {
return nil, fmt.Errorf("error loading content of %s", url)
}
var entry = new(priceListEntry)
if err := json.Unmarshal([]byte(raw), entry); err != nil {
return nil, errors.Wrapf(err, "error unmarshaling pricing list entry: %s", raw)
}

var validTerm productOffer
for _, term := range entry.Terms.OnDemand {
for _, priceDimension := range term.PriceDimensions {
if priceDimension.BeginRange == "0" && priceDimension.EndRange == "Inf" && !strings.HasPrefix(priceDimension.PricePerUnit.USD, "0.000000") {
validTerm = term
}
}
}

if validTerm.SKU == "" {
klog.Warningf("no on demand price was not found for instance type %s in region %s", entry.Product.Attributes.InstanceType, region)
continue
}

data.Products[entry.Product.SKU] = entry.Product
data.Terms.OnDemand[entry.Product.SKU] = validTerm

if res.StatusCode != 200 {
return nil, fmt.Errorf("got unexpected http status code %d with body [%s]", res.StatusCode, string(body))
}

var data = new(response)
if err := json.Unmarshal(body, data); err != nil {
return nil, fmt.Errorf("error unmarshaling %s with body [%s]", url, string(body))
if len(data.Products) == 0 {
return nil, fmt.Errorf("no price information found for region %s", region)
}

return data, nil
Expand Down Expand Up @@ -287,17 +368,20 @@ func (b *regionalInstanceInfoBucket) Add(info ...InstanceInfo) {
b.info = append(b.info, info...)
}

type priceListEntry struct {
Product product `json:"product"`
Terms terms `json:"terms"`
}

type response struct {
Products map[string]product `json:"products"`
Terms terms `json:"terms"`
}

type terms struct {
OnDemand map[string]productOffers `json:"OnDemand"`
OnDemand map[string]productOffer `json:"OnDemand"`
}

type productOffers map[string]productOffer

type productOffer struct {
OfferTermCode string `json:"offerTermCode"`
EffectiveDate time.Time `json:"effectiveDate"`
Expand Down Expand Up @@ -354,3 +438,11 @@ func parseCPU(cpu string) (int64, error) {
}
return i, nil
}

func regionFullName(region string) (string, error) {
if fullName, ok := regionNameMap[region]; ok {
return fullName, nil
}

return "", errors.New(fmt.Sprintf("region full name not found for region: %s", region))
}