Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add metrics endpoint to stress test tool #1910

Merged
merged 5 commits into from Dec 15, 2017
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
220 changes: 94 additions & 126 deletions cmd/containerd-stress/main.go
Expand Up @@ -2,29 +2,68 @@ package main

import (
"context"
"encoding/json"
"fmt"
"net/http"
"os"
"os/signal"
"path/filepath"
"runtime"
"strconv"
"strings"
"sync"
"syscall"
"time"

"github.com/containerd/containerd"
"github.com/containerd/containerd/cio"
"github.com/containerd/containerd/containers"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/oci"
specs "github.com/opencontainers/runtime-spec/specs-go"
metrics "github.com/docker/go-metrics"
"github.com/sirupsen/logrus"
"github.com/urfave/cli"
)

const imageName = "docker.io/library/alpine:latest"

type run struct {
total int
failures int

started time.Time
ended time.Time
}

func (r *run) start() {
r.started = time.Now()
}

func (r *run) end() {
r.ended = time.Now()
}

func (r *run) seconds() float64 {
return r.ended.Sub(r.started).Seconds()
}

func (r *run) gather(workers []*worker) *result {
for _, w := range workers {
r.total += w.count
r.failures += w.failures
}
sec := r.seconds()
return &result{
Total: r.total,
Seconds: sec,
ContainersPerSecond: float64(r.total) / sec,
SecondsPerContainer: sec / float64(r.total),
}
}

type result struct {
Total int `json:"total"`
Seconds float64 `json:"seconds"`
ContainersPerSecond float64 `json:"containersPerSecond"`
SecondsPerContainer float64 `json:"secondsPerContainer"`
}

func main() {
// morr power!
runtime.GOMAXPROCS(runtime.NumCPU())
Expand Down Expand Up @@ -56,11 +95,22 @@ func main() {
Name: "exec",
Usage: "add execs to the stress tests",
},
cli.BoolFlag{
Name: "json,j",
Usage: "output results in json format",
},
cli.StringFlag{
Name: "metrics,m",
Usage: "address to serve the metrics API",
},
}
app.Before = func(context *cli.Context) error {
if context.GlobalBool("debug") {
logrus.SetLevel(logrus.DebugLevel)
}
if context.GlobalBool("json") {
logrus.SetLevel(logrus.WarnLevel)
}
return nil
}
app.Action = func(context *cli.Context) error {
Expand All @@ -69,6 +119,11 @@ func main() {
Duration: context.GlobalDuration("duration"),
Concurrency: context.GlobalInt("concurrent"),
Exec: context.GlobalBool("exec"),
JSON: context.GlobalBool("json"),
Metrics: context.GlobalString("metrics"),
}
if config.Metrics != "" {
return serve(config)
}
return test(config)
}
Expand All @@ -83,12 +138,23 @@ type config struct {
Duration time.Duration
Address string
Exec bool
JSON bool
Metrics string
}

func (c config) newClient() (*containerd.Client, error) {
return containerd.New(c.Address)
}

func serve(c config) error {
go func() {
if err := http.ListenAndServe(c.Metrics, metrics.Handler()); err != nil {
logrus.WithError(err).Error("listen and serve")
}
}()
return test(c)
}

