Skip to content

Commit

Permalink
Use transaction with optimistic lock to eliminate race (#365)
Browse files Browse the repository at this point in the history
* getPayload optimistic locking

* PR review feedback
  • Loading branch information
metachris committed Apr 25, 2023
1 parent 7c4fa2b commit 1960654
Show file tree
Hide file tree
Showing 3 changed files with 185 additions and 50 deletions.
39 changes: 35 additions & 4 deletions datastore/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@ var (
activeValidatorsHours = cli.GetEnvInt("ACTIVE_VALIDATOR_HOURS", 3)
expiryActiveValidators = time.Duration(activeValidatorsHours) * time.Hour // careful with this setting - for each hour a hash set is created with each active proposer as field. for a lot of hours this can take a lot of space in redis.

RedisConfigFieldPubkey = "pubkey"
RedisStatsFieldLatestSlot = "latest-slot"
RedisStatsFieldValidatorsTotal = "validators-total"
RedisStatsFieldSlotLastPayloadDelivered = "slot-last-payload-delivered"
RedisConfigFieldPubkey = "pubkey"
RedisStatsFieldLatestSlot = "latest-slot"
RedisStatsFieldValidatorsTotal = "validators-total"

ErrFailedUpdatingTopBidNoBids = errors.New("failed to update top bid because no bids were found")
ErrSlotAlreadyDelivered = errors.New("payload for slot was already delivered")
)

type BlockBuilderStatus string
Expand Down Expand Up @@ -83,6 +83,7 @@ type RedisCache struct {
keyStats string
keyProposerDuties string
keyBlockBuilderStatus string
keyLastSlotDelivered string
}

func NewRedisCache(prefix, redisURI, readonlyURI string) (*RedisCache, error) {
Expand Down Expand Up @@ -122,6 +123,7 @@ func NewRedisCache(prefix, redisURI, readonlyURI string) (*RedisCache, error) {
keyStats: fmt.Sprintf("%s/%s:stats", redisPrefix, prefix),
keyProposerDuties: fmt.Sprintf("%s/%s:proposer-duties", redisPrefix, prefix),
keyBlockBuilderStatus: fmt.Sprintf("%s/%s:block-builder-status", redisPrefix, prefix),
keyLastSlotDelivered: fmt.Sprintf("%s/%s:last-slot-delivered", redisPrefix, prefix),
}, nil
}

Expand Down Expand Up @@ -270,6 +272,35 @@ func (r *RedisCache) GetActiveValidators() (map[boostTypes.PubkeyHex]bool, error
return validators, nil
}

func (r *RedisCache) CheckAndSetLastSlotDelivered(slot uint64) (err error) {
// More details about Redis optimistic locking:
// - https://redis.uptrace.dev/guide/go-redis-pipelines.html#transactions
// - https://github.com/redis/go-redis/blob/6ecbcf6c90919350c42181ce34c1cbdfbd5d1463/race_test.go#L183
txf := func(tx *redis.Tx) error {
lastSlotDelivered, err := tx.Get(context.Background(), r.keyLastSlotDelivered).Uint64()
if err != nil && !errors.Is(err, redis.Nil) {
return err
}

if slot <= lastSlotDelivered {
return ErrSlotAlreadyDelivered
}

_, err = tx.TxPipelined(context.Background(), func(pipe redis.Pipeliner) error {
pipe.Set(context.Background(), r.keyLastSlotDelivered, slot, 0)
return nil
})

return err
}

return r.client.Watch(context.Background(), txf, r.keyLastSlotDelivered)
}

func (r *RedisCache) GetLastSlotDelivered() (slot uint64, err error) {
return r.client.Get(context.Background(), r.keyLastSlotDelivered).Uint64()
}

func (r *RedisCache) SetStats(field string, value any) (err error) {
return r.client.HSet(context.Background(), r.keyStats, field, value).Err()
}
Expand Down
96 changes: 96 additions & 0 deletions datastore/redis_test.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
package datastore

import (
"context"
"errors"
"math/big"
"sync"
"testing"
"time"

"github.com/alicebob/miniredis/v2"
"github.com/flashbots/go-boost-utils/types"
"github.com/flashbots/mev-boost-relay/common"
"github.com/go-redis/redis/v9"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -302,3 +306,95 @@ func TestRedisURIs(t *testing.T) {
_, err = NewRedisCache("", malformURL, "")
require.Error(t, err)
}

func TestCheckAndSetLastSlotDelivered(t *testing.T) {
cache := setupTestRedis(t)
newSlot := uint64(123)

// should return redis.Nil if wasn't set
slot, err := cache.GetLastSlotDelivered()
require.ErrorIs(t, err, redis.Nil)
require.Equal(t, uint64(0), slot)

// should be able to set once
err = cache.CheckAndSetLastSlotDelivered(newSlot)
require.NoError(t, err)

// should get slot
slot, err = cache.GetLastSlotDelivered()
require.NoError(t, err)
require.Equal(t, newSlot, slot)

// should fail on second time
err = cache.CheckAndSetLastSlotDelivered(newSlot)
require.ErrorIs(t, err, ErrSlotAlreadyDelivered)

// should also fail on earlier slots
err = cache.CheckAndSetLastSlotDelivered(newSlot - 1)
require.ErrorIs(t, err, ErrSlotAlreadyDelivered)
}

// Test_CheckAndSetLastSlotDeliveredForTesting ensures the optimistic locking works
// i.e. running CheckAndSetLastSlotDelivered leading to err == redis.TxFailedErr
func Test_CheckAndSetLastSlotDeliveredForTesting(t *testing.T) {
cache := setupTestRedis(t)
newSlot := uint64(123)
n := 3

errC := make(chan error, n)
waitC := make(chan bool, n)
syncWG := sync.WaitGroup{}

// Kick off goroutines, that will all try to set the same slot
for i := 0; i < n; i++ {
syncWG.Add(1)
go func() {
errC <- _CheckAndSetLastSlotDeliveredForTesting(cache, waitC, &syncWG, newSlot)
}()
}

syncWG.Wait()

// Continue first goroutine (should succeed)
waitC <- true
err := <-errC
require.NoError(t, err)

// Continue all other goroutines (all should return the race error redis.TxFailedErr)
for i := 1; i < n; i++ {
waitC <- true
err := <-errC
require.ErrorIs(t, err, redis.TxFailedErr)
}

// Any later call should return ErrSlotAlreadyDelivered
err = _CheckAndSetLastSlotDeliveredForTesting(cache, waitC, &syncWG, newSlot)
waitC <- true
require.ErrorIs(t, err, ErrSlotAlreadyDelivered)
}

func _CheckAndSetLastSlotDeliveredForTesting(r *RedisCache, waitC chan bool, wg *sync.WaitGroup, slot uint64) (err error) {
// copied from redis.go, with added channel and waitgroup to test the race condition in a controlled way
txf := func(tx *redis.Tx) error {
lastSlotDelivered, err := tx.Get(context.Background(), r.keyLastSlotDelivered).Uint64()
if err != nil && !errors.Is(err, redis.Nil) {
return err
}

if slot <= lastSlotDelivered {
return ErrSlotAlreadyDelivered
}

wg.Done()
<-waitC

_, err = tx.TxPipelined(context.Background(), func(pipe redis.Pipeliner) error {
pipe.Set(context.Background(), r.keyLastSlotDelivered, slot, 0)
return nil
})

return err
}

return r.client.Watch(context.Background(), txf, r.keyLastSlotDelivered)
}
100 changes: 54 additions & 46 deletions services/api/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -932,10 +932,12 @@ func (api *RelayAPI) handleGetPayload(w http.ResponseWriter, req *http.Request)

// Log at start and end of request
log.Info("request initiated")
defer log.WithFields(logrus.Fields{
"timestampRequestFin": time.Now().UTC().UnixMilli(),
"requestDurationMs": time.Since(receivedAt).Milliseconds(),
}).Info("request finished")
defer func() {
log.WithFields(logrus.Fields{
"timestampRequestFin": time.Now().UTC().UnixMilli(),
"requestDurationMs": time.Since(receivedAt).Milliseconds(),
}).Info("request finished")
}()

// Read the body first, so we can decode it later
body, err := io.ReadAll(req.Body)
Expand Down Expand Up @@ -1030,17 +1032,48 @@ func (api *RelayAPI) handleGetPayload(w http.ResponseWriter, req *http.Request)
log = log.WithField("timestampAfterSignatureVerify", time.Now().UTC().UnixMilli())
log.Info("getPayload request received")

// Check whether getPayload has already been called
slotLastPayloadDelivered, err := api.redis.GetStatsUint64(datastore.RedisStatsFieldSlotLastPayloadDelivered)
if err != nil && !errors.Is(err, redis.Nil) {
log.WithError(err).Error("failed to get delivered payload slot from redis")
} else if payload.Slot() <= slotLastPayloadDelivered {
log.Warn("getPayload was already called for this slot")
api.RespondError(w, http.StatusBadRequest, "payload for this slot was already delivered")
return
// TODO: store signed blinded block in database (always)

// Get the response - from Redis, Memcache or DB
// note that recent mev-boost versions only send getPayload to relays that provided the bid
getPayloadResp, err := api.datastore.GetGetPayloadResponse(payload.Slot(), proposerPubkey.String(), payload.BlockHash())
if err != nil || getPayloadResp == nil {
log.WithError(err).Warn("failed getting execution payload (1/2)")
time.Sleep(time.Duration(timeoutGetPayloadRetryMs) * time.Millisecond)

// Try again
getPayloadResp, err = api.datastore.GetGetPayloadResponse(payload.Slot(), proposerPubkey.String(), payload.BlockHash())
if err != nil {
log.WithError(err).Error("failed getting execution payload (2/2) - due to error")
api.RespondError(w, http.StatusBadRequest, err.Error())
return
} else if getPayloadResp == nil {
log.Warn("failed getting execution payload (2/2)")
api.RespondError(w, http.StatusBadRequest, "no execution payload for this request")
return
}
}

log = log.WithField("timestampAfterAlreadyDelivered", time.Now().UTC().UnixMilli())
// Now we know this relay also has the payload
log = log.WithField("timestampAfterLoadResponse", time.Now().UTC().UnixMilli())

// Check whether getPayload has already been called -- TODO: do we need to allow multiple submissions of one blinded block?
err = api.redis.CheckAndSetLastSlotDelivered(payload.Slot())
log = log.WithField("timestampAfterAlreadyDeliveredCheck", time.Now().UTC().UnixMilli())
if err != nil {
if errors.Is(err, datastore.ErrSlotAlreadyDelivered) {
// BAD VALIDATOR, 2x GETPAYLOAD
log.Warn("validator called getPayload twice")
api.RespondError(w, http.StatusBadRequest, "payload for this slot was already delivered")
return
} else if errors.Is(err, redis.TxFailedErr) {
// BAD VALIDATOR, 2x GETPAYLOAD + RACE
log.Warn("validator called getPayload twice (race)")
api.RespondError(w, http.StatusBadRequest, "payload for this slot was already delivered (race)")
return
}
log.WithError(err).Error("redis.CheckAndSetLastSlotDelivered failed")
}

// Handle early/late requests
if msIntoSlot < 0 {
Expand All @@ -1066,26 +1099,7 @@ func (api *RelayAPI) handleGetPayload(w http.ResponseWriter, req *http.Request)
return
}

// Get the response - from Redis, Memcache or DB
// note that recent mev-boost versions only send getPayload to relays that provided the bid
getPayloadResp, err := api.datastore.GetGetPayloadResponse(payload.Slot(), proposerPubkey.String(), payload.BlockHash())
if err != nil || getPayloadResp == nil {
log.WithError(err).Warn("failed getting execution payload (1/2)")
time.Sleep(time.Duration(timeoutGetPayloadRetryMs) * time.Millisecond)

// Try again
getPayloadResp, err = api.datastore.GetGetPayloadResponse(payload.Slot(), proposerPubkey.String(), payload.BlockHash())
if err != nil {
log.WithError(err).Error("failed getting execution payload (2/2) - due to error")
api.RespondError(w, http.StatusBadRequest, err.Error())
return
} else if getPayloadResp == nil {
log.Warn("failed getting execution payload (2/2)")
api.RespondError(w, http.StatusBadRequest, "no execution payload for this request")
return
}
}

// Introduce a random delay (disabled by default)
if getPayloadPublishDelayMs > 0 {
delayMillis := rand.Intn(getPayloadPublishDelayMs) //nolint:gosec
time.Sleep(time.Duration(delayMillis) * time.Millisecond)
Expand Down Expand Up @@ -1114,14 +1128,6 @@ func (api *RelayAPI) handleGetPayload(w http.ResponseWriter, req *http.Request)
log = log.WithField("timestampAfterPublishing", timeAfterPublish)
log.WithField("msNeededForPublishing", msNeededForPublishing).Info("block published through beacon node")

// Remember that getPayload has already been called
go func() {
err := api.redis.SetStats(datastore.RedisStatsFieldSlotLastPayloadDelivered, payload.Slot())
if err != nil {
log.WithError(err).Error("failed to save delivered payload slot to redis")
}
}()

// give the beacon network some time to propagate the block
time.Sleep(time.Duration(getPayloadResponseDelayMs) * time.Millisecond)

Expand Down Expand Up @@ -1183,10 +1189,12 @@ func (api *RelayAPI) handleSubmitNewBlock(w http.ResponseWriter, req *http.Reque

// Log at start and end of request
log.Info("request initiated")
defer log.WithFields(logrus.Fields{
"timestampRequestFin": time.Now().UTC().UnixMilli(),
"requestDurationMs": time.Since(receivedAt).Milliseconds(),
}).Info("request finished")
defer func() {
log.WithFields(logrus.Fields{
"timestampRequestFin": time.Now().UTC().UnixMilli(),
"requestDurationMs": time.Since(receivedAt).Milliseconds(),
}).Info("request finished")
}()

// If cancellations are disabled but builder requested it, return error
if isCancellationEnabled && !api.ffEnableCancellations {
Expand Down Expand Up @@ -1359,7 +1367,7 @@ func (api *RelayAPI) handleSubmitNewBlock(w http.ResponseWriter, req *http.Reque
}

// Reject new submissions once the payload for this slot was delivered - TODO: store in memory as well
slotLastPayloadDelivered, err := api.redis.GetStatsUint64(datastore.RedisStatsFieldSlotLastPayloadDelivered)
slotLastPayloadDelivered, err := api.redis.GetLastSlotDelivered()
if err != nil && !errors.Is(err, redis.Nil) {
log.WithError(err).Error("failed to get delivered payload slot from redis")
} else if payload.Slot() <= slotLastPayloadDelivered {
Expand Down

0 comments on commit 1960654

Please sign in to comment.