Skip to content

Commit

Permalink
test(load): standalone load tests
Browse files Browse the repository at this point in the history
- run load tests separately
- run the engine from inside the test
  • Loading branch information
steebchen committed Feb 18, 2024
1 parent 00111d8 commit 0cee77f
Show file tree
Hide file tree
Showing 7 changed files with 412 additions and 260 deletions.
67 changes: 67 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -168,3 +168,70 @@ jobs:
- name: Teardown
run: docker compose down

load:
runs-on: ubuntu-latest
timeout-minutes: 30
env:
DATABASE_URL: postgresql://hatchet:hatchet@127.0.0.1:5431/hatchet

steps:
- uses: actions/checkout@v4

- name: Install Task
uses: arduino/setup-task@v1

- name: Install Protoc
uses: arduino/setup-protoc@v2
with:
version: "25.1"

- name: Setup Go
uses: actions/setup-go@v5
with:
go-version: "1.21"

- name: Compose
run: docker compose up -d

- name: Go deps
run: go mod download

- name: Generate
run: |
go run github.com/steebchen/prisma-client-go db push
task generate-certs
task generate-local-encryption-keys
- name: Prepare
run: |
cat > .env <<EOF
DATABASE_URL='postgresql://hatchet:hatchet@127.0.0.1:5431/hatchet'
SERVER_TLS_CERT_FILE=./hack/dev/certs/cluster.pem
SERVER_TLS_KEY_FILE=./hack/dev/certs/cluster.key
SERVER_TLS_ROOT_CA_FILE=./hack/dev/certs/ca.cert
SERVER_PORT=8080
SERVER_URL=https://app.dev.hatchet-tools.com
SERVER_AUTH_COOKIE_SECRETS="something something"
SERVER_AUTH_COOKIE_DOMAIN=app.dev.hatchet-tools.com
SERVER_AUTH_COOKIE_INSECURE=false
SERVER_AUTH_SET_EMAIL_VERIFIED=true
EOF
- name: Setup
run: |
set -a
. .env
set +a
go run ./cmd/hatchet-admin quickstart --generated-config-dir ./generated/
- name: Test
run: |
export HATCHET_CLIENT_TOKEN="$(go run ./cmd/hatchet-admin token create --config ./generated/ --tenant-id 707d0855-80ab-4e1f-a156-f1c4546cbf52)"
go test -tags load ./... -p 1 -v -failfast
// TODO enable -race
- name: Teardown
run: docker compose down
257 changes: 257 additions & 0 deletions cmd/hatchet-engine/engine/run.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,257 @@
package engine

import (
"fmt"
"os"
"sync"

"github.com/hatchet-dev/hatchet/internal/config/loader"
"github.com/hatchet-dev/hatchet/internal/services/admin"
"github.com/hatchet-dev/hatchet/internal/services/controllers/events"
"github.com/hatchet-dev/hatchet/internal/services/controllers/jobs"
"github.com/hatchet-dev/hatchet/internal/services/controllers/workflows"
"github.com/hatchet-dev/hatchet/internal/services/dispatcher"
"github.com/hatchet-dev/hatchet/internal/services/grpc"
"github.com/hatchet-dev/hatchet/internal/services/heartbeat"
"github.com/hatchet-dev/hatchet/internal/services/ingestor"
"github.com/hatchet-dev/hatchet/internal/services/ticker"
"github.com/hatchet-dev/hatchet/internal/telemetry"
"github.com/hatchet-dev/hatchet/pkg/cmdutils"
)

