Skip to content

Commit

Permalink
compose: add vouch to compose (#1814)
Browse files Browse the repository at this point in the history
Adds vouch to compose and also made some changes to beaconmock which were needed by vouch specifically.
Note: Vouch doesn't support block proposals because of RANDAO check in go-eth2-client.

category: feature
ticket: #1403
  • Loading branch information
dB2510 committed Feb 24, 2023
1 parent eeff3cd commit 32a6879
Show file tree
Hide file tree
Showing 11 changed files with 335 additions and 15 deletions.
3 changes: 3 additions & 0 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -801,7 +801,10 @@ func createMockValidators(pubkeys []eth2p0.BLSPubKey) beaconmock.ValidatorSet {
Status: eth2v1.ValidatorStateActiveOngoing,
Validator: &eth2p0.Validator{
WithdrawalCredentials: []byte("12345678901234567890123456789012"),
EffectiveBalance: eth2p0.Gwei(31300000000),
PublicKey: pubkey,
ExitEpoch: 18446744073709551615,
WithdrawableEpoch: 18446744073709551615,
},
}
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ require (
golang.org/x/time v0.3.0
golang.org/x/tools v0.6.0
google.golang.org/protobuf v1.28.2-0.20220831092852-f930b1dc76e8
gopkg.in/cenkalti/backoff.v1 v1.1.0
)

require (
Expand Down Expand Up @@ -174,7 +175,6 @@ require (
golang.org/x/text v0.7.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
google.golang.org/appengine v1.6.7 // indirect
gopkg.in/cenkalti/backoff.v1 v1.1.0 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
Expand Down
93 changes: 79 additions & 14 deletions testutil/beaconmock/headproducer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,24 +21,29 @@ import (
"fmt"
"math/rand"
"net/http"
"strconv"
"sync"
"time"

eth2v1 "github.com/attestantio/go-eth2-client/api/v1"
eth2p0 "github.com/attestantio/go-eth2-client/spec/phase0"
"github.com/gorilla/mux"
"github.com/r3labs/sse/v2"

"github.com/obolnetwork/charon/app/log"
"github.com/obolnetwork/charon/app/z"
)

const sseStreamID = "head_events"
const (
topicHead = "head"
topicBlock = "block"
)

func newHeadProducer() *headProducer {
server := sse.New()
server.CreateStream(sseStreamID)

return &headProducer{
server: server,
quit: make(chan struct{}),
server: sse.New(),
streamsByTopic: make(map[string][]string),
quit: make(chan struct{}),
}
}

Expand All @@ -49,8 +54,9 @@ type headProducer struct {
quit chan struct{}

// Mutable state
mu sync.Mutex
currentHead *eth2v1.HeadEvent
mu sync.Mutex
currentHead *eth2v1.HeadEvent
streamsByTopic map[string][]string
}

// Start starts the internal slot ticker that updates head.
Expand Down Expand Up @@ -95,20 +101,55 @@ func (p *headProducer) setCurrentHead(currentHead *eth2v1.HeadEvent) {
p.currentHead = currentHead
}

func (p *headProducer) getStreamIDs(topic string) []string {
p.mu.Lock()
defer p.mu.Unlock()

return p.streamsByTopic[topic]
}

func (p *headProducer) setStreamIDs(topic string, streamID string) {
p.mu.Lock()
defer p.mu.Unlock()

p.streamsByTopic[topic] = append(p.streamsByTopic[topic], streamID)
}

// updateHead updates current head based on provided slot.
func (p *headProducer) updateHead(slot eth2p0.Slot) {
currentHead := pseudoRandomHeadEvent(slot)
p.setCurrentHead(currentHead)

data, err := json.Marshal(currentHead)
currentBlock := &eth2v1.BlockEvent{
Slot: slot,
Block: currentHead.Block,
}

headData, err := json.Marshal(currentHead)
if err != nil {
panic(err) // This should never happen and this is test code sorry ;)
}

p.server.Publish(sseStreamID, &sse.Event{
Event: []byte("head"),
Data: data,
})
blockData, err := json.Marshal(currentBlock)
if err != nil {
panic(err) // This should never happen and this is test code sorry ;)
}

// Publish head events.
for _, streamID := range p.getStreamIDs(topicHead) {
p.server.Publish(streamID, &sse.Event{
Event: []byte(topicHead),
Data: headData,
})
}

// Publish block events.
for _, streamID := range p.getStreamIDs(topicBlock) {
p.server.Publish(streamID, &sse.Event{
Event: []byte(topicBlock),
Data: blockData,
})
}
}

type getBlockRootResponseJSON struct {
Expand Down Expand Up @@ -173,9 +214,33 @@ func (p *headProducer) handleGetBlockRoot(w http.ResponseWriter, r *http.Request

// handleEvents is a http handler to handle "/eth/v1/events".
func (p *headProducer) handleEvents(w http.ResponseWriter, r *http.Request) {
//nolint:gosec
streamID := strconv.Itoa(rand.Int())
p.server.CreateStream(streamID)

query := r.URL.Query()
query.Set("stream", sseStreamID) // Add sseStreamID for sse server to serve events on.
query.Set("stream", streamID) // Add sseStreamID for sse server to serve events on.
r.URL.RawQuery = query.Encode()

for _, topic := range query["topics"] {
if topic != topicHead && topic != topicBlock {
log.Warn(context.Background(), "Unsupported topic requested", nil, z.Str("topic", topic))
w.WriteHeader(http.StatusInternalServerError)
resp, err := json.Marshal(errorMsgJSON{
Code: 500,
Message: "unknown topic",
})
if err != nil {
panic(err) // This should never happen and this is test code sorry ;)
}
_, _ = w.Write(resp)

return
}

p.setStreamIDs(topic, streamID)
}

p.server.ServeHTTP(w, r)
}

Expand Down
123 changes: 123 additions & 0 deletions testutil/beaconmock/headproducer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
// Copyright © 2022 Obol Labs Inc.
//
// This program is free software: you can redistribute it and/or modify it
// under the terms of the GNU General Public License as published by the Free
// Software Foundation, either version 3 of the License, or (at your option)
// any later version.
//
// This program is distributed in the hope that it will be useful, but WITHOUT
// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
// FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
// more details.
//
// You should have received a copy of the GNU General Public License along with
// this program. If not, see <http://www.gnu.org/licenses/>.

package beaconmock_test

import (
"context"
"fmt"
"io"
"net/http"
"net/url"
"strings"
"testing"

"github.com/r3labs/sse/v2"
"github.com/stretchr/testify/require"
"gopkg.in/cenkalti/backoff.v1"

"github.com/obolnetwork/charon/app/errors"
"github.com/obolnetwork/charon/testutil/beaconmock"
)

func TestHeadProducer(t *testing.T) {
bmock, err := beaconmock.New()
require.NoError(t, err)

defer bmock.Close()

base, err := url.Parse(bmock.Address())
require.NoError(t, err)

unsupportedTopicErr := errors.New("unknown topic requested")

tests := []struct {
name string
topics []string
statusCode int
expectErr bool
}{
{
name: "2 supported topics requested",
topics: []string{"head", "block"},
statusCode: http.StatusOK,
},
{
name: "head topic",
topics: []string{"head"},
statusCode: http.StatusOK,
},
{
name: "block topic",
topics: []string{"block"},
statusCode: http.StatusOK,
},
{
name: "unsupported topic",
topics: []string{"exit"},
statusCode: http.StatusInternalServerError,
expectErr: true,
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
endpoint, err := url.Parse(fmt.Sprintf("eth/v1/events?topics=%s", strings.Join(test.topics, "&topics=")))
require.NoError(t, err)

addr := base.ResolveReference(endpoint).String()

requiredTopics := make(map[string]bool)
for _, topic := range test.topics {
requiredTopics[topic] = true
}

client := sse.NewClient(addr, func(c *sse.Client) {
c.ResponseValidator = func(c *sse.Client, resp *http.Response) error {
require.Equal(t, test.statusCode, resp.StatusCode)

if resp.StatusCode == http.StatusInternalServerError {
data, err := io.ReadAll(resp.Body)
require.NoError(t, err)
require.Contains(t, string(data), "unknown topic")

return backoff.Permanent(unsupportedTopicErr)
}

if len(requiredTopics) == 0 {
return backoff.Permanent(nil)
}

return nil
}
})

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

if test.expectErr {
require.ErrorIs(t, client.SubscribeWithContext(ctx, addr, func(msg *sse.Event) {}), unsupportedTopicErr)
} else {
require.NoError(t, client.SubscribeWithContext(ctx, addr, func(msg *sse.Event) {
require.True(t, requiredTopics[string(msg.Event)])
delete(requiredTopics, string(msg.Event))
if len(requiredTopics) == 0 {
cancel()
}
}))
}
})
}
}
22 changes: 22 additions & 0 deletions testutil/beaconmock/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,14 @@ import (

eth2client "github.com/attestantio/go-eth2-client"
eth2http "github.com/attestantio/go-eth2-client/http"
eth2spec "github.com/attestantio/go-eth2-client/spec"
"github.com/attestantio/go-eth2-client/spec/bellatrix"
"github.com/gorilla/mux"

"github.com/obolnetwork/charon/app/errors"
"github.com/obolnetwork/charon/app/log"
"github.com/obolnetwork/charon/app/z"
"github.com/obolnetwork/charon/testutil"
)

//go:embed static.json
Expand Down Expand Up @@ -100,6 +103,25 @@ func newHTTPServer(addr string, optionalHandlers map[string]http.HandlerFunc, ov
case <-r.Context().Done():
}
},
"/eth/v2/beacon/blocks/{block_id}": func(w http.ResponseWriter, r *http.Request) {
type signedBlockResponseJSON struct {
Version *eth2spec.DataVersion `json:"version"`
Data *bellatrix.SignedBeaconBlock `json:"data"`
}

version := eth2spec.DataVersionBellatrix
resp, err := json.Marshal(signedBlockResponseJSON{
Version: &version,
Data: testutil.RandomBellatrixSignedBeaconBlock(),
})
if err != nil {
panic(err) // This should never happen and this is test code sorry ;)
}

w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
_, _ = w.Write(resp)
},
}

for path, handler := range optionalHandlers {
Expand Down
1 change: 1 addition & 0 deletions testutil/compose/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ const (
VCMock VCType = "mock"
VCTeku VCType = "teku"
VCLighthouse VCType = "lighthouse"
VCVouch VCType = "vouch"
)

// KeyGen defines a key generation process.
Expand Down
4 changes: 4 additions & 0 deletions testutil/compose/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ func Run(ctx context.Context, dir string, conf Config) (TmplData, error) {
// getVC returns the validator client template data for the provided type and index.
func getVC(typ VCType, nodeIdx int, numVals int, insecure bool) (TmplVC, error) {
vcByType := map[VCType]TmplVC{
VCVouch: {
Label: string(VCVouch),
Build: "vouch",
},
VCLighthouse: {
Label: string(VCLighthouse),
Build: "lighthouse",
Expand Down
9 changes: 9 additions & 0 deletions testutil/compose/static/vouch/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
FROM wealdtech/ethdo:1.25.3 as ethdo

FROM attestant/vouch:1.6.2

COPY --from=ethdo /app/ethdo /app/ethdo

RUN apt-get update && apt-get install -y curl jq wget

ENTRYPOINT ["/compose/vouch/run.sh"]
Loading

0 comments on commit 32a6879

Please sign in to comment.