Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Why is ClientPolicy.Timeout used to create new connections during reads instead of basePolicy.connectTimeout #408

Open
juanma9613 opened this issue Jun 25, 2023 · 11 comments

Comments

@juanma9613
Copy link

Hi, I would like to know why the clientPolicy.Timeout is applied when creating new connections for the pool during the read operation when the pool is empty or exhausted. This is the line where it is happening.

It's happening exactly when this line is executed and there are no available connections.
record, err := c.Aerospike.Get(readPolicy, key)

I ask because the java client in its read policy has a very specific connectTimeout that is used to create new connections during read operations.

Is there a way to achieve this same behavior without modifying the clientPolicy.Timeout?

Thank you very much for your response.

@khaf
Copy link
Collaborator

khaf commented Jun 25, 2023

Hi, the link you posted goes to the Go Client v4, while the latest version is v6, residing in the v6 branch. Just a heads up.

Go client creates connections asynchronously, meaning that the connections are not created inline in the transaction. That's why it does not take the connectTimeout option in the base policy.

What are you trying to achieve that this is a problem?

@juanma9613
Copy link
Author

juanma9613 commented Jun 26, 2023

Hi, I'm indeed using go client v6 and reading code and docs from v6, maybe I sent the wrong one by error, could yo please tell me which link points to v4?, I'll fix it.

My problem is that during some spikes of traffic in my app, I'm experiencing that the number of opened/closed connections per second increases too much in the aerospike cluster (check image 1) and this causes that the p99 latency of the aerospike reads goes up way beyond 5ms inside my app (check image 3) and never goes does again (kind of a snowball effect, check image 2, I even had to restart my app at 2am).

I suspect that what's causing the issue is the sudden spike in writes at 23:30pm together with my Basepolicy.Totaltimeout of only 3ms. During this spike, maybe the aerospike server becomes slightly slower, causing too many connections to close, as described here. The problem is that I need my app to respond below 3ms. Something I don't understand is why the number of opening and closing connections in image1 doesn't stabilizes when the writes from the other app stop and why there's a snowball effect.

Additional context:

  • I'm experiencing a hotkey pattern in my app, where during some hours, more than 50% of the traffic is for a few keys (10 random keys receive more than 80% of the requests)
  • My app only reads from the aerospike cluster, but there are other people writing to the same cluster from different apps. - Check the traffic of aerospike cluster in images 3 and 4, only ~50k reads per second are from my app, the rest is from other apps.
  • My namespace is AP, configured for availability

image 1: increasing number of connections during latency spike
image

image2: fetch latency inside my app
image

image3: reads from aerospike cluster from my app + other apps
image

image4: writes to aerospike cluster from other apps
image

These are the parameters I have used for my initial tests:

ClientPolicy.ConnectionQueueSize = 1024
ClientPolicy.LoginTimeout = time.Duration(6) * time.Second
ClientPolicy.OpeningConnectionThreshold = 256

policy = MyClient.DefaultPolicy // base read policy
policy.TotalTimeout = 3 * time.Millisecond // I blame this very short timeout for the opening connections problem
policy.SocketTimeout = time.Duration(100) * time.Millisecond
policy.ReplicaPolicy = aerospike.MASTER_PROLES // read from any replica to mitigate hotkey a little bit

@juanma9613
Copy link
Author

juanma9613 commented Jun 27, 2023

@khaf, we observed the server side and indeed found some timeouts when the issue started. So, according to this it can be hotkey + too aggressive timeout in our client side. I'm gonna increase the timeout in my side.

But it doesn't explain why in the image 2 above the latency went up forever. (I saw the CPU usage during that time got to 100%, so I guess it was creating a lot of goroutines trying to create TCP connections but It couldn't do it on time.

