Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add multi beacon client support #876

Merged
merged 46 commits into from
Dec 22, 2021
Merged
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
259d551
Add chainHash to Metadata message
raynaudoe Dec 8, 2021
67ebbcc
Update grpc and http clients to use beaconHash info
raynaudoe Dec 8, 2021
e799d67
Update http client endpoints for multibeacon
raynaudoe Dec 8, 2021
d8783f7
Adapt server to handle multiple beacons
raynaudoe Dec 8, 2021
21d4107
mod tidy
raynaudoe Dec 8, 2021
8a06f53
Apply lint fixes
raynaudoe Dec 11, 2021
a8b1a6a
Fix creating drand handler
raynaudoe Dec 12, 2021
84e4cbe
Fix endpoints urls
raynaudoe Dec 12, 2021
2627f79
Relay - Create beacon handler for beacon hash
raynaudoe Dec 12, 2021
7c2707f
Several logic fixes
raynaudoe Dec 12, 2021
c59248c
Fix tests
raynaudoe Dec 12, 2021
dbc6cdf
Fix linter issue
raynaudoe Dec 12, 2021
5af35db
apply go mod tidy
emmanuelm41 Dec 12, 2021
2d4f223
apply some lint and test fixes
emmanuelm41 Dec 12, 2021
f57188d
fix url generation on info request on client side
emmanuelm41 Dec 13, 2021
80031a1
fix get url for get op on http server
emmanuelm41 Dec 13, 2021
317b7ef
add default beacon's chain hash to mock server
emmanuelm41 Dec 13, 2021
4fe73ec
Add log info
raynaudoe Dec 13, 2021
9db8a1c
Fix endpoint url
raynaudoe Dec 13, 2021
85cb12b
Minor fix
raynaudoe Dec 13, 2021
b1acc65
Create BeaconHandlers when initiating a beacon process or when dkg fi…
raynaudoe Dec 13, 2021
ca08b7c
Do not load beacon when instantiating beacon process
raynaudoe Dec 13, 2021
bdc23a6
set beacon handler for the default beacon process
emmanuelm41 Dec 13, 2021
2a86990
Merge pull request #38 from Zondax/default-chainhash
raynaudoe Dec 14, 2021
c280c14
fix loading beacon process
emmanuelm41 Dec 14, 2021
c78bd37
Merge pull request #39 from Zondax/multi-beacon-fix
raynaudoe Dec 14, 2021
7d0d18f
change loopback ip for test nodes
emmanuelm41 Dec 14, 2021
97953b5
Add multibeacon support to relay
raynaudoe Dec 14, 2021
a6bbfd4
Fix parsing hash related flags
raynaudoe Dec 14, 2021
d484c7c
Fix flag name
raynaudoe Dec 14, 2021
ec15702
Fix linter errors
raynaudoe Dec 14, 2021
11954ac
add log for error on tests
emmanuelm41 Dec 15, 2021
7afebdd
try to fix error on test
emmanuelm41 Dec 15, 2021
4137837
apply some fixes and improvements
emmanuelm41 Dec 15, 2021
d8385c0
add some comments and rename some functions
emmanuelm41 Dec 15, 2021
57089cc
Fix test attempt
raynaudoe Dec 16, 2021
89c4b16
Increase test timeout
raynaudoe Dec 16, 2021
308730e
add beacon handler for http server through dkg callback
emmanuelm41 Dec 16, 2021
17ce485
update http handler logic to unify some things
emmanuelm41 Dec 16, 2021
3cb0fcb
apply some lint fixes and lock/unlock over beacon slice
emmanuelm41 Dec 16, 2021
49acdce
Unify required modules
raynaudoe Dec 17, 2021
4b98b74
Minor fix
raynaudoe Dec 17, 2021
1953ef1
Use RMutex on getBeaconHandler
raynaudoe Dec 17, 2021
958dd1e
apply go mod tidy
emmanuelm41 Dec 20, 2021
a161ce6
Integration test fix attempt
raynaudoe Dec 21, 2021
1463472
Retry curl if an empty response is received
raynaudoe Dec 22, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions chain/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"crypto/sha256"
"encoding/binary"
"encoding/hex"
"time"

