Skip to content

Commit

Permalink
use AWS SDK for OnDemand Nodes (#5)
Browse files Browse the repository at this point in the history
* added aws-pricing to godeps
* updated kubernetes.sync
* replaced glog with klog
* using pricing SDK to list OnDemand prices
* moved aws session creation to price/descriptor
* fixed ondemand api test
* updated test data for better test coverage
  • Loading branch information
ylallemant authored and TorstenW committed Jan 23, 2019
1 parent afe8f3c commit 6f34ce7
Show file tree
Hide file tree
Showing 79 changed files with 5,637 additions and 2,472 deletions.
3,093 changes: 1,551 additions & 1,542 deletions cluster-autoscaler/Godeps/Godeps.json

Large diffs are not rendered by default.

23 changes: 23 additions & 0 deletions cluster-autoscaler/cloudprovider/aws/api/UPDATE_TEST_DATA.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
Update AWS test data
=====

Install AWS CLI Tools

Update

```bash
cd ${GOPATH}/src/k8s.io/autoscaler

# update on demand data
aws pricing get-products \
--region=us-east-1 \
--service-code=AmazonEC2 \
--filter Type=TERM_MATCH,Field=capacitystatus,Value=Used \
Type=TERM_MATCH,Field=preInstalledSw,Value=NA \
Type=TERM_MATCH,Field=location,Value="EU (Ireland)" \
Type=TERM_MATCH,Field=instanceType,Value="m4.xlarge" \
> ./cluster-autoscaler/cloudprovider/aws/api/pricing_ondemand_eu-west-1.json

# update spot data

```
214 changes: 153 additions & 61 deletions cluster-autoscaler/cloudprovider/aws/api/instance_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,18 @@ package api
import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"regexp"
"strconv"
"strings"
"sync"
"time"

"github.com/golang/glog"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/private/protocol"
"github.com/aws/aws-sdk-go/service/pricing"
"github.com/pkg/errors"
"k8s.io/klog"
)

