diff --git a/cmd/mev-boost/main.go b/cmd/mev-boost/main.go index 968c4756..edfbfebb 100644 --- a/cmd/mev-boost/main.go +++ b/cmd/mev-boost/main.go @@ -21,16 +21,18 @@ var ( version = "dev" // is set during build process // defaults - defaultListenAddr = getEnv("BOOST_LISTEN_ADDR", "localhost:18550") - defaultRelayTimeoutMs = getEnvInt("RELAY_TIMEOUT_MS", 2000) // timeout for all the requests to the relay - defaultRelayCheck = os.Getenv("RELAY_STARTUP_CHECK") != "" - defaultGenesisForkVersion = getEnv("GENESIS_FORK_VERSION", "") + defaultListenAddr = getEnv("BOOST_LISTEN_ADDR", "localhost:18550") + defaultRelayTimeoutMs = getEnvInt("RELAY_TIMEOUT_MS", 2000) // timeout for all the requests to the relay + defaultRelayCheck = os.Getenv("RELAY_STARTUP_CHECK") != "" + defaultGenesisForkVersion = getEnv("GENESIS_FORK_VERSION", "") + defaultRegisterValidatorInterval = getEnvInt("REGISTER_VALIDATOR_INTERVAL_SEC", 384) // cli flags - listenAddr = flag.String("addr", defaultListenAddr, "listen-address for mev-boost server") - relayURLs = flag.String("relays", "", "relay urls - single entry or comma-separated list (schema://pubkey@ip:port)") - relayTimeoutMs = flag.Int("request-timeout", defaultRelayTimeoutMs, "timeout for requests to a relay [ms]") - relayCheck = flag.Bool("relay-check", defaultRelayCheck, "whether to check relay status on startup") + listenAddr = flag.String("addr", defaultListenAddr, "listen-address for mev-boost server") + relayURLs = flag.String("relays", "", "relay urls - single entry or comma-separated list (schema://pubkey@ip:port)") + relayTimeoutMs = flag.Int("request-timeout", defaultRelayTimeoutMs, "timeout for requests to a relay [ms]") + relayCheck = flag.Bool("relay-check", defaultRelayCheck, "whether to check relay status on startup") + registerValidatorIntervalSec = flag.Int("register-validator-interval-sec", defaultRegisterValidatorInterval, "the interval used to resend the validator preferences (fee recipient and gas limit)") // helpers useGenesisForkVersionMainnet = flag.Bool("mainnet", false, "use Mainnet genesis fork version 0x00000000 (for signature validation)") @@ -70,13 +72,14 @@ func main() { } relayTimeout := time.Duration(*relayTimeoutMs) * time.Millisecond + registerValidatorInterval := time.Duration(*registerValidatorIntervalSec) * time.Second server, err := server.NewBoostService(*listenAddr, relays, log, genesisForkVersionHex, relayTimeout) if err != nil { log.WithError(err).Fatal("failed creating the server") } log.Println("listening on", *listenAddr) - log.Fatal(server.StartHTTPServer()) + log.Fatal(server.StartServer(registerValidatorInterval)) } func getEnv(key string, defaultValue string) string { diff --git a/server/service.go b/server/service.go index 7f053674..d0acc893 100644 --- a/server/service.go +++ b/server/service.go @@ -8,6 +8,7 @@ import ( "net/http" "strconv" "sync" + "sync/atomic" "time" "github.com/flashbots/go-boost-utils/types" @@ -47,6 +48,9 @@ func NewDefaultHTTPServerTimeouts() HTTPServerTimeouts { // BoostService TODO type BoostService struct { + ctx context.Context + cancelFunc context.CancelFunc + listenAddr string relays []RelayEntry log *logrus.Entry @@ -56,6 +60,9 @@ type BoostService struct { serverTimeouts HTTPServerTimeouts httpClient http.Client + + // Used to share new incoming registration requests along with the unique channel for sharing valid responses count + registrationsRequests chan sharedRegistrationsRequests } // NewBoostService created a new BoostService @@ -69,7 +76,11 @@ func NewBoostService(listenAddr string, relays []RelayEntry, log *logrus.Entry, return nil, err } + ctx, cancel := context.WithCancel(context.Background()) return &BoostService{ + ctx: ctx, + cancelFunc: cancel, + listenAddr: listenAddr, relays: relays, log: log.WithField("module", "service"), @@ -77,6 +88,8 @@ func NewBoostService(listenAddr string, relays []RelayEntry, log *logrus.Entry, builderSigningDomain: builderSigningDomain, serverTimeouts: NewDefaultHTTPServerTimeouts(), httpClient: http.Client{Timeout: relayRequestTimeout}, + + registrationsRequests: make(chan sharedRegistrationsRequests), }, nil } @@ -94,8 +107,8 @@ func (m *BoostService) getRouter() http.Handler { return loggedRouter } -// StartHTTPServer starts the HTTP server for this boost service instance -func (m *BoostService) StartHTTPServer() error { +// StartServer starts the HTTP server for this boost service instance +func (m *BoostService) StartServer(registerValidatorInterval time.Duration) error { if m.srv != nil { return errServerAlreadyRunning } @@ -110,10 +123,15 @@ func (m *BoostService) StartHTTPServer() error { IdleTimeout: m.serverTimeouts.Idle, } + // Start separate process to send validator preferences at regular interval. + go m.registerValidatorAtInterval(registerValidatorInterval) + defer m.shutdown() + err := m.srv.ListenAndServe() if err == http.ErrServerClosed { return nil } + return err } @@ -129,6 +147,63 @@ func (m *BoostService) handleStatus(w http.ResponseWriter, req *http.Request) { fmt.Fprintf(w, `{}`) } +// sendValidatorPreferences is used to send the validators preferences to the registered relays +func (m *BoostService) sendValidatorPreferences(log *logrus.Entry, payload []types.SignedValidatorRegistration) uint64 { + // We need a wait group to manage each routine used to perform the requests. + var wg sync.WaitGroup + + // Use an atomic counter to count successful requests. + numSuccessRequestsToRelay := uint64(0) + + // Send the validators preferences to each registered relay. + for _, relay := range m.relays { + wg.Add(1) + + go func(relayAddr string) { + defer wg.Done() + + url := relayAddr + pathRegisterValidator + log := log.WithField("url", url) + + err := SendHTTPRequest(context.Background(), m.httpClient, http.MethodPost, url, payload, nil) + if err != nil { + log.WithError(err).Warn("error in registerValidator to relay") + return + } + + atomic.AddUint64(&numSuccessRequestsToRelay, 1) + }(relay.Address) + } + + wg.Wait() + + return numSuccessRequestsToRelay +} + +func (m *BoostService) registerValidatorAtInterval(interval time.Duration) { + var payload []types.SignedValidatorRegistration + log := m.log.WithField("method", "registerValidatorAtInterval") + + ticker := time.NewTicker(interval) + defer ticker.Stop() + + for { + select { + case <-m.ctx.Done(): + // mev-boost has probably stopped + return + case srr := <-m.registrationsRequests: + // registerValidator has received new registrations and forwards them to here + srr.numSuccessRequestsToRelay <- m.sendValidatorPreferences(log, srr.preferences) + payload = srr.preferences + // Reset the timer to avoid overload + ticker.Reset(interval) + case <-ticker.C: + m.sendValidatorPreferences(log, payload) + } + } +} + // RegisterValidatorV1 - returns 200 if at least one relay returns 200 func (m *BoostService) handleRegisterValidator(w http.ResponseWriter, req *http.Request) { log := m.log.WithField("method", "registerValidator") @@ -164,32 +239,16 @@ func (m *BoostService) handleRegisterValidator(w http.ResponseWriter, req *http. } } - numSuccessRequestsToRelay := 0 - var mu sync.Mutex - - // Call the relays - var wg sync.WaitGroup - for _, relay := range m.relays { - wg.Add(1) - go func(relayAddr string) { - defer wg.Done() - url := relayAddr + pathRegisterValidator - log := log.WithField("url", url) - - err := SendHTTPRequest(context.Background(), m.httpClient, http.MethodPost, url, payload, nil) - if err != nil { - log.WithError(err).Warn("error in registerValidator to relay") - return - } - - mu.Lock() - defer mu.Unlock() - numSuccessRequestsToRelay++ - }(relay.Address) + srr := sharedRegistrationsRequests{ + preferences: payload, + numSuccessRequestsToRelay: make(chan uint64), } - // Wait for all requests to complete... - wg.Wait() + // Send the payload to the goroutine responsible for handling the resend at interval + m.registrationsRequests <- srr + + // Block until we get the number of successful requests back from this goroutine + numSuccessRequestsToRelay := <-srr.numSuccessRequestsToRelay if numSuccessRequestsToRelay > 0 { w.Header().Set("Content-Type", "application/json") @@ -384,3 +443,7 @@ func (m *BoostService) handleGetPayload(w http.ResponseWriter, req *http.Request return } } + +func (m *BoostService) shutdown() { + m.cancelFunc() +} diff --git a/server/service_test.go b/server/service_test.go index 736b3534..aab82fed 100644 --- a/server/service_test.go +++ b/server/service_test.go @@ -36,9 +36,12 @@ func newTestBackend(t *testing.T, numRelays int, relayTimeout time.Duration) *te backend.relays[i] = newMockRelay(t, blsPrivateKey) relayEntries[i] = backend.relays[i].RelayEntry } + service, err := NewBoostService("localhost:12345", relayEntries, testLog, "0x00000000", relayTimeout) require.NoError(t, err) + go service.registerValidatorAtInterval(time.Second * 384) + backend.boost = service return &backend } @@ -71,22 +74,26 @@ func TestNewBoostServiceErrors(t *testing.T) { func TestWebserver(t *testing.T) { t.Run("errors when webserver is already existing", func(t *testing.T) { backend := newTestBackend(t, 1, time.Second) + defer backend.boost.shutdown() + backend.boost.srv = &http.Server{} - err := backend.boost.StartHTTPServer() + err := backend.boost.StartServer(time.Second * 384) require.Error(t, err) }) t.Run("webserver error on invalid listenAddr", func(t *testing.T) { backend := newTestBackend(t, 1, time.Second) + defer backend.boost.shutdown() + backend.boost.listenAddr = "localhost:876543" - err := backend.boost.StartHTTPServer() + err := backend.boost.StartServer(time.Second * 384) require.Error(t, err) }) // t.Run("webserver starts normally", func(t *testing.T) { // backend := newTestBackend(t, 1, time.Second) // go func() { - // err := backend.boost.StartHTTPServer() + // err := backend.boost.StartServer() // require.NoError(t, err) // }() // time.Sleep(time.Millisecond * 100) @@ -96,6 +103,7 @@ func TestWebserver(t *testing.T) { func TestWebserverRootHandler(t *testing.T) { backend := newTestBackend(t, 1, time.Second) + defer backend.boost.shutdown() // Check root handler req, _ := http.NewRequest("GET", "/", nil) @@ -121,6 +129,8 @@ var payloadRegisterValidator = types.SignedValidatorRegistration{ func TestStatus(t *testing.T) { backend := newTestBackend(t, 1, time.Second) + defer backend.boost.shutdown() + path := "/eth/v1/builder/status" rr := backend.request(t, http.MethodGet, path, payloadRegisterValidator) require.Equal(t, http.StatusOK, rr.Code) @@ -143,6 +153,8 @@ func TestRegisterValidator(t *testing.T) { t.Run("Normal function", func(t *testing.T) { backend := newTestBackend(t, 1, time.Second) + defer backend.boost.shutdown() + rr := backend.request(t, http.MethodPost, path, payload) require.Equal(t, http.StatusOK, rr.Code) require.Equal(t, 1, backend.relays[0].GetRequestCount(path)) @@ -150,6 +162,7 @@ func TestRegisterValidator(t *testing.T) { t.Run("Relay error response", func(t *testing.T) { backend := newTestBackend(t, 2, time.Second) + defer backend.boost.shutdown() rr := backend.request(t, http.MethodPost, path, payload) require.Equal(t, http.StatusOK, rr.Code) @@ -179,6 +192,8 @@ func TestRegisterValidator(t *testing.T) { t.Run("mev-boost relay timeout works with slow relay", func(t *testing.T) { backend := newTestBackend(t, 1, 5*time.Millisecond) // 10ms max + defer backend.boost.shutdown() + rr := backend.request(t, http.MethodPost, path, payload) require.Equal(t, http.StatusOK, rr.Code) @@ -203,6 +218,8 @@ func TestGetHeader(t *testing.T) { t.Run("Okay response from relay", func(t *testing.T) { backend := newTestBackend(t, 1, time.Second) + defer backend.boost.shutdown() + rr := backend.request(t, http.MethodGet, path, nil) require.Equal(t, http.StatusOK, rr.Code, rr.Body.String()) require.Equal(t, 1, backend.relays[0].GetRequestCount(path)) @@ -210,6 +227,8 @@ func TestGetHeader(t *testing.T) { t.Run("Bad response from relays", func(t *testing.T) { backend := newTestBackend(t, 2, time.Second) + defer backend.boost.shutdown() + resp := backend.relays[0].MakeGetHeaderResponse( 12345, "0xe28385e7bd68df656cd0042b74b69c3104b5356ed1f20eb69f1f925df47a3ab7", @@ -235,6 +254,7 @@ func TestGetHeader(t *testing.T) { t.Run("Use header with highest value", func(t *testing.T) { // Create backend and register 3 relays. backend := newTestBackend(t, 3, time.Second) + defer backend.boost.shutdown() // First relay will return signed response with value 12345. backend.relays[0].GetHeaderResponse = backend.relays[0].MakeGetHeaderResponse( @@ -276,6 +296,7 @@ func TestGetHeader(t *testing.T) { t.Run("Invalid relay public key", func(t *testing.T) { backend := newTestBackend(t, 1, time.Second) + defer backend.boost.shutdown() backend.relays[0].GetHeaderResponse = backend.relays[0].MakeGetHeaderResponse( 12345, @@ -296,6 +317,7 @@ func TestGetHeader(t *testing.T) { t.Run("Invalid relay signature", func(t *testing.T) { backend := newTestBackend(t, 1, time.Second) + defer backend.boost.shutdown() backend.relays[0].GetHeaderResponse = backend.relays[0].MakeGetHeaderResponse( 12345, @@ -342,6 +364,8 @@ func TestGetPayload(t *testing.T) { t.Run("Okay response from relay", func(t *testing.T) { backend := newTestBackend(t, 1, time.Second) + defer backend.boost.shutdown() + rr := backend.request(t, http.MethodPost, path, payload) require.Equal(t, http.StatusOK, rr.Code, rr.Body.String()) require.Equal(t, 1, backend.relays[0].GetRequestCount(path)) @@ -354,6 +378,8 @@ func TestGetPayload(t *testing.T) { t.Run("Bad response from relays", func(t *testing.T) { backend := newTestBackend(t, 2, time.Second) + defer backend.boost.shutdown() + resp := new(types.GetPayloadResponse) // Delays are needed because otherwise one relay might never receive a request diff --git a/server/validator_preferences.go b/server/validator_preferences.go new file mode 100644 index 00000000..b273beb1 --- /dev/null +++ b/server/validator_preferences.go @@ -0,0 +1,10 @@ +package server + +import "github.com/flashbots/go-boost-utils/types" + +// sharedRegistrationsRequests is used by handleRegisterValidator to send incoming requests along with a unique +// channel for registerValidatorAtInterval to send back the number of successful registered validator preferences. +type sharedRegistrationsRequests struct { + preferences []types.SignedValidatorRegistration + numSuccessRequestsToRelay chan uint64 +}