"github.com/drand/drand/common/scheme"
Expand Down Expand Up @@ -60,6 +61,11 @@ func (c *Info) Hash() []byte {
return h.Sum(nil)
}

// HashString returns the value of Hash in string format
func (c *Info) HashString() string {
return hex.EncodeToString(c.Hash())
}

// Equal indicates if two Chain Info objects are equivalent
func (c *Info) Equal(c2 *Info) bool {
return c.GenesisTime == c2.GenesisTime &&
Expand Down
29 changes: 26 additions & 3 deletions client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,16 @@ func TestClientMultiple(t *testing.T) {
addr2, _, cancel2, _ := httpmock.NewMockHTTPPublicServer(t, false, sch)
defer cancel2()

httpClients := http.ForURLs([]string{"http://" + addr1, "http://" + addr2}, chainInfo.Hash())
if len(httpClients) == 0 {
t.Error("http clients is empty")
return
}

var c client.Client
var e error
c, e = client.New(
client.From(http.ForURLs([]string{"http://" + addr1, "http://" + addr2}, chainInfo.Hash())...),
client.From(httpClients...),
client.WithChainHash(chainInfo.Hash()))

if e != nil {
Expand Down Expand Up @@ -93,9 +99,15 @@ func TestClientCache(t *testing.T) {
addr1, chainInfo, cancel, _ := httpmock.NewMockHTTPPublicServer(t, false, sch)
defer cancel()

httpClients := http.ForURLs([]string{"http://" + addr1}, chainInfo.Hash())
if len(httpClients) == 0 {
t.Error("http clients is empty")
return
}

var c client.Client
var e error
c, e = client.New(client.From(http.ForURLs([]string{"http://" + addr1}, chainInfo.Hash())...),
c, e = client.New(client.From(httpClients...),
client.WithChainHash(chainInfo.Hash()), client.WithCacheSize(1))

if e != nil {
Expand Down Expand Up @@ -123,10 +135,16 @@ func TestClientWithoutCache(t *testing.T) {
addr1, chainInfo, cancel, _ := httpmock.NewMockHTTPPublicServer(t, false, sch)
defer cancel()

httpClients := http.ForURLs([]string{"http://" + addr1}, chainInfo.Hash())
if len(httpClients) == 0 {
t.Error("http clients is empty")
return
}

var c client.Client
var e error
c, e = client.New(
client.From(http.ForURLs([]string{"http://" + addr1}, chainInfo.Hash())...),
client.From(httpClients...),
client.WithChainHash(chainInfo.Hash()),
client.WithCacheSize(0))

Expand Down Expand Up @@ -230,6 +248,11 @@ func TestClientAutoWatch(t *testing.T) {
defer cancel()

httpClient := http.ForURLs([]string{"http://" + addr1}, chainInfo.Hash())
if len(httpClient) == 0 {
t.Error("http clients is empty")
return
}

r1, _ := httpClient[0].Get(context.Background(), 1)
r2, _ := httpClient[0].Get(context.Background(), 2)
results := []client.Result{r1, r2}
Expand Down
33 changes: 23 additions & 10 deletions client/grpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"fmt"
"time"

"github.com/drand/drand/protobuf/common"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this in a separate import block and not next to protobuf/drand below?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed


"github.com/drand/drand/chain"
"github.com/drand/drand/client"
"github.com/drand/drand/log"
Expand All @@ -20,14 +22,15 @@ import (
const grpcDefaultTimeout = 5 * time.Second

type grpcClient struct {
address string
client drand.PublicClient
conn *grpc.ClientConn
l log.Logger
address string
chainHash []byte
client drand.PublicClient
conn *grpc.ClientConn
l log.Logger
}

// New creates a drand client backed by a GRPC connection.
func New(address, certPath string, insecure bool) (client.Client, error) {
func New(address, certPath string, insecure bool, chainHash []byte) (client.Client, error) {
emmanuelm41 marked this conversation as resolved.
Show resolved Hide resolved
opts := []grpc.DialOption{}
if certPath != "" {
creds, err := credentials.NewClientTLSFromFile(certPath, "")
Expand All @@ -48,7 +51,8 @@ func New(address, certPath string, insecure bool) (client.Client, error) {
if err != nil {
return nil, err
}
return &grpcClient{address, drand.NewPublicClient(conn), conn, log.DefaultLogger()}, nil

return &grpcClient{address, chainHash, drand.NewPublicClient(conn), conn, log.DefaultLogger()}, nil
}

func asRD(r *drand.PublicRandResponse) *client.RandomData {
Expand All @@ -67,7 +71,9 @@ func (g *grpcClient) String() string {

// Get returns a the randomness at `round` or an error.
func (g *grpcClient) Get(ctx context.Context, round uint64) (client.Result, error) {
curr, err := g.client.PublicRand(ctx, &drand.PublicRandRequest{Round: round})
metadata := common.Metadata{ChainHash: g.chainHash}

curr, err := g.client.PublicRand(ctx, &drand.PublicRandRequest{Round: round, Metadata: &metadata})
if err != nil {
return nil, err
}
Expand All @@ -80,7 +86,9 @@ func (g *grpcClient) Get(ctx context.Context, round uint64) (client.Result, erro

// Watch returns new randomness as it becomes available.
func (g *grpcClient) Watch(ctx context.Context) <-chan client.Result {
stream, err := g.client.PublicRandStream(ctx, &drand.PublicRandRequest{Round: 0})
metadata := common.Metadata{ChainHash: g.chainHash}

stream, err := g.client.PublicRandStream(ctx, &drand.PublicRandRequest{Round: 0, Metadata: &metadata})
ch := make(chan client.Result, 1)
if err != nil {
close(ch)
Expand All @@ -92,7 +100,9 @@ func (g *grpcClient) Watch(ctx context.Context) <-chan client.Result {

// Info returns information about the chain.
func (g *grpcClient) Info(ctx context.Context) (*chain.Info, error) {
proto, err := g.client.ChainInfo(ctx, &drand.ChainInfoRequest{})
metadata := common.Metadata{ChainHash: g.chainHash}

proto, err := g.client.ChainInfo(ctx, &drand.ChainInfoRequest{Metadata: &metadata})
if err != nil {
return nil, err
}
Expand All @@ -119,7 +129,10 @@ func (g *grpcClient) translate(stream drand.Public_PublicRandStreamClient, out c
func (g *grpcClient) RoundAt(t time.Time) uint64 {
ctx, cancel := context.WithTimeout(context.Background(), grpcDefaultTimeout)
defer cancel()
info, err := g.client.ChainInfo(ctx, &drand.ChainInfoRequest{})

metadata := common.Metadata{ChainHash: g.chainHash}
emmanuelm41 marked this conversation as resolved.
Show resolved Hide resolved

info, err := g.client.ChainInfo(ctx, &drand.ChainInfoRequest{Metadata: &metadata})
if err != nil {
return 0
}
Expand Down
4 changes: 2 additions & 2 deletions client/grpc/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func TestClient(t *testing.T) {
go l.Start()
defer l.Stop(context.Background())

c, err := New(addr, "", true)
c, err := New(addr, "", true, []byte(""))
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -71,7 +71,7 @@ func TestClientClose(t *testing.T) {
go l.Start()
defer l.Stop(context.Background())

c, err := New(addr, "", true)
c, err := New(addr, "", true, []byte(""))
if err != nil {
t.Fatal(err)
}
Expand Down
17 changes: 12 additions & 5 deletions client/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ type httpInfoResponse struct {
err error
}

// FetchGroupInfo attempts to initialize an httpClient when
// FetchChainInfo attempts to initialize an httpClient when
// it does not know the full group parameters for a drand group. The chain hash
// is the hash of the chain info.
func (h *httpClient) FetchChainInfo(ctx context.Context, chainHash []byte) (*chain.Info, error) {
Expand All @@ -243,7 +243,14 @@ func (h *httpClient) FetchChainInfo(ctx context.Context, chainHash []byte) (*cha
defer cancel()

go func() {
req, err := nhttp.NewRequestWithContext(ctx, "GET", fmt.Sprintf("%sinfo", h.root), nhttp.NoBody)
url := ""
if len(chainHash) > 0 {
url = fmt.Sprintf("%s%x/info", h.root, chainHash)
} else {
url = fmt.Sprintf("%sinfo", h.root)
}

req, err := nhttp.NewRequestWithContext(ctx, "GET", url, nhttp.NoBody)
if err != nil {
resC <- httpInfoResponse{nil, fmt.Errorf("creating request: %w", err)}
return
Expand Down Expand Up @@ -295,13 +302,13 @@ type httpGetResponse struct {
err error
}

// Get returns a the randomness at `round` or an error.
// Get returns the randomness at `round` or an error.
func (h *httpClient) Get(ctx context.Context, round uint64) (client.Result, error) {
var url string
if round == 0 {
url = fmt.Sprintf("%spublic/latest", h.root)
url = fmt.Sprintf("%s%x/public/latest", h.root, h.chainInfo.Hash())
} else {
url = fmt.Sprintf("%spublic/%d", h.root, round)
url = fmt.Sprintf("%s%x/public/%d", h.root, h.chainInfo.Hash(), round)
}

resC := make(chan httpGetResponse, 1)
Expand Down
4 changes: 3 additions & 1 deletion client/test/http/mock/httpserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,14 @@ func NewMockHTTPPublicServer(t *testing.T, badSecondRound bool, sch scheme.Schem
t.Fatal("could not use server after 3 attempts.")
}

handler.HandlerDrand.CreateBeaconHandler(client, chainInfo.HashString())

listener, err := net.Listen("tcp", ":0")
if err != nil {
t.Fatal(err)
}

httpServer := http.Server{Handler: handler}
httpServer := http.Server{Handler: handler.HandlerHTTP}
go httpServer.Serve(listener)

return listener.Addr().String(), chainInfo, func() {
Expand Down
22 changes: 21 additions & 1 deletion cmd/client/lib/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ var (
Usage: "The hash (in hex) for the chain to follow",
Aliases: []string{"chain-hash"}, // DEPRECATED
}
// HashListFlag is the CLI flag for the hashes list (in hex) for the relay to follow.
HashListFlag = &cli.StringSliceFlag{
Name: "hash-list",
Usage: "The hash list (in hex) for the relay to follow",
}
// GroupConfFlag is the CLI flag for specifying the path to the drand group configuration (TOML encoded) or chain info (JSON encoded).
GroupConfFlag = &cli.PathFlag{
Name: "group-conf",
Expand Down Expand Up @@ -155,7 +160,22 @@ func Create(c *cli.Context, withInstrumentation bool, opts ...client.Option) (cl

func buildGrpcClient(c *cli.Context, info **chain.Info) ([]client.Client, error) {
emmanuelm41 marked this conversation as resolved.
Show resolved Hide resolved
if c.IsSet(GRPCConnectFlag.Name) {
gc, err := grpc.New(c.String(GRPCConnectFlag.Name), c.String(CertFlag.Name), c.Bool(InsecureFlag.Name))
hash := make([]byte, 0)

if c.IsSet(HashFlag.Name) {
var err error

hash, err = hex.DecodeString(c.String(HashFlag.Name))
if err != nil {
return nil, err
}
}

if *info != nil && len(hash) == 0 {
hash = (*info).Hash()
}

gc, err := grpc.New(c.String(GRPCConnectFlag.Name), c.String(CertFlag.Name), c.Bool(InsecureFlag.Name), hash)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/drand-cli/cli_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -775,7 +775,7 @@ func launchDrandInstances(t *testing.T, n int) ([]*drandInstance, string) {
pubPath := path.Join(tmpPath, "pub.key")

freePort := test.FreePort()
addr := "127.0.0." + strconv.Itoa(i) + ":" + freePort
addr := "127.0.0.1:" + freePort
ctrlPort := test.FreePort()
metricsPort := test.FreePort()

Expand Down
2 changes: 1 addition & 1 deletion cmd/drand-cli/public.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func getPublicRandomness(c *cli.Context) error {
var resp client.Result
var foundCorrect bool
for _, id := range ids {
grpcClient, err := grpc.New(id.Addr, certPath, !id.TLS)
grpcClient, err := grpc.New(id.Addr, certPath, !id.TLS, group.Hash())
if err != nil {
fmt.Fprintf(os.Stderr, "drand: could not connect to %s: %s", id.Addr, err)
break
Expand Down
40 changes: 35 additions & 5 deletions cmd/relay/main.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package main

import (
"encoding/hex"
"fmt"
"net"
"net/http"
"net/http/httptest"
"os"

dclient "github.com/drand/drand/client"
"github.com/drand/drand/cmd/client/lib"
"github.com/drand/drand/common"
dhttp "github.com/drand/drand/http"
Expand Down Expand Up @@ -44,6 +46,7 @@ var metricsFlag = &cli.StringFlag{
}

// Relay a GRPC connection to an HTTP server.
// nolint:gocyclo
func Relay(c *cli.Context) error {
version := common.GetAppVersion()

Expand All @@ -66,15 +69,42 @@ func Relay(c *cli.Context) error {
return fmt.Errorf("failed to create rest handler: %w", err)
}

if c.IsSet(lib.HashFlag.Name) {
hash, err := hex.DecodeString(c.String(lib.HashFlag.Name))
if err != nil {
return fmt.Errorf("failed to decode hash flag: %w", err)
}
handler.HandlerDrand.CreateBeaconHandler(client, string(hash))
} else {
if c.IsSet(lib.HashListFlag.Name) {
hashList := c.StringSlice(lib.HashListFlag.Name)
for _, hashHex := range hashList {
hash, err := hex.DecodeString(hashHex)
if err != nil {
return fmt.Errorf("failed to decode hash flag: %w", err)
}

c, err := lib.Create(c, c.IsSet(metricsFlag.Name), dclient.WithChainHash(hash))
if err != nil {
return err
}

handler.HandlerDrand.CreateBeaconHandler(c, fmt.Sprintf("%x", hash))
}
} else {
return fmt.Errorf("must specify flag %s or %s", lib.HashFlag.Name, lib.HashListFlag.Name)
emmanuelm41 marked this conversation as resolved.
Show resolved Hide resolved
}
}

if c.IsSet(accessLogFlag.Name) {
logFile, err := os.OpenFile(c.String(accessLogFlag.Name), os.O_CREATE|os.O_APPEND|os.O_WRONLY, accessLogPermFolder)
if err != nil {
return fmt.Errorf("failed to open access log: %w", err)
}
defer logFile.Close()
handler = handlers.CombinedLoggingHandler(logFile, handler)
handler.HandlerHTTP = handlers.CombinedLoggingHandler(logFile, handler.HandlerHTTP)
} else {
handler = handlers.CombinedLoggingHandler(os.Stdout, handler)
handler.HandlerHTTP = handlers.CombinedLoggingHandler(os.Stdout, handler.HandlerHTTP)
}

bind := "localhost:0"
Expand All @@ -89,13 +119,13 @@ func Relay(c *cli.Context) error {
// jumpstart bootup
req, _ := http.NewRequest("GET", "/public/0", http.NoBody)
rr := httptest.NewRecorder()
handler.ServeHTTP(rr, req)
handler.HandlerHTTP.ServeHTTP(rr, req)
if rr.Code != http.StatusOK {
log.DefaultLogger().Warnw("", "binary", "relay", "startup failed", rr.Code)
}

fmt.Printf("Listening at %s\n", listener.Addr())
return http.Serve(listener, handler)
return http.Serve(listener, handler.HandlerHTTP)
}

func main() {
Expand All @@ -105,7 +135,7 @@ func main() {
Name: "relay",
Version: version.String(),
Usage: "Relay a Drand group to a public HTTP Rest API",
Flags: append(lib.ClientFlags, listenFlag, accessLogFlag, metricsFlag),
Flags: append(lib.ClientFlags, lib.HashListFlag, listenFlag, accessLogFlag, metricsFlag),
Action: Relay,
}
cli.VersionPrinter = func(c *cli.Context) {
Expand Down
Loading