Skip to content
This repository has been archived by the owner on Aug 13, 2024. It is now read-only.

Add ability for control servers to broadcast to hubs via gRPC #40

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 26 additions & 12 deletions cmd/hzn/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/hashicorp/horizon/pkg/grpc/lz4"
grpctoken "github.com/hashicorp/horizon/pkg/grpc/token"
"github.com/hashicorp/horizon/pkg/hub"
"github.com/hashicorp/horizon/pkg/netloc"
"github.com/hashicorp/horizon/pkg/pb"
"github.com/hashicorp/horizon/pkg/periodic"
"github.com/hashicorp/horizon/pkg/tlsmanage"
Expand Down Expand Up @@ -486,6 +487,20 @@ func (h *hubRunner) Run(args []string) int {

deployment := os.Getenv("K8_DEPLOYMENT")

var labels *pb.LabelSet

strLabels := os.Getenv("LOCATION_LABELS")
if strLabels != "" {
labels = pb.ParseLabelSet(os.Getenv(strLabels))
}

// Learn our network location so we can populate consul. This is used for
// control to connect to this hub for grpc messages.
locs, err := netloc.Locate(labels)
if err != nil {
log.Fatal(err)
}

// We want to have the control client filter use ConsulHealth,
// so we establish that here for use as a filter.

Expand All @@ -510,7 +525,16 @@ func (h *hubRunner) Run(args []string) int {
L.Info("consul running, leader detected", "leader", leader)
L.Info("starting consul health monitoring")

ch, err = hub.NewConsulHealth(instanceId.SpecString(), ccfg)
var addr string

for _, loc := range locs {
if loc.IsPublic() {
addr = loc.Addresses[0]
break
}
}

ch, err = hub.NewConsulHealth(instanceId.SpecString(), ccfg, addr)
if err != nil {
log.Fatal(err)
}
Expand Down Expand Up @@ -556,17 +580,7 @@ func (h *hubRunner) Run(args []string) int {
client.Close(ctx)
}()

var labels *pb.LabelSet

strLabels := os.Getenv("LOCATION_LABELS")
if strLabels != "" {
labels = pb.ParseLabelSet(os.Getenv(strLabels))
}

locs, err := client.LearnLocations(labels)
if err != nil {
log.Fatal(err)
}
client.SetLocations(locs)

err = client.BootstrapConfig(ctx)
if err != nil {
Expand Down
156 changes: 156 additions & 0 deletions pkg/control/broadcast.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
package control

import (
context "context"
"crypto/tls"
"crypto/x509"
"sync"

"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-multierror"
"github.com/hashicorp/horizon/pkg/grpc/lz4"
grpctoken "github.com/hashicorp/horizon/pkg/grpc/token"
"github.com/hashicorp/horizon/pkg/pb"
"github.com/hashicorp/horizon/pkg/utils"
"google.golang.org/grpc"
gcreds "google.golang.org/grpc/credentials"
)

// HubCatalog is a simple interface to decouple the gather and management of hub addresses from
// the code that broadcasts to them. This is implemented by ConsulMonitor and used
// primarily in production.
type HubCatalog interface {
Targets() []string
}

// Broadcaster is a simple fan out value. The commands sent to it via funciton calls are
// fanned out to all targets in the given HubCatalog.
type Broadcaster struct {
L hclog.Logger
catalog HubCatalog
conn func(addr string) (pb.HubServicesClient, error)
}

// NewBroadcaster creates a new Broadcaster value. The targets to broadcast to come from
// catalog. conn is how we actually open a connection to the target. This conn decoupling
// makes this code much easier to test. In production, conn is usually GRPCDial.Dial.
func NewBroadcaster(
L hclog.Logger,
catalog HubCatalog,
conn func(addr string) (pb.HubServicesClient, error),
) (*Broadcaster, error) {
br := &Broadcaster{
L: L,
catalog: catalog,
conn: conn,
}

return br, nil
}

// AdvertiseServices gets a list of targets from the catalog and calls AddService
// on the clients generated from the connect function (which defaults to dialing a grpc
// connection to the target)
func (b *Broadcaster) AdvertiseServices(ctx context.Context, as *pb.AccountServices) error {
var topError error

targets := b.catalog.Targets()

b.L.Info("hub broadcasting beginning", "targets", len(targets))

for _, tgt := range targets {
b.L.Info("broadcasting hub update", "target", tgt)
client, err := b.conn(tgt)
if err != nil {
topError = multierror.Append(topError, err)
continue
}

_, err = client.AddServices(ctx, as)
if err != nil {
topError = multierror.Append(topError, err)
}
}

return topError
}

// GRPCDial provides connection pooling grpc connections to hubs. It is used to
// avoid spinning up new TCP connections to hubs on every advertise operation.
type GRPCDial struct {
token string
cert []byte

mu sync.RWMutex
grpcConns map[string]*grpc.ClientConn

tlscfg tls.Config
}

// NewGRPCDial creates a new GRPCDial value. The given token is the authentication
// token that will be included with all calls to the hubs, to identify them as valid.
// cert is a TLS certification that, if set, will be used as the only cert in the TLS
// RootCAs. This further restricts the code to calling valid hubs by making sure that
// the code is only talking to hubs that are using the certs managed by control.
func NewGRPCDial(token string, cert []byte) (*GRPCDial, error) {
g := &GRPCDial{
token: token,
cert: cert,
grpcConns: make(map[string]*grpc.ClientConn),
}

if g.cert != nil {
parsedHubCert, err := utils.ParseCertificate(cert)
if err != nil {
return nil, err
}

g.tlscfg.RootCAs = x509.NewCertPool()
g.tlscfg.RootCAs.AddCert(parsedHubCert)
}

return g, nil
}

// Dial gets a gRPC client for target. It either generates a new gRPC connection
// to the given target, used as a host:port combo. Or it returns a existing
// connection.
func (g *GRPCDial) Dial(target string) (pb.HubServicesClient, error) {
g.mu.RLock()
cc, ok := g.grpcConns[target]
g.mu.RUnlock()

if ok {
return pb.NewHubServicesClient(cc), nil
}

g.mu.Lock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to use both RLock and Lock here in sequence? I guess this relates to your comment about there being a race condition -- does the race condition require we use a Lock on our second check instead of an RLock (is there a difference in behavior that contributes to a race condition), or is the reasoning "we need to do a Write, but first we have to check that Read didn't flake out on us previously, and that check has to be within the same Lock as the eventual write-action"?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think of it like a table lock in SQL. RLock locks all threads from reading, then sets the current target. Such that everyone behind this current Dial call has to wait to read until it is done. Only then can hypothetically the next person in line read the value, or not to see that it was changed by the person ahead of them. But this is in the weeds for me.

defer g.mu.Unlock()

// There is a race here so we have to check again.
cc, ok = g.grpcConns[target]
if ok {
return pb.NewHubServicesClient(cc), nil
}

opts := []grpc.DialOption{
grpc.WithDefaultCallOptions(grpc.UseCompressor(lz4.Name)),
}

if g.token != "" {
opts = append(opts, grpc.WithPerRPCCredentials(grpctoken.Token(g.token)))
}

creds := gcreds.NewTLS(&g.tlscfg)

opts = append(opts, grpc.WithTransportCredentials(creds))

cc, err := grpc.Dial(target, opts...)
if err != nil {
return nil, err
}

g.grpcConns[target] = cc

return pb.NewHubServicesClient(cc), nil
}
77 changes: 77 additions & 0 deletions pkg/control/broadcast_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package control

import (
"context"
"testing"

"github.com/hashicorp/go-hclog"
"github.com/hashicorp/horizon/pkg/pb"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
)

type fakeCatalog struct {
targets []string
}

func (f *fakeCatalog) Targets() []string {
return f.targets
}

type fakeClient struct {
addr string
services []*pb.AccountServices
}

func (f *fakeClient) AddServices(ctx context.Context, in *pb.AccountServices, opts ...grpc.CallOption) (*pb.Noop, error) {
f.services = append(f.services, in)
return &pb.Noop{}, nil
}

func (f *fakeClient) AddLabeLink(ctx context.Context, in *pb.LabelLinks, opts ...grpc.CallOption) (*pb.Noop, error) {
panic("not implemented") // TODO: Implement
}

func TestBroadcaster(t *testing.T) {
t.Run("fans out new account services to all hubs", func(t *testing.T) {
var (
fcat fakeCatalog
fcli fakeClient
)

conn := func(addr string) (pb.HubServicesClient, error) {
fcli.addr = addr

return &fcli, nil
}

fcat.targets = []string{"1.2.3.4"}

bc, err := NewBroadcaster(hclog.L(), &fcat, conn)
require.NoError(t, err)

as := &pb.AccountServices{
Account: &pb.Account{
Namespace: "/",
AccountId: pb.NewULID(),
},
Services: []*pb.ServiceRoute{
{
Hub: pb.NewULID(),
},
},
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

err = bc.AdvertiseServices(ctx, as)
require.NoError(t, err)

assert.Equal(t, "1.2.3.4", fcli.addr)
require.Equal(t, 1, len(fcli.services))
assert.Equal(t, as, fcli.services[0])

})
}
Loading