Skip to content

Commit

Permalink
test(load): add load tests CLI & e2e tests (#157)
Browse files Browse the repository at this point in the history
  • Loading branch information
steebchen committed Feb 16, 2024
1 parent 4fe2951 commit 00111d8
Show file tree
Hide file tree
Showing 13 changed files with 454 additions and 84 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ jobs:

e2e:
runs-on: ubuntu-latest
timeout-minutes: 5
timeout-minutes: 30
env:
DATABASE_URL: postgresql://hatchet:hatchet@127.0.0.1:5431/hatchet

Expand Down Expand Up @@ -164,7 +164,7 @@ jobs:
run: |
export HATCHET_CLIENT_TOKEN="$(go run ./cmd/hatchet-admin token create --config ./generated/ --tenant-id 707d0855-80ab-4e1f-a156-f1c4546cbf52)"
go test -tags e2e ./... -p 1 -v -failfast
go test -tags e2e ./... -race -p 1 -v -failfast
- name: Teardown
run: docker compose down
66 changes: 66 additions & 0 deletions examples/loadtest/cli/cli_e2e_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
//go:build e2e

package main

import (
"testing"
"time"

"go.uber.org/goleak"

"github.com/hatchet-dev/hatchet/internal/testutils"
)

func TestLoadCLI(t *testing.T) {
testutils.Prepare(t)

type args struct {
duration time.Duration
eventsPerSecond int
delay time.Duration
wait time.Duration
concurrency int
}
tests := []struct {
name string
args args
wantErr bool
}{{
name: "test simple with unlimited concurrency",
args: args{
duration: 10 * time.Second,
eventsPerSecond: 10,
delay: 0 * time.Second,
wait: 20 * time.Second,
concurrency: 0,
},
}, {
name: "test with high step delay",
args: args{
duration: 10 * time.Second,
eventsPerSecond: 10,
delay: 10 * time.Second,
wait: 30 * time.Second,
concurrency: 0,
},
}}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
defer func() {
time.Sleep(1 * time.Second)

goleak.VerifyNone(
t,
goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"),
goleak.IgnoreTopFunction("google.golang.org/grpc/internal/grpcsync.(*CallbackSerializer).run"),
goleak.IgnoreTopFunction("internal/poll.runtime_pollWait"),
goleak.IgnoreTopFunction("google.golang.org/grpc/internal/transport.(*controlBuffer).get"),
)
}()

if err := do(tt.args.duration, tt.args.eventsPerSecond, tt.args.delay, tt.args.wait, tt.args.concurrency); (err != nil) != tt.wantErr {
t.Errorf("do() error = %v, wantErr %v", err, tt.wantErr)
}
})
}
}
69 changes: 69 additions & 0 deletions examples/loadtest/cli/do.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package main

import (
"context"
"fmt"
"log"
"time"
)

func do(duration time.Duration, eventsPerSecond int, delay time.Duration, wait time.Duration, concurrency int) error {
log.Printf("testing with duration=%s, eventsPerSecond=%d, delay=%s, wait=%s, concurrency=%d", duration, eventsPerSecond, delay, wait, concurrency)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

after := 10 * time.Second

go func() {
time.Sleep(duration + after + wait + 5*time.Second)
cancel()
}()

ch := make(chan int64, 2)
durations := make(chan time.Duration, eventsPerSecond*int(duration.Seconds())*3)
go func() {
count, uniques := run(ctx, delay, durations, concurrency)
ch <- count
ch <- uniques
}()

time.Sleep(after)

scheduled := make(chan time.Duration, eventsPerSecond*int(duration.Seconds())*2)
emitted := emit(ctx, eventsPerSecond, duration, scheduled)
executed := <-ch
uniques := <-ch

log.Printf("ℹ️ emitted %d, executed %d, uniques %d, using %d events/s", emitted, executed, uniques, eventsPerSecond)

if executed == 0 {
return fmt.Errorf("❌ no events executed")
}

var totalDurationExecuted time.Duration
for i := 0; i < int(executed); i++ {
totalDurationExecuted += <-durations
}
durationPerEventExecuted := totalDurationExecuted / time.Duration(executed)
log.Printf("ℹ️ average duration per executed event: %s", durationPerEventExecuted)

var totalDurationScheduled time.Duration
for i := 0; i < int(emitted); i++ {
totalDurationScheduled += <-scheduled
}
scheduleTimePerEvent := totalDurationScheduled / time.Duration(emitted)
log.Printf("ℹ️ average scheduling time per event: %s", scheduleTimePerEvent)

if emitted != executed {
log.Printf("⚠️ warning: emitted and executed counts do not match: %d != %d", emitted, executed)
}

if emitted != uniques {
return fmt.Errorf("❌ emitted and unique executed counts do not match: %d != %d", emitted, uniques)
}

log.Printf("✅ success")

return nil
}
71 changes: 71 additions & 0 deletions examples/loadtest/cli/emit.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package main

import (
"context"
"fmt"
"log"
"sync"
"time"

"github.com/hatchet-dev/hatchet/pkg/client"
)

