Skip to content

Commit

Permalink
add leaky bucket for crdb health checking, rate limiter pgx connections
Browse files Browse the repository at this point in the history
this ensures that small network blips don't inadvertently mark a node
as unhealthy, and that new nodes coming online don't get flooded with
new connections
  • Loading branch information
ecordell committed May 19, 2023
1 parent e72689b commit 98b5d4a
Show file tree
Hide file tree
Showing 11 changed files with 119 additions and 27 deletions.
2 changes: 1 addition & 1 deletion e2e/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ go 1.19
require (
github.com/authzed/authzed-go v0.8.1-0.20230511215435-5ca552307f34
github.com/authzed/grpcutil v0.0.0-20220104222419-f813f77722e5
github.com/authzed/spicedb v1.5.0
github.com/authzed/spicedb v1.21.0
github.com/brianvoe/gofakeit/v6 v6.15.0
github.com/ecordell/optgen v0.0.9
github.com/jackc/pgx/v5 v5.3.2-0.20230516120255-eab316e200b1
Expand Down
50 changes: 37 additions & 13 deletions internal/datastore/crdb/crdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,20 +90,20 @@ func newCRDBDatastore(url string, options ...Option) (datastore.Datastore, error
}
config.writePoolOpts.ConfigurePgx(writePoolConfig)

initCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
initCtx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()

healthChecker, err := pool.NewNodeHealthChecker(url)
if err != nil {
return nil, fmt.Errorf(errUnableToInstantiate, err)
}

writePool, err := pool.NewRetryPool(initCtx, "write", writePoolConfig.Copy(), healthChecker, config.maxRetries)
writePool, err := pool.NewRetryPool(initCtx, "write", writePoolConfig.Copy(), healthChecker, config.maxRetries, config.connectRate)
if err != nil {
return nil, fmt.Errorf(errUnableToInstantiate, err)
}

readPool, err := pool.NewRetryPool(initCtx, "read", readPoolConfig.Copy(), healthChecker, config.maxRetries)
readPool, err := pool.NewRetryPool(initCtx, "read", readPoolConfig.Copy(), healthChecker, config.maxRetries, config.connectRate)
if err != nil {
return nil, fmt.Errorf(errUnableToInstantiate, err)
}
Expand Down Expand Up @@ -344,18 +344,42 @@ func (cds *crdbDatastore) ReadyState(ctx context.Context) (datastore.ReadyState,
return datastore.ReadyState{}, err
}

if version == headMigration {
return datastore.ReadyState{IsReady: true}, nil
if version != headMigration {
return datastore.ReadyState{
Message: fmt.Sprintf(
"datastore is not migrated: currently at revision `%s`, but requires `%s`. Please run `spicedb migrate`.",
version,
headMigration,
),
IsReady: false,
}, nil
}

return datastore.ReadyState{
Message: fmt.Sprintf(
"datastore is not migrated: currently at revision `%s`, but requires `%s`. Please run `spicedb migrate`.",
version,
headMigration,
),
IsReady: false,
}, nil
// Wait for either minconns or maxconns/2 connections to be available,
// whichever is smaller.
writeMin := cds.writePool.MinConns()
if halfMax := cds.writePool.MaxConns() / 2; halfMax < writeMin {
writeMin = halfMax
}
readMin := cds.readPool.MinConns()
if halfMax := cds.readPool.MaxConns() / 2; halfMax < readMin {
readMin = halfMax
}
writeTotal := uint32(cds.writePool.Stat().TotalConns())
readTotal := uint32(cds.readPool.Stat().TotalConns())
if writeTotal < writeMin || readTotal < readMin {
return datastore.ReadyState{
Message: fmt.Sprintf(
"spicedb does have the required minimum connection count to the datastore. Read: %d/%d, Write: %d/%d",
writeTotal,
readMin,
readTotal,
writeMin,
),
IsReady: false,
}, nil
}
return datastore.ReadyState{IsReady: true}, nil
}

func (cds *crdbDatastore) Close() error {
Expand Down
10 changes: 10 additions & 0 deletions internal/datastore/crdb/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

type crdbOptions struct {
readPoolOpts, writePoolOpts pgxcommon.PoolOptions
connectRate time.Duration

watchBufferLength uint16
revisionQuantization time.Duration
Expand Down Expand Up @@ -44,6 +45,7 @@ const (

defaultEnablePrometheusStats = false
defaultEnableConnectionBalancing = true
defaultConnectRate = 1 * time.Second
)

// Option provides the facility to configure how clients within the CRDB
Expand All @@ -64,6 +66,7 @@ func generateConfig(options []Option) (crdbOptions, error) {
disableStats: false,
enablePrometheusStats: defaultEnablePrometheusStats,
enableConnectionBalancing: defaultEnableConnectionBalancing,
connectRate: defaultConnectRate,
}

for _, option := range options {
Expand Down Expand Up @@ -256,6 +259,13 @@ func GCWindow(window time.Duration) Option {
return func(po *crdbOptions) { po.gcWindow = window }
}

// ConnectRate is the rate at which new datastore connections can be made.
//
// This is a duration, the rate is 1/period.
func ConnectRate(rate time.Duration) Option {
return func(po *crdbOptions) { po.connectRate = rate }
}

// MaxRetries is the maximum number of times a retriable transaction will be
// client-side retried.
// Default: 5
Expand Down
1 change: 0 additions & 1 deletion internal/datastore/crdb/pool/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,6 @@ func (p *nodeConnectionBalancer[P, C]) pruneConnections(ctx context.Context) {
// Delete metrics for nodes we no longer have connections for
p.healthTracker.RLock()
for node := range p.healthTracker.nodesEverSeen {
// TODO: does this handle network interruptions correctly?
if _, ok := connectionCounts[node]; !ok {
connectionsPerCRDBNodeCountGauge.DeletePartialMatch(map[string]string{
"pool": p.pool.ID(),
Expand Down
28 changes: 24 additions & 4 deletions internal/datastore/crdb/pool/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,13 @@ import (
"github.com/jackc/pgx/v5"
"github.com/lthibault/jitterbug"
"github.com/prometheus/client_golang/prometheus"
"golang.org/x/time/rate"

log "github.com/authzed/spicedb/internal/logging"
)

const errorBurst = 2

var healthyCRDBNodeCountGauge = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "crdb_healthy_nodes",
Help: "the number of healthy crdb nodes detected by spicedb",
Expand All @@ -31,7 +34,8 @@ type NodeHealthTracker struct {
sync.RWMutex
connConfig *pgx.ConnConfig
healthyNodes map[uint32]struct{}
nodesEverSeen map[uint32]struct{}
nodesEverSeen map[uint32]*rate.Limiter
newLimiter func() *rate.Limiter
}

// NewNodeHealthChecker builds a health checker that polls the cluster at the given url.
Expand All @@ -44,7 +48,10 @@ func NewNodeHealthChecker(url string) (*NodeHealthTracker, error) {
return &NodeHealthTracker{
connConfig: connConfig,
healthyNodes: make(map[uint32]struct{}, 0),
nodesEverSeen: make(map[uint32]struct{}, 0),
nodesEverSeen: make(map[uint32]*rate.Limiter, 0),
newLimiter: func() *rate.Limiter {
return rate.NewLimiter(rate.Every(1*time.Minute), errorBurst)
},
}, nil
}

Expand Down Expand Up @@ -85,7 +92,7 @@ func (t *NodeHealthTracker) tryConnect(interval time.Duration) {
t.SetNodeHealth(nodeID(conn), true)
t.Lock()
defer t.Unlock()
t.nodesEverSeen[nodeID(conn)] = struct{}{}
t.nodesEverSeen[nodeID(conn)] = t.newLimiter()
}

// SetNodeHealth marks a node as either healthy or unhealthy.
Expand All @@ -95,11 +102,24 @@ func (t *NodeHealthTracker) SetNodeHealth(nodeID uint32, healthy bool) {
defer func() {
healthyCRDBNodeCountGauge.Set(float64(len(t.healthyNodes)))
}()

if _, ok := t.nodesEverSeen[nodeID]; !ok {
t.nodesEverSeen[nodeID] = t.newLimiter()
}

if healthy {
t.healthyNodes[nodeID] = struct{}{}
t.nodesEverSeen[nodeID] = t.newLimiter()
return
}
delete(t.healthyNodes, nodeID)

// If the limiter allows the request, it means we haven't seen more than
// 2 failures in the past 1m, so the node shouldn't be marked unhealthy yet.
// If the limiter denies the request, we've hit too many errors and the node
// is marked unhealthy.
if !t.nodesEverSeen[nodeID].Allow() {
delete(t.healthyNodes, nodeID)
}
}

// IsHealthy returns true if the given nodeID has been marked healthy.
Expand Down
15 changes: 14 additions & 1 deletion internal/datastore/crdb/pool/health_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,19 @@ package pool

import (
"testing"
"time"

"github.com/stretchr/testify/require"
"golang.org/x/time/rate"
)

func TestNodeHealthTracker(t *testing.T) {
tracker := &NodeHealthTracker{
healthyNodes: make(map[uint32]struct{}),
healthyNodes: make(map[uint32]struct{}),
nodesEverSeen: make(map[uint32]*rate.Limiter),
newLimiter: func() *rate.Limiter {
return rate.NewLimiter(rate.Every(1*time.Minute), 2)
},
}

tracker.SetNodeHealth(1, true)
Expand All @@ -21,6 +27,13 @@ func TestNodeHealthTracker(t *testing.T) {
require.True(t, tracker.IsHealthy(2))
require.Equal(t, tracker.HealthyNodeCount(), 2)

// just 1 mark isn't enough to trigger false
tracker.SetNodeHealth(1, false)
require.True(t, tracker.IsHealthy(1))
require.True(t, tracker.IsHealthy(2))
require.Equal(t, tracker.HealthyNodeCount(), 2)

tracker.SetNodeHealth(1, false)
tracker.SetNodeHealth(1, false)
require.False(t, tracker.IsHealthy(1))
require.True(t, tracker.IsHealthy(2))
Expand Down
19 changes: 16 additions & 3 deletions internal/datastore/crdb/pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/jackc/pgx/v5/pgconn"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/prometheus/client_golang/prometheus"
"golang.org/x/time/rate"

log "github.com/authzed/spicedb/internal/logging"
)
Expand Down Expand Up @@ -42,7 +43,7 @@ type RetryPool struct {
gc map[*pgx.Conn]struct{}
}

func NewRetryPool(ctx context.Context, name string, config *pgxpool.Config, healthTracker *NodeHealthTracker, maxRetries uint8) (*RetryPool, error) {
func NewRetryPool(ctx context.Context, name string, config *pgxpool.Config, healthTracker *NodeHealthTracker, maxRetries uint8, connectRate time.Duration) (*RetryPool, error) {
config = config.Copy()
p := &RetryPool{
id: name,
Expand All @@ -52,6 +53,7 @@ func NewRetryPool(ctx context.Context, name string, config *pgxpool.Config, heal
gc: make(map[*pgx.Conn]struct{}, 0),
}

limiter := rate.NewLimiter(rate.Every(connectRate), 1)
afterConnect := config.AfterConnect
config.AfterConnect = func(ctx context.Context, conn *pgx.Conn) error {
if afterConnect != nil {
Expand All @@ -67,6 +69,11 @@ func NewRetryPool(ctx context.Context, name string, config *pgxpool.Config, heal
delete(p.gc, conn)

healthTracker.SetNodeHealth(nodeID(conn), true)

if err := limiter.Wait(ctx); err != nil {
return err
}

p.nodeForConn[conn] = nodeID(conn)

return nil
Expand Down Expand Up @@ -137,6 +144,11 @@ func (p *RetryPool) MaxConns() uint32 {
return uint32(p.pool.Config().MaxConns)
}

// MinConns returns the MinConns configured on the underlying pool
func (p *RetryPool) MinConns() uint32 {
return uint32(p.pool.Config().MinConns)
}

// ExecFunc is a replacement for pgxpool.Pool.Exec that allows resetting the
// connection on error, or retrying on a retryable error.
func (p *RetryPool) ExecFunc(ctx context.Context, tagFunc func(ctx context.Context, tag pgconn.CommandTag, err error) error, sql string, arguments ...any) error {
Expand Down Expand Up @@ -268,12 +280,13 @@ func (p *RetryPool) withRetries(ctx context.Context, fn func(conn *pgxpool.Conn)
if errors.As(err, &resettable) || conn.Conn().IsClosed() {
log.Ctx(ctx).Info().Err(err).Uint8("retries", retries).Msg("resettable error")

nodeID := p.nodeForConn[conn.Conn()]
nodeID := p.Node(conn.Conn())
p.GC(conn.Conn())
conn.Release()

// After a resettable error, mark the node as unhealthy
// TODO: configurable error count / circuit-breaker
// The health tracker enforces an error rate, so a single request
// failing will not mark the node as globally unhealthy.
if nodeID > 0 {
p.healthTracker.SetNodeHealth(nodeID, false)
}
Expand Down
2 changes: 1 addition & 1 deletion internal/services/integrationtesting/healthcheck_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,5 +57,5 @@ func runHealthChecks(require *require.Assertions, conn *grpc.ClientConn) {
resp, err := hclient.Check(context.Background(), &healthpb.HealthCheckRequest{Service: v1.PermissionsService_ServiceDesc.ServiceName})
require.NoError(err)
return healthpb.HealthCheckResponse_SERVING == resp.GetStatus()
}, 5*time.Second, 100*time.Millisecond)
}, 30*time.Second, 100*time.Millisecond)
}
6 changes: 3 additions & 3 deletions magefiles/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ type Test mg.Namespace
func (t Test) All() error {
ds := Testds{}
c := Testcons{}
mg.Deps(t.Unit(), t.Integration(), t.Image(), t.Analyzers(),
ds.Crdb(), ds.Postgres(), ds.Spanner(), ds.Mysql(),
c.Crdb(), c.Spanner(), c.Postgres(), c.Mysql())
mg.Deps(t.Unit, t.Integration, t.Image, t.Analyzers,
ds.Crdb, ds.Postgres, ds.Spanner, ds.Mysql,
c.Crdb, c.Spanner, c.Postgres, c.Mysql)
return nil
}

Expand Down
4 changes: 4 additions & 0 deletions pkg/cmd/datastore/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ type Config struct {
OverlapKey string `debugmap:"visible"`
OverlapStrategy string `debugmap:"visible"`
EnableConnectionBalancing bool `debugmap:"visible"`
ConnectRate time.Duration `debugmap:"visible"`

// Postgres
GCInterval time.Duration `debugmap:"visible"`
Expand Down Expand Up @@ -190,6 +191,7 @@ func RegisterDatastoreFlagsWithPrefix(flagSet *pflag.FlagSet, prefix string, opt
flagSet.StringVar(&opts.OverlapStrategy, flagName("datastore-tx-overlap-strategy"), "static", `strategy to generate transaction overlap keys ("prefix", "static", "insecure") (cockroach driver only)`)
flagSet.StringVar(&opts.OverlapKey, flagName("datastore-tx-overlap-key"), "key", "static key to touch when writing to ensure transactions overlap (only used if --datastore-tx-overlap-strategy=static is set; cockroach driver only)")
flagSet.BoolVar(&opts.EnableConnectionBalancing, flagName("datastore-connection-balancing"), defaults.EnableConnectionBalancing, "enable connection balancing between database nodes (cockroach driver only)")
flagSet.DurationVar(&opts.ConnectRate, flagName("datastore-connect-rate"), 1*time.Second, "rate at which new connections are allowed to the datastore (at a rate of 1/duration) (cockroach driver only)")
flagSet.StringVar(&opts.SpannerCredentialsFile, flagName("datastore-spanner-credentials"), "", "path to service account key credentials file with access to the cloud spanner instance (omit to use application default credentials)")
flagSet.StringVar(&opts.SpannerEmulatorHost, flagName("datastore-spanner-emulator-host"), "", "URI of spanner emulator instance used for development and testing (e.g. localhost:9010)")
flagSet.StringVar(&opts.TablePrefix, flagName("datastore-mysql-table-prefix"), "", "prefix to add to the name of all SpiceDB database tables")
Expand Down Expand Up @@ -224,6 +226,7 @@ func DefaultDatastoreConfig() *Config {
MaxRetries: 10,
OverlapKey: "key",
OverlapStrategy: "static",
ConnectRate: 1 * time.Second,
EnableConnectionBalancing: true,
GCInterval: 3 * time.Minute,
GCMaxOperationTime: 1 * time.Minute,
Expand Down Expand Up @@ -355,6 +358,7 @@ func newCRDBDatastore(opts Config) (datastore.Datastore, error) {
crdb.DisableStats(opts.DisableStats),
crdb.WithEnablePrometheusStats(opts.EnableDatastoreMetrics),
crdb.WithEnableConnectionBalancing(opts.EnableConnectionBalancing),
crdb.ConnectRate(opts.ConnectRate),
)
}

Expand Down
9 changes: 9 additions & 0 deletions pkg/cmd/datastore/zz_generated.options.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 98b5d4a

Please sign in to comment.