Skip to content

Commit

Permalink
fix: Debounce AcquireJob when no jobs are available (#5017)
Browse files Browse the repository at this point in the history
This prevents constant database spam at scale to a maximum
of 60 queries/s per coderd instance.
  • Loading branch information
kylecarbs committed Nov 10, 2022
1 parent f32748c commit 927c241
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 15 deletions.
13 changes: 9 additions & 4 deletions cli/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,9 @@ import (
"github.com/coder/coder/provisioner/echo"
"github.com/coder/coder/provisioner/terraform"
"github.com/coder/coder/provisionerd"
"github.com/coder/coder/provisionerd/proto"
"github.com/coder/coder/provisionersdk"
"github.com/coder/coder/provisionersdk/proto"
sdkproto "github.com/coder/coder/provisionersdk/proto"
"github.com/coder/coder/tailnet"
)

Expand Down Expand Up @@ -900,7 +901,7 @@ func newProvisionerDaemon(
}

provisioners := provisionerd.Provisioners{
string(database.ProvisionerTypeTerraform): proto.NewDRPCProvisionerClient(provisionersdk.Conn(terraformClient)),
string(database.ProvisionerTypeTerraform): sdkproto.NewDRPCProvisionerClient(provisionersdk.Conn(terraformClient)),
}
// include echo provisioner when in dev mode
if dev {
Expand All @@ -921,9 +922,13 @@ func newProvisionerDaemon(
}
}
}()
provisioners[string(database.ProvisionerTypeEcho)] = proto.NewDRPCProvisionerClient(provisionersdk.Conn(echoClient))
provisioners[string(database.ProvisionerTypeEcho)] = sdkproto.NewDRPCProvisionerClient(provisionersdk.Conn(echoClient))
}
return provisionerd.New(coderAPI.ListenProvisionerDaemon, &provisionerd.Options{
return provisionerd.New(func(ctx context.Context) (proto.DRPCProvisionerDaemonClient, error) {
// This debounces calls to listen every second. Read the comment
// in provisionerdserver.go to learn more!
return coderAPI.ListenProvisionerDaemon(ctx, time.Second)
}, &provisionerd.Options{
Logger: logger,
PollInterval: 500 * time.Millisecond,
UpdateInterval: 500 * time.Millisecond,
Expand Down
9 changes: 6 additions & 3 deletions coderd/coderdtest/coderdtest.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,9 @@ import (
"github.com/coder/coder/cryptorand"
"github.com/coder/coder/provisioner/echo"
"github.com/coder/coder/provisionerd"
"github.com/coder/coder/provisionerd/proto"
"github.com/coder/coder/provisionersdk"
"github.com/coder/coder/provisionersdk/proto"
sdkproto "github.com/coder/coder/provisionersdk/proto"
"github.com/coder/coder/tailnet"
"github.com/coder/coder/testutil"
)
Expand Down Expand Up @@ -325,14 +326,16 @@ func NewProvisionerDaemon(t *testing.T, coderAPI *coderd.API) io.Closer {
assert.NoError(t, err)
}()

closer := provisionerd.New(coderAPI.ListenProvisionerDaemon, &provisionerd.Options{
closer := provisionerd.New(func(ctx context.Context) (proto.DRPCProvisionerDaemonClient, error) {
return coderAPI.ListenProvisionerDaemon(ctx, 0)
}, &provisionerd.Options{
Filesystem: fs,
Logger: slogtest.Make(t, nil).Named("provisionerd").Leveled(slog.LevelDebug),
PollInterval: 50 * time.Millisecond,
UpdateInterval: 250 * time.Millisecond,
ForceCancelInterval: time.Second,
Provisioners: provisionerd.Provisioners{
string(database.ProvisionerTypeEcho): proto.NewDRPCProvisionerClient(provisionersdk.Conn(echoClient)),
string(database.ProvisionerTypeEcho): sdkproto.NewDRPCProvisionerClient(provisionersdk.Conn(echoClient)),
},
WorkDirectory: t.TempDir(),
})
Expand Down
18 changes: 10 additions & 8 deletions coderd/provisionerdaemons.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"io"
"net/http"
"time"

"github.com/google/uuid"
"github.com/moby/moby/pkg/namesgenerator"
Expand Down Expand Up @@ -55,7 +56,7 @@ func (api *API) provisionerDaemons(rw http.ResponseWriter, r *http.Request) {

// ListenProvisionerDaemon is an in-memory connection to a provisionerd. Useful when starting coderd and provisionerd
// in the same process.
func (api *API) ListenProvisionerDaemon(ctx context.Context) (client proto.DRPCProvisionerDaemonClient, err error) {
func (api *API) ListenProvisionerDaemon(ctx context.Context, acquireJobDebounce time.Duration) (client proto.DRPCProvisionerDaemonClient, err error) {
clientSession, serverSession := provisionersdk.TransportPipe()
defer func() {
if err != nil {
Expand All @@ -77,13 +78,14 @@ func (api *API) ListenProvisionerDaemon(ctx context.Context) (client proto.DRPCP

mux := drpcmux.New()
err = proto.DRPCRegisterProvisionerDaemon(mux, &provisionerdserver.Server{
AccessURL: api.AccessURL,
ID: daemon.ID,
Database: api.Database,
Pubsub: api.Pubsub,
Provisioners: daemon.Provisioners,
Telemetry: api.Telemetry,
Logger: api.Logger.Named(fmt.Sprintf("provisionerd-%s", daemon.Name)),
AccessURL: api.AccessURL,
ID: daemon.ID,
Database: api.Database,
Pubsub: api.Pubsub,
Provisioners: daemon.Provisioners,
Telemetry: api.Telemetry,
Logger: api.Logger.Named(fmt.Sprintf("provisionerd-%s", daemon.Name)),
AcquireJobDebounce: acquireJobDebounce,
})
if err != nil {
return nil, err
Expand Down
22 changes: 22 additions & 0 deletions coderd/provisionerdserver/provisionerdserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"
"net/url"
"reflect"
"sync"
"time"

"github.com/google/uuid"
Expand All @@ -27,6 +28,11 @@ import (
sdkproto "github.com/coder/coder/provisionersdk/proto"
)

var (
lastAcquire time.Time
lastAcquireMutex sync.RWMutex
)

type Server struct {
AccessURL *url.URL
ID uuid.UUID
Expand All @@ -35,10 +41,23 @@ type Server struct {
Database database.Store
Pubsub database.Pubsub
Telemetry telemetry.Reporter

AcquireJobDebounce time.Duration
}

// AcquireJob queries the database to lock a job.
func (server *Server) AcquireJob(ctx context.Context, _ *proto.Empty) (*proto.AcquiredJob, error) {
// This prevents loads of provisioner daemons from consistently
// querying the database when no jobs are available.
//
// The debounce only occurs when no job is returned, so if loads of
// jobs are added at once, they will start after at most this duration.
lastAcquireMutex.RLock()
if !lastAcquire.IsZero() && time.Since(lastAcquire) < server.AcquireJobDebounce {
lastAcquireMutex.RUnlock()
return &proto.AcquiredJob{}, nil
}
lastAcquireMutex.RUnlock()
// This marks the job as locked in the database.
job, err := server.Database.AcquireProvisionerJob(ctx, database.AcquireProvisionerJobParams{
StartedAt: sql.NullTime{
Expand All @@ -54,6 +73,9 @@ func (server *Server) AcquireJob(ctx context.Context, _ *proto.Empty) (*proto.Ac
if errors.Is(err, sql.ErrNoRows) {
// The provisioner daemon assumes no jobs are available if
// an empty struct is returned.
lastAcquireMutex.Lock()
lastAcquire = time.Now()
lastAcquireMutex.Unlock()
return &proto.AcquiredJob{}, nil
}
if err != nil {
Expand Down
28 changes: 28 additions & 0 deletions coderd/provisionerdserver/provisionerdserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"encoding/json"
"net/url"
"testing"
"time"

"github.com/google/uuid"
"github.com/stretchr/testify/require"
Expand All @@ -22,6 +23,33 @@ import (

func TestAcquireJob(t *testing.T) {
t.Parallel()
t.Run("Debounce", func(t *testing.T) {
t.Parallel()
db := databasefake.New()
pubsub := database.NewPubsubInMemory()
srv := &provisionerdserver.Server{
ID: uuid.New(),
Logger: slogtest.Make(t, nil),
AccessURL: &url.URL{},
Provisioners: []database.ProvisionerType{database.ProvisionerTypeEcho},
Database: db,
Pubsub: pubsub,
Telemetry: telemetry.NewNoop(),
AcquireJobDebounce: time.Hour,
}
job, err := srv.AcquireJob(context.Background(), nil)
require.NoError(t, err)
require.Equal(t, &proto.AcquiredJob{}, job)
_, err = srv.Database.InsertProvisionerJob(context.Background(), database.InsertProvisionerJobParams{
ID: uuid.New(),
InitiatorID: uuid.New(),
Provisioner: database.ProvisionerTypeEcho,
})
require.NoError(t, err)
job, err = srv.AcquireJob(context.Background(), nil)
require.NoError(t, err)
require.Equal(t, &proto.AcquiredJob{}, job)
})
t.Run("NoJobs", func(t *testing.T) {
t.Parallel()
srv := setup(t)
Expand Down

0 comments on commit 927c241

Please sign in to comment.