I also need my client.Get( function to return within 3ms, without leaking goroutines and without closing the connections in case there's no response, do you think that's possible? do you have any insight you can share?

image

@khaf
Copy link
Collaborator

khaf commented Jun 28, 2023

Hi @juanma9613 , sorry for my late reply. It is this link that goes to master, which is the v4.x branch.

I suspect your issue is indeed the very short timeout, especially if your app is in a hosted datacenter (cloud environment). Over the years, we have observed a lot of inconsistency in both CPU and io scheduling on cloud instances. On top of that, other issues like hot keys can affect latency, as you suspect in your case.

Regarding your settings, ClientPolicy.OpeningConnectionThreshold = 256 by itself can cause issues, since the client will attempt to reopen up to 256 connections at the same time which could in theory put too much stress on the server node depending on the size and resources of the instance.

It seems to me that during the time of prolonged latency (image 2), you also had a corresponding connection churn period (image 1). I suspect that could explain the considerable latency increase at that same time frame. Connection churn is the root of all evil when it comes to the Aerospike clients, and a point against setting the timeout values too aggressively.

@juanma9613
Copy link
Author

@khaf , I'm gonna do the following then to make sure my timeout of 3ms is respected in my app

enclosing_fn_with_3ms_timeout():
    go call aerospike asyncronously (basepolicy.totaltimeout 100ms -> basepolicy.sockettimeout 100ms)
    wait for results up to 3ms
    return results or timeoutErr if aerospike didn't respond in 3ms

This may yield to some asyncronous fn / go routine leakage for certain time, but the maximum number of opened goroutines isn't too high.

Regarding the ClientPolicy.OpeningConnectionThreshold = 256, what do you recommend as a setting for this value? I see that the default value is 0, and I thought it was worse because It could try to create a lot of connections if the nodes don't acknowledge the opening connections quickly. That's why I set it as 256, to try to limit a little bit this edge case.

@khaf
Copy link
Collaborator

khaf commented Jun 29, 2023

I have seen some users do this via wrapping the client inside API that takes context.Context. It's not a bad compromise as far as I can tell, though I believe every use case has to be evaluated in its own environment. I have so far resisted that solution myself since it imposes an extra goroutine on the user per API call which is a price some of our performance conscious customers would not like to pay.

Another thing you could do would be to setup the circuit breaker (ClientPolicy.MaxErrorRate and ClientPolicy.ErrorRateWindow) to divert your reads from the node with hotkeys to other nodes. This could potentially work if those nodes are not affected by the other program.

Regarding the value of ClientPolicy.OpeningConnectionThreshold, I cannot tell you an exact value. I'd start by limiting it to 32 or 64, and see how that affects my program. Increase only if necessary. The reality is, that the values in ClientPolicy have significant effect on the behavior of your overall application during the time of crisis, and you need to pay extra attention to them, and possibly research your environment (via instrumenting your live app) to find the optimal values for them.

We are open and very accommodating to user feedback and requests in improving the client behavior. Let us know about your discoveries and observations and we'll try our best to architect reasonable and general solutions into the client.

@juanma9613
Copy link
Author

@khaf I was able to solve the issue by first limiting the openingConnections to 64, by also increasing the timeouts and finally and much more important:

I had to stop using Error.Matches, Error.Is and Error.as in the case this library returns an error from Client.Get. I just took the string of your error and created a new error myself.

Do you think it's possible that there's some circular reference in your errors or a very long error stack?

@khaf
Copy link
Collaborator

khaf commented Jul 28, 2023

@juanma9613 Thanks for the follow up. Do you mean that Error.Is and Error.As are taking so long that they affect the performance of your app?!

@juanma9613
Copy link
Author

@khaf , either what you said or there's an infinite error checking there. Unfortunately I found the fix of just not using the errors from the library for these error comparisons. It immediately solved the problem for me and I didn't change anything else in my app.

@juanma9613
Copy link
Author

juanma9613 commented Aug 3, 2023

@khaf For your debugging I can share a minimal working example of code that I used to reproduce the issue. Some of the TIC prints below don't have any corresponding print for TOC, which indicates a goroutine leakage when using your error. Specifically check the definition of the 'assing' variable under the second case statement of the getValuesfunction. The line Error(unwrappedErr) runs forever:
:

package main

import (
	"errors"
	"fmt"
	"runtime"
	"strings"
	"sync"
	"time"

	"github.com/aerospike/aerospike-client-go"
	"github.com/aerospike/aerospike-client-go/v6/types"
)

type QueryInfo struct {
	Namespace string
	Set       string
	Key       string
}

type ResultType struct {
	id    int
	value map[string]interface{}
	err   error
}

func main() {
	concurrency := 40
	var wg sync.WaitGroup
	nTest := 100000000
	timeout := time.Duration(100) * time.Microsecond // time.Millisecond // timeout of my getvalues with timeout

	clientPolicy := aerospike.NewClientPolicy()
	clientPolicy.ConnectionQueueSize = 1024
	clientPolicy.LoginTimeout = time.Duration(1) * time.Second
	clientPolicy.OpeningConnectionThreshold = 128
	hosts := make([]*aerospike.Host, 0)
	// TODO: add host here
	hosts = append(hosts, aerospike.NewHost("localhost", 30000)) // 
	username := "asdasdsaddadasd"
	password := "adasdasdasdasdas"
	clientPolicy.User = username
	clientPolicy.Password = password

	aerospikeClient, err := aerospike.NewClientWithPolicyAndHost(clientPolicy, hosts...)

	if err != nil {
		fmt.Println("found error in creation", err)
	}

	go func() {
		for {
			fmt.Println("GOROUTINES: ", runtime.NumGoroutine())
			time.Sleep(2 * time.Second)
		}
	}()

	for i := 0; i < concurrency; i++ {
		wg.Add(concurrency)
		go runTest(aerospikeClient, i, nTest, timeout, &wg)
	}
	wg.Wait()
}

func runTest(client *aerospike.Client, idx int, nTest int, timeout time.Duration, wg *sync.WaitGroup) {
	queries := []QueryInfo{{Namespace: "test", Set: "asd", Key: "asdasdas"}} 
	for i := 0; i < nTest; i++ {
		// run fn for 2ms
		GetValuesWithTimeout(client, timeout, queries)
		// do other stuff
		time.Sleep(time.Duration(100) * time.Microsecond)
	}
	wg.Done()

}

// this is the fn with strong timeout of 3ms

func GetValuesWithTimeout(client *aerospike.Client, timeout time.Duration, queries []QueryInfo) ([]map[string]interface{}, error) {
	size := len(queries)
	result := make([]map[string]interface{}, size)
	resChan := make(chan *ResultType, size)
	for id, query := range queries {
		go func(id int, query QueryInfo) {
			lres := &ResultType{
				id:    id,
				value: nil,
			}
			key, _ := aerospike.NewKey(query.Namespace, query.Set, query.Key)

			//
			// IMPORTANT: READ POLICY PARAMETERS
			readPolicy := client.DefaultPolicy
			readPolicy.TotalTimeout = time.Duration(50) * time.Microsecond
			readPolicy.ReplicaPolicy = aerospike.MASTER_PROLES

			record, err := client.Get(readPolicy, key)
			if err != nil {
				fmt.Println("err getting key", err)
				switch {
				case err.Matches(types.KEY_NOT_FOUND_ERROR):
					// Aerospike.KEY_NOT_FOUND_ERROR is treated as a nil value, no error passed.
					lres.value = nil
				case err.Matches(types.TIMEOUT) || err.Matches(types.QUERY_TIMEOUT):
					lres.value = nil
					lres.err = errors.New("timeout aerospike " + err.Error())
					unwrappedErr := err.Unwrap()
					if unwrappedErr != nil {
						fmt.Println("Unwrapped err aerospike time: ", unwrappedErr.Error()) // if this is the error, then try to do things on it
						lres.err = unwrappedErr
						fmt.Println("TIC")
						assing := Error(unwrappedErr)
						fmt.Println("TOC")
						runtime.KeepAlive(assing)

					} else {
						fmt.Println("Error in aerospike: ", err.Error())
						lres.err = errors.New("Error in aerospike" + err.Error())
					}

				default:
					lres.value = nil
					unwrappedErr := err.Unwrap()
					if unwrappedErr != nil {
						fmt.Println("Unwrapped err aerospike: ", unwrappedErr.Error()) // if this is the error, then try to do things on it
						lres.err = unwrappedErr
						fmt.Println("TiC")
						assing := Error(unwrappedErr)
						fmt.Println("TOC")
						runtime.KeepAlive(assing)

					} else {
						fmt.Println("Error in aerospike: ", err.Error())
						lres.err = errors.New("Error in aerospike" + err.Error())
					}
				}
				return
			}
			lres.value = map[string]interface{}(record.Bins)
		}(id, query)
	}

	count := 0
	t := time.NewTimer(timeout)
	for {
		select {
		case r := <-resChan:
			if r.err != nil {
				if strings.Contains(r.err.Error(), "timeout") {
					return nil, errors.New("aerospike get timeout")
				}
				return nil, r.err
			}

			result[r.id] = r.value
			if count++; count >= size {
				return result, nil
			}

		case <-t.C:
			return nil, errors.New("this function timedout")
		}
	}
}

Also the additional serving error code I used in my app is below:

package main

import (
	"fmt"

	"github.com/go-errors/errors"
)

const (
	Unknown = -1
)

type ServingError struct {
	err   *errors.Error
	cause Cause
	code  int
	fatal bool
}

type Cause interface {
	Error() string
	Trace() string
	GetCode() int
	IsFatal() bool
}

type wrapError interface {
	Error() string
	Unwrap() error
}

func Error(v interface{}) *ServingError {
	return wrap(v, 1)
}

func Errorf(format string, a ...interface{}) *ServingError {
	return wrap(fmt.Errorf(format, a...), 1)
}

func wrap(v interface{}, skip int) *ServingError {
	if v == nil {
		return nil
	}

	res := &ServingError{
		cause: nil,
		err:   nil,
		code:  Unknown,
		fatal: false,
	}

	switch e := v.(type) {
	case *ServingError:
		return e

	case *errors.Error:
		res.err = errors.Wrap(e, skip+1)
		res.cause = Cause(res)

	case wrapError:
		var latest *ServingError
		res.err = errors.Wrap(e, skip+1)
		res.cause, latest = findCause(e, skip+1)
		if latest != nil {
			res.code = latest.code
			res.fatal = latest.fatal
		}

	default:
		res.err = errors.Wrap(e, skip+1)
		res.cause = Cause(res)
	}

	return res
}

func findCause(err error, skip int) (cause Cause, latest *ServingError) {
	cur := err

	for {
		switch err := cur.(type) {
		case *ServingError:
			return err.cause, err

		case wrapError:
			cur = err.Unwrap()
			if cur == nil {
				return Cause(wrap(err, skip+1)), nil
			}

		default:
			return Cause(wrap(cur, skip+1)), nil
		}
	}
}

func (e *ServingError) Cause() Cause {
	return e.cause
}

func (e *ServingError) Error() string {
	return e.err.Error()
}

func (e *ServingError) Unwrap() error {
	return e.err.Unwrap()
}

func (e *ServingError) Code(code int) *ServingError {
	if e.code < code {
		e.code = code
	}

	return e
}

func (e *ServingError) Fatal(enable bool) *ServingError {
	e.fatal = enable
	return e
}

func (e *ServingError) Trace() string {
	return string(e.err.Stack())
}

func (e *ServingError) GetCode() int {
	return e.code
}

func (e *ServingError) IsFatal() bool {
	return e.fatal
}

@juanma9613
Copy link
Author

juanma9613 commented Sep 6, 2023

golang/go#34957

Hi @khaf, I think that something like this can be created by the type of errors that client.Get returns, which have the unwrap method as well, do you think there's a possibility of that happening?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants