Skip to content

Commit

Permalink
Add multi beacon client support (#876)
Browse files Browse the repository at this point in the history
Clients will subscribe to specific beacons within a multi-beacon group, and relays will by default re-publish all of the beacons of a multi-beacon group.
  • Loading branch information
emmanuelm41 committed Dec 22, 2021
1 parent 02bf053 commit 2c63d68
Show file tree
Hide file tree
Showing 25 changed files with 588 additions and 227 deletions.
6 changes: 6 additions & 0 deletions chain/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"crypto/sha256"
"encoding/binary"
"encoding/hex"
"time"

"github.com/drand/drand/common/scheme"
Expand Down Expand Up @@ -60,6 +61,11 @@ func (c *Info) Hash() []byte {
return h.Sum(nil)
}

// HashString returns the value of Hash in string format
func (c *Info) HashString() string {
return hex.EncodeToString(c.Hash())
}

// Equal indicates if two Chain Info objects are equivalent
func (c *Info) Equal(c2 *Info) bool {
return c.GenesisTime == c2.GenesisTime &&
Expand Down
29 changes: 26 additions & 3 deletions client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,16 @@ func TestClientMultiple(t *testing.T) {
addr2, _, cancel2, _ := httpmock.NewMockHTTPPublicServer(t, false, sch)
defer cancel2()

httpClients := http.ForURLs([]string{"http://" + addr1, "http://" + addr2}, chainInfo.Hash())
if len(httpClients) == 0 {
t.Error("http clients is empty")
return
}

var c client.Client
var e error
c, e = client.New(
client.From(http.ForURLs([]string{"http://" + addr1, "http://" + addr2}, chainInfo.Hash())...),
client.From(httpClients...),
client.WithChainHash(chainInfo.Hash()))

if e != nil {
Expand Down Expand Up @@ -93,9 +99,15 @@ func TestClientCache(t *testing.T) {
addr1, chainInfo, cancel, _ := httpmock.NewMockHTTPPublicServer(t, false, sch)
defer cancel()

httpClients := http.ForURLs([]string{"http://" + addr1}, chainInfo.Hash())
if len(httpClients) == 0 {
t.Error("http clients is empty")
return
}

var c client.Client
var e error
c, e = client.New(client.From(http.ForURLs([]string{"http://" + addr1}, chainInfo.Hash())...),
c, e = client.New(client.From(httpClients...),
client.WithChainHash(chainInfo.Hash()), client.WithCacheSize(1))

if e != nil {
Expand Down Expand Up @@ -123,10 +135,16 @@ func TestClientWithoutCache(t *testing.T) {
addr1, chainInfo, cancel, _ := httpmock.NewMockHTTPPublicServer(t, false, sch)
defer cancel()

httpClients := http.ForURLs([]string{"http://" + addr1}, chainInfo.Hash())
if len(httpClients) == 0 {
t.Error("http clients is empty")
return
}

var c client.Client
var e error
c, e = client.New(
client.From(http.ForURLs([]string{"http://" + addr1}, chainInfo.Hash())...),
client.From(httpClients...),
client.WithChainHash(chainInfo.Hash()),
client.WithCacheSize(0))

Expand Down Expand Up @@ -230,6 +248,11 @@ func TestClientAutoWatch(t *testing.T) {
defer cancel()

httpClient := http.ForURLs([]string{"http://" + addr1}, chainInfo.Hash())
if len(httpClient) == 0 {
t.Error("http clients is empty")
return
}

r1, _ := httpClient[0].Get(context.Background(), 1)
r2, _ := httpClient[0].Get(context.Background(), 2)
results := []client.Result{r1, r2}
Expand Down
28 changes: 18 additions & 10 deletions client/grpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/drand/drand/chain"
"github.com/drand/drand/client"
"github.com/drand/drand/log"
"github.com/drand/drand/protobuf/common"
"github.com/drand/drand/protobuf/drand"

grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
Expand All @@ -20,14 +21,15 @@ import (
const grpcDefaultTimeout = 5 * time.Second

type grpcClient struct {
address string
client drand.PublicClient
conn *grpc.ClientConn
l log.Logger
address string
chainHash []byte
client drand.PublicClient
conn *grpc.ClientConn
l log.Logger
}

// New creates a drand client backed by a GRPC connection.
func New(address, certPath string, insecure bool) (client.Client, error) {
func New(address, certPath string, insecure bool, chainHash []byte) (client.Client, error) {
opts := []grpc.DialOption{}
if certPath != "" {
creds, err := credentials.NewClientTLSFromFile(certPath, "")
Expand All @@ -48,7 +50,8 @@ func New(address, certPath string, insecure bool) (client.Client, error) {
if err != nil {
return nil, err
}
return &grpcClient{address, drand.NewPublicClient(conn), conn, log.DefaultLogger()}, nil

return &grpcClient{address, chainHash, drand.NewPublicClient(conn), conn, log.DefaultLogger()}, nil
}

func asRD(r *drand.PublicRandResponse) *client.RandomData {
Expand All @@ -67,7 +70,7 @@ func (g *grpcClient) String() string {

// Get returns a the randomness at `round` or an error.
func (g *grpcClient) Get(ctx context.Context, round uint64) (client.Result, error) {
curr, err := g.client.PublicRand(ctx, &drand.PublicRandRequest{Round: round})
curr, err := g.client.PublicRand(ctx, &drand.PublicRandRequest{Round: round, Metadata: g.getMetadata()})
if err != nil {
return nil, err
}
Expand All @@ -80,7 +83,7 @@ func (g *grpcClient) Get(ctx context.Context, round uint64) (client.Result, erro

// Watch returns new randomness as it becomes available.
func (g *grpcClient) Watch(ctx context.Context) <-chan client.Result {
stream, err := g.client.PublicRandStream(ctx, &drand.PublicRandRequest{Round: 0})
stream, err := g.client.PublicRandStream(ctx, &drand.PublicRandRequest{Round: 0, Metadata: g.getMetadata()})
ch := make(chan client.Result, 1)
if err != nil {
close(ch)
Expand All @@ -92,7 +95,7 @@ func (g *grpcClient) Watch(ctx context.Context) <-chan client.Result {

// Info returns information about the chain.
func (g *grpcClient) Info(ctx context.Context) (*chain.Info, error) {
proto, err := g.client.ChainInfo(ctx, &drand.ChainInfoRequest{})
proto, err := g.client.ChainInfo(ctx, &drand.ChainInfoRequest{Metadata: g.getMetadata()})
if err != nil {
return nil, err
}
Expand All @@ -116,10 +119,15 @@ func (g *grpcClient) translate(stream drand.Public_PublicRandStreamClient, out c
}
}

func (g *grpcClient) getMetadata() *common.Metadata {
return &common.Metadata{ChainHash: g.chainHash}
}

func (g *grpcClient) RoundAt(t time.Time) uint64 {
ctx, cancel := context.WithTimeout(context.Background(), grpcDefaultTimeout)
defer cancel()
info, err := g.client.ChainInfo(ctx, &drand.ChainInfoRequest{})

info, err := g.client.ChainInfo(ctx, &drand.ChainInfoRequest{Metadata: g.getMetadata()})
if err != nil {
return 0
}
Expand Down
4 changes: 2 additions & 2 deletions client/grpc/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func TestClient(t *testing.T) {
go l.Start()
defer l.Stop(context.Background())

c, err := New(addr, "", true)
c, err := New(addr, "", true, []byte(""))
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -71,7 +71,7 @@ func TestClientClose(t *testing.T) {
go l.Start()
defer l.Stop(context.Background())

c, err := New(addr, "", true)
c, err := New(addr, "", true, []byte(""))
if err != nil {
t.Fatal(err)
}
Expand Down
17 changes: 12 additions & 5 deletions client/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ type httpInfoResponse struct {
err error
}

// FetchGroupInfo attempts to initialize an httpClient when
// FetchChainInfo attempts to initialize an httpClient when
// it does not know the full group parameters for a drand group. The chain hash
// is the hash of the chain info.
func (h *httpClient) FetchChainInfo(ctx context.Context, chainHash []byte) (*chain.Info, error) {
Expand All @@ -243,7 +243,14 @@ func (h *httpClient) FetchChainInfo(ctx context.Context, chainHash []byte) (*cha
defer cancel()

go func() {
req, err := nhttp.NewRequestWithContext(ctx, "GET", fmt.Sprintf("%sinfo", h.root), nhttp.NoBody)
url := ""
if len(chainHash) > 0 {
url = fmt.Sprintf("%s%x/info", h.root, chainHash)
} else {
url = fmt.Sprintf("%sinfo", h.root)
}

req, err := nhttp.NewRequestWithContext(ctx, "GET", url, nhttp.NoBody)
if err != nil {
resC <- httpInfoResponse{nil, fmt.Errorf("creating request: %w", err)}
return
Expand Down Expand Up @@ -295,13 +302,13 @@ type httpGetResponse struct {
err error
}

// Get returns a the randomness at `round` or an error.
// Get returns the randomness at `round` or an error.
func (h *httpClient) Get(ctx context.Context, round uint64) (client.Result, error) {
var url string
if round == 0 {
url = fmt.Sprintf("%spublic/latest", h.root)
url = fmt.Sprintf("%s%x/public/latest", h.root, h.chainInfo.Hash())
} else {
url = fmt.Sprintf("%spublic/%d", h.root, round)
url = fmt.Sprintf("%s%x/public/%d", h.root, h.chainInfo.Hash(), round)
}

resC := make(chan httpGetResponse, 1)
Expand Down
14 changes: 8 additions & 6 deletions client/http/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ func TestHTTPClient(t *testing.T) {
t.Fatal(err)
}

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
result, err := httpClient.Get(ctx, 0)
ctx1, cancel1 := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel1()
result, err := httpClient.Get(ctx1, 0)
if err != nil {
t.Fatal(err)
}
Expand All @@ -44,8 +44,10 @@ func TestHTTPClient(t *testing.T) {
t.Fatal("no signature provided")
}

if _, err := httpClient.Get(ctx, full.Rnd+1); err != nil {
t.Fatal("http client should not perform verification of results")
ctx2, cancel2 := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel2()
if _, err := httpClient.Get(ctx2, full.Rnd+1); err != nil {
t.Fatalf("http client should not perform verification of results. err: %s", err)
}
_ = httpClient.Close()
}
Expand Down Expand Up @@ -118,7 +120,7 @@ func TestHTTPWatch(t *testing.T) {
t.Fatal(err)
}

ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
result := httpClient.Watch(ctx)
first, ok := <-result
Expand Down
4 changes: 3 additions & 1 deletion client/test/http/mock/httpserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,14 @@ func NewMockHTTPPublicServer(t *testing.T, badSecondRound bool, sch scheme.Schem
t.Fatal("could not use server after 3 attempts.")
}

handler.RegisterNewBeaconHandler(client, chainInfo.HashString())

listener, err := net.Listen("tcp", ":0")
if err != nil {
t.Fatal(err)
}

httpServer := http.Server{Handler: handler}
httpServer := http.Server{Handler: handler.GetHTTPHandler()}
go httpServer.Serve(listener)

return listener.Addr().String(), chainInfo, func() {
Expand Down
22 changes: 21 additions & 1 deletion cmd/client/lib/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ var (
Usage: "The hash (in hex) for the chain to follow",
Aliases: []string{"chain-hash"}, // DEPRECATED
}
// HashListFlag is the CLI flag for the hashes list (in hex) for the relay to follow.
HashListFlag = &cli.StringSliceFlag{
Name: "hash-list",
Usage: "The hash list (in hex) for the relay to follow",
}
// GroupConfFlag is the CLI flag for specifying the path to the drand group configuration (TOML encoded) or chain info (JSON encoded).
GroupConfFlag = &cli.PathFlag{
Name: "group-conf",
Expand Down Expand Up @@ -155,7 +160,22 @@ func Create(c *cli.Context, withInstrumentation bool, opts ...client.Option) (cl

func buildGrpcClient(c *cli.Context, info **chain.Info) ([]client.Client, error) {
if c.IsSet(GRPCConnectFlag.Name) {
gc, err := grpc.New(c.String(GRPCConnectFlag.Name), c.String(CertFlag.Name), c.Bool(InsecureFlag.Name))
hash := make([]byte, 0)

if c.IsSet(HashFlag.Name) {
var err error

hash, err = hex.DecodeString(c.String(HashFlag.Name))
if err != nil {
return nil, err
}
}

if *info != nil && len(hash) == 0 {
hash = (*info).Hash()
}

gc, err := grpc.New(c.String(GRPCConnectFlag.Name), c.String(CertFlag.Name), c.Bool(InsecureFlag.Name), hash)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/drand-cli/cli_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -775,7 +775,7 @@ func launchDrandInstances(t *testing.T, n int) ([]*drandInstance, string) {
pubPath := path.Join(tmpPath, "pub.key")

freePort := test.FreePort()
addr := "127.0.0." + strconv.Itoa(i) + ":" + freePort
addr := "127.0.0.1:" + freePort
ctrlPort := test.FreePort()
metricsPort := test.FreePort()

Expand Down
2 changes: 1 addition & 1 deletion cmd/drand-cli/public.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func getPublicRandomness(c *cli.Context) error {
var resp client.Result
var foundCorrect bool
for _, id := range ids {
grpcClient, err := grpc.New(id.Addr, certPath, !id.TLS)
grpcClient, err := grpc.New(id.Addr, certPath, !id.TLS, group.Hash())
if err != nil {
fmt.Fprintf(os.Stderr, "drand: could not connect to %s: %s", id.Addr, err)
break
Expand Down
Loading

0 comments on commit 2c63d68

Please sign in to comment.