Skip to content

Commit

Permalink
Put GRPC and HTTP APIs on separate ports. (#274)
Browse files Browse the repository at this point in the history
* Splitting the listener into separate GRPC and HTTP/JSON listeners.
* Provide separate TLS servers for REST Public API and GRPC Public+Protocol API.
* Link pub/priv server separation into Drand node and args.
* Fix context propagation.
* Cleanup REST variants from GRPC.
* Tests use available ports.
* Update orchestrator to use public API REST address.
  • Loading branch information
petar authored May 13, 2020
1 parent a7620bd commit 16b6e56
Show file tree
Hide file tree
Showing 16 changed files with 499 additions and 376 deletions.
8 changes: 6 additions & 2 deletions beacon/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,11 @@ func (b *BeaconTest) ServeBeacon(i int) {
j := b.searchNode(i)
beaconServer := &testBeaconServer{h: b.nodes[j].handler}
b.nodes[j].server = beaconServer
b.nodes[j].listener = net.NewTCPGrpcListener(b.nodes[j].private.Public.Address(), beaconServer)
var err error
b.nodes[j].listener, err = net.NewGRPCListenerForPrivate(context.Background(), b.nodes[j].private.Public.Address(), beaconServer)
if err != nil {
panic(err)
}
fmt.Printf("\n || Serve Beacon for node %d - %p --> %s\n", j, b.nodes[j].handler, b.nodes[j].private.Public.Address())
go b.nodes[j].listener.Start()
}
Expand Down Expand Up @@ -278,7 +282,7 @@ func (b *BeaconTest) StopBeacon(i int) {
if !n.started {
return
}
n.listener.Stop()
n.listener.Stop(context.Background())
n.handler.Stop()
n.started = false
}
Expand Down
6 changes: 0 additions & 6 deletions core/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,6 @@ func TestClientPrivate(t *testing.T) {
require.NotNil(t, buff)
require.Len(t, buff, 32)

client = NewRESTClientFromCert(drands[0].opts.certmanager)
buff, err = client.Private(pub)
require.Nil(t, err)
require.NotNil(t, buff)
require.Len(t, buff, 32)

drands[0].opts.enablePrivate = false
client = NewGrpcClientFromCert(drands[0].opts.certmanager)
buff, err = client.Private(pub)
Expand Down
69 changes: 44 additions & 25 deletions core/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,24 @@ type ConfigOption func(*Config)

// Config holds all relevant information for a drand node to run.
type Config struct {
configFolder string
dbFolder string
listenAddr string
controlPort string
grpcOpts []grpc.DialOption
callOpts []grpc.CallOption
dkgTimeout time.Duration
boltOpts *bolt.Options
beaconCbs []func(*beacon.Beacon)
dkgCallback func(*key.Share)
insecure bool
certPath string
keyPath string
certmanager *net.CertManager
logger log.Logger
clock clock.Clock
enablePrivate bool
configFolder string
dbFolder string
privateListenAddr string
publicListenAddr string
controlPort string
grpcOpts []grpc.DialOption
callOpts []grpc.CallOption
dkgTimeout time.Duration
boltOpts *bolt.Options
beaconCbs []func(*beacon.Beacon)
dkgCallback func(*key.Share)
insecure bool
certPath string
keyPath string
certmanager *net.CertManager
logger log.Logger
clock clock.Clock
enablePrivate bool
}

// NewConfig returns the config to pass to drand with the default options set
Expand Down Expand Up @@ -78,11 +79,20 @@ func (d *Config) Certs() *net.CertManager {
return d.certmanager
}

// ListenAddress returns the given default address or the listen address stored
// in the config thanks to WithListenAddress
func (d *Config) ListenAddress(defaultAddr string) string {
if d.listenAddr != "" {
return d.listenAddr
// PrivateListenAddress returns the given default address or the listen address stored
// in the config thanks to WithPrivateListenAddress
func (d *Config) PrivateListenAddress(defaultAddr string) string {
if d.privateListenAddr != "" {
return d.privateListenAddr
}
return defaultAddr
}

// PublicListenAddress returns the given default address or the listen address stored
// in the config thanks to WithPublicListenAddress
func (d *Config) PublicListenAddress(defaultAddr string) string {
if d.publicListenAddr != "" {
return d.publicListenAddr
}
return defaultAddr
}
Expand Down Expand Up @@ -211,12 +221,21 @@ func WithTrustedCerts(certPaths ...string) ConfigOption {
}
}

// WithListenAddress specifies the address the drand instance should bind to. It
// WithPublicListenAddress specifies the address the drand instance should bind to. It
// is useful if you want to advertise a public proxy address and the drand
// instance runs behind your network.
func WithPublicListenAddress(addr string) ConfigOption {
return func(d *Config) {
d.publicListenAddr = addr
}
}

// WithPrivateListenAddress specifies the address the drand instance should bind to. It
// is useful if you want to advertise a public proxy address and the drand
// instance runs behind your network.
func WithListenAddress(addr string) ConfigOption {
func WithPrivateListenAddress(addr string) ConfigOption {
return func(d *Config) {
d.listenAddr = addr
d.privateListenAddr = addr
}
}

Expand Down
53 changes: 42 additions & 11 deletions core/drand.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package core

import (
"context"
"errors"
"fmt"
"strings"
Expand All @@ -24,9 +25,10 @@ type Drand struct {
group *key.Group
index int

store key.Store
gateway net.Gateway
control net.ControlListener
store key.Store
privGateway *net.PrivateGateway
pubGateway *net.PublicGateway
control net.ControlListener

// handle all callbacks when a new beacon is found
callbacks *callbackManager
Expand Down Expand Up @@ -92,19 +94,45 @@ func initDrand(s key.Store, c *Config) (*Drand, error) {
d.callbacks.AddCallback(callbackID, d.opts.callbacks)
//d.callbacks.AddCallback(cacheID, d.cache.StoreTemp)

a := c.ListenAddress(priv.Public.Address())
// Set the private API address to the command-line flag, if given.
// Otherwise, set it to the address associated with stored private key.
privAddr := c.PrivateListenAddress(priv.Public.Address())
pubAddr := c.PublicListenAddress("")
// ctx is used to create the gateway below.
// Gateway constructors (specifically, the generated gateway stubs that require it)
// do not actually use it, so we are passing a background context to be safe.
ctx := context.Background()
if c.insecure {
var err error
d.log.Info("network", "tls-disable")
d.gateway = net.NewGrpcGatewayInsecure(a, d, d.opts.grpcOpts...)
if pubAddr != "" {
if d.pubGateway, err = net.NewRESTPublicGatewayWithoutTLS(ctx, pubAddr, d, d.opts.grpcOpts...); err != nil {
return nil, err
}
}
if d.privGateway, err = net.NewGRPCPrivateGatewayWithoutTLS(ctx, privAddr, d, d.opts.grpcOpts...); err != nil {
return nil, err
}
} else {
var err error
d.log.Info("network", "tls-enabled")
d.gateway = net.NewGrpcGatewayFromCertManager(a, c.certPath, c.keyPath, c.certmanager, d, d.opts.grpcOpts...)
if pubAddr != "" {
if d.pubGateway, err = net.NewRESTPublicGatewayWithTLS(ctx, pubAddr, c.certPath, c.keyPath, c.certmanager, d, d.opts.grpcOpts...); err != nil {
return nil, err
}
}
if d.privGateway, err = net.NewGRPCPrivateGatewayWithTLS(ctx, privAddr, c.certPath, c.keyPath, c.certmanager, d, d.opts.grpcOpts...); err != nil {
return nil, err
}
}
p := c.ControlPort()
d.control = net.NewTCPGrpcControlListener(d, p)
go d.control.Start()
d.log.Info("network_listen", a, "control_port", c.ControlPort())
d.gateway.StartAll()
d.log.Info("private_listen", privAddr, "control_port", c.ControlPort(), "public_listen", pubAddr)
d.privGateway.StartAll()
if d.pubGateway != nil {
d.pubGateway.StartAll()
}
return d, nil
}

Expand Down Expand Up @@ -259,10 +287,13 @@ func (d *Drand) StopBeacon() {
}

// Stop simply stops all drand operations.
func (d *Drand) Stop() {
func (d *Drand) Stop(ctx context.Context) {
d.StopBeacon()
d.state.Lock()
d.gateway.StopAll()
if d.pubGateway != nil {
d.pubGateway.StopAll(ctx)
}
d.privGateway.StopAll(ctx)
d.control.Stop()
d.state.Unlock()
d.exitCh <- true
Expand Down Expand Up @@ -301,7 +332,7 @@ func (d *Drand) newBeacon() (*beacon.Handler, error) {
Share: d.share,
Clock: d.opts.clock,
}
beacon, err := beacon.NewHandler(d.gateway.ProtocolClient, store, conf, d.log)
beacon, err := beacon.NewHandler(d.privGateway.ProtocolClient, store, conf, d.log)
if err != nil {
return nil, err
}
Expand Down
12 changes: 6 additions & 6 deletions core/drand_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func (d *Drand) runDKG(leader bool, group *key.Group, timeout uint32, entropy *c
if err != nil {
return nil, fmt.Errorf("drand: invalid timeout: %s", err)
}
board := newBoard(d.log, d.gateway.ProtocolClient, group)
board := newBoard(d.log, d.privGateway.ProtocolClient, group)
protoConf := &dkg.Config{
DkgConfig: dkgConfig,
Auth: key.AuthScheme,
Expand Down Expand Up @@ -206,7 +206,7 @@ func (d *Drand) runResharing(leader bool, oldGroup, newGroup *key.Group, timeout
if err != nil {
return nil, err
}
board := newReshareBoard(d.log, d.gateway.ProtocolClient, oldGroup, newGroup, d.priv.Public)
board := newReshareBoard(d.log, d.privGateway.ProtocolClient, oldGroup, newGroup, d.priv.Public)
protoConf := &dkg.Config{
DkgConfig: dkgConfig,
Auth: key.AuthScheme,
Expand Down Expand Up @@ -296,7 +296,7 @@ func (d *Drand) setupAutomaticDKG(c context.Context, in *control.InitDKGPacket)
}

d.log.Debug("init_dkg", "send_key", "leader", lpeer.Address())
err = d.gateway.ProtocolClient.SignalDKGParticipant(context.Background(), lpeer, prep)
err = d.privGateway.ProtocolClient.SignalDKGParticipant(context.Background(), lpeer, prep)
if err != nil {
return nil, fmt.Errorf("drand: err when receiving group: %s", err)
}
Expand Down Expand Up @@ -386,7 +386,7 @@ func (d *Drand) setupAutomaticResharing(c context.Context, oldGroup *key.Group,
defer cancel()

d.log.Info("setup_reshare", "signalling_key_to_leader")
err = d.gateway.ProtocolClient.SignalDKGParticipant(nc, lpeer, prep)
err = d.privGateway.ProtocolClient.SignalDKGParticipant(nc, lpeer, prep)
if err != nil {
return nil, fmt.Errorf("drand: err when receiving group: %s", err)
}
Expand Down Expand Up @@ -622,7 +622,7 @@ func (d *Drand) GroupFile(ctx context.Context, in *control.GroupRequest) (*contr
}

func (d *Drand) Shutdown(ctx context.Context, in *control.ShutdownRequest) (*control.ShutdownResponse, error) {
d.Stop()
d.Stop(ctx)
return nil, nil
}

Expand Down Expand Up @@ -678,7 +678,7 @@ func (d *Drand) pushDKGInfo(to []*key.Node, packet *drand.DKGInfoPacket) error {
continue
}
go func(i *key.Identity) {
err := d.gateway.ProtocolClient.PushDKGInfo(ctx, i, packet)
err := d.privGateway.ProtocolClient.PushDKGInfo(ctx, i, packet)
if err != nil {
d.log.Error("push_dkg", err, "to", i.Address())
} else {
Expand Down
16 changes: 8 additions & 8 deletions core/drand_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ func (d *DrandTest2) GetDrand(id string, newGroup bool) *Node {
func (d *DrandTest2) StopDrand(id string, newGroup bool) {
node := d.GetDrand(id, newGroup)
dr := node.drand
dr.Stop()
dr.Stop(context.Background())
pinger, err := net.NewControlClient(dr.opts.controlPort)
require.NoError(d.t, err)
var counter = 1
Expand Down Expand Up @@ -566,7 +566,6 @@ func TestDrandPublicGroup(t *testing.T) {
//client := NewGrpcClient()
cm := dt.nodes[0].drand.opts.certmanager
client := NewGrpcClientFromCert(cm)
rest := net.NewRestClientFromCertManager(cm)
for _, node := range dt.nodes {
d := node.drand
groupResp, err := client.Group(d.priv.Public.Address(), d.priv.Public.TLS)
Expand All @@ -591,11 +590,12 @@ func TestDrandPublicGroup(t *testing.T) {
require.True(t, found)
}

restGroup, err := rest.Group(context.TODO(), dt.nodes[0].drand.priv.Public, &drand.GroupRequest{})
require.NoError(t, err)
received, err := key.GroupFromProto(restGroup)
require.NoError(t, err)
require.True(t, group.Equal(received))
// rest := net.NewRestClientFromCertManager(cm)
// restGroup, err := rest.Group(context.TODO(), dt.nodes[0].drand.priv.Public, &drand.GroupRequest{})
// require.NoError(t, err)
// received, err := key.GroupFromProto(restGroup)
// require.NoError(t, err)
// require.True(t, group.Equal(received))
}

// Test if the we can correctly fetch the rounds after a DKG using the
Expand Down Expand Up @@ -781,7 +781,7 @@ func BatchNewDrand(n int, insecure bool, opts ...ConfigOption) ([]*Drand, *key.G
// CloseAllDrands closes all drands
func CloseAllDrands(drands []*Drand) {
for i := 0; i < len(drands); i++ {
drands[i].Stop()
drands[i].Stop(context.Background())
//os.RemoveAll(drands[i].opts.dbFolder)
}
}
Expand Down
Loading

0 comments on commit 16b6e56

Please sign in to comment.