Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support delegation to external load testing tools #90

Merged
merged 17 commits into from Mar 12, 2019
Merged
17 changes: 17 additions & 0 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 2 additions & 3 deletions cmd/loadtester/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@ var (
func init() {
flag.StringVar(&logLevel, "log-level", "debug", "Log level can be: debug, info, warning, error.")
flag.StringVar(&port, "port", "9090", "Port to listen on.")
flag.DurationVar(&timeout, "timeout", time.Hour, "Command exec timeout.")
flag.BoolVar(&logCmdOutput, "log-cmd-output", true, "Log command output to stderr")
flag.DurationVar(&timeout, "timeout", time.Hour, "Load test exec timeout.")
flag.BoolVar(&zapReplaceGlobals, "zap-replace-globals", false, "Whether to change the logging level of the global zap logger.")
flag.StringVar(&zapEncoding, "zap-encoding", "json", "Zap logger encoding.")
}
Expand All @@ -44,7 +43,7 @@ func main() {

stopCh := signals.SetupSignalHandler()

taskRunner := loadtester.NewTaskRunner(logger, timeout, logCmdOutput)
taskRunner := loadtester.NewTaskRunner(logger, timeout)

go taskRunner.Start(100*time.Millisecond, stopCh)

Expand Down
35 changes: 5 additions & 30 deletions pkg/loadtester/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,7 @@ package loadtester

import (
"context"
"encoding/hex"
"fmt"
"go.uber.org/zap"
"hash/fnv"
"os/exec"
"sync"
"sync/atomic"
"time"
Expand All @@ -21,24 +17,12 @@ type TaskRunner struct {
logCmdOutput bool
}

type Task struct {
Canary string
Command string
}

func (t Task) Hash() string {
fnvHash := fnv.New32()
fnvBytes := fnvHash.Sum([]byte(t.Canary + t.Command))
return hex.EncodeToString(fnvBytes[:])
}

func NewTaskRunner(logger *zap.SugaredLogger, timeout time.Duration, logCmdOutput bool) *TaskRunner {
func NewTaskRunner(logger *zap.SugaredLogger, timeout time.Duration) *TaskRunner {
return &TaskRunner{
logger: logger,
todoTasks: new(sync.Map),
runningTasks: new(sync.Map),
timeout: timeout,
logCmdOutput: logCmdOutput,
}
}

Expand Down Expand Up @@ -69,24 +53,15 @@ func (tr *TaskRunner) runAll() {
// increment the total exec counter
atomic.AddUint64(&tr.totalExecs, 1)

tr.logger.With("canary", t.Canary).Infof("command starting %s", t.Command)
cmd := exec.CommandContext(ctx, "sh", "-c", t.Command)
tr.logger.With("canary", t.Canary()).Infof("task starting %s", t)

// execute task
out, err := cmd.CombinedOutput()
if err != nil {
tr.logger.With("canary", t.Canary).Errorf("command failed %s %v %s", t.Command, err, out)
} else {
if tr.logCmdOutput {
fmt.Printf("%s\n", out)
}
tr.logger.With("canary", t.Canary).Infof("command finished %s", t.Command)
}
// run task with the timeout context
t.Run(ctx)

// remove task from the running list
tr.runningTasks.Delete(t.Hash())
} else {
tr.logger.With("canary", t.Canary).Infof("command skipped %s is already running", t.Command)
tr.logger.With("canary", t.Canary()).Infof("command skipped %s is already running", t)
}
}(task)
return true
Expand Down
13 changes: 4 additions & 9 deletions pkg/loadtester/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,13 @@ import (
func TestTaskRunner_Start(t *testing.T) {
stop := make(chan struct{})
logger, _ := logging.NewLogger("debug")
tr := NewTaskRunner(logger, time.Hour, false)
tr := NewTaskRunner(logger, time.Hour)

go tr.Start(10*time.Millisecond, stop)

task1 := Task{
Canary: "podinfo.default",
Command: "sleep 0.6",
}
task2 := Task{
Canary: "podinfo.default",
Command: "sleep 0.7",
}
taskFactory, _ := GetTaskFactory(TaskTypeShell)
task1, _ := taskFactory(map[string]string{"cmd": "sleep 0.6"}, "podinfo.default", logger)
task2, _ := taskFactory(map[string]string{"cmd": "sleep 0.7"}, "podinfo.default", logger)

tr.Add(task1)
tr.Add(task2)
Expand Down
23 changes: 16 additions & 7 deletions pkg/loadtester/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,25 @@ func ListenAndServe(port string, timeout time.Duration, logger *zap.SugaredLogge
}

if len(payload.Metadata) > 0 {
if cmd, ok := payload.Metadata["cmd"]; ok {
taskRunner.Add(Task{
Canary: fmt.Sprintf("%s.%s", payload.Name, payload.Namespace),
Command: cmd,
})
} else {
metadata := payload.Metadata
var typ, ok = metadata["type"]
if !ok {
typ = TaskTypeShell
}
taskFactory, ok := GetTaskFactory(typ)
if !ok {
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte(fmt.Sprintf("unknown task type %s", typ)))
return
}
canary := fmt.Sprintf("%s.%s", payload.Name, payload.Namespace)
task, err := taskFactory(metadata, canary, logger)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte("cmd not found in metadata"))
w.Write([]byte(err.Error()))
return
}
taskRunner.Add(task)
} else {
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte("metadata not found in payload"))
Expand Down
40 changes: 40 additions & 0 deletions pkg/loadtester/task.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package loadtester

import (
"context"
"encoding/hex"
"go.uber.org/zap"
"hash/fnv"
"sync"
)

// Modeling a loadtester task
type Task interface {
Hash() string
Run(ctx context.Context) bool
String() string
Canary() string
}

type TaskBase struct {
canary string
logger *zap.SugaredLogger
}

func (task *TaskBase) Canary() string {
return task.canary
}
func hash(str string) string {
fnvHash := fnv.New32()
fnvBytes := fnvHash.Sum([]byte(str))
return hex.EncodeToString(fnvBytes[:])
}

var taskFactories = new(sync.Map)

type TaskFactory = func(metadata map[string]string, canary string, logger *zap.SugaredLogger) (Task, error)

func GetTaskFactory(typ string) (TaskFactory, bool) {
factory, ok := taskFactories.Load(typ)
return factory.(TaskFactory), ok
}
158 changes: 158 additions & 0 deletions pkg/loadtester/task_ngrinder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
package loadtester

import (
"context"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"go.uber.org/zap"
"io/ioutil"
"net/http"
"net/url"
"strconv"
"time"
)

const TaskTypeNGrinder = "ngrinder"

func init() {
taskFactories.Store(TaskTypeNGrinder, func(metadata map[string]string, canary string, logger *zap.SugaredLogger) (Task, error) {
server := metadata["server"]
clone := metadata["clone"]
username := metadata["username"]
passwd := metadata["passwd"]
pollInterval := metadata["pollInterval"]
if server == "" || clone == "" || username == "" || passwd == "" {
return nil, errors.New("server, clone, username and passwd are required metadata")
}
baseUrl, err := url.Parse(server)
if err != nil {
return nil, errors.New(fmt.Sprintf("invalid url: %s", server))
}
cloneId, err := strconv.Atoi(clone)
if err != nil {
return nil, errors.New("metadata clone must be integer")
}

passwdDecoded, err := base64.StdEncoding.DecodeString(passwd)
if err != nil {
return nil, errors.New("metadata auth provided is invalid, base64 encoded username:password required")
}
interval, err := strconv.Atoi(pollInterval)
if err != nil {
interval = 1
}

return &NGrinderTask{
TaskBase{canary, logger},
baseUrl, cloneId, username, string(passwdDecoded), -1, time.Duration(interval),
}, nil
})
}

type NGrinderTask struct {
TaskBase
// base url of ngrinder server, e.g. http://ngrinder:8080
baseUrl *url.URL
// template test to clone from
cloneId int
// http basic auth
username string
passwd string
// current ngrinder test id
testId int
// task status polling interval
pollInterval time.Duration
}

func (task *NGrinderTask) Hash() string {
return hash(task.canary + string(task.cloneId))
}

// nGrinder REST endpoints
func (task *NGrinderTask) CloneAndStartEndpoint() *url.URL {
path, _ := url.Parse(fmt.Sprintf("perftest/api/%d/clone_and_start", task.cloneId))
return task.baseUrl.ResolveReference(path)
}
func (task *NGrinderTask) StatusEndpoint() *url.URL {
path, _ := url.Parse(fmt.Sprintf("perftest/api/%d/status", task.testId))
return task.baseUrl.ResolveReference(path)
}
func (task *NGrinderTask) StopEndpoint() *url.URL {
path, _ := url.Parse(fmt.Sprintf("perftest/api/%d?action=stop", task.testId))
return task.baseUrl.ResolveReference(path)
}

// initiate a clone_and_start request and get new test id from response
func (task *NGrinderTask) Run(ctx context.Context) bool {
url := task.CloneAndStartEndpoint().String()
result, err := task.request("POST", url, ctx)
if err != nil {
task.logger.With("canary", task.canary).Errorf("failed to clone and start ngrinder test %s: %s", url, err.Error())
return false
}
id := result["id"]
task.testId = int(id.(float64))
return task.PollStatus(ctx)
}

func (task *NGrinderTask) String() string {
return task.canary + task.CloneAndStartEndpoint().String()
}

// polling execution status of the new test and check if finished
func (task *NGrinderTask) PollStatus(ctx context.Context) bool {
// wait until ngrinder test finished/canceled or timedout
tickChan := time.NewTicker(time.Second * task.pollInterval).C
for {
select {
case <-tickChan:
result, err := task.request("GET", task.StatusEndpoint().String(), ctx)
if err == nil {
statusArray, ok := result["status"].([]interface{})
if ok && len(statusArray) > 0 {
status := statusArray[0].(map[string]interface{})
statusId := status["status_id"]
task.logger.Debugf("status of ngrinder task %d is %s", task.testId, statusId)
if statusId == "FINISHED" {
return true
} else if statusId == "STOP_BY_ERROR" || statusId == "CANCELED" || statusId == "UNKNOWN" {
return false
}
}
}
case <-ctx.Done():
task.logger.Warnf("context timedout, top ngrinder task %d forcibly", task.testId)
task.request("PUT", task.StopEndpoint().String(), nil)
return false
}
}
}

// send request, handle error, and eavl response json
func (task *NGrinderTask) request(method, url string, ctx context.Context) (map[string]interface{}, error) {
task.logger.Debugf("send %s request to %s", method, url)
req, _ := http.NewRequest(method, url, nil)
req.SetBasicAuth(task.username, task.passwd)
if ctx != nil {
req = req.WithContext(ctx)
}
resp, err := http.DefaultClient.Do(req)
if resp != nil {
defer resp.Body.Close()
}
if err != nil {
task.logger.Errorf("bad request: %s", err.Error())
return nil, err
}
respBytes, err := ioutil.ReadAll(resp.Body)
res := make(map[string]interface{})
err = json.Unmarshal(respBytes, &res)
if err != nil {
task.logger.Errorf("bad response, %s ,json expected:\n %s", err.Error(), string(respBytes))
} else if success, ok := res["success"]; ok && success == false {
err = errors.New(res["message"].(string))
}
return res, err
}