From 60afae5afce5a8052aaaa6ac3890ca87ab249361 Mon Sep 17 00:00:00 2001 From: Sander van Harmelen Date: Thu, 3 Dec 2015 12:17:44 +0100 Subject: [PATCH] Add a simple kind of exponential backoff feature for async clients --- cloudstack/AsyncjobService.go | 2 +- cloudstack/cloudstack.go | 11 +- cloudstack43/AsyncjobService.go | 2 +- cloudstack43/cloudstack.go | 11 +- cloudstack44/AsyncjobService.go | 2 +- cloudstack44/cloudstack.go | 11 +- generate/generate.go | 213 +++++++++++++++++--------------- 7 files changed, 144 insertions(+), 108 deletions(-) diff --git a/cloudstack/AsyncjobService.go b/cloudstack/AsyncjobService.go index 348830c..2266be9 100644 --- a/cloudstack/AsyncjobService.go +++ b/cloudstack/AsyncjobService.go @@ -66,7 +66,7 @@ func (s *AsyncjobService) QueryAsyncJobResult(p *QueryAsyncJobResultParams) (*Qu if err == nil { break } - time.Sleep(1 * time.Second) + time.Sleep(500 * time.Millisecond) } if err != nil { return nil, err diff --git a/cloudstack/cloudstack.go b/cloudstack/cloudstack.go index 2744755..20eb3a7 100644 --- a/cloudstack/cloudstack.go +++ b/cloudstack/cloudstack.go @@ -228,7 +228,9 @@ var AsyncTimeoutErr = errors.New("Timeout while waiting for async job to finish" // A helper function that you can use to get the result of a running async job. If the job is not finished within the configured // timeout, the async job returns a AsyncTimeoutErr. func (cs *CloudStackClient) GetAsyncJobResult(jobid string, timeout int64) (json.RawMessage, error) { + var timer time.Duration currentTime := time.Now().Unix() + for { p := cs.Asyncjob.NewQueryAsyncJobResultParams(jobid) r, err := cs.Asyncjob.QueryAsyncJobResult(p) @@ -253,7 +255,14 @@ func (cs *CloudStackClient) GetAsyncJobResult(jobid string, timeout int64) (json if time.Now().Unix()-currentTime > timeout { return nil, AsyncTimeoutErr } - time.Sleep(3 * time.Second) + + // Add an (extremely simple) exponential backoff like feature to prevent + // flooding the CloudStack API + if timer < 15 { + timer++ + } + + time.Sleep(timer * time.Second) } } diff --git a/cloudstack43/AsyncjobService.go b/cloudstack43/AsyncjobService.go index c078ff1..82e73b3 100644 --- a/cloudstack43/AsyncjobService.go +++ b/cloudstack43/AsyncjobService.go @@ -66,7 +66,7 @@ func (s *AsyncjobService) QueryAsyncJobResult(p *QueryAsyncJobResultParams) (*Qu if err == nil { break } - time.Sleep(1 * time.Second) + time.Sleep(500 * time.Millisecond) } if err != nil { return nil, err diff --git a/cloudstack43/cloudstack.go b/cloudstack43/cloudstack.go index bcd4076..466b70d 100644 --- a/cloudstack43/cloudstack.go +++ b/cloudstack43/cloudstack.go @@ -230,7 +230,9 @@ var AsyncTimeoutErr = errors.New("Timeout while waiting for async job to finish" // A helper function that you can use to get the result of a running async job. If the job is not finished within the configured // timeout, the async job returns a AsyncTimeoutErr. func (cs *CloudStackClient) GetAsyncJobResult(jobid string, timeout int64) (json.RawMessage, error) { + var timer time.Duration currentTime := time.Now().Unix() + for { p := cs.Asyncjob.NewQueryAsyncJobResultParams(jobid) r, err := cs.Asyncjob.QueryAsyncJobResult(p) @@ -255,7 +257,14 @@ func (cs *CloudStackClient) GetAsyncJobResult(jobid string, timeout int64) (json if time.Now().Unix()-currentTime > timeout { return nil, AsyncTimeoutErr } - time.Sleep(3 * time.Second) + + // Add an (extremely simple) exponential backoff like feature to prevent + // flooding the CloudStack API + if timer < 15 { + timer++ + } + + time.Sleep(timer * time.Second) } } diff --git a/cloudstack44/AsyncjobService.go b/cloudstack44/AsyncjobService.go index d3d292c..f125f25 100644 --- a/cloudstack44/AsyncjobService.go +++ b/cloudstack44/AsyncjobService.go @@ -66,7 +66,7 @@ func (s *AsyncjobService) QueryAsyncJobResult(p *QueryAsyncJobResultParams) (*Qu if err == nil { break } - time.Sleep(1 * time.Second) + time.Sleep(500 * time.Millisecond) } if err != nil { return nil, err diff --git a/cloudstack44/cloudstack.go b/cloudstack44/cloudstack.go index 8fa7abf..23ce560 100644 --- a/cloudstack44/cloudstack.go +++ b/cloudstack44/cloudstack.go @@ -228,7 +228,9 @@ var AsyncTimeoutErr = errors.New("Timeout while waiting for async job to finish" // A helper function that you can use to get the result of a running async job. If the job is not finished within the configured // timeout, the async job returns a AsyncTimeoutErr. func (cs *CloudStackClient) GetAsyncJobResult(jobid string, timeout int64) (json.RawMessage, error) { + var timer time.Duration currentTime := time.Now().Unix() + for { p := cs.Asyncjob.NewQueryAsyncJobResultParams(jobid) r, err := cs.Asyncjob.QueryAsyncJobResult(p) @@ -253,7 +255,14 @@ func (cs *CloudStackClient) GetAsyncJobResult(jobid string, timeout int64) (json if time.Now().Unix()-currentTime > timeout { return nil, AsyncTimeoutErr } - time.Sleep(3 * time.Second) + + // Add an (extremely simple) exponential backoff like feature to prevent + // flooding the CloudStack API + if timer < 15 { + timer++ + } + + time.Sleep(timer * time.Second) } } diff --git a/generate/generate.go b/generate/generate.go index e489b07..4762803 100644 --- a/generate/generate.go +++ b/generate/generate.go @@ -247,24 +247,24 @@ func (as *allServices) GeneralCode() ([]byte, error) { pn(")") pn("") pn("type CSError struct {") - pn(" ErrorCode int `json:\"errorcode\"`") - pn(" CSErrorCode int `json:\"cserrorcode\"`") - pn(" ErrorText string `json:\"errortext\"`") + pn(" ErrorCode int `json:\"errorcode\"`") + pn(" CSErrorCode int `json:\"cserrorcode\"`") + pn(" ErrorText string `json:\"errortext\"`") pn("}") pn("") pn("func (e *CSError) Error() error {") - pn(" return fmt.Errorf(\"CloudStack API error %%d (CSExceptionErrorCode: %%d): %%s\", e.ErrorCode, e.CSErrorCode, e.ErrorText)") + pn(" return fmt.Errorf(\"CloudStack API error %%d (CSExceptionErrorCode: %%d): %%s\", e.ErrorCode, e.CSErrorCode, e.ErrorText)") pn("}") pn("") pn("type CloudStackClient struct {") - pn(" HTTPGETOnly bool // If `true` only use HTTP GET calls") + pn(" HTTPGETOnly bool // If `true` only use HTTP GET calls") pn("") - pn(" client *http.Client // The http client for communicating") - pn(" baseURL string // The base URL of the API") - pn(" apiKey string // Api key") - pn(" secret string // Secret key") - pn(" async bool // Wait for async calls to finish") - pn(" timeout int64 // Max waiting timeout in seconds for async jobs to finish; defaults to 300 seconds") + pn(" client *http.Client // The http client for communicating") + pn(" baseURL string // The base URL of the API") + pn(" apiKey string // Api key") + pn(" secret string // Secret key") + pn(" async bool // Wait for async calls to finish") + pn(" timeout int64 // Max waiting timeout in seconds for async jobs to finish; defaults to 300 seconds") pn("") for _, s := range as.services { pn(" %s *%s", strings.TrimSuffix(s.name, "Service"), s.name) @@ -273,32 +273,32 @@ func (as *allServices) GeneralCode() ([]byte, error) { pn("") pn("// Creates a new client for communicating with CloudStack") pn("func newClient(apiurl string, apikey string, secret string, async bool, verifyssl bool) *CloudStackClient {") - pn(" cs := &CloudStackClient{") - pn(" client: &http.Client{") - pn(" Transport: &http.Transport{") - pn(" Proxy: http.ProxyFromEnvironment,") - pn(" TLSClientConfig: &tls.Config{InsecureSkipVerify: !verifyssl}, // If verifyssl is true, skipping the verify should be false and vice versa") - pn(" },") - pn(" Timeout: time.Duration(60 * time.Second),") - pn(" },") - pn(" baseURL: apiurl,") - pn(" apiKey: apikey,") - pn(" secret: secret,") - pn(" async: async,") - pn(" timeout: 300,") - pn(" }") + pn(" cs := &CloudStackClient{") + pn(" client: &http.Client{") + pn(" Transport: &http.Transport{") + pn(" Proxy: http.ProxyFromEnvironment,") + pn(" TLSClientConfig: &tls.Config{InsecureSkipVerify: !verifyssl}, // If verifyssl is true, skipping the verify should be false and vice versa") + pn(" },") + pn(" Timeout: time.Duration(60 * time.Second),") + pn(" },") + pn(" baseURL: apiurl,") + pn(" apiKey: apikey,") + pn(" secret: secret,") + pn(" async: async,") + pn(" timeout: 300,") + pn(" }") for _, s := range as.services { pn(" cs.%s = New%s(cs)", strings.TrimSuffix(s.name, "Service"), s.name) } - pn(" return cs") + pn(" return cs") pn("}") pn("") pn("// Default non-async client. So for async calls you need to implement and check the async job result yourself. When using") pn("// HTTPS with a self-signed certificate to connect to your CloudStack API, you would probably want to set 'verifyssl' to") pn("// false so the call ignores the SSL errors/warnings.") pn("func NewClient(apiurl string, apikey string, secret string, verifyssl bool) *CloudStackClient {") - pn(" cs := newClient(apiurl, apikey, secret, false, verifyssl)") - pn(" return cs") + pn(" cs := newClient(apiurl, apikey, secret, false, verifyssl)") + pn(" return cs") pn("}") pn("") pn("// For sync API calls this client behaves exactly the same as a standard client call, but for async API calls") @@ -306,8 +306,8 @@ func (as *allServices) GeneralCode() ([]byte, error) { pn("// job finishes successfully it will return actual object received from the API and nil, but when the timout is") pn("// reached it will return the initial object containing the async job ID for the running job and a warning.") pn("func NewAsyncClient(apiurl string, apikey string, secret string, verifyssl bool) *CloudStackClient {") - pn(" cs := newClient(apiurl, apikey, secret, true, verifyssl)") - pn(" return cs") + pn(" cs := newClient(apiurl, apikey, secret, true, verifyssl)") + pn(" return cs") pn("}") pn("") pn("// When using the async client an api call will wait for the async call to finish before returning. The default is to poll for 300 seconds") @@ -321,55 +321,64 @@ func (as *allServices) GeneralCode() ([]byte, error) { pn("// A helper function that you can use to get the result of a running async job. If the job is not finished within the configured") pn("// timeout, the async job returns a AsyncTimeoutErr.") pn("func (cs *CloudStackClient) GetAsyncJobResult(jobid string, timeout int64) (json.RawMessage, error) {") - pn(" currentTime := time.Now().Unix()") - pn(" for {") - pn(" p := cs.Asyncjob.NewQueryAsyncJobResultParams(jobid)") - pn(" r, err := cs.Asyncjob.QueryAsyncJobResult(p)") - pn(" if err != nil {") - pn(" return nil, err") - pn(" }") + pn(" var timer time.Duration") + pn(" currentTime := time.Now().Unix()") pn("") - pn(" // Status 1 means the job is finished successfully") - pn(" if r.Jobstatus == 1 {") - pn(" return r.Jobresult, nil") - pn(" }") + pn(" for {") + pn(" p := cs.Asyncjob.NewQueryAsyncJobResultParams(jobid)") + pn(" r, err := cs.Asyncjob.QueryAsyncJobResult(p)") + pn(" if err != nil {") + pn(" return nil, err") + pn(" }") pn("") - pn(" // When the status is 2, the job has failed") - pn(" if r.Jobstatus == 2 {") - pn(" if r.Jobresulttype == \"text\" {") - pn(" return nil, fmt.Errorf(string(r.Jobresult))") - pn(" } else {") - pn(" return nil, fmt.Errorf(\"Undefined error: %%s\", string(r.Jobresult))") - pn(" }") - pn(" }") + pn(" // Status 1 means the job is finished successfully") + pn(" if r.Jobstatus == 1 {") + pn(" return r.Jobresult, nil") + pn(" }") pn("") - pn(" if time.Now().Unix()-currentTime > timeout {") - pn(" return nil, AsyncTimeoutErr") - pn(" }") - pn(" time.Sleep(3 * time.Second)") - pn(" }") + pn(" // When the status is 2, the job has failed") + pn(" if r.Jobstatus == 2 {") + pn(" if r.Jobresulttype == \"text\" {") + pn(" return nil, fmt.Errorf(string(r.Jobresult))") + pn(" } else {") + pn(" return nil, fmt.Errorf(\"Undefined error: %%s\", string(r.Jobresult))") + pn(" }") + pn(" }") + pn("") + pn(" if time.Now().Unix()-currentTime > timeout {") + pn(" return nil, AsyncTimeoutErr") + pn(" }") + pn("") + pn(" // Add an (extremely simple) exponential backoff like feature to prevent") + pn(" // flooding the CloudStack API") + pn(" if timer < 15 {") + pn(" timer++") + pn(" }") + pn("") + pn(" time.Sleep(timer * time.Second)") + pn(" }") pn("}") pn("") pn("// Execute the request against a CS API. Will return the raw JSON data returned by the API and nil if") pn("// no error occured. If the API returns an error the result will be nil and the HTTP error code and CS") pn("// error details. If a processing (code) error occurs the result will be nil and the generated error") pn("func (cs *CloudStackClient) newRequest(api string, params url.Values) (json.RawMessage, error) {") - pn(" params.Set(\"apiKey\", cs.apiKey)") - pn(" params.Set(\"command\", api)") - pn(" params.Set(\"response\", \"json\")") + pn(" params.Set(\"apiKey\", cs.apiKey)") + pn(" params.Set(\"command\", api)") + pn(" params.Set(\"response\", \"json\")") pn("") - pn(" // Generate signature for API call") - pn(" // * Serialize parameters, URL encoding only values and sort them by key, done by encodeValues") - pn(" // * Convert the entire argument string to lowercase") - pn(" // * Replace all instances of '+' to '%%20'") - pn(" // * Calculate HMAC SHA1 of argument string with CloudStack secret") - pn(" // * URL encode the string and convert to base64") - pn(" s := encodeValues(params)") - pn(" s2 := strings.ToLower(s)") - pn(" s3 := strings.Replace(s2, \"+\", \"%%20\", -1)") - pn(" mac := hmac.New(sha1.New, []byte(cs.secret))") - pn(" mac.Write([]byte(s3))") - pn(" signature := base64.StdEncoding.EncodeToString(mac.Sum(nil))") + pn(" // Generate signature for API call") + pn(" // * Serialize parameters, URL encoding only values and sort them by key, done by encodeValues") + pn(" // * Convert the entire argument string to lowercase") + pn(" // * Replace all instances of '+' to '%%20'") + pn(" // * Calculate HMAC SHA1 of argument string with CloudStack secret") + pn(" // * URL encode the string and convert to base64") + pn(" s := encodeValues(params)") + pn(" s2 := strings.ToLower(s)") + pn(" s3 := strings.Replace(s2, \"+\", \"%%20\", -1)") + pn(" mac := hmac.New(sha1.New, []byte(cs.secret))") + pn(" mac.Write([]byte(s3))") + pn(" signature := base64.StdEncoding.EncodeToString(mac.Sum(nil))") pn("") pn(" var err error") pn(" var resp *http.Response") @@ -383,36 +392,36 @@ func (as *allServices) GeneralCode() ([]byte, error) { pn(" // Make a POST call") pn(" resp, err = cs.client.PostForm(cs.baseURL, params)") pn(" } else {") - pn(" // Create the final URL before we issue the request") - pn(" url := cs.baseURL + \"?\" + s + \"&signature=\" + url.QueryEscape(signature)") + pn(" // Create the final URL before we issue the request") + pn(" url := cs.baseURL + \"?\" + s + \"&signature=\" + url.QueryEscape(signature)") pn("") pn(" // Make a GET call") - pn(" resp, err = cs.client.Get(url)") + pn(" resp, err = cs.client.Get(url)") pn(" }") - pn(" if err != nil {") - pn(" return nil, err") - pn(" }") - pn(" defer resp.Body.Close()") + pn(" if err != nil {") + pn(" return nil, err") + pn(" }") + pn(" defer resp.Body.Close()") pn("") - pn(" b, err := ioutil.ReadAll(resp.Body)") - pn(" if err != nil {") - pn(" return nil, err") - pn(" }") + pn(" b, err := ioutil.ReadAll(resp.Body)") + pn(" if err != nil {") + pn(" return nil, err") + pn(" }") pn("") - pn(" // Need to get the raw value to make the result play nice") - pn(" b, err = getRawValue(b)") - pn(" if err != nil {") - pn(" return nil, err") - pn(" }") + pn(" // Need to get the raw value to make the result play nice") + pn(" b, err = getRawValue(b)") + pn(" if err != nil {") + pn(" return nil, err") + pn(" }") pn("") - pn(" if resp.StatusCode != 200 {") - pn(" var e CSError") - pn(" if err := json.Unmarshal(b, &e); err != nil {") - pn(" return nil, err") - pn(" }") - pn(" return nil, e.Error()") - pn(" }") - pn(" return b, nil") + pn(" if resp.StatusCode != 200 {") + pn(" var e CSError") + pn(" if err := json.Unmarshal(b, &e); err != nil {") + pn(" return nil, err") + pn(" }") + pn(" return nil, e.Error()") + pn(" }") + pn(" return b, nil") pn("}") pn("") pn("// Custom version of net/url Encode that only URL escapes values") @@ -443,14 +452,14 @@ func (as *allServices) GeneralCode() ([]byte, error) { pn("") pn("// Generic function to get the first raw value from a response as json.RawMessage") pn("func getRawValue(b json.RawMessage) (json.RawMessage, error) {") - pn(" var m map[string]json.RawMessage") - pn(" if err := json.Unmarshal(b, &m); err != nil {") - pn(" return nil, err") - pn(" }") - pn(" for _, v := range m {") - pn(" return v, nil") - pn(" }") - pn(" return nil, fmt.Errorf(\"Unable to extract the raw value from:\\n\\n%%s\\n\\n\", string(b))") + pn(" var m map[string]json.RawMessage") + pn(" if err := json.Unmarshal(b, &m); err != nil {") + pn(" return nil, err") + pn(" }") + pn(" for _, v := range m {") + pn(" return v, nil") + pn(" }") + pn(" return nil, fmt.Errorf(\"Unable to extract the raw value from:\\n\\n%%s\\n\\n\", string(b))") pn("}") pn("") for _, s := range as.services { @@ -948,7 +957,7 @@ func (s *service) generateNewAPICallFunc(a *API) { pn(" if err == nil {") pn(" break") pn(" }") - pn(" time.Sleep(1 * time.Second)") + pn(" time.Sleep(500 * time.Millisecond)") pn(" }") } else { pn(" resp, err := s.cs.newRequest(\"%s\", p.toURLValues())", a.Name)