Skip to content
Permalink
Browse files
Add a simple kind of exponential backoff feature for async clients
  • Loading branch information
Sander van Harmelen committed Dec 3, 2015
1 parent 5cd72f6 commit 60afae5afce5a8052aaaa6ac3890ca87ab249361
Showing 7 changed files with 144 additions and 108 deletions.
@@ -66,7 +66,7 @@ func (s *AsyncjobService) QueryAsyncJobResult(p *QueryAsyncJobResultParams) (*Qu
if err == nil {
break
}
time.Sleep(1 * time.Second)
time.Sleep(500 * time.Millisecond)
}
if err != nil {
return nil, err
@@ -228,7 +228,9 @@ var AsyncTimeoutErr = errors.New("Timeout while waiting for async job to finish"
// A helper function that you can use to get the result of a running async job. If the job is not finished within the configured
// timeout, the async job returns a AsyncTimeoutErr.
func (cs *CloudStackClient) GetAsyncJobResult(jobid string, timeout int64) (json.RawMessage, error) {
var timer time.Duration
currentTime := time.Now().Unix()

for {
p := cs.Asyncjob.NewQueryAsyncJobResultParams(jobid)
r, err := cs.Asyncjob.QueryAsyncJobResult(p)
@@ -253,7 +255,14 @@ func (cs *CloudStackClient) GetAsyncJobResult(jobid string, timeout int64) (json
if time.Now().Unix()-currentTime > timeout {
return nil, AsyncTimeoutErr
}
time.Sleep(3 * time.Second)

// Add an (extremely simple) exponential backoff like feature to prevent
// flooding the CloudStack API
if timer < 15 {
timer++
}

time.Sleep(timer * time.Second)
}
}

@@ -66,7 +66,7 @@ func (s *AsyncjobService) QueryAsyncJobResult(p *QueryAsyncJobResultParams) (*Qu
if err == nil {
break
}
time.Sleep(1 * time.Second)
time.Sleep(500 * time.Millisecond)
}
if err != nil {
return nil, err
@@ -230,7 +230,9 @@ var AsyncTimeoutErr = errors.New("Timeout while waiting for async job to finish"
// A helper function that you can use to get the result of a running async job. If the job is not finished within the configured
// timeout, the async job returns a AsyncTimeoutErr.
func (cs *CloudStackClient) GetAsyncJobResult(jobid string, timeout int64) (json.RawMessage, error) {
var timer time.Duration
currentTime := time.Now().Unix()

for {
p := cs.Asyncjob.NewQueryAsyncJobResultParams(jobid)
r, err := cs.Asyncjob.QueryAsyncJobResult(p)
@@ -255,7 +257,14 @@ func (cs *CloudStackClient) GetAsyncJobResult(jobid string, timeout int64) (json
if time.Now().Unix()-currentTime > timeout {
return nil, AsyncTimeoutErr
}
time.Sleep(3 * time.Second)

// Add an (extremely simple) exponential backoff like feature to prevent
// flooding the CloudStack API
if timer < 15 {
timer++
}

time.Sleep(timer * time.Second)
}
}

@@ -66,7 +66,7 @@ func (s *AsyncjobService) QueryAsyncJobResult(p *QueryAsyncJobResultParams) (*Qu
if err == nil {
break
}
time.Sleep(1 * time.Second)
time.Sleep(500 * time.Millisecond)
}
if err != nil {
return nil, err
@@ -228,7 +228,9 @@ var AsyncTimeoutErr = errors.New("Timeout while waiting for async job to finish"
// A helper function that you can use to get the result of a running async job. If the job is not finished within the configured
// timeout, the async job returns a AsyncTimeoutErr.
func (cs *CloudStackClient) GetAsyncJobResult(jobid string, timeout int64) (json.RawMessage, error) {
var timer time.Duration
currentTime := time.Now().Unix()

for {
p := cs.Asyncjob.NewQueryAsyncJobResultParams(jobid)
r, err := cs.Asyncjob.QueryAsyncJobResult(p)
@@ -253,7 +255,14 @@ func (cs *CloudStackClient) GetAsyncJobResult(jobid string, timeout int64) (json
if time.Now().Unix()-currentTime > timeout {
return nil, AsyncTimeoutErr
}
time.Sleep(3 * time.Second)

// Add an (extremely simple) exponential backoff like feature to prevent
// flooding the CloudStack API
if timer < 15 {
timer++
}

time.Sleep(timer * time.Second)
}
}

@@ -247,24 +247,24 @@ func (as *allServices) GeneralCode() ([]byte, error) {
pn(")")
pn("")
pn("type CSError struct {")
pn(" ErrorCode int `json:\"errorcode\"`")
pn(" CSErrorCode int `json:\"cserrorcode\"`")
pn(" ErrorText string `json:\"errortext\"`")
pn(" ErrorCode int `json:\"errorcode\"`")
pn(" CSErrorCode int `json:\"cserrorcode\"`")
pn(" ErrorText string `json:\"errortext\"`")
pn("}")
pn("")
pn("func (e *CSError) Error() error {")
pn(" return fmt.Errorf(\"CloudStack API error %%d (CSExceptionErrorCode: %%d): %%s\", e.ErrorCode, e.CSErrorCode, e.ErrorText)")
pn(" return fmt.Errorf(\"CloudStack API error %%d (CSExceptionErrorCode: %%d): %%s\", e.ErrorCode, e.CSErrorCode, e.ErrorText)")
pn("}")
pn("")
pn("type CloudStackClient struct {")
pn(" HTTPGETOnly bool // If `true` only use HTTP GET calls")
pn(" HTTPGETOnly bool // If `true` only use HTTP GET calls")
pn("")
pn(" client *http.Client // The http client for communicating")
pn(" baseURL string // The base URL of the API")
pn(" apiKey string // Api key")
pn(" secret string // Secret key")
pn(" async bool // Wait for async calls to finish")
pn(" timeout int64 // Max waiting timeout in seconds for async jobs to finish; defaults to 300 seconds")
pn(" client *http.Client // The http client for communicating")
pn(" baseURL string // The base URL of the API")
pn(" apiKey string // Api key")
pn(" secret string // Secret key")
pn(" async bool // Wait for async calls to finish")
pn(" timeout int64 // Max waiting timeout in seconds for async jobs to finish; defaults to 300 seconds")
pn("")
for _, s := range as.services {
pn(" %s *%s", strings.TrimSuffix(s.name, "Service"), s.name)
@@ -273,41 +273,41 @@ func (as *allServices) GeneralCode() ([]byte, error) {
pn("")
pn("// Creates a new client for communicating with CloudStack")
pn("func newClient(apiurl string, apikey string, secret string, async bool, verifyssl bool) *CloudStackClient {")
pn(" cs := &CloudStackClient{")
pn(" client: &http.Client{")
pn(" Transport: &http.Transport{")
pn(" Proxy: http.ProxyFromEnvironment,")
pn(" TLSClientConfig: &tls.Config{InsecureSkipVerify: !verifyssl}, // If verifyssl is true, skipping the verify should be false and vice versa")
pn(" },")
pn(" Timeout: time.Duration(60 * time.Second),")
pn(" },")
pn(" baseURL: apiurl,")
pn(" apiKey: apikey,")
pn(" secret: secret,")
pn(" async: async,")
pn(" timeout: 300,")
pn(" }")
pn(" cs := &CloudStackClient{")
pn(" client: &http.Client{")
pn(" Transport: &http.Transport{")
pn(" Proxy: http.ProxyFromEnvironment,")
pn(" TLSClientConfig: &tls.Config{InsecureSkipVerify: !verifyssl}, // If verifyssl is true, skipping the verify should be false and vice versa")
pn(" },")
pn(" Timeout: time.Duration(60 * time.Second),")
pn(" },")
pn(" baseURL: apiurl,")
pn(" apiKey: apikey,")
pn(" secret: secret,")
pn(" async: async,")
pn(" timeout: 300,")
pn(" }")
for _, s := range as.services {
pn(" cs.%s = New%s(cs)", strings.TrimSuffix(s.name, "Service"), s.name)
}
pn(" return cs")
pn(" return cs")
pn("}")
pn("")
pn("// Default non-async client. So for async calls you need to implement and check the async job result yourself. When using")
pn("// HTTPS with a self-signed certificate to connect to your CloudStack API, you would probably want to set 'verifyssl' to")
pn("// false so the call ignores the SSL errors/warnings.")
pn("func NewClient(apiurl string, apikey string, secret string, verifyssl bool) *CloudStackClient {")
pn(" cs := newClient(apiurl, apikey, secret, false, verifyssl)")
pn(" return cs")
pn(" cs := newClient(apiurl, apikey, secret, false, verifyssl)")
pn(" return cs")
pn("}")
pn("")
pn("// For sync API calls this client behaves exactly the same as a standard client call, but for async API calls")
pn("// this client will wait until the async job is finished or until the configured AsyncTimeout is reached. When the async")
pn("// job finishes successfully it will return actual object received from the API and nil, but when the timout is")
pn("// reached it will return the initial object containing the async job ID for the running job and a warning.")
pn("func NewAsyncClient(apiurl string, apikey string, secret string, verifyssl bool) *CloudStackClient {")
pn(" cs := newClient(apiurl, apikey, secret, true, verifyssl)")
pn(" return cs")
pn(" cs := newClient(apiurl, apikey, secret, true, verifyssl)")
pn(" return cs")
pn("}")
pn("")
pn("// When using the async client an api call will wait for the async call to finish before returning. The default is to poll for 300 seconds")
@@ -321,55 +321,64 @@ func (as *allServices) GeneralCode() ([]byte, error) {
pn("// A helper function that you can use to get the result of a running async job. If the job is not finished within the configured")
pn("// timeout, the async job returns a AsyncTimeoutErr.")
pn("func (cs *CloudStackClient) GetAsyncJobResult(jobid string, timeout int64) (json.RawMessage, error) {")
pn(" currentTime := time.Now().Unix()")
pn(" for {")
pn(" p := cs.Asyncjob.NewQueryAsyncJobResultParams(jobid)")
pn(" r, err := cs.Asyncjob.QueryAsyncJobResult(p)")
pn(" if err != nil {")
pn(" return nil, err")
pn(" }")
pn(" var timer time.Duration")
pn(" currentTime := time.Now().Unix()")
pn("")
pn(" // Status 1 means the job is finished successfully")
pn(" if r.Jobstatus == 1 {")
pn(" return r.Jobresult, nil")
pn(" }")
pn(" for {")
pn(" p := cs.Asyncjob.NewQueryAsyncJobResultParams(jobid)")
pn(" r, err := cs.Asyncjob.QueryAsyncJobResult(p)")
pn(" if err != nil {")
pn(" return nil, err")
pn(" }")
pn("")
pn(" // When the status is 2, the job has failed")
pn(" if r.Jobstatus == 2 {")
pn(" if r.Jobresulttype == \"text\" {")
pn(" return nil, fmt.Errorf(string(r.Jobresult))")
pn(" } else {")
pn(" return nil, fmt.Errorf(\"Undefined error: %%s\", string(r.Jobresult))")
pn(" }")
pn(" }")
pn(" // Status 1 means the job is finished successfully")
pn(" if r.Jobstatus == 1 {")
pn(" return r.Jobresult, nil")
pn(" }")
pn("")
pn(" if time.Now().Unix()-currentTime > timeout {")
pn(" return nil, AsyncTimeoutErr")
pn(" }")
pn(" time.Sleep(3 * time.Second)")
pn(" }")
pn(" // When the status is 2, the job has failed")
pn(" if r.Jobstatus == 2 {")
pn(" if r.Jobresulttype == \"text\" {")
pn(" return nil, fmt.Errorf(string(r.Jobresult))")
pn(" } else {")
pn(" return nil, fmt.Errorf(\"Undefined error: %%s\", string(r.Jobresult))")
pn(" }")
pn(" }")
pn("")
pn(" if time.Now().Unix()-currentTime > timeout {")
pn(" return nil, AsyncTimeoutErr")
pn(" }")
pn("")
pn(" // Add an (extremely simple) exponential backoff like feature to prevent")
pn(" // flooding the CloudStack API")
pn(" if timer < 15 {")
pn(" timer++")
pn(" }")
pn("")
pn(" time.Sleep(timer * time.Second)")
pn(" }")
pn("}")
pn("")
pn("// Execute the request against a CS API. Will return the raw JSON data returned by the API and nil if")
pn("// no error occured. If the API returns an error the result will be nil and the HTTP error code and CS")
pn("// error details. If a processing (code) error occurs the result will be nil and the generated error")
pn("func (cs *CloudStackClient) newRequest(api string, params url.Values) (json.RawMessage, error) {")
pn(" params.Set(\"apiKey\", cs.apiKey)")
pn(" params.Set(\"command\", api)")
pn(" params.Set(\"response\", \"json\")")
pn(" params.Set(\"apiKey\", cs.apiKey)")
pn(" params.Set(\"command\", api)")
pn(" params.Set(\"response\", \"json\")")
pn("")
pn(" // Generate signature for API call")
pn(" // * Serialize parameters, URL encoding only values and sort them by key, done by encodeValues")
pn(" // * Convert the entire argument string to lowercase")
pn(" // * Replace all instances of '+' to '%%20'")
pn(" // * Calculate HMAC SHA1 of argument string with CloudStack secret")
pn(" // * URL encode the string and convert to base64")
pn(" s := encodeValues(params)")
pn(" s2 := strings.ToLower(s)")
pn(" s3 := strings.Replace(s2, \"+\", \"%%20\", -1)")
pn(" mac := hmac.New(sha1.New, []byte(cs.secret))")
pn(" mac.Write([]byte(s3))")
pn(" signature := base64.StdEncoding.EncodeToString(mac.Sum(nil))")
pn(" // Generate signature for API call")
pn(" // * Serialize parameters, URL encoding only values and sort them by key, done by encodeValues")
pn(" // * Convert the entire argument string to lowercase")
pn(" // * Replace all instances of '+' to '%%20'")
pn(" // * Calculate HMAC SHA1 of argument string with CloudStack secret")
pn(" // * URL encode the string and convert to base64")
pn(" s := encodeValues(params)")
pn(" s2 := strings.ToLower(s)")
pn(" s3 := strings.Replace(s2, \"+\", \"%%20\", -1)")
pn(" mac := hmac.New(sha1.New, []byte(cs.secret))")
pn(" mac.Write([]byte(s3))")
pn(" signature := base64.StdEncoding.EncodeToString(mac.Sum(nil))")
pn("")
pn(" var err error")
pn(" var resp *http.Response")
@@ -383,36 +392,36 @@ func (as *allServices) GeneralCode() ([]byte, error) {
pn(" // Make a POST call")
pn(" resp, err = cs.client.PostForm(cs.baseURL, params)")
pn(" } else {")
pn(" // Create the final URL before we issue the request")
pn(" url := cs.baseURL + \"?\" + s + \"&signature=\" + url.QueryEscape(signature)")
pn(" // Create the final URL before we issue the request")
pn(" url := cs.baseURL + \"?\" + s + \"&signature=\" + url.QueryEscape(signature)")
pn("")
pn(" // Make a GET call")
pn(" resp, err = cs.client.Get(url)")
pn(" resp, err = cs.client.Get(url)")
pn(" }")
pn(" if err != nil {")
pn(" return nil, err")
pn(" }")
pn(" defer resp.Body.Close()")
pn(" if err != nil {")
pn(" return nil, err")
pn(" }")
pn(" defer resp.Body.Close()")
pn("")
pn(" b, err := ioutil.ReadAll(resp.Body)")
pn(" if err != nil {")
pn(" return nil, err")
pn(" }")
pn(" b, err := ioutil.ReadAll(resp.Body)")
pn(" if err != nil {")
pn(" return nil, err")
pn(" }")
pn("")
pn(" // Need to get the raw value to make the result play nice")
pn(" b, err = getRawValue(b)")
pn(" if err != nil {")
pn(" return nil, err")
pn(" }")
pn(" // Need to get the raw value to make the result play nice")
pn(" b, err = getRawValue(b)")
pn(" if err != nil {")
pn(" return nil, err")
pn(" }")
pn("")
pn(" if resp.StatusCode != 200 {")
pn(" var e CSError")
pn(" if err := json.Unmarshal(b, &e); err != nil {")
pn(" return nil, err")
pn(" }")
pn(" return nil, e.Error()")
pn(" }")
pn(" return b, nil")
pn(" if resp.StatusCode != 200 {")
pn(" var e CSError")
pn(" if err := json.Unmarshal(b, &e); err != nil {")
pn(" return nil, err")
pn(" }")
pn(" return nil, e.Error()")
pn(" }")
pn(" return b, nil")
pn("}")
pn("")
pn("// Custom version of net/url Encode that only URL escapes values")
@@ -443,14 +452,14 @@ func (as *allServices) GeneralCode() ([]byte, error) {
pn("")
pn("// Generic function to get the first raw value from a response as json.RawMessage")
pn("func getRawValue(b json.RawMessage) (json.RawMessage, error) {")
pn(" var m map[string]json.RawMessage")
pn(" if err := json.Unmarshal(b, &m); err != nil {")
pn(" return nil, err")
pn(" }")
pn(" for _, v := range m {")
pn(" return v, nil")
pn(" }")
pn(" return nil, fmt.Errorf(\"Unable to extract the raw value from:\\n\\n%%s\\n\\n\", string(b))")
pn(" var m map[string]json.RawMessage")
pn(" if err := json.Unmarshal(b, &m); err != nil {")
pn(" return nil, err")
pn(" }")
pn(" for _, v := range m {")
pn(" return v, nil")
pn(" }")
pn(" return nil, fmt.Errorf(\"Unable to extract the raw value from:\\n\\n%%s\\n\\n\", string(b))")
pn("}")
pn("")
for _, s := range as.services {
@@ -948,7 +957,7 @@ func (s *service) generateNewAPICallFunc(a *API) {
pn(" if err == nil {")
pn(" break")
pn(" }")
pn(" time.Sleep(1 * time.Second)")
pn(" time.Sleep(500 * time.Millisecond)")
pn(" }")
} else {
pn(" resp, err := s.cs.newRequest(\"%s\", p.toURLValues())", a.Name)

0 comments on commit 60afae5

Please sign in to comment.