Skip to content

Commit

Permalink
Embedded NATS Broker (#1110)
Browse files Browse the repository at this point in the history
* if the address is produced by a default route don't hash it

* embedded nats

* fix url parsing

* don't override help

* add ready flag
  • Loading branch information
asim committed Jan 14, 2020
1 parent b699d96 commit 1d311ab
Show file tree
Hide file tree
Showing 10 changed files with 308 additions and 41 deletions.
13 changes: 4 additions & 9 deletions broker/http_broker.go
Expand Up @@ -106,8 +106,9 @@ func newTransport(config *tls.Config) *http.Transport {

func newHttpBroker(opts ...Option) Broker {
options := Options{
Codec: json.Marshaler{},
Context: context.TODO(),
Codec: json.Marshaler{},
Context: context.TODO(),
Registry: registry.DefaultRegistry,
}

for _, o := range opts {
Expand All @@ -120,17 +121,11 @@ func newHttpBroker(opts ...Option) Broker {
addr = options.Addrs[0]
}

// get registry
reg, ok := options.Context.Value(registryKey).(registry.Registry)
if !ok {
reg = registry.DefaultRegistry
}

h := &httpBroker{
id: uuid.New().String(),
address: addr,
opts: options,
r: reg,
r: options.Registry,
c: &http.Client{Transport: newTransport(options.TLSConfig)},
subscribers: make(map[string][]*httpSubscriber),
exit: make(chan chan error),
Expand Down
254 changes: 231 additions & 23 deletions broker/nats/nats.go
Expand Up @@ -4,23 +4,44 @@ package nats
import (
"context"
"errors"
"net"
"net/url"
"strconv"
"strings"
"sync"
"time"

"github.com/micro/go-micro/broker"
"github.com/micro/go-micro/codec/json"
"github.com/micro/go-micro/registry"
"github.com/micro/go-micro/util/addr"
"github.com/micro/go-micro/util/log"
"github.com/nats-io/nats-server/v2/server"
nats "github.com/nats-io/nats.go"
)

type natsBroker struct {
sync.Once
sync.RWMutex
addrs []string
conn *nats.Conn
opts broker.Options
nopts nats.Options

// indicate if we're connected
connected bool

addrs []string
conn *nats.Conn
opts broker.Options
nopts nats.Options

// should we drain the connection
drain bool
closeCh chan (error)

// embedded server
server *server.Server
// configure to use local server
local bool
// server exit channel
exit chan bool
}

type subscriber struct {
Expand Down Expand Up @@ -62,14 +83,15 @@ func (n *natsBroker) Address() string {
if n.conn != nil && n.conn.IsConnected() {
return n.conn.ConnectedUrl()
}

if len(n.addrs) > 0 {
return n.addrs[0]
}

return ""
}

func setAddrs(addrs []string) []string {
func (n *natsBroker) setAddrs(addrs []string) []string {
//nolint:prealloc
var cAddrs []string
for _, addr := range addrs {
Expand All @@ -81,16 +103,178 @@ func setAddrs(addrs []string) []string {
}
cAddrs = append(cAddrs, addr)
}
if len(cAddrs) == 0 {
// if there's no address and we weren't told to
// embed a local server then use the default url
if len(cAddrs) == 0 && !n.local {
cAddrs = []string{nats.DefaultURL}
}
return cAddrs
}

// serve stats a local nats server if needed
func (n *natsBroker) serve(exit chan bool) error {
var host string
var port int
var local bool

// with no address we just default it
// this is a local client address
if len(n.addrs) == 0 || n.local {
host = "127.0.0.1"
port = -1
local = true
// with a local address we parse it
} else {
address := n.addrs[0]
if strings.HasPrefix(address, "nats://") {
address = strings.TrimPrefix(address, "nats://")
}

if addr.IsLocal(address) {
h, p, err := net.SplitHostPort(address)
if err == nil {
host = h
port, _ = strconv.Atoi(p)
local = true
}
}
}

// we only setup a server for local things
if !local {
return nil
}

// 1. create new server
// 2. register the server
// 3. connect to other servers
var cOpts server.ClusterOpts
var routes []*url.URL

// get existing nats servers to connect to
services, err := n.opts.Registry.GetService("go.micro.nats.broker")
if err == nil {
for _, service := range services {
for _, node := range service.Nodes {
u, err := url.Parse("nats://" + node.Address)
if err != nil {
log.Log(err)
continue
}
// append to the cluster routes
routes = append(routes, u)
}
}
}

// try get existing server
s := n.server

// get a host address
caddr, err := addr.Extract("")
if err != nil {
caddr = "0.0.0.0"
}

// set cluster opts
cOpts = server.ClusterOpts{
Host: caddr,
Port: -1,
}

if s == nil {
var err error
s, err = server.NewServer(&server.Options{
// Specify the host
Host: host,
// Use a random port
Port: port,
// Set the cluster ops
Cluster: cOpts,
// Set the routes
Routes: routes,
NoLog: true,
NoSigs: true,
MaxControlLine: 2048,
TLSConfig: n.opts.TLSConfig,
})
if err != nil {
return err
}

// save the server
n.server = s
}

// start the server
go s.Start()

var ready bool

// wait till its ready for connections
for i := 0; i < 3; i++ {
if s.ReadyForConnections(time.Second) {
ready = true
break
}
}

if !ready {
return errors.New("server not ready")
}

// set the client address
n.addrs = []string{s.ClientURL()}

go func() {
// register the cluster address
for {
select {
case <-exit:
// deregister on exit
n.opts.Registry.Deregister(&registry.Service{
Name: "go.micro.nats.broker",
Version: "v2",
Nodes: []*registry.Node{
{Id: s.ID(), Address: s.ClusterAddr().String()},
},
})
s.Shutdown()
return
default:
// register the broker
n.opts.Registry.Register(&registry.Service{
Name: "go.micro.nats.broker",
Version: "v2",
Nodes: []*registry.Node{
{Id: s.ID(), Address: s.ClusterAddr().String()},
},
}, registry.RegisterTTL(time.Minute))
time.Sleep(time.Minute)
}
}
}()

return nil
}

func (n *natsBroker) Connect() error {
n.Lock()
defer n.Unlock()

if !n.connected {
// create exit chan
n.exit = make(chan bool)

// start the server if needed
if err := n.serve(n.exit); err != nil {
return err
}

// set to connected
n.connected = true
}

status := nats.CLOSED
if n.conn != nil {
status = n.conn.Status()
Expand Down Expand Up @@ -122,11 +306,29 @@ func (n *natsBroker) Connect() error {
func (n *natsBroker) Disconnect() error {
n.RLock()
defer n.RUnlock()

// drain the connection if specified
if n.drain {
n.conn.Drain()
return <-n.closeCh
}

// close the client connection
n.conn.Close()

// shutdown the local server
// and deregister
if n.server != nil {
select {
case <-n.exit:
default:
close(n.exit)
}
}

// set not connected
n.connected = false

return nil
}

Expand Down Expand Up @@ -191,21 +393,6 @@ func (n *natsBroker) String() string {
return "nats"
}

func NewBroker(opts ...broker.Option) broker.Broker {
options := broker.Options{
// Default codec
Codec: json.Marshaler{},
Context: context.Background(),
}

n := &natsBroker{
opts: options,
}
n.setOption(opts...)

return n
}

func (n *natsBroker) setOption(opts ...broker.Option) {
for _, o := range opts {
o(&n.opts)
Expand All @@ -219,10 +406,15 @@ func (n *natsBroker) setOption(opts ...broker.Option) {
n.nopts = nopts
}

local, ok := n.opts.Context.Value(localServerKey{}).(bool)
if ok {
n.local = local
}

// broker.Options have higher priority than nats.Options
// only if Addrs, Secure or TLSConfig were not set through a broker.Option
// we read them from nats.Option
if len(n.opts.Addrs) == 0 {
if len(n.opts.Addrs) == 0 && !n.local {
n.opts.Addrs = n.nopts.Servers
}

Expand All @@ -233,7 +425,7 @@ func (n *natsBroker) setOption(opts ...broker.Option) {
if n.opts.TLSConfig == nil {
n.opts.TLSConfig = n.nopts.TLSConfig
}
n.addrs = setAddrs(n.opts.Addrs)
n.addrs = n.setAddrs(n.opts.Addrs)

if n.opts.Context.Value(drainConnectionKey{}) != nil {
n.drain = true
Expand All @@ -254,3 +446,19 @@ func (n *natsBroker) onAsyncError(conn *nats.Conn, sub *nats.Subscription, err e
n.closeCh <- err
}
}

func NewBroker(opts ...broker.Option) broker.Broker {
options := broker.Options{
// Default codec
Codec: json.Marshaler{},
Context: context.Background(),
Registry: registry.DefaultRegistry,
}

n := &natsBroker{
opts: options,
}
n.setOption(opts...)

return n
}
6 changes: 6 additions & 0 deletions broker/nats/options.go
Expand Up @@ -7,12 +7,18 @@ import (

type optionsKey struct{}
type drainConnectionKey struct{}
type localServerKey struct{}

// Options accepts nats.Options
func Options(opts nats.Options) broker.Option {
return setBrokerOption(optionsKey{}, opts)
}

// LocalServer embeds a local server rather than connecting to one
func LocalServer() broker.Option {
return setBrokerOption(localServerKey{}, true)
}

// DrainConnection will drain subscription on close
func DrainConnection() broker.Option {
return setBrokerOption(drainConnectionKey{}, struct{}{})
Expand Down

0 comments on commit 1d311ab

Please sign in to comment.