Skip to content

Commit

Permalink
Add more visibility to IP address resolution when using DNS and stdcl…
Browse files Browse the repository at this point in the history
…ient socket count (#556)

* Add log to show the ip address DNS resolve to

* Add GetIpAddress method HTTPClient

* Fast client get ip address report

* get ip address for std client

* remove todo and add error handling

* resolve code climate comments

* fix the data race issue

* change fmt.Print to log.Info

* sort ip address by its usage count

* update the log to show ip address per thread

* remove go routine to get ip address when using stdclient

* add socket count for stdclient and add unit test

* Add a todo to move the IPCountMap field

* Add unit test for ip distribution

* change the sort ip distribution list logic

* move the getIPUsageCount to httprunner_test.go

* change the client init order to avoid using pointer for socket count

* Fix the issue that can cause nil pointer error and add unit test to cover it
  • Loading branch information
wuhaoyujerry committed May 5, 2022
1 parent 7052338 commit 58f458f
Show file tree
Hide file tree
Showing 4 changed files with 118 additions and 38 deletions.
66 changes: 47 additions & 19 deletions fhttp/http_client.go
Expand Up @@ -46,6 +46,8 @@ type Fetcher interface {
// Close() cleans up connections and state - must be paired with NewClient calls.
// returns how many sockets have been used (Fastclient only)
Close() int
// GetIPAddress() get the ip address that DNS resolves to
GetIPAddress() string
}

const (
Expand Down Expand Up @@ -308,6 +310,7 @@ type Client struct {
bodyContainsUUID bool // if body contains the "{uuid}" pattern (lowercase)
logErrors bool
id int
socketCount int
}

// Close cleans up any resources used by NewStdClient.
Expand All @@ -324,7 +327,7 @@ func (c *Client) Close() int {
if c.transport != nil {
c.transport.CloseIdleConnections()
}
return 0 // TODO: find a way to track std client socket usage.
return c.socketCount
}

// ChangeURL only for standard client, allows fetching a different URL.
Expand Down Expand Up @@ -396,6 +399,11 @@ func (c *Client) Fetch() (int, []byte, int) {
return code, data, 0
}

// GetIPAddress get the ip address that DNS resolves to when using stdClient.
func (c *Client) GetIPAddress() string {
return c.req.RemoteAddr
}

// NewClient creates either a standard or fast client (depending on
// the DisableFastClient flag).
func NewClient(o *HTTPOptions) (Fetcher, error) {
Expand All @@ -415,6 +423,23 @@ func NewStdClient(o *HTTPOptions) (*Client, error) {
if req == nil {
return nil, err
}

client := Client{
url: o.URL,
path: req.URL.Path,
pathContainsUUID: strings.Contains(req.URL.Path, uuidToken),
rawQuery: req.URL.RawQuery,
rawQueryContainsUUID: strings.Contains(req.URL.RawQuery, uuidToken),
body: o.PayloadString(),
bodyContainsUUID: strings.Contains(o.PayloadString(), uuidToken),
req: req,
client: &http.Client{
Timeout: o.HTTPReqTimeOut,
},
id: o.ID,
logErrors: o.LogErrors,
}

tr := http.Transport{
MaxIdleConns: o.NumConnections,
MaxIdleConnsPerHost: o.NumConnections,
Expand All @@ -426,35 +451,33 @@ func NewStdClient(o *HTTPOptions) (*Client, error) {
if o.Resolve != "" {
addr = o.Resolve + addr[strings.LastIndex(addr, ":"):]
}
return (&net.Dialer{
conn, err := (&net.Dialer{
Timeout: o.HTTPReqTimeOut,
}).DialContext(ctx, network, addr)

if conn != nil {
newRemoteAddress := conn.RemoteAddr().String()
if req.RemoteAddr != "" {
log.Infof("Standard client IP address changed from %s to %s", req.RemoteAddr, newRemoteAddress)
}
req.RemoteAddr = newRemoteAddress
client.socketCount++
}

return conn, err
},
TLSHandshakeTimeout: o.HTTPReqTimeOut,
}

client.client.Transport = &tr
client.transport = &tr

if o.https {
tr.TLSClientConfig, err = o.TLSOptions.TLSClientConfig()
if err != nil {
return nil, err
}
}
client := Client{
url: o.URL,
path: req.URL.Path,
pathContainsUUID: strings.Contains(req.URL.Path, uuidToken),
rawQuery: req.URL.RawQuery,
rawQueryContainsUUID: strings.Contains(req.URL.RawQuery, uuidToken),
body: o.PayloadString(),
bodyContainsUUID: strings.Contains(o.PayloadString(), uuidToken),
req: req,
client: &http.Client{
Timeout: o.HTTPReqTimeOut,
Transport: &tr,
},
transport: &tr,
id: o.ID,
logErrors: o.LogErrors,
}
if !o.FollowRedirects {
// Lets us see the raw response instead of auto following redirects.
client.client.CheckRedirect = func(req *http.Request, via []*http.Request) error {
Expand Down Expand Up @@ -511,6 +534,11 @@ type FastClient struct {
tlsConfig *tls.Config
}

// GetIPAddress get the ip address that DNS resolves to when using fast client.
func (c *FastClient) GetIPAddress() string {
return c.dest.String()
}

// Close cleans up any resources used by FastClient.
func (c *FastClient) Close() int {
log.Debugf("Closing %p %s socket count %d", c, c.url, c.socketCount)
Expand Down
18 changes: 17 additions & 1 deletion fhttp/http_test.go
Expand Up @@ -676,7 +676,7 @@ func TestSmallBufferAndNoKeepAlive(t *testing.T) {
cli.Close()
}

func TestBadUrl(t *testing.T) {
func TestBadUrlFastClient(t *testing.T) {
opts := NewHTTPOptions("not a valid url")
cli, err := NewFastClient(opts)
if cli != nil || err == nil {
Expand All @@ -691,6 +691,22 @@ func TestBadUrl(t *testing.T) {
}
}

func TestBadURLStdClient(t *testing.T) {
opts := NewHTTPOptions("not a valid url")
cli, err := NewStdClient(opts)
if cli != nil || err == nil {
t.Errorf("config1: got a client %v despite bogus url %s", cli, opts.URL)
cli.Close()
}

opts.URL = "http://doesnotexist.fortio.org"
cli, _ = NewStdClient(opts)
code, _, _ := cli.Fetch()
if code != -1 {
t.Errorf("config2: client can send request despite bogus url %s", opts.URL)
}
}

func TestDefaultPort(t *testing.T) {
// TODO: change back to fortio demo server once setup
url := "http://istio.io/" // shall imply port 80
Expand Down
32 changes: 26 additions & 6 deletions fhttp/httprunner.go
Expand Up @@ -35,8 +35,9 @@ import (
// Also is the internal type used per thread/goroutine.
type HTTPRunnerResults struct {
periodic.RunnerResults
client Fetcher
RetCodes map[int]int64
client Fetcher
RetCodes map[int]int64
IPCountMap map[string]int // TODO: Move it to a shared results struct where all runner should have this field
// internal type/data
sizes *stats.Histogram
headerSizes *stats.Histogram
Expand Down Expand Up @@ -98,6 +99,7 @@ func RunHTTPTest(o *HTTPRunnerOptions) (*HTTPRunnerResults, error) {
total := HTTPRunnerResults{
HTTPOptions: o.HTTPOptions,
RetCodes: make(map[int]int64),
IPCountMap: make(map[string]int),
sizes: stats.NewHistogram(0, 100),
headerSizes: stats.NewHistogram(0, 5),
AbortOn: o.AbortOn,
Expand Down Expand Up @@ -182,6 +184,11 @@ func RunHTTPTest(o *HTTPRunnerOptions) (*HTTPRunnerResults, error) {
// unused ones. We also must cleanup all the created clients.
keys := []int{}
for i := 0; i < numThreads; i++ {
// Get the report on the IP address each thread use to send traffic
ip := httpstate[i].client.GetIPAddress()
log.Infof("[%d] %s resolve to IP address: %s\n", i, o.URL, ip)
total.IPCountMap[ip]++

total.SocketCount += httpstate[i].client.Close()
// Q: is there some copying each time stats[i] is used?
for k := range httpstate[i].RetCodes {
Expand All @@ -193,14 +200,27 @@ func RunHTTPTest(o *HTTPRunnerOptions) (*HTTPRunnerResults, error) {
total.sizes.Transfer(httpstate[i].sizes)
total.headerSizes.Transfer(httpstate[i].headerSizes)
}

// Sort the ip address form largest to smallest based on its usage count
ipList := make([]string, 0, len(total.IPCountMap))
for k := range total.IPCountMap {
ipList = append(ipList, k)
}

sort.Slice(ipList, func(i, j int) bool {
return total.IPCountMap[ipList[i]] > total.IPCountMap[ipList[j]]
})

// Cleanup state:
r.Options().ReleaseRunners()
sort.Ints(keys)
totalCount := float64(total.DurationHistogram.Count)
if !o.DisableFastClient {
_, _ = fmt.Fprintf(out, "Sockets used: %d (for perfect keepalive, would be %d)\n", total.SocketCount, r.Options().NumThreads)
}
_, _ = fmt.Fprintf(out, "Sockets used: %d (for perfect keepalive, would be %d)\n", total.SocketCount, r.Options().NumThreads)
_, _ = fmt.Fprintf(out, "Uniform: %t, Jitter: %t\n", total.Uniform, total.Jitter)
_, _ = fmt.Fprintf(out, "IP addresses distribution:\n")
for _, v := range ipList {
_, _ = fmt.Fprintf(out, "%s: %d\n", v, total.IPCountMap[v])
}
for _, k := range keys {
_, _ = fmt.Fprintf(out, "Code %3d : %d (%.1f %%)\n", k, total.RetCodes[k], 100.*float64(total.RetCodes[k])/totalCount)
}
Expand All @@ -216,7 +236,7 @@ func RunHTTPTest(o *HTTPRunnerOptions) (*HTTPRunnerResults, error) {
return &total, nil
}

// A errgroup is a collection of goroutines working on subtasks that are part of
// An errgroup is a collection of goroutines working on subtasks that are part of
// the same overall task.
type errgroup struct {
wg sync.WaitGroup
Expand Down
40 changes: 28 additions & 12 deletions fhttp/httprunner_test.go
Expand Up @@ -60,6 +60,10 @@ func TestHTTPRunner(t *testing.T) {
if res.SocketCount != res.RunnerResults.NumThreads {
t.Errorf("%d socket used, expected same as thread# %d", res.SocketCount, res.RunnerResults.NumThreads)
}
count := getIPUsageCount(res.IPCountMap)
if count != res.RunnerResults.NumThreads {
t.Errorf("Total IP usage count %d, expected same as thread %d", count, res.RunnerResults.NumThreads)
}
// Test raw client, should get warning about non init timeout:
rawOpts := HTTPOptions{
URL: opts.URL,
Expand Down Expand Up @@ -121,11 +125,8 @@ func testHTTPNotLeaking(t *testing.T, opts *HTTPRunnerOptions) {
if ngAfter > ngBefore2+8 {
t.Errorf("Goroutines after test %d, expected it to stay near %d", ngAfter, ngBefore2)
}
if !opts.DisableFastClient {
// only fast client so far has a socket count
if res.SocketCount != res.RunnerResults.NumThreads {
t.Errorf("%d socket used, expected same as thread# %d", res.SocketCount, res.RunnerResults.NumThreads)
}
if res.SocketCount != res.RunnerResults.NumThreads {
t.Errorf("%d socket used, expected same as thread# %d", res.SocketCount, res.RunnerResults.NumThreads)
}
}

Expand Down Expand Up @@ -190,17 +191,16 @@ func TestHTTPRunnerClientRace(t *testing.T) {
}
}

func TestClosingAndSocketCount(t *testing.T) {
func testClosingAndSocketCount(t *testing.T, o *HTTPRunnerOptions) {
mux, addr := DynamicHTTPServer(false)
mux.HandleFunc("/echo42/", EchoHandler)
URL := fmt.Sprintf("http://localhost:%d/echo42/?close=true", addr.Port)
opts := HTTPRunnerOptions{}
opts.Init(URL)
opts.QPS = 10
o.Init(URL)
o.QPS = 10
numReq := int64(50) // can't do too many without running out of fds on mac
opts.Exactly = numReq
opts.NumThreads = 5
res, err := RunHTTPTest(&opts)
o.Exactly = numReq
o.NumThreads = 5
res, err := RunHTTPTest(o)
if err != nil {
t.Fatal(err)
}
Expand All @@ -217,6 +217,14 @@ func TestClosingAndSocketCount(t *testing.T) {
}
}

func TestClosingAndSocketCountFastClient(t *testing.T) {
testClosingAndSocketCount(t, &HTTPRunnerOptions{})
}

func TestClosingAndSocketCountStdClient(t *testing.T) {
testClosingAndSocketCount(t, &HTTPRunnerOptions{HTTPOptions: HTTPOptions{DisableFastClient: true}})
}

func TestHTTPRunnerBadServer(t *testing.T) {
// Using http to an https server (or the current 'close all' dummy https server)
// should fail:
Expand Down Expand Up @@ -462,3 +470,11 @@ func TestAbortOn(t *testing.T) {
t.Errorf("Abort2 not working, did %d requests expecting ideally 1 and <= %d", count, o.NumThreads)
}
}

func getIPUsageCount(ipCountMap map[string]int) (count int) {
for _, v := range ipCountMap {
count += v
}

return count
}

0 comments on commit 58f458f

Please sign in to comment.