func StartEngineOrDie(cf *loader.ConfigLoader, interruptCh <-chan interface{}) {
sc, err := cf.LoadServerConfig()

if err != nil {
panic(err)
}

errCh := make(chan error)
ctx, cancel := cmdutils.InterruptContextFromChan(interruptCh)
wg := sync.WaitGroup{}

shutdown, err := telemetry.InitTracer(&telemetry.TracerOpts{
ServiceName: sc.OpenTelemetry.ServiceName,
CollectorURL: sc.OpenTelemetry.CollectorURL,
})

if err != nil {
panic(fmt.Sprintf("could not initialize tracer: %s", err))
}

defer shutdown(ctx) // nolint: errcheck

if sc.HasService("grpc") {
wg.Add(1)

// create the dispatcher
d, err := dispatcher.New(
dispatcher.WithTaskQueue(sc.TaskQueue),
dispatcher.WithRepository(sc.Repository),
dispatcher.WithLogger(sc.Logger),
)

if err != nil {
errCh <- err
return
}

go func() {
defer wg.Done()
err := d.Start(ctx)

if err != nil {
panic(err)
}
}()

// create the event ingestor
ei, err := ingestor.NewIngestor(
ingestor.WithEventRepository(
sc.Repository.Event(),
),
ingestor.WithTaskQueue(sc.TaskQueue),
)

if err != nil {
errCh <- err
return
}

adminSvc, err := admin.NewAdminService(
admin.WithRepository(sc.Repository),
admin.WithTaskQueue(sc.TaskQueue),
)

if err != nil {
errCh <- err
return
}

grpcOpts := []grpc.ServerOpt{
grpc.WithConfig(sc),
grpc.WithIngestor(ei),
grpc.WithDispatcher(d),
grpc.WithAdmin(adminSvc),
grpc.WithLogger(sc.Logger),
grpc.WithTLSConfig(sc.TLSConfig),
grpc.WithPort(sc.Runtime.GRPCPort),
grpc.WithBindAddress(sc.Runtime.GRPCBindAddress),
}

if sc.Runtime.GRPCInsecure {
grpcOpts = append(grpcOpts, grpc.WithInsecure())
}

// create the grpc server
s, err := grpc.NewServer(
grpcOpts...,
)

if err != nil {
errCh <- err
return
}

go func() {
err = s.Start(ctx)

if err != nil {
errCh <- err
return
}
}()
}

if sc.HasService("eventscontroller") {
// create separate events controller process
go func() {
ec, err := events.New(
events.WithTaskQueue(sc.TaskQueue),
events.WithRepository(sc.Repository),
events.WithLogger(sc.Logger),
)

if err != nil {
errCh <- err
return
}

err = ec.Start(ctx)

if err != nil {
errCh <- err
}
}()
}

if sc.HasService("jobscontroller") {
// create separate jobs controller process
go func() {
jc, err := jobs.New(
jobs.WithTaskQueue(sc.TaskQueue),
jobs.WithRepository(sc.Repository),
jobs.WithLogger(sc.Logger),
)

if err != nil {
errCh <- err
return
}

err = jc.Start(ctx)

if err != nil {
errCh <- err
}
}()
}

if sc.HasService("workflowscontroller") {
// create separate jobs controller process
go func() {
jc, err := workflows.New(
workflows.WithTaskQueue(sc.TaskQueue),
workflows.WithRepository(sc.Repository),
workflows.WithLogger(sc.Logger),
)

if err != nil {
errCh <- err
return
}

err = jc.Start(ctx)

if err != nil {
errCh <- err
}
}()
}

if sc.HasService("ticker") {
// create a ticker
go func() {
t, err := ticker.New(
ticker.WithTaskQueue(sc.TaskQueue),
ticker.WithRepository(sc.Repository),
ticker.WithLogger(sc.Logger),
)

if err != nil {
errCh <- err
return
}

err = t.Start(ctx)

if err != nil {
errCh <- err
}
}()
}

if sc.HasService("heartbeater") {
go func() {
h, err := heartbeat.New(
heartbeat.WithTaskQueue(sc.TaskQueue),
heartbeat.WithRepository(sc.Repository),
heartbeat.WithLogger(sc.Logger),
)

if err != nil {
errCh <- err
return
}

err = h.Start(ctx)

if err != nil {
errCh <- err
}
}()
}

Loop:
for {
select {
case err := <-errCh:
fmt.Fprintf(os.Stderr, "%s", err)

// exit with non-zero exit code
os.Exit(1) //nolint:gocritic
case <-interruptCh:
break Loop
}
}

cancel()

wg.Wait()

err = sc.Disconnect()

if err != nil {
panic(err)
}
}
Loading

0 comments on commit 0cee77f

Please sign in to comment.