const (
Expand All @@ -37,6 +40,33 @@ const (
instanceTenancyShared = "Shared"
)

// TODO <ylallemant> find some API for this map - support case opened
var (
regionNameMap = map[string]string{
"us-east-2": "USA East (Ohio)",
"us-east-1": "USA East (N. Virginia)",
"us-west-1": "USA West (N. California)",
"us-west-2": "USA West (Oregon)",
"ap-south-1": "Asia Pacific (Mumbai)",
"ap-northeast-3": "Asia Pacific (Osaka-Local)",
"ap-northeast-2": "Asia Pacific (Seoul)",
"ap-southeast-1": "Asia Pacific (Singapore)",
"ap-southeast-2": "Asia Pacific (Sydney)",
"ap-northeast-1": "Asia Pacific (Tokyo)",
"ca-central-1": "Canada (Central)",
"cn-north-1": "China (Beijing)",
"cn-northwest-1": "China (Ningxia)",
"eu-central-1": "EU (Frankfurt)",
"eu-west-1": "EU (Ireland)",
"eu-west-2": "EU (London)",
"eu-west-3": "EU (Paris)",
"eu-north-1": "EU (Stockholm)",
"sa-east-1": "South America (São Paulo)",
"us-gov-east-1": "AWS GovCloud (US-East)",
"us-gov-west-1": "AWS GovCloud (USA)",
}
)

// InstanceInfo holds AWS EC2 instance information
type InstanceInfo struct {
// InstanceType of the described instance
Expand All @@ -51,71 +81,71 @@ type InstanceInfo struct {
GPU int64
}

type httpClient interface {
Do(req *http.Request) (*http.Response, error)
type awsClient interface {
GetProducts(input *pricing.GetProductsInput) (*pricing.GetProductsOutput, error)
}

// NewEC2InstanceInfoService is the constructor of instanceInfoService which is a wrapper for AWS Pricing API.
func NewEC2InstanceInfoService(client httpClient) *instanceInfoService {
func NewEC2InstanceInfoService(client awsClient) *instanceInfoService {
return &instanceInfoService{
client: client,
cache: make(instanceInfoCache),
}
}

type instanceInfoService struct {
client httpClient
client awsClient
cache instanceInfoCache
sync.RWMutex
}

// DescribeInstanceInfo returns the corresponding aws instance info by given instance type and availability zone.
func (s *instanceInfoService) DescribeInstanceInfo(instanceType string, availabilityZone string) (*InstanceInfo, error) {
if s.shouldSync(availabilityZone) {
if err := s.sync(availabilityZone); err != nil {
func (s *instanceInfoService) DescribeInstanceInfo(instanceType string, region string) (*InstanceInfo, error) {
if s.shouldSync(region) {
if err := s.sync(region); err != nil {
// TODO <mrcrgl> may this be tolerated for resilience
return nil, fmt.Errorf("failed to sync aws product and price information: %v", err)
}
}

if bucket, found := s.cache[availabilityZone]; found {
if bucket, found := s.cache[region]; found {
for _, info := range bucket.info {
if info.InstanceType == instanceType {
return &info, nil
}
}
}
return nil, fmt.Errorf("instance info not available for instance type %s in zone %s", instanceType, availabilityZone)
return nil, fmt.Errorf("instance info not available for instance type %s region %s", instanceType, region)
}

func (s *instanceInfoService) shouldSync(availabilityZone string) bool {
bucket, found := s.cache[availabilityZone]
func (s *instanceInfoService) shouldSync(region string) bool {
bucket, found := s.cache[region]
if !found {
return true
}

return bucket.LastSync().Before(time.Now().Truncate(instanceInfoCacheMaxAge))
}

func (s *instanceInfoService) sync(availabilityZone string) error {
func (s *instanceInfoService) sync(region string) error {
s.Lock()
defer s.Unlock()

start := time.Now()

bucket, found := s.cache[availabilityZone]
bucket, found := s.cache[region]
if !found {
bucket = new(regionalInstanceInfoBucket)
s.cache[availabilityZone] = bucket
s.cache[region] = bucket
}

response, err := s.fetch(availabilityZone, bucket.ETag)
response, err := s.fetch(region, bucket.ETag)
if err != nil {
return err
}

defer func() {
glog.V(4).Infof("Synchronized aws ec2 instance information for availability zone %s - took %s", availabilityZone, time.Now().Sub(start).String())
klog.V(4).Infof("Synchronized aws ec2 instance information for region %s - took %s", region, time.Now().Sub(start).String())
}()

if response == nil {
Expand Down Expand Up @@ -152,51 +182,49 @@ func (s *instanceInfoService) sync(availabilityZone string) error {
var err error
if attr.Memory != "" && attr.Memory != "NA" {
if i.MemoryMb, err = parseMemory(attr.Memory); err != nil {
return fmt.Errorf("parser error %v", err)
return errors.Wrapf(err, "error parsing memory for SKU %s [%s]", sku, attr.Memory)
}
}

if attr.VCPU != "" {
if i.VCPU, err = parseCPU(attr.VCPU); err != nil {
return fmt.Errorf("parser error %v", err)
return errors.Wrapf(err, "error parsing VCPU for SKU %s [%s]", sku, attr.VCPU)
}
}
if attr.GPU != "" {
if i.GPU, err = parseCPU(attr.GPU); err != nil {
return fmt.Errorf("parser error %v", err)
return errors.Wrapf(err, "error parsing GPU for SKU %s [%s]", sku, attr.GPU)
}
}

for priceSKU, offers := range response.Terms.OnDemand {
for priceSKU, offer := range response.Terms.OnDemand {
if priceSKU != sku {
continue
}

var lastOfferTime time.Time
var lastOfferPrice float64

for _, offer := range offers {
if offer.EffectiveDate.After(now) {
if offer.EffectiveDate.After(now) {
continue
}

for _, price := range offer.PriceDimensions {
if price.EndRange != "Inf" || price.Unit != "Hrs" {
continue
}
p, err := strconv.ParseFloat(price.PricePerUnit.USD, 64)
if err != nil {
return errors.Wrapf(err, "error parsing price for SKU %s [%s]", sku, price.PricePerUnit.USD)
}

if p == 0.0 {
continue
}

for _, price := range offer.PriceDimensions {
if price.EndRange != "Inf" || price.Unit != "Hrs" {
continue
}
p, err := strconv.ParseFloat(price.PricePerUnit.USD, 64)
if err != nil {
return fmt.Errorf("error parsing price for SKU %s [%s] %v", sku, price.PricePerUnit.USD, err)
}

if p == 0.0 {
continue
}

if lastOfferTime.IsZero() || lastOfferTime.After(offer.EffectiveDate) {
lastOfferTime = offer.EffectiveDate
lastOfferPrice = p
}
if lastOfferTime.IsZero() || lastOfferTime.After(offer.EffectiveDate) {
lastOfferTime = offer.EffectiveDate
lastOfferPrice = p
}
}

Expand All @@ -213,38 +241,91 @@ func (s *instanceInfoService) sync(availabilityZone string) error {
return nil
}

func (s *instanceInfoService) fetch(availabilityZone string, etag string) (*response, error) {
url := fmt.Sprintf(awsPricingAPIURLTemplate, availabilityZone)
func (s *instanceInfoService) fetch(region string, etag string) (*response, error) {
url := fmt.Sprintf(awsPricingAPIURLTemplate, region)
regionName, err := regionFullName(region)
if err != nil {
return nil, err
}

req, err := http.NewRequest("GET", url, nil)

if len(etag) != 0 {
req.Header.Add("If-None-Match", etag)
}

res, err := s.client.Do(req)
input := &pricing.GetProductsInput{
ServiceCode: aws.String("AmazonEC2"),
Filters: []*pricing.Filter{
{
Type: aws.String("TERM_MATCH"),
Field: aws.String("location"),
Value: aws.String(regionName),
},
{
Type: aws.String("TERM_MATCH"),
Field: aws.String("operatingSystem"),
Value: aws.String("Linux"),
},
{
Type: aws.String("TERM_MATCH"),
Field: aws.String("capacitystatus"),
Value: aws.String("Used"),
},
{
Type: aws.String("TERM_MATCH"),
Field: aws.String("tenancy"),
Value: aws.String("Shared"),
},
{
Type: aws.String("TERM_MATCH"),
Field: aws.String("preInstalledSw"),
Value: aws.String("NA"),
},
},
}

output, err := s.client.GetProducts(input)
if err != nil {
return nil, fmt.Errorf("error fetching [%s]", url)
return nil, errors.Wrapf(err, "could not fetch products for region %s", region)
}

defer res.Body.Close()
var data = new(response)
data.Products = make(map[string]product, 0)
data.Terms.OnDemand = make(map[string]productOffer, 0)

if res.StatusCode == 304 {
return nil, nil
}
for _, entry := range output.PriceList {
raw, err := protocol.EncodeJSONValue(entry, protocol.NoEscape)
if err != nil {
return nil, errors.Wrap(err, "could not encode back aws sdk pricing response")
}

var body []byte
if body, err = ioutil.ReadAll(res.Body); err != nil {
return nil, fmt.Errorf("error loading content of %s", url)
}
var entry = new(priceListEntry)
if err := json.Unmarshal([]byte(raw), entry); err != nil {
return nil, errors.Wrapf(err, "error unmarshaling pricing list entry: %s", raw)
}

var validTerm productOffer
for _, term := range entry.Terms.OnDemand {
for _, priceDimension := range term.PriceDimensions {
if priceDimension.BeginRange == "0" && priceDimension.EndRange == "Inf" && !strings.HasPrefix(priceDimension.PricePerUnit.USD, "0.000000") {
validTerm = term
}
}
}

if validTerm.SKU == "" {
klog.Warningf("no on demand price was not found for instance type %s in region %s", entry.Product.Attributes.InstanceType, region)
continue
}

data.Products[entry.Product.SKU] = entry.Product
data.Terms.OnDemand[entry.Product.SKU] = validTerm

if res.StatusCode != 200 {
return nil, fmt.Errorf("got unexpected http status code %d with body [%s]", res.StatusCode, string(body))
}

var data = new(response)
if err := json.Unmarshal(body, data); err != nil {
return nil, fmt.Errorf("error unmarshaling %s with body [%s]", url, string(body))
if len(data.Products) == 0 {
return nil, fmt.Errorf("no price information found for region %s", region)
}

return data, nil
Expand Down Expand Up @@ -287,17 +368,20 @@ func (b *regionalInstanceInfoBucket) Add(info ...InstanceInfo) {
b.info = append(b.info, info...)
}

type priceListEntry struct {
Product product `json:"product"`
Terms terms `json:"terms"`
}

type response struct {
Products map[string]product `json:"products"`
Terms terms `json:"terms"`
}

type terms struct {
OnDemand map[string]productOffers `json:"OnDemand"`
OnDemand map[string]productOffer `json:"OnDemand"`
}

type productOffers map[string]productOffer

type productOffer struct {
OfferTermCode string `json:"offerTermCode"`
EffectiveDate time.Time `json:"effectiveDate"`
Expand Down Expand Up @@ -354,3 +438,11 @@ func parseCPU(cpu string) (int64, error) {
}
return i, nil
}

func regionFullName(region string) (string, error) {
if fullName, ok := regionNameMap[region]; ok {
return fullName, nil
}

return "", errors.New(fmt.Sprintf("region full name not found for region: %s", region))
}

0 comments on commit 6f34ce7

Please sign in to comment.