func test(c config) error {
var (
wg sync.WaitGroup
Expand Down Expand Up @@ -119,13 +185,18 @@ func test(c config) error {

var (
workers []*worker
start = time.Now()
r = &run{}
)
logrus.Info("starting stress test run...")
args := oci.WithProcessArgs("true")
if c.Exec {
args = oci.WithProcessArgs("sleep", "10")
}
v, err := client.Version(ctx)
if err != nil {
return err
}
// create the workers along with their spec
for i := 0; i < c.Concurrency; i++ {
wg.Add(1)
spec, err := oci.GenerateSpec(ctx, client,
Expand All @@ -143,140 +214,37 @@ func test(c config) error {
image: image,
client: client,
doExec: c.Exec,
commit: v.Revision,
}
workers = append(workers, w)
go w.run(ctx, tctx)
}
wg.Wait()

var (
total int
failures int
end = time.Now().Sub(start).Seconds()
)
logrus.Infof("ending test run in %0.3f seconds", end)
// start the timer and run the worker
r.start()
for _, w := range workers {
total += w.count
failures += w.failures
go w.run(ctx, tctx)
}
logrus.WithField("failures", failures).Infof(
"create/start/delete %d containers in %0.3f seconds (%0.3f c/sec) or (%0.3f sec/c)",
total,
end,
float64(total)/end,
end/float64(total),
)
return nil
}

type worker struct {
id int
wg *sync.WaitGroup
count int
failures int

client *containerd.Client
image containerd.Image
spec *specs.Spec
doExec bool
}

func (w *worker) run(ctx, tctx context.Context) {
defer func() {
w.wg.Done()
logrus.Infof("worker %d finished", w.id)
}()
for {
select {
case <-tctx.Done():
return
default:
}

w.count++
id := w.getID()
logrus.Debugf("starting container %s", id)
if err := w.runContainer(ctx, id); err != nil {
if err != context.DeadlineExceeded ||
!strings.Contains(err.Error(), context.DeadlineExceeded.Error()) {
w.failures++
logrus.WithError(err).Errorf("running container %s", id)
// wait and end the timer
wg.Wait()
r.end()

}
}
}
}
results := r.gather(workers)
logrus.Infof("ending test run in %0.3f seconds", results.Seconds)

func (w *worker) runContainer(ctx context.Context, id string) error {
// fix up cgroups path for a default config
w.spec.Linux.CgroupsPath = filepath.Join("/", "stress", id)
c, err := w.client.NewContainer(ctx, id,
containerd.WithNewSnapshot(id, w.image),
containerd.WithSpec(w.spec, oci.WithUsername("games")),
logrus.WithField("failures", r.failures).Infof(
"create/start/delete %d containers in %0.3f seconds (%0.3f c/sec) or (%0.3f sec/c)",
results.Total,
results.Seconds,
results.ContainersPerSecond,
results.SecondsPerContainer,
)
if err != nil {
return err
}
defer c.Delete(ctx, containerd.WithSnapshotCleanup)

task, err := c.NewTask(ctx, cio.NullIO)
if err != nil {
return err
}
defer task.Delete(ctx, containerd.WithProcessKill)

statusC, err := task.Wait(ctx)
if err != nil {
return err
}
if err := task.Start(ctx); err != nil {
return err
}
if w.doExec {
for i := 0; i < 256; i++ {
if err := w.exec(ctx, i, task); err != nil {
w.failures++
logrus.WithError(err).Error("exec failure")
}
}
if err := task.Kill(ctx, syscall.SIGKILL); err != nil {
if c.JSON {
if err := json.NewEncoder(os.Stdout).Encode(results); err != nil {
return err
}
}
status := <-statusC
_, _, err = status.Result()
if err != nil {
if err == context.DeadlineExceeded || err == context.Canceled {
return nil
}
w.failures++
}
return nil
}

func (w *worker) exec(ctx context.Context, i int, t containerd.Task) error {
pSpec := *w.spec.Process
pSpec.Args = []string{"true"}
process, err := t.Exec(ctx, strconv.Itoa(i), &pSpec, cio.NullIO)
if err != nil {
return err
}
defer process.Delete(ctx)
status, err := process.Wait(ctx)
if err != nil {
return err
}
if err := process.Start(ctx); err != nil {
return err
}
<-status
return nil
}

func (w *worker) getID() string {
return fmt.Sprintf("%d-%d", w.id, w.count)
}

// cleanup cleans up any containers in the "stress" namespace before the test run
func cleanup(ctx context.Context, client *containerd.Client) error {
containers, err := client.Containers(ctx)
Expand Down