Skip to content

Commit

Permalink
Added Watch functionality to libreg (#38)
Browse files Browse the repository at this point in the history
* Added Watch functionality to libreg

* Interface Update

* Fixed Mock Interfaces
  • Loading branch information
king-jam authored and valcharry committed Oct 18, 2016
1 parent 5547017 commit cbb74f1
Show file tree
Hide file tree
Showing 3 changed files with 177 additions and 0 deletions.
150 changes: 150 additions & 0 deletions libreg/registry/consul/consul.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@ import (
"github.com/hashicorp/consul/api"
)

const (
// DefaultWatchWaitTime is how long we will block when watching
// for services or nodes to change/update
DefaultWatchWaitTime = 15 * time.Second
)

var (
// ErrMultipleEndpointsUnsupported is thrown when there are
// multiple endpoints specified for Consul
Expand Down Expand Up @@ -156,6 +162,53 @@ func (s *Consul) Nodes(options *registry.QueryOptions) ([]*registry.Node, error)
return retNodes, err
}

// NodesWatch watches for changes to the nodes in a given DC
func (s *Consul) NodesWatch(options *registry.QueryOptions, stopChan <-chan struct{}) (<-chan []*registry.Node, error) {
catalog := s.client.Catalog()
queryOps := s.getQueryOptions(options)
watchCh := make(chan []*registry.Node)

go func() {
defer close(watchCh)

// Override the wait time option to create the watch
queryOps.WaitTime = DefaultWatchWaitTime

for {
select {
case <-stopChan:
return
default:
}

nodes, meta, err := catalog.Nodes(queryOps)
if err != nil {
return
}

// If LastIndex didn't change then it means `Get` returned
// because of the WaitTime and the key didn't change.
if queryOps.WaitIndex == meta.LastIndex {
continue
}

queryOps.WaitIndex = meta.LastIndex

var retNodes []*registry.Node
for _, v := range nodes {
retNodes = append(retNodes, &registry.Node{
Node: v.Node,
Address: v.Address,
})
}

watchCh <- retNodes
}
}()

return watchCh, nil
}

// Services lists all services in a given DC
func (s *Consul) Services(options *registry.QueryOptions) (map[string][]string, error) {
catalog := s.client.Catalog()
Expand All @@ -164,6 +217,47 @@ func (s *Consul) Services(options *registry.QueryOptions) (map[string][]string,
return services, err
}

// ServicesWatch watches for changes to the list of services in a given DC
func (s *Consul) ServicesWatch(options *registry.QueryOptions, stopChan <-chan struct{}) (<-chan map[string][]string, error) {
catalog := s.client.Catalog()
queryOps := s.getQueryOptions(options)
watchCh := make(chan map[string][]string)

go func() {
defer close(watchCh)

// Override the wait time option to create the watch
queryOps.WaitTime = DefaultWatchWaitTime

for {
select {
case <-stopChan:
return
default:
}

services, meta, err := catalog.Services(queryOps)
if err != nil {
return
}

// If LastIndex didn't change then it means `Get` returned
// because of the WaitTime and the key didn't change.
if queryOps.WaitIndex == meta.LastIndex {
continue
}

queryOps.WaitIndex = meta.LastIndex

if services != nil {
watchCh <- services
}
}
}()

return watchCh, nil
}

// Service lists the nodes in a given service
func (s *Consul) Service(service, tag string, options *registry.QueryOptions) ([]*registry.CatalogService, error) {
catalog := s.client.Catalog()
Expand All @@ -188,6 +282,62 @@ func (s *Consul) Service(service, tag string, options *registry.QueryOptions) ([
return retServices, err
}

// ServiceWatch watches for changes to the list of nodes under a given service
func (s *Consul) ServiceWatch(service, tag string, options *registry.QueryOptions, stopChan <-chan struct{}) (<-chan []*registry.CatalogService, error) {
catalog := s.client.Catalog()
queryOps := s.getQueryOptions(options)
watchCh := make(chan []*registry.CatalogService)

go func() {
defer close(watchCh)

// Override the wait time option to create the watch
queryOps.WaitTime = DefaultWatchWaitTime

for {
select {
case <-stopChan:
return
default:
}

services, meta, err := catalog.Service(
service,
tag,
queryOps)
if err != nil {
return
}

// If LastIndex didn't change then it means `Get` returned
// because of the WaitTime and the key didn't change.
if queryOps.WaitIndex == meta.LastIndex {
continue
}

queryOps.WaitIndex = meta.LastIndex

var retServices []*registry.CatalogService
for _, v := range services {
retServices = append(retServices, &registry.CatalogService{
Node: v.Node,
Address: v.Address,
ServiceID: v.ServiceID,
ServiceName: v.ServiceName,
ServiceAddress: v.ServiceAddress,
ServiceTags: v.ServiceTags,
ServicePort: v.ServicePort,
ServiceEnableTagOverride: v.ServiceEnableTagOverride,
})
}

watchCh <- retServices
}
}()

return watchCh, nil
}

// Node lists the services provided by a given node
func (s *Consul) Node(node string, options *registry.QueryOptions) (*registry.CatalogNode, error) {
catalog := s.client.Catalog()
Expand Down
18 changes: 18 additions & 0 deletions libreg/registry/mock/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,12 @@ func (m *Mock) Nodes(options *registry.QueryOptions) ([]*registry.Node, error) {
return keys, nil
}

// NodesWatch watches for changes to the nodes in a given DC
func (m *Mock) NodesWatch(options *registry.QueryOptions, stopChan <-chan struct{}) (<-chan []*registry.Node, error) {
watchCh := make(chan []*registry.Node)
return watchCh, nil
}

// Services lists all services in a given DC
func (m *Mock) Services(options *registry.QueryOptions) (map[string][]string, error) {
serviceMap := make(map[string][]string)
Expand All @@ -115,6 +121,12 @@ func (m *Mock) Services(options *registry.QueryOptions) (map[string][]string, er
return serviceMap, nil
}

// ServicesWatch watches for changes to the list of services in a given DC
func (m *Mock) ServicesWatch(options *registry.QueryOptions, stopChan <-chan struct{}) (<-chan map[string][]string, error) {
watchCh := make(chan map[string][]string)
return watchCh, nil
}

// Service lists the nodes in a given service
func (m *Mock) Service(service, tag string, options *registry.QueryOptions) ([]*registry.CatalogService, error) {
var c []*registry.CatalogService
Expand Down Expand Up @@ -151,6 +163,12 @@ func (m *Mock) Service(service, tag string, options *registry.QueryOptions) ([]*
return c, nil
}

// ServiceWatch watches for changes to the list of nodes under a given service
func (m *Mock) ServiceWatch(service, tag string, options *registry.QueryOptions, stopChan <-chan struct{}) (<-chan []*registry.CatalogService, error) {
watchCh := make(chan []*registry.CatalogService)
return watchCh, nil
}

// Node lists the services provided by a given node
func (m *Mock) Node(node string, options *registry.QueryOptions) (*registry.CatalogNode, error) {
if node == "" {
Expand Down
9 changes: 9 additions & 0 deletions libreg/registry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,21 @@ type Registry interface {
// Nodes lists all nodes in a given DC
Nodes(options *QueryOptions) ([]*Node, error)

// NodesWatch watches for changes to the nodes list
NodesWatch(options *QueryOptions, stopCh <-chan struct{}) (<-chan []*Node, error)

// Services lists all services in a given DC
Services(options *QueryOptions) (map[string][]string, error)

// ServicesWatch watches for changes to the list of services in a given DC
ServicesWatch(options *QueryOptions, stopCh <-chan struct{}) (<-chan map[string][]string, error)

// Service lists the nodes in a given service
Service(service, tag string, options *QueryOptions) ([]*CatalogService, error)

// ServiceWatch watches for changes to a service in a given DC
ServiceWatch(service, tag string, options *QueryOptions, stopChan <-chan struct{}) (<-chan []*CatalogService, error)

// Node lists the services provided by a given node
Node(node string, options *QueryOptions) (*CatalogNode, error)

Expand Down

0 comments on commit cbb74f1

Please sign in to comment.