Skip to content

Commit

Permalink
Don't open and close HTTP connections too often
Browse files Browse the repository at this point in the history
This adds a test to verify we aren't opening and closing HTTP
connections too often when sending events to a single host. The test
will fail in CI (it fails locally) until I push a subsequent change to
actually fix this issue.

The test tracks the count of opened HTTP connections and fails if it's
an amount more than double the expected concurrency. In practice, when
this is fixed properly, the number of opened connections should be
close to if not exactly the expected concurrency. In the case of this
test, that would be 200 new connections. As the code stands today,
when I run this on my laptop, we actually opening over 3000 new
connections in just the 1 second this test runs.

Signed-off-by: Ben Browning <bbrownin@redhat.com>
  • Loading branch information
bbrowning committed Apr 12, 2019
1 parent ef09d69 commit 5849507
Show file tree
Hide file tree
Showing 2 changed files with 129 additions and 0 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,6 @@ require (
go.uber.org/atomic v1.3.2 // indirect
go.uber.org/multierr v1.1.0 // indirect
go.uber.org/zap v1.9.1
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4
golang.org/x/sys v0.0.0-20190219203350-90b0e4468f99 // indirect
)
128 changes: 128 additions & 0 deletions pkg/cloudevents/transport/http/transport_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
package http_test

import (
"context"
"fmt"
"net"
"net/http"
"sync/atomic"
"testing"
"time"

"github.com/cloudevents/sdk-go/pkg/cloudevents"
cehttp "github.com/cloudevents/sdk-go/pkg/cloudevents/transport/http"
"github.com/cloudevents/sdk-go/pkg/cloudevents/types"
"golang.org/x/sync/errgroup"
)

type task func() error

// We can't use net/http/httptest.Server here because it's connection
// tracking logic interferes with the connection lifecycle under test
func startTestServer(handler http.Handler) (*http.Server, error) {
listener, err := net.Listen("tcp", ":0")
if err != nil {
return nil, err
}
server := &http.Server{
Addr: listener.Addr().String(),
Handler: handler,
}
go server.Serve(listener)
return server, nil
}

func doConcurrently(concurrency int, duration time.Duration, fn task) error {
var group errgroup.Group
for i := 0; i < concurrency; i++ {
group.Go(func() error {
done := time.After(duration)
for {
select {
case <-done:
return nil
default:
if err := fn(); err != nil {
return err
}
}
}
})
}

if err := group.Wait(); err != nil {
return err
}
return nil
}

// An example of how to make a stable client under sustained
// concurrency sending to a single host
func makeStableClient(addr string) (*cehttp.Transport, error) {
ceClient, err := cehttp.New(cehttp.WithTarget(addr))
if err != nil {
return nil, err
}
netHTTPTransport := &http.Transport{
MaxIdleConnsPerHost: 1000,
MaxConnsPerHost: 5000,
}
netHTTPClient := &http.Client{
Transport: netHTTPTransport,
}
ceClient.Client = netHTTPClient
return ceClient, nil
}

func TestStableConnectionsToSingleHost(t *testing.T) {
// Start a dummy HTTP server
handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
time.Sleep(50 * time.Millisecond)
fmt.Fprintf(w, `{"success": true}`)
})
sinkServer, err := startTestServer(handler)
if err != nil {
t.Fatalf("unexpected error starting test http server %v", err.Error())
}
defer sinkServer.Close()

// Keep track of all new connections to that dummy HTTP server
var newConnectionCount uint64
sinkServer.ConnState = func(connection net.Conn, state http.ConnState) {
if state == http.StateNew {
atomic.AddUint64(&newConnectionCount, 1)
}
}

ceClient, err := makeStableClient("http://" + sinkServer.Addr)
if err != nil {
t.Fatalf("unexpected error creating CloudEvents client %v", err.Error())
}
event := cloudevents.Event{
Context: &cloudevents.EventContextV02{
SpecVersion: cloudevents.CloudEventsVersionV02,
Type: "test.event",
Source: *types.ParseURLRef("test"),
},
}

ctx := context.TODO()
concurrency := 200
duration := 1 * time.Second
err = doConcurrently(concurrency, duration, func() error {
_, err := ceClient.Send(ctx, event)
if err != nil {
return fmt.Errorf("unexpected error sending CloudEvent %v", err.Error())
}
return nil
})
if err != nil {
t.Errorf("error sending concurrent CloudEvents: %v", err)
}

// newConnectionCount usually equals concurrency, but give some
// leeway. When this fails, it fails by a lot
if newConnectionCount > uint64(concurrency*2) {
t.Errorf("too many new connections opened: expected %d, got %d", concurrency, newConnectionCount)
}
}

0 comments on commit 5849507

Please sign in to comment.