type Event struct {
ID int64 `json:"id"`
CreatedAt time.Time `json:"created_at"`
}

func emit(ctx context.Context, amountPerSecond int, duration time.Duration, scheduled chan<- time.Duration) int64 {
c, err := client.New()

if err != nil {
panic(err)
}

var id int64
mx := sync.Mutex{}
go func() {
ticker := time.NewTicker(time.Second / time.Duration(amountPerSecond))
defer ticker.Stop()

timer := time.After(duration)

for {
select {
case <-ticker.C:
mx.Lock()
id += 1
mx.Unlock()

go func(id int64) {
ev := Event{CreatedAt: time.Now(), ID: id}
fmt.Println("pushed event", ev.ID)
err = c.Event().Push(context.Background(), "load-test:event", ev)
if err != nil {
panic(fmt.Errorf("error pushing event: %w", err))
}
took := time.Since(ev.CreatedAt)
fmt.Println("pushed event", ev.ID, "took", took)
scheduled <- took
}(id)
case <-timer:
log.Println("done emitting events due to timer at", id)
return
case <-ctx.Done():
log.Println("done emitting events due to interruption at", id)
return
}
}
}()

for {
select {
case <-ctx.Done():
mx.Lock()
defer mx.Unlock()
return id
default:
time.Sleep(time.Second)
}
}
}
44 changes: 44 additions & 0 deletions examples/loadtest/cli/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package main

import (
"log"
"time"

"github.com/joho/godotenv"
"github.com/spf13/cobra"
)

func main() {
var events int
var concurrency int
var duration time.Duration
var wait time.Duration
var delay time.Duration

var loadtest = &cobra.Command{
Use: "loadtest",
Run: func(cmd *cobra.Command, args []string) {
err := godotenv.Load()
if err != nil {
panic(err)
}

if err := do(duration, events, delay, wait, concurrency); err != nil {
log.Println(err)
panic("load test failed")
}
},
}

loadtest.Flags().IntVarP(&events, "events", "e", 10, "events per second")
loadtest.Flags().IntVarP(&concurrency, "concurrency", "c", 0, "concurrency specifies the maximum events to run at the same time")
loadtest.Flags().DurationVarP(&duration, "duration", "d", 10*time.Second, "duration specifies the total time to run the load test")
loadtest.Flags().DurationVarP(&delay, "delay", "D", 0, "delay specifies the time to wait in each event to simulate slow tasks")
loadtest.Flags().DurationVarP(&wait, "wait", "w", 10*time.Second, "wait specifies the total time to wait until events complete")

cmd := &cobra.Command{Use: "app"}
cmd.AddCommand(loadtest)
if err := cmd.Execute(); err != nil {
panic(err)
}
}
114 changes: 114 additions & 0 deletions examples/loadtest/cli/run.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package main

import (
"context"
"fmt"
"sync"
"time"

"github.com/hatchet-dev/hatchet/pkg/client"
"github.com/hatchet-dev/hatchet/pkg/worker"
)

type stepOneOutput struct {
Message string `json:"message"`
}

func getConcurrencyKey(ctx worker.HatchetContext) (string, error) {
return "my-key", nil
}

func run(ctx context.Context, delay time.Duration, executions chan<- time.Duration, concurrency int) (int64, int64) {
c, err := client.New()

if err != nil {
panic(err)
}

w, err := worker.NewWorker(
worker.WithClient(
c,
),
)

if err != nil {
panic(err)
}

mx := sync.Mutex{}
var count int64
var uniques int64
var executed []int64

var concurrencyOpts *worker.WorkflowConcurrency
if concurrency > 0 {
concurrencyOpts = worker.Concurrency(getConcurrencyKey).MaxRuns(int32(concurrency))
}

err = w.On(
worker.Event("load-test:event"),
&worker.WorkflowJob{
Name: "load-test",
Description: "Load testing",
Concurrency: concurrencyOpts,
Steps: []*worker.WorkflowStep{
worker.Fn(func(ctx worker.HatchetContext) (result *stepOneOutput, err error) {
var input Event
err = ctx.WorkflowInput(&input)
if err != nil {
return nil, err
}

took := time.Since(input.CreatedAt)
fmt.Println("executing", input.ID, "took", took)

mx.Lock()
executions <- took
// detect duplicate in executed slice
var duplicate bool
for i := 0; i < len(executed)-1; i++ {
if executed[i] == input.ID {
duplicate = true
fmt.Println("DUPLICATE:", input.ID)
}
}
if !duplicate {
uniques += 1
}
count += 1
executed = append(executed, input.ID)
mx.Unlock()

time.Sleep(delay)

return &stepOneOutput{
Message: "This ran at: " + time.Now().Format(time.RFC3339Nano),
}, nil
}).SetName("step-one"),
},
},
)

if err != nil {
panic(err)
}

go func() {
err = w.Start(ctx)

if err != nil {
panic(err)
}
}()

for {
select {
case <-ctx.Done():
mx.Lock()
defer mx.Unlock()
return count, uniques
default:
time.Sleep(time.Second)
}
}
}
Loading

0 comments on commit 00111d8

Please sign in to comment.