Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(builder): subscribe to constraints via SSE #46

Merged
merged 14 commits into from
Jun 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
217 changes: 188 additions & 29 deletions builder/builder/builder.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
package builder

import (
"bufio"
"compress/gzip"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"math"
"math/big"
"net/http"
_ "os"
"slices"
"strings"
"sync"
"time"

Expand All @@ -23,6 +29,7 @@ import (
"github.com/attestantio/go-eth2-client/spec/deneb"
"github.com/attestantio/go-eth2-client/spec/phase0"
utilbellatrix "github.com/attestantio/go-eth2-client/util/bellatrix"
"github.com/chainbound/shardmap"
"github.com/ethereum/go-ethereum/beacon/engine"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
Expand All @@ -48,6 +55,10 @@ const (
SubmissionOffsetFromEndOfSlotSecondsDefault = 3 * time.Second
)

const (
SubscribeConstraintsPath = "/eth/v1/builder/constraints"
)

type PubkeyHex string

type ValidatorData struct {
Expand All @@ -58,7 +69,7 @@ type ValidatorData struct {

type IRelay interface {
SubmitBlock(msg *builderSpec.VersionedSubmitBlockRequest, vd ValidatorData) error
SubmitBlockWithPreconfsProofs(msg *VersionedSubmitBlockRequestWithPreconfsProofs, vd ValidatorData) error
SubmitBlockWithProofs(msg *common.VersionedSubmitBlockRequestWithProofs, vd ValidatorData) error
GetValidatorForSlot(nextSlot uint64) (ValidatorData, error)
Config() RelayConfig
Start() error
Expand All @@ -72,7 +83,6 @@ type IBuilder interface {
}

type Builder struct {
boltCCEndpoint string
ds flashbotsextra.IDatabaseService
blockConsumer flashbotsextra.BlockConsumer
relay IRelay
Expand All @@ -87,6 +97,9 @@ type Builder struct {
builderResubmitInterval time.Duration
discardRevertibleTxOnErr bool

// constraintsCache is a map from slot to the constraints made by proposers
constraintsCache *shardmap.FIFOMap[uint64, common.SignedConstraintsList]

limiter *rate.Limiter
submissionOffsetFromEndOfSlot time.Duration

Expand Down Expand Up @@ -168,7 +181,6 @@ func NewBuilder(args BuilderArgs) (*Builder, error) {

slotCtx, slotCtxCancel := context.WithCancel(context.Background())
return &Builder{
boltCCEndpoint: args.boltCCEndpoint,
ds: args.ds,
blockConsumer: args.blockConsumer,
relay: args.relay,
Expand All @@ -184,6 +196,8 @@ func NewBuilder(args BuilderArgs) (*Builder, error) {
discardRevertibleTxOnErr: args.discardRevertibleTxOnErr,
submissionOffsetFromEndOfSlot: args.submissionOffsetFromEndOfSlot,

constraintsCache: shardmap.NewFIFOMap[uint64, common.SignedConstraintsList](64, 16, shardmap.HashUint64),

limiter: args.limiter,
slotCtx: slotCtx,
slotCtxCancel: slotCtxCancel,
Expand Down Expand Up @@ -235,7 +249,156 @@ func (b *Builder) Start() error {
}
}()

return b.relay.Start()
if err := b.relay.Start(); err != nil {
return err
}

return b.SubscribeProposerConstraints()
}

// GenerateAuthenticationHeader generates an authentication string for the builder
// to subscribe to SSE constraint events emitted by relays
func (b *Builder) GenerateAuthenticationHeader() (string, error) {
// NOTE: the `slot` acts similarly to a nonce for the message to sign, to avoid replay attacks.
slot := b.slotAttrs.Slot
message, err := json.Marshal(common.ConstraintSubscriptionAuth{PublicKey: b.builderPublicKey, Slot: slot})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need the slot in the authentication message?

Copy link
Contributor Author

@thedevbirb thedevbirb May 31, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The slot just acts similarly to a nonce for the message to sign, to avoid replay attacks. We could do something else but for the PoC I thought it was easy to implement and convenient.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perfect, let's document this in the comments somewhere

if err != nil {
log.Error(fmt.Sprintf("Failed to marshal auth message: %v", err))
return "", err
}
signatureEC := bls.Sign(b.builderSecretKey, message)
subscriptionSignatureJSON := `"` + phase0.BLSSignature(bls.SignatureToBytes(signatureEC)[:]).String() + `"`
authHeader := "BOLT " + subscriptionSignatureJSON + "," + string(message)
mempirate marked this conversation as resolved.
Show resolved Hide resolved
return authHeader, nil
}

// SubscribeProposerConstraints subscribes to the constraints made by Bolt proposers
// which the builder pulls from relay(s) using SSE.
func (b *Builder) SubscribeProposerConstraints() error {
// Create authentication signed message
authHeader, err := b.GenerateAuthenticationHeader()
if err != nil {
log.Error(fmt.Sprintf("Failed to generate authentication header: %v", err))
return err
}

// Check if `b.relay` is a RemoteRelayAggregator, if so we need to subscribe to
// the constraints made available by all the relays
relayAggregator, ok := b.relay.(*RemoteRelayAggregator)
if ok {
for _, relay := range relayAggregator.relays {
go b.subscribeToRelayForConstraints(relay.Config().Endpoint, authHeader)
}
} else {
go b.subscribeToRelayForConstraints(b.relay.Config().Endpoint, authHeader)
}
return nil
}

func (b *Builder) subscribeToRelayForConstraints(relayBaseEndpoint, authHeader string) error {
// Subscribe to constraints
req, err := http.NewRequest(http.MethodGet, relayBaseEndpoint+SubscribeConstraintsPath, nil)
if err != nil {
log.Error(fmt.Sprintf("Failed to create new http request: %v", err))
return err
}
req.Header.Set("Accept-Encoding", "gzip")
req.Header.Set("Authorization", authHeader)

client := http.Client{}
resp, err := client.Do(req)
if err != nil {
log.Error(fmt.Sprintf("Failed to connect to SSE server: %v", err))
return err
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
log.Error(fmt.Sprintf("Non-OK HTTP status: %s", resp.Status))
return err
}

var reader io.Reader

// Step 2: Check if the response is gzipped
if resp.Header.Get("Content-Encoding") == "gzip" {
// Step 3: Decompress the response body
gzipReader, err := gzip.NewReader(resp.Body)
if err != nil {
return fmt.Errorf("error creating gzip reader: %v", err)
}
defer gzipReader.Close()
reader = gzipReader
} else {
reader = resp.Body
}

bufReader := bufio.NewReader(reader)
for {
line, err := bufReader.ReadString('\n')
if err != nil {
if err == io.EOF {
fmt.Println("End of stream")
break
}
log.Error("Error reading from response body: %v", err)
}
if !strings.HasPrefix(line, "data: ") {
continue
}
data := strings.TrimPrefix(line, "data: ")

// We assume the data is the JSON representation of the constraints
log.Debug("Received new constraint: %s\n", data)
constraintsSigned := make(common.SignedConstraintsList, 0, 8)
if err := json.Unmarshal([]byte(data), &constraintsSigned); err != nil {
log.Warn(fmt.Sprintf("Failed to unmarshal constraints: %v", err))
continue
}
if len(constraintsSigned) == 0 {
log.Warn("Received 0 length list of constraints")
continue
}

OUTER:
for _, constraint := range constraintsSigned {
// For every constraint, we need to check if it has already been seen for the associated slot
slotConstraints, _ := b.constraintsCache.Get(constraint.Message.Slot)
if len(slotConstraints) == 0 {
// New constraint for this slot, add it in the map and continue with the next constraint
b.constraintsCache.Put(constraint.Message.Slot, common.SignedConstraintsList{constraint})
continue
}
for _, slotConstraint := range slotConstraints {
if slotConstraint.Signature == constraint.Signature {
// The constraint has already been seen, we can continue with the next one
continue OUTER
}
}
// The constraint is new, we need to append it to the current list
b.constraintsCache.Put(constraint.Message.Slot, append(slotConstraints, constraint))
}
}

return nil
}

func (b *Builder) GetConstraintsForSlot(slot uint64) types.HashToConstraintDecoded {
constraintsDecoded := make(types.HashToConstraintDecoded)
constraintsSigned, _ := b.constraintsCache.Get(slot)

for _, constraintSigned := range constraintsSigned {
constraints := constraintSigned.Message.Constraints
for _, constraint := range constraints {
decoded := new(types.Transaction)
if err := decoded.UnmarshalBinary(constraint.Tx); err != nil {
log.Error("Failed to decode preconfirmation transaction RLP: ", err)
continue
}
constraintsDecoded[decoded.Hash()] = &types.ConstraintDecoded{Index: constraint.Index, Tx: decoded}
}
}
return constraintsDecoded
}
Comment on lines +386 to 402
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it make sense to hold the constraints as decoded in the constraints cache instead of decoding them when we need to seal the block (when this function is called inside runBuildingJob)? That way the RLP decoding overhead would be pushed to the event subscription side.

Just a nit and can be done in a further PR if it makes sense

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's a great point and I'm sorry for not having thought of it immediately. I'd like to address it in a separate PR (I'll make the issue shortly) just to focus more on having the devnet done.


func (b *Builder) Stop() error {
Expand All @@ -244,7 +407,7 @@ func (b *Builder) Stop() error {
}

// BOLT: modify to calculate merkle inclusion proofs for preconfirmed transactions
func (b *Builder) onSealedBlock(opts SubmitBlockOpts, preconfs []*types.Transaction) error {
func (b *Builder) onSealedBlock(opts SubmitBlockOpts, constraints types.HashToConstraintDecoded) error {
executableData := engine.BlockToExecutableData(opts.Block, opts.BlockValue, opts.BlobSidecars)
var dataVersion spec.DataVersion
if b.eth.Config().IsCancun(opts.Block.Number(), opts.Block.Time()) {
Expand Down Expand Up @@ -277,9 +440,9 @@ func (b *Builder) onSealedBlock(opts SubmitBlockOpts, preconfs []*types.Transact
payloadTransactions := opts.Block.Transactions()

// BOLT: sanity check: verify that the block actually contains the preconfirmed transactions
for _, preconf := range preconfs {
if !slices.Contains(payloadTransactions, preconf) {
log.Error(fmt.Sprintf("[BOLT]: Preconfirmed transaction %s not found in block %s", preconf.Hash(), opts.Block.Hash()))
for hash, constraint := range constraints {
if !slices.Contains(payloadTransactions, constraint.Tx) {
log.Error(fmt.Sprintf("[BOLT]: Preconfirmed transaction %s not found in block %s", hash, opts.Block.Hash()))
continue
}
}
Expand Down Expand Up @@ -308,13 +471,13 @@ func (b *Builder) onSealedBlock(opts SubmitBlockOpts, preconfs []*types.Transact
rootNode.Hash()

// BOLT: calculate merkle proofs for preconfirmed transactions
preconfirmationsProofs := make([]*PreconfirmationWithProof, 0, len(preconfs))
preconfirmationsProofs := make([]*common.PreconfirmationWithProof, 0, len(constraints))

for i, preconf := range preconfs {
for hash := range constraints {
// get the index of the preconfirmed transaction in the block
preconfIndex := slices.IndexFunc(payloadTransactions, func(tx *types.Transaction) bool { return tx.Hash() == preconf.Hash() })
preconfIndex := slices.IndexFunc(payloadTransactions, func(tx *types.Transaction) bool { return tx.Hash() == hash })
if preconfIndex == -1 {
log.Error(fmt.Sprintf("Preconfirmed transaction %s not found in block %s", preconf.Hash(), opts.Block.Hash()))
log.Error(fmt.Sprintf("Preconfirmed transaction %s not found in block %s", hash, opts.Block.Hash()))
log.Error(fmt.Sprintf("block has %v transactions", len(payloadTransactions)))
continue
}
Expand All @@ -323,26 +486,26 @@ func (b *Builder) onSealedBlock(opts SubmitBlockOpts, preconfs []*types.Transact
generalizedIndex := int(math.Pow(float64(2), float64(21))) + preconfIndex

log.Info(fmt.Sprintf("[BOLT]: Calculating merkle proof for preconfirmed transaction %s with index %d. Preconf index: %d",
preconf.Hash(), generalizedIndex, preconfIndex))
hash, generalizedIndex, preconfIndex))

timeStart := time.Now()
proof, err := rootNode.Prove(generalizedIndex)
if err != nil {
log.Error("[BOLT]: could not calculate merkle proof for preconfirmed transaction", "txHash", preconf.Hash(), "err", err)
log.Error("[BOLT]: could not calculate merkle proof for preconfirmed transaction", "txHash", hash, "err", err)
continue
}
log.Info(fmt.Sprintf("[BOLT]: Calculated merkle proof for preconf %s in %s", preconf.Hash(), time.Since(timeStart)))
log.Info(fmt.Sprintf("[BOLT]: Calculated merkle proof for preconf %s in %s", hash, time.Since(timeStart)))
log.Info(fmt.Sprintf("[BOLT]: LEAF: %x, Is leaf nil? %v", proof.Leaf, proof.Leaf == nil))

merkleProof := new(SerializedMerkleProof)
merkleProof := new(common.SerializedMerkleProof)
merkleProof.FromFastSszProof(proof)

preconfirmationsProofs = append(preconfirmationsProofs, &PreconfirmationWithProof{
TxHash: phase0.Hash32(preconf.Hash()),
preconfirmationsProofs = append(preconfirmationsProofs, &common.PreconfirmationWithProof{
TxHash: phase0.Hash32(hash),
MerkleProof: merkleProof,
})

log.Info(fmt.Sprintf("[BOLT]: Added merkle proof for preconfirmed transaction %s", preconfirmationsProofs[i]))
// log.Info(fmt.Sprintf("[BOLT]: Added merkle proof for preconfirmed transaction %s", preconfirmationsProofs[i]))
}

versionedBlockRequest, err := b.getBlockRequest(executableData, dataVersion, &blockBidMsg)
Expand All @@ -351,7 +514,7 @@ func (b *Builder) onSealedBlock(opts SubmitBlockOpts, preconfs []*types.Transact
return err
}

versionedBlockRequestWithPreconfsProofs := &VersionedSubmitBlockRequestWithPreconfsProofs{
versionedBlockRequestWithPreconfsProofs := &common.VersionedSubmitBlockRequestWithProofs{
Inner: versionedBlockRequest,
Proofs: preconfirmationsProofs,
}
Expand All @@ -371,7 +534,7 @@ func (b *Builder) onSealedBlock(opts SubmitBlockOpts, preconfs []*types.Transact
} else {
// NOTE: we can ignore preconfs for `processBuiltBlock`
go b.processBuiltBlock(opts.Block, opts.BlockValue, opts.OrdersClosedAt, opts.SealedAt, opts.CommitedBundles, opts.AllBundles, opts.UsedSbundles, &blockBidMsg)
err = b.relay.SubmitBlockWithPreconfsProofs(versionedBlockRequestWithPreconfsProofs, opts.ValidatorData)
err = b.relay.SubmitBlockWithProofs(versionedBlockRequestWithPreconfsProofs, opts.ValidatorData)
if err != nil {
log.Error("could not submit block", "err", err, "verion", dataVersion, "#commitedBundles", len(opts.CommitedBundles))
return err
Expand Down Expand Up @@ -531,12 +694,8 @@ func (b *Builder) runBuildingJob(slotCtx context.Context, proposerPubkey phase0.

log.Debug("runBuildingJob", "slot", attrs.Slot, "parent", attrs.HeadHash, "payloadTimestamp", uint64(attrs.Timestamp))

// fetch preconfs here
preconfs, err := b.eth.Preconfirmations(b.boltCCEndpoint, attrs.Slot)
log.Info("[BOLT]: Got preconfirmations", "preconfs", len(preconfs))
if err != nil {
log.Error("[BOLT]: could not get preconfirmations", "err", err)
}
// fetch constraints here
constraints := b.GetConstraintsForSlot(attrs.Slot)

submitBestBlock := func() {
queueMu.Lock()
Expand All @@ -554,7 +713,7 @@ func (b *Builder) runBuildingJob(slotCtx context.Context, proposerPubkey phase0.
ValidatorData: vd,
PayloadAttributes: attrs,
}
err := b.onSealedBlock(submitBlockOpts, preconfs)
err := b.onSealedBlock(submitBlockOpts, constraints)

if err != nil {
log.Error("could not run sealed block hook", "err", err)
Expand Down Expand Up @@ -610,7 +769,7 @@ func (b *Builder) runBuildingJob(slotCtx context.Context, proposerPubkey phase0.
"slot", attrs.Slot,
"parent", attrs.HeadHash,
"resubmit-interval", b.builderResubmitInterval.String())
err := b.eth.BuildBlock(attrs, blockHook, preconfs)
err := b.eth.BuildBlock(attrs, blockHook, constraints)
if err != nil {
log.Warn("Failed to build block", "err", err)
}
Expand Down
Loading