Skip to content

Commit

Permalink
Improve the per-VU buffer pool (#2879)
Browse files Browse the repository at this point in the history
* Remove buffer pool from http request

* Add sync.Pool for http resposne

* Add sync.Pool for http resposne

* Add sync.Pool for http resposne

* Add sync.Pool for http resposne

* Fix type assertion

* Remove bpool dependency

* Add benchmark test for memory consumption

* Add benchmark test for memory consumption

* Add benchmark test for memory consumption

* Add benchmark test for memory consumption

* Add benchmark test for memory consumption

* Add benchmark test for memory consumption

* Fix go.sum

* Ignore lint type assertion
  • Loading branch information
davidpst committed Mar 15, 2023
1 parent 314d9d5 commit 12f5dd8
Show file tree
Hide file tree
Showing 21 changed files with 162 additions and 586 deletions.
4 changes: 2 additions & 2 deletions cmd/tests/cmd_cloud_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ import (
"go.k6.io/k6/cmd"
)

func cloudTestStartSimple(t *testing.T, testRunID int) http.Handler {
func cloudTestStartSimple(tb testing.TB, testRunID int) http.Handler {
return http.HandlerFunc(func(resp http.ResponseWriter, req *http.Request) {
resp.WriteHeader(http.StatusOK)
_, err := fmt.Fprintf(resp, `{"reference_id": "%d"}`, testRunID)
assert.NoError(t, err)
assert.NoError(tb, err)
})
}

Expand Down
84 changes: 63 additions & 21 deletions cmd/tests/cmd_run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,13 +227,13 @@ func TestWrongEnvVarIterations(t *testing.T) {
assert.Empty(t, ts.LoggerHook.Drain())
}

func getSingleFileTestState(t *testing.T, script string, cliFlags []string, expExitCode exitcodes.ExitCode) *GlobalTestState {
func getSingleFileTestState(tb testing.TB, script string, cliFlags []string, expExitCode exitcodes.ExitCode) *GlobalTestState {
if cliFlags == nil {
cliFlags = []string{"-v", "--log-output=stdout"}
}

ts := NewGlobalTestState(t)
require.NoError(t, afero.WriteFile(ts.FS, filepath.Join(ts.Cwd, "test.js"), []byte(script), 0o644))
ts := NewGlobalTestState(tb)
require.NoError(tb, afero.WriteFile(ts.FS, filepath.Join(ts.Cwd, "test.js"), []byte(script), 0o644))
ts.CmdArgs = append(append([]string{"k6", "run"}, cliFlags...), "test.js")
ts.ExpectedExitCode = int(expExitCode)

Expand Down Expand Up @@ -463,12 +463,12 @@ func TestSubMetricThresholdNoData(t *testing.T) {
two..................: 42`)
}

func getTestServer(t *testing.T, routes map[string]http.Handler) *httptest.Server {
func getTestServer(tb testing.TB, routes map[string]http.Handler) *httptest.Server {
mux := http.HandlerFunc(func(resp http.ResponseWriter, req *http.Request) {
for methodAndRoute, handler := range routes {
methodRouteTuple := strings.SplitN(methodAndRoute, " ", 2)
regex, err := regexp.Compile(methodRouteTuple[1])
require.NoError(t, err)
require.NoError(tb, err)

if req.Method == methodRouteTuple[0] && regex.Match([]byte(req.URL.String())) {
handler.ServeHTTP(resp, req)
Expand All @@ -483,63 +483,63 @@ func getTestServer(t *testing.T, routes map[string]http.Handler) *httptest.Serve
}

func getCloudTestEndChecker(
t *testing.T, testRunID int,
tb testing.TB, testRunID int,
testStart http.Handler, expRunStatus cloudapi.RunStatus, expResultStatus cloudapi.ResultStatus,
) *httptest.Server {
testFinished := false

if testStart == nil {
testStart = cloudTestStartSimple(t, testRunID)
testStart = cloudTestStartSimple(tb, testRunID)
}

srv := getTestServer(t, map[string]http.Handler{
srv := getTestServer(tb, map[string]http.Handler{
"POST ^/v1/tests$": testStart,
fmt.Sprintf("POST ^/v1/tests/%d$", testRunID): http.HandlerFunc(func(resp http.ResponseWriter, req *http.Request) {
require.NotNil(t, req.Body)
require.NotNil(tb, req.Body)
buf := &bytes.Buffer{}
_, err := io.Copy(buf, req.Body)
require.NoError(t, err)
require.NoError(t, req.Body.Close())
require.NoError(tb, err)
require.NoError(tb, req.Body.Close())

body := buf.Bytes()
require.True(t, gjson.ValidBytes(body))
require.True(tb, gjson.ValidBytes(body))

runStatus := gjson.GetBytes(body, "run_status")
require.True(t, runStatus.Exists()) // important to check, since run_status can be 0
require.True(tb, runStatus.Exists()) // important to check, since run_status can be 0
assert.Equalf(
t, expRunStatus, cloudapi.RunStatus(runStatus.Int()),
tb, expRunStatus, cloudapi.RunStatus(runStatus.Int()),
"received wrong run_status value",
)

resultStatus := gjson.GetBytes(body, "result_status")
require.True(t, resultStatus.Exists())
require.True(tb, resultStatus.Exists())
assert.Equalf(
t, expResultStatus, cloudapi.ResultStatus(resultStatus.Int()),
tb, expResultStatus, cloudapi.ResultStatus(resultStatus.Int()),
"received wrong result_status value",
)
testFinished = true
}),
})

t.Cleanup(func() {
assert.Truef(t, testFinished, "expected test to have called the cloud API endpoint to finish the test")
tb.Cleanup(func() {
assert.Truef(tb, testFinished, "expected test to have called the cloud API endpoint to finish the test")
srv.Close()
})

return srv
}

func getSimpleCloudOutputTestState(
t *testing.T, script string, cliFlags []string,
tb testing.TB, script string, cliFlags []string,
expRunStatus cloudapi.RunStatus, expResultStatus cloudapi.ResultStatus, expExitCode exitcodes.ExitCode,
) *GlobalTestState {
if cliFlags == nil {
cliFlags = []string{"-v", "--log-output=stdout"}
}
cliFlags = append(cliFlags, "--out", "cloud")

srv := getCloudTestEndChecker(t, 111, nil, expRunStatus, expResultStatus)
ts := getSingleFileTestState(t, script, cliFlags, expExitCode)
srv := getCloudTestEndChecker(tb, 111, nil, expRunStatus, expResultStatus)
ts := getSingleFileTestState(tb, script, cliFlags, expExitCode)
ts.Env["K6_CLOUD_HOST"] = srv.URL
return ts
}
Expand Down Expand Up @@ -1728,6 +1728,48 @@ func TestPrometheusRemoteWriteOutput(t *testing.T) {
assert.Contains(t, stdout, "output: Prometheus remote write")
}

func BenchmarkReadResponseBody(b *testing.B) {
httpSrv := httpmultibin.NewHTTPMultiBin(b)

script := httpSrv.Replacer.Replace(`
import http from "k6/http";
import { check, sleep } from "k6";
let statusCheck = { "status is 200": (r) => r.status === 200 }
export let options = {
duration: '10s',
vus: 10
};
export default function () {
let bytes = randomIntBetween(100 * 1024, 5 * 1024 * 1024)
let response = http.get(http.url` + "`HTTPBIN_IP_URL/bytes/${bytes}`" + `)
check(response, statusCheck)
let responses = http.batch([
["GET", http.url` + "`HTTPBIN_IP_URL/stream-bytes/${bytes}`" + `],
["GET", http.url` + "`HTTPBIN_IP_URL/stream-bytes/${bytes}`" + `],
["GET", http.url` + "`HTTPBIN_IP_URL/bytes/${bytes}`" + `],
["GET", http.url` + "`HTTPBIN_IP_URL/bytes/${bytes}`" + `],
["GET", http.url` + "`HTTPBIN_IP_URL/gzip`" + `],
["GET", http.url` + "`HTTPBIN_IP_URL/deflate`" + `],
["GET", http.url` + "`HTTPBIN_IP_URL/image/jpeg`" + `],
]);
responses.forEach(res => check(res, statusCheck))
sleep(0.1)
};
function randomIntBetween(min, max) {
return Math.floor(Math.random() * (max - min + 1) + min);
}
`)

ts := getSimpleCloudOutputTestState(b, script, nil, cloudapi.RunStatusFinished, cloudapi.ResultStatusPassed, 0)
cmd.ExecuteWithGlobalState(ts.GlobalState)
}

func TestBrowserPermissions(t *testing.T) {
t.Parallel()

Expand Down
24 changes: 12 additions & 12 deletions cmd/tests/test_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,20 +35,20 @@ type GlobalTestState struct {

// NewGlobalTestState returns an initialized GlobalTestState, mocking all
// GlobalState fields for use in tests.
func NewGlobalTestState(t *testing.T) *GlobalTestState {
func NewGlobalTestState(tb testing.TB) *GlobalTestState {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
tb.Cleanup(cancel)

fs := &afero.MemMapFs{}
cwd := "/test/" // TODO: Make this relative to the test?
if runtime.GOOS == "windows" {
cwd = "c:\\test\\"
}
require.NoError(t, fs.MkdirAll(cwd, 0o755))
require.NoError(tb, fs.MkdirAll(cwd, 0o755))

logger := logrus.New()
logger.SetLevel(logrus.InfoLevel)
logger.Out = testutils.NewTestOutput(t)
logger.Out = testutils.NewTestOutput(tb)
hook := &testutils.SimpleLogrusHook{HookedLevels: logrus.AllLevels}
logger.AddHook(hook)

Expand All @@ -64,14 +64,14 @@ func NewGlobalTestState(t *testing.T) *GlobalTestState {
defaultOsExitHandle := func(exitCode int) {
cancel()
osExitCalled = true
assert.Equal(t, ts.ExpectedExitCode, exitCode)
assert.Equal(tb, ts.ExpectedExitCode, exitCode)
}

t.Cleanup(func() {
tb.Cleanup(func() {
if ts.ExpectedExitCode > 0 {
// Ensure that, if we expected to receive an error, our `os.Exit()` mock
// function was actually called.
assert.Truef(t,
assert.Truef(tb,
osExitCalled,
"expected exit code %d, but the os.Exit() mock was not called",
ts.ExpectedExitCode,
Expand All @@ -81,7 +81,7 @@ func NewGlobalTestState(t *testing.T) *GlobalTestState {

outMutex := &sync.Mutex{}
defaultFlags := state.GetDefaultFlags(".config")
defaultFlags.Address = getFreeBindAddr(t)
defaultFlags.Address = getFreeBindAddr(tb)

ts.GlobalState = &state.GlobalState{
Ctx: ctx,
Expand All @@ -108,15 +108,15 @@ func NewGlobalTestState(t *testing.T) *GlobalTestState {
SignalNotify: signal.Notify,
SignalStop: signal.Stop,
Logger: logger,
FallbackLogger: testutils.NewLogger(t).WithField("fallback", true),
FallbackLogger: testutils.NewLogger(tb).WithField("fallback", true),
}

return ts
}

var portRangeStart uint64 = 6565 //nolint:gochecknoglobals

func getFreeBindAddr(t *testing.T) string {
func getFreeBindAddr(tb testing.TB) string {
for i := 0; i < 100; i++ {
port := atomic.AddUint64(&portRangeStart, 1)
addr := net.JoinHostPort("localhost", strconv.FormatUint(port, 10))
Expand All @@ -126,11 +126,11 @@ func getFreeBindAddr(t *testing.T) string {
continue // port was busy for some reason
}
defer func() {
assert.NoError(t, listener.Close())
assert.NoError(tb, listener.Close())
}()
return addr
}

t.Fatal("could not get a free port")
tb.Fatal("could not get a free port")
return ""
}
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ require (
github.com/mstoykov/atlas v0.0.0-20220811071828-388f114305dd
github.com/mstoykov/envconfig v1.4.1-0.20220114105314-765c6d8c76f1
github.com/nu7hatch/gouuid v0.0.0-20131221200532-179d4d0c4d8d
github.com/oxtoacart/bpool v0.0.0-20190530202638-03653db5a59c
github.com/pmezard/go-difflib v1.0.0
github.com/serenize/snaker v0.0.0-20201027110005-a7ad2135616e
github.com/sirupsen/logrus v1.9.0
Expand Down
1 change: 0 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,6 @@ github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE=
github.com/onsi/gomega v1.18.1 h1:M1GfJqGRrBrrGGsbxzV5dqM2U2ApXefZCQpkukxYRLE=
github.com/oxtoacart/bpool v0.0.0-20190530202638-03653db5a59c h1:rp5dCmg/yLR3mgFuSOe4oEnDDmGLROTvMragMUXpTQw=
github.com/oxtoacart/bpool v0.0.0-20190530202638-03653db5a59c/go.mod h1:X07ZCGwUbLaax7L0S3Tw4hpejzu63ZrrQiUe6W0hcy0=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand Down
5 changes: 2 additions & 3 deletions js/initcontext_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"time"

"github.com/dop251/goja"
"github.com/oxtoacart/bpool"
"github.com/sirupsen/logrus"
"github.com/spf13/afero"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -366,7 +365,7 @@ func TestRequestWithBinaryFile(t *testing.T) {
netext.NewResolver(net.LookupIP, 0, types.DNSfirst, types.DNSpreferIPv4),
)).DialContext,
},
BPool: bpool.NewBufferPool(1),
BufferPool: lib.NewBufferPool(),
Samples: make(chan metrics.SampleContainer, 500),
BuiltinMetrics: builtinMetrics,
Tags: lib.NewVUStateTags(registry.RootTagSet()),
Expand Down Expand Up @@ -513,7 +512,7 @@ func TestRequestWithMultipleBinaryFiles(t *testing.T) {
netext.NewResolver(net.LookupIP, 0, types.DNSfirst, types.DNSpreferIPv4),
)).DialContext,
},
BPool: bpool.NewBufferPool(1),
BufferPool: lib.NewBufferPool(),
Samples: make(chan metrics.SampleContainer, 500),
BuiltinMetrics: builtinMetrics,
Tags: lib.NewVUStateTags(registry.RootTagSet()),
Expand Down
17 changes: 8 additions & 9 deletions js/modules/k6/http/request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
"github.com/andybalholm/brotli"
"github.com/klauspost/compress/zstd"
"github.com/mccutchen/go-httpbin/httpbin"
"github.com/oxtoacart/bpool"
"github.com/sirupsen/logrus"
logtest "github.com/sirupsen/logrus/hooks/test"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -147,13 +146,13 @@ func newTestCase(t testing.TB) *httpTestCase {
samples := make(chan metrics.SampleContainer, 1000)

state := &lib.State{
Options: options,
Logger: logger,
Group: root,
TLSConfig: tb.TLSClientConfig,
Transport: tb.HTTPTransport,
BPool: bpool.NewBufferPool(1),
Samples: samples,
Options: options,
Logger: logger,
Group: root,
TLSConfig: tb.TLSClientConfig,
Transport: tb.HTTPTransport,
BufferPool: lib.NewBufferPool(),
Samples: samples,
Tags: lib.NewVUStateTags(registry.RootTagSet().WithTagsFromMap(map[string]string{
"group": root.Path,
})),
Expand Down Expand Up @@ -1973,7 +1972,7 @@ func BenchmarkHandlingOfResponseBodies(b *testing.B) {
rt := ts.runtime.VU.Runtime()
state := ts.runtime.VU.State()

state.BPool = bpool.NewBufferPool(100)
state.BufferPool = lib.NewBufferPool()

go func() {
ctxDone := tb.Context.Done()
Expand Down
15 changes: 8 additions & 7 deletions js/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"time"

"github.com/dop251/goja"
"github.com/oxtoacart/bpool"
"github.com/spf13/afero"
"golang.org/x/net/http2"
"golang.org/x/time/rate"
Expand Down Expand Up @@ -54,8 +53,9 @@ type Runner struct {
RPSLimit *rate.Limiter
RunTags *metrics.TagSet

console *console
setupData []byte
console *console
setupData []byte
BufferPool *lib.BufferPool
}

// New returns a new Runner for the provided source
Expand Down Expand Up @@ -99,6 +99,7 @@ func NewFromBundle(piState *lib.TestPreInitState, b *Bundle) (*Runner, error) {
Resolver: netext.NewResolver(
net.LookupIP, 0, defDNS.Select.DNSSelect, defDNS.Policy.DNSPolicy),
ActualResolver: net.LookupIP,
BufferPool: lib.NewBufferPool(),
}

err = r.SetOptions(r.Bundle.Options)
Expand Down Expand Up @@ -224,7 +225,7 @@ func (r *Runner) newVU(
CookieJar: cookieJar,
TLSConfig: tlsConfig,
Console: r.console,
BPool: bpool.NewBufferPool(100),
BufferPool: r.BufferPool,
Samples: samplesOut,
scenarioIter: make(map[string]uint64),
}
Expand All @@ -237,7 +238,7 @@ func (r *Runner) newVU(
TLSConfig: vu.TLSConfig,
CookieJar: cookieJar,
RPSLimit: vu.Runner.RPSLimit,
BPool: vu.BPool,
BufferPool: vu.BufferPool,
VUID: vu.ID,
VUIDGlobal: vu.IDGlobal,
Samples: vu.Samples,
Expand Down Expand Up @@ -585,8 +586,8 @@ type VU struct {
IDGlobal uint64 // global across all instances
iteration int64

Console *console
BPool *bpool.BufferPool
Console *console
BufferPool *lib.BufferPool

Samples chan<- metrics.SampleContainer

Expand Down

0 comments on commit 12f5dd8

Please sign in to comment.