Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add storage and resend for validators preferences #149

Closed
wants to merge 11 commits into from
23 changes: 13 additions & 10 deletions cmd/mev-boost/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,18 @@ var (
version = "dev" // is set during build process

// defaults
defaultListenAddr = getEnv("BOOST_LISTEN_ADDR", "localhost:18550")
defaultRelayTimeoutMs = getEnvInt("RELAY_TIMEOUT_MS", 2000) // timeout for all the requests to the relay
defaultRelayCheck = os.Getenv("RELAY_STARTUP_CHECK") != ""
defaultGenesisForkVersion = getEnv("GENESIS_FORK_VERSION", "")
defaultListenAddr = getEnv("BOOST_LISTEN_ADDR", "localhost:18550")
defaultRelayTimeoutMs = getEnvInt("RELAY_TIMEOUT_MS", 2000) // timeout for all the requests to the relay
defaultRelayCheck = os.Getenv("RELAY_STARTUP_CHECK") != ""
defaultGenesisForkVersion = getEnv("GENESIS_FORK_VERSION", "")
defaultRegisterValidatorInterval = getEnvInt("REGISTER_VALIDATOR_INTERVAL_SEC", 384)

// cli flags
listenAddr = flag.String("addr", defaultListenAddr, "listen-address for mev-boost server")
relayURLs = flag.String("relays", "", "relay urls - single entry or comma-separated list (schema://pubkey@ip:port)")
relayTimeoutMs = flag.Int("request-timeout", defaultRelayTimeoutMs, "timeout for requests to a relay [ms]")
relayCheck = flag.Bool("relay-check", defaultRelayCheck, "whether to check relay status on startup")
listenAddr = flag.String("addr", defaultListenAddr, "listen-address for mev-boost server")
relayURLs = flag.String("relays", "", "relay urls - single entry or comma-separated list (schema://pubkey@ip:port)")
relayTimeoutMs = flag.Int("request-timeout", defaultRelayTimeoutMs, "timeout for requests to a relay [ms]")
relayCheck = flag.Bool("relay-check", defaultRelayCheck, "whether to check relay status on startup")
registerValidatorIntervalSec = flag.Int("register-validator-interval-sec", defaultRegisterValidatorInterval, "the interval used to resend the validator preferences (fee recipient and gas limit)")

// helpers
useGenesisForkVersionMainnet = flag.Bool("mainnet", false, "use Mainnet genesis fork version 0x00000000 (for signature validation)")
Expand Down Expand Up @@ -70,13 +72,14 @@ func main() {
}

relayTimeout := time.Duration(*relayTimeoutMs) * time.Millisecond
server, err := server.NewBoostService(*listenAddr, relays, log, genesisForkVersionHex, relayTimeout)
registerValidatorInterval := time.Duration(*registerValidatorIntervalSec) * time.Second
server, err := server.NewBoostService(*listenAddr, relays, log, genesisForkVersionHex, relayTimeout, registerValidatorInterval)
if err != nil {
log.WithError(err).Fatal("failed creating the server")
}

log.Println("listening on", *listenAddr)
log.Fatal(server.StartHTTPServer())
log.Fatal(server.StartServer())
}

func getEnv(key string, defaultValue string) string {
Expand Down
113 changes: 84 additions & 29 deletions server/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"net/http"
"strconv"
"sync"
"sync/atomic"
"time"

"github.com/flashbots/go-boost-utils/types"
Expand Down Expand Up @@ -56,10 +57,17 @@ type BoostService struct {
serverTimeouts HTTPServerTimeouts

httpClient http.Client

// Used to stop registerValidatorAtInterval
done chan bool
Copy link
Contributor

@Ruteri Ruteri Jun 14, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another way to implement shutdowns will be to have a context with cancel, and storing the cancel function.
Since we are not using contexts yet this is good too.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, that's what they've been made after all.
Fixed in a2bb054

// Used by registerValidator to share new incoming registration request with the goroutine holding the ticker
Copy link
Contributor

@Ruteri Ruteri Jun 14, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It'd be great to avoid fields shared across requests as they usually introduce a race

newRegistrationsRequests chan []types.SignedValidatorRegistration
// Used by registerValidatorAtInterval to share the number of successful requests with the registerValidator handler
numSuccessRequestsToRelay chan uint64
}

// NewBoostService created a new BoostService
func NewBoostService(listenAddr string, relays []RelayEntry, log *logrus.Entry, genesisForkVersionHex string, relayRequestTimeout time.Duration) (*BoostService, error) {
func NewBoostService(listenAddr string, relays []RelayEntry, log *logrus.Entry, genesisForkVersionHex string, relayRequestTimeout, validatorPreferencesResendInterval time.Duration) (*BoostService, error) {
if len(relays) == 0 {
return nil, errors.New("no relays")
}
Expand All @@ -77,6 +85,10 @@ func NewBoostService(listenAddr string, relays []RelayEntry, log *logrus.Entry,
builderSigningDomain: builderSigningDomain,
serverTimeouts: NewDefaultHTTPServerTimeouts(),
httpClient: http.Client{Timeout: relayRequestTimeout},

done: make(chan bool),
newRegistrationsRequests: make(chan []types.SignedValidatorRegistration),
numSuccessRequestsToRelay: make(chan uint64),
}, nil
}

Expand All @@ -94,8 +106,8 @@ func (m *BoostService) getRouter() http.Handler {
return loggedRouter
}

// StartHTTPServer starts the HTTP server for this boost service instance
func (m *BoostService) StartHTTPServer() error {
// StartServer starts the HTTP server for this boost service instance
func (m *BoostService) StartServer() error {
if m.srv != nil {
return errServerAlreadyRunning
}
Expand All @@ -110,10 +122,15 @@ func (m *BoostService) StartHTTPServer() error {
IdleTimeout: m.serverTimeouts.Idle,
}

// Start separate process to send validator preferences at regular interval.
go m.registerValidatorAtInterval(time.Second*384, m.done)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm guessing this should be registerValidatorIntervalSec

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missed that, thanks ! Fixed in e1d6572.

defer m.shutdown()

err := m.srv.ListenAndServe()
if err == http.ErrServerClosed {
return nil
}

return err
}

Expand All @@ -129,6 +146,62 @@ func (m *BoostService) handleStatus(w http.ResponseWriter, req *http.Request) {
fmt.Fprintf(w, `{}`)
}

// sendValidatorPreferences is used to send the validators preferences to the registered relays
func (m *BoostService) sendValidatorPreferences(log *logrus.Entry, payload []types.SignedValidatorRegistration) uint64 {
// We need a wait group to manage each routine used to perform the requests.
var wg sync.WaitGroup

// Use an atomic counter to count successful requests.
numSuccessRequestsToRelay := uint64(0)

// Send the validators preferences to each registered relay.
for _, relay := range m.relays {
wg.Add(1)

go func(relayAddr string) {
defer wg.Done()

url := relayAddr + pathRegisterValidator
log := log.WithField("url", url)

err := SendHTTPRequest(context.Background(), m.httpClient, http.MethodPost, url, payload, nil)
if err != nil {
log.WithError(err).Warn("error in registerValidator to relay")
return
}

atomic.AddUint64(&numSuccessRequestsToRelay, 1)
}(relay.Address)
}

wg.Wait()

return numSuccessRequestsToRelay
}

func (m *BoostService) registerValidatorAtInterval(interval time.Duration, done chan bool) {
var payload []types.SignedValidatorRegistration
log := m.log.WithField("method", "registerValidatorAtInterval")

ticker := time.NewTicker(interval)
defer ticker.Stop()

for {
select {
case <-done:
// mev-boost has probably stopped
return
case payload = <-m.newRegistrationsRequests:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this will overwrite the previous payload, but it's conceivable that there's multiple validators using one BN that would then overwrite each others registrations. perhaps better to make this additive?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm actually not sure about it, the expectation is to have a single BN connecting to mev-boost, and forcing this to be additive would mean you cannot ever remove a registration.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've implemented the merging process between the old (local payload list) and the new one (received from the channel) but not pushed it, so you'll just have to tell me !

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Ruteri expectation is a single BN, but this single BN could have many validator clients connected! We can not ignore this case.

One approach could be to add a time to each registration and remove old ones.

// registerValidator has received new registrations and forwards them to here
m.numSuccessRequestsToRelay <- m.sendValidatorPreferences(log, payload)
// Reset the timer to avoid overload
ticker.Reset(interval)
case <-ticker.C:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One issue that arises is that we can send twice in a short interval (if a registration request comes near the timer tick)
In this approach it's not easy to solve it, and I think we could come at this from a different angle:

  • make registerValidatorAtInterval the only place responsible for sending the registration
  • listen on both the timer, and a new channel that accepts new registration requests
  • on new registration request, send the preferences and reset the timer

This has some nice benefits:

  • m.vp is now an object local to registerValidatorAtInterval
  • validator preferences struct can become just a plain old map
  • no need for mutex
  • after a registration you will correctly wait until the next interval

The only downside is that you'll need to return the registration status back over a channel (or a callback), but it should be okay.

We can submit a PR to this PR if you prefer not implementing it

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Totally agree with this comment ! Thanks a lot for raising the issue and proposing an alternative 👍
I can make the updates don't worry.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added in de04d2f

cc @Ruteri

m.sendValidatorPreferences(log, payload)
}
}
}

// RegisterValidatorV1 - returns 200 if at least one relay returns 200
func (m *BoostService) handleRegisterValidator(w http.ResponseWriter, req *http.Request) {
log := m.log.WithField("method", "registerValidator")
Expand Down Expand Up @@ -164,32 +237,10 @@ func (m *BoostService) handleRegisterValidator(w http.ResponseWriter, req *http.
}
}

numSuccessRequestsToRelay := 0
var mu sync.Mutex

// Call the relays
var wg sync.WaitGroup
for _, relay := range m.relays {
wg.Add(1)
go func(relayAddr string) {
defer wg.Done()
url := relayAddr + pathRegisterValidator
log := log.WithField("url", url)

err := SendHTTPRequest(context.Background(), m.httpClient, http.MethodPost, url, payload, nil)
if err != nil {
log.WithError(err).Warn("error in registerValidator to relay")
return
}

mu.Lock()
defer mu.Unlock()
numSuccessRequestsToRelay++
}(relay.Address)
}

// Wait for all requests to complete...
wg.Wait()
// Send the payload to the goroutine responsible for handling the resend at interval
m.newRegistrationsRequests <- payload
// Block until we get the number of successful requests back from this goroutine
numSuccessRequestsToRelay := <-m.numSuccessRequestsToRelay
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once context (timeout) handling is implemented this will become a race, the result channel (or callback) should be specific to the request.
I think pushing a pair of (payload, result chan) would be a clean way to solve it that will also work nicely with context timeouts.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really like the idea of pushing the response channel, I've added this in 2d26f24


if numSuccessRequestsToRelay > 0 {
w.Header().Set("Content-Type", "application/json")
Expand Down Expand Up @@ -384,3 +435,7 @@ func (m *BoostService) handleGetPayload(w http.ResponseWriter, req *http.Request
return
}
}

func (m *BoostService) shutdown() {
m.done <- true
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

using a context would be better because if there's multiple things to shut down, this channel would only work for one of them. could be okay for now, and we can also do this in another PR.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above, fixed in a2bb054

}
38 changes: 33 additions & 5 deletions server/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,14 @@ func newTestBackend(t *testing.T, numRelays int, relayTimeout time.Duration) *te
backend.relays[i] = newMockRelay(t, blsPrivateKey)
relayEntries[i] = backend.relays[i].RelayEntry
}
service, err := NewBoostService("localhost:12345", relayEntries, testLog, "0x00000000", relayTimeout)

registerValidatorInterval := time.Second * 5

service, err := NewBoostService("localhost:12345", relayEntries, testLog, "0x00000000", relayTimeout, registerValidatorInterval)
require.NoError(t, err)

go service.registerValidatorAtInterval(time.Second*384, service.done)

backend.boost = service
return &backend
}
Expand All @@ -63,30 +68,34 @@ func (be *testBackend) request(t *testing.T, method string, path string, payload

func TestNewBoostServiceErrors(t *testing.T) {
t.Run("errors when no relays", func(t *testing.T) {
_, err := NewBoostService(":123", []RelayEntry{}, testLog, "0x00000000", time.Second)
_, err := NewBoostService(":123", []RelayEntry{}, testLog, "0x00000000", time.Second, time.Second)
require.Error(t, err)
})
}

func TestWebserver(t *testing.T) {
t.Run("errors when webserver is already existing", func(t *testing.T) {
backend := newTestBackend(t, 1, time.Second)
defer backend.boost.shutdown()

backend.boost.srv = &http.Server{}
err := backend.boost.StartHTTPServer()
err := backend.boost.StartServer()
require.Error(t, err)
})

t.Run("webserver error on invalid listenAddr", func(t *testing.T) {
backend := newTestBackend(t, 1, time.Second)
defer backend.boost.shutdown()

backend.boost.listenAddr = "localhost:876543"
err := backend.boost.StartHTTPServer()
err := backend.boost.StartServer()
require.Error(t, err)
})

// t.Run("webserver starts normally", func(t *testing.T) {
// backend := newTestBackend(t, 1, time.Second)
// go func() {
// err := backend.boost.StartHTTPServer()
// err := backend.boost.StartServer()
// require.NoError(t, err)
// }()
// time.Sleep(time.Millisecond * 100)
Expand All @@ -96,6 +105,7 @@ func TestWebserver(t *testing.T) {

func TestWebserverRootHandler(t *testing.T) {
backend := newTestBackend(t, 1, time.Second)
defer backend.boost.shutdown()

// Check root handler
req, _ := http.NewRequest("GET", "/", nil)
Expand All @@ -121,6 +131,8 @@ var payloadRegisterValidator = types.SignedValidatorRegistration{

func TestStatus(t *testing.T) {
backend := newTestBackend(t, 1, time.Second)
defer backend.boost.shutdown()

path := "/eth/v1/builder/status"
rr := backend.request(t, http.MethodGet, path, payloadRegisterValidator)
require.Equal(t, http.StatusOK, rr.Code)
Expand All @@ -143,13 +155,16 @@ func TestRegisterValidator(t *testing.T) {

t.Run("Normal function", func(t *testing.T) {
backend := newTestBackend(t, 1, time.Second)
defer backend.boost.shutdown()

rr := backend.request(t, http.MethodPost, path, payload)
require.Equal(t, http.StatusOK, rr.Code)
require.Equal(t, 1, backend.relays[0].GetRequestCount(path))
})

t.Run("Relay error response", func(t *testing.T) {
backend := newTestBackend(t, 2, time.Second)
defer backend.boost.shutdown()

rr := backend.request(t, http.MethodPost, path, payload)
require.Equal(t, http.StatusOK, rr.Code)
Expand Down Expand Up @@ -179,6 +194,8 @@ func TestRegisterValidator(t *testing.T) {

t.Run("mev-boost relay timeout works with slow relay", func(t *testing.T) {
backend := newTestBackend(t, 1, 5*time.Millisecond) // 10ms max
defer backend.boost.shutdown()

rr := backend.request(t, http.MethodPost, path, payload)
require.Equal(t, http.StatusOK, rr.Code)

Expand All @@ -203,13 +220,17 @@ func TestGetHeader(t *testing.T) {

t.Run("Okay response from relay", func(t *testing.T) {
backend := newTestBackend(t, 1, time.Second)
defer backend.boost.shutdown()

rr := backend.request(t, http.MethodGet, path, nil)
require.Equal(t, http.StatusOK, rr.Code, rr.Body.String())
require.Equal(t, 1, backend.relays[0].GetRequestCount(path))
})

t.Run("Bad response from relays", func(t *testing.T) {
backend := newTestBackend(t, 2, time.Second)
defer backend.boost.shutdown()

resp := backend.relays[0].MakeGetHeaderResponse(
12345,
"0xe28385e7bd68df656cd0042b74b69c3104b5356ed1f20eb69f1f925df47a3ab7",
Expand All @@ -235,6 +256,7 @@ func TestGetHeader(t *testing.T) {
t.Run("Use header with highest value", func(t *testing.T) {
// Create backend and register 3 relays.
backend := newTestBackend(t, 3, time.Second)
defer backend.boost.shutdown()

// First relay will return signed response with value 12345.
backend.relays[0].GetHeaderResponse = backend.relays[0].MakeGetHeaderResponse(
Expand Down Expand Up @@ -276,6 +298,7 @@ func TestGetHeader(t *testing.T) {

t.Run("Invalid relay public key", func(t *testing.T) {
backend := newTestBackend(t, 1, time.Second)
defer backend.boost.shutdown()

backend.relays[0].GetHeaderResponse = backend.relays[0].MakeGetHeaderResponse(
12345,
Expand All @@ -296,6 +319,7 @@ func TestGetHeader(t *testing.T) {

t.Run("Invalid relay signature", func(t *testing.T) {
backend := newTestBackend(t, 1, time.Second)
defer backend.boost.shutdown()

backend.relays[0].GetHeaderResponse = backend.relays[0].MakeGetHeaderResponse(
12345,
Expand Down Expand Up @@ -342,6 +366,8 @@ func TestGetPayload(t *testing.T) {

t.Run("Okay response from relay", func(t *testing.T) {
backend := newTestBackend(t, 1, time.Second)
defer backend.boost.shutdown()

rr := backend.request(t, http.MethodPost, path, payload)
require.Equal(t, http.StatusOK, rr.Code, rr.Body.String())
require.Equal(t, 1, backend.relays[0].GetRequestCount(path))
Expand All @@ -354,6 +380,8 @@ func TestGetPayload(t *testing.T) {

t.Run("Bad response from relays", func(t *testing.T) {
backend := newTestBackend(t, 2, time.Second)
defer backend.boost.shutdown()

resp := new(types.GetPayloadResponse)

// Delays are needed because otherwise one relay might never receive a request
Expand Down