Skip to content

Commit

Permalink
Add TTL refresh (for etcd and skydns2)
Browse files Browse the repository at this point in the history
  • Loading branch information
sheldonh committed Oct 3, 2014
1 parent 2e55307 commit 06551c1
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 3 deletions.
18 changes: 18 additions & 0 deletions bridge.go
Expand Up @@ -27,6 +27,7 @@ type Service struct {
IP string
Tags []string
Attrs map[string]string
TTL int

pp PublishedPort
}
Expand Down Expand Up @@ -89,6 +90,8 @@ func NewService(port PublishedPort, isgroup bool) *Service {
delete(metadata, "name")
service.Attrs = metadata

service.TTL = *refreshTtl

return service
}

Expand Down Expand Up @@ -181,3 +184,18 @@ func (b *RegistryBridge) Remove(containerId string) {
}
delete(b.services, containerId)
}

func (b *RegistryBridge) Refresh() {
b.Lock()
defer b.Unlock()
for containerId, services := range b.services {
for _, service := range services {
err := b.registry.Refresh(service)
if err != nil {
log.Println("registrator: unable to refresh service:", service.ID, err)
continue
}
log.Println("registrator: refreshed:", containerId[:12], service.ID)
}
}
}
5 changes: 5 additions & 0 deletions consul.go
@@ -1,6 +1,7 @@
package main

import (
"errors"
"fmt"
"net"
"net/url"
Expand Down Expand Up @@ -84,6 +85,10 @@ func (r *ConsulRegistry) Deregister(service *Service) error {
}
}

func (r *ConsulRegistry) Refresh(service *Service) error {
return errors.New("consul backend does not support refresh (use a TTL health check instead)")
}

func (r *ConsulRegistry) deregisterWithCatalog(service *Service) error {
return r.client.Agent().ServiceDeregister(service.ID)
}
Expand Down
6 changes: 5 additions & 1 deletion etcd.go
Expand Up @@ -25,7 +25,7 @@ func (r *EtcdRegistry) Register(service *Service) error {
path := r.path + "/" + service.Name + "/" + service.ID
port := strconv.Itoa(service.Port)
addr := net.JoinHostPort(service.IP, port)
_, err := r.client.Set(path, addr, uint64(0))
_, err := r.client.Set(path, addr, uint64(service.TTL))
return err
}

Expand All @@ -34,3 +34,7 @@ func (r *EtcdRegistry) Deregister(service *Service) error {
_, err := r.client.Delete(path, false)
return err
}

func (r *EtcdRegistry) Refresh(service *Service) error {
return r.Register(service)
}
27 changes: 27 additions & 0 deletions registrator.go
@@ -1,16 +1,20 @@
package main

import (
"errors"
"flag"
"log"
"net/url"
"os"
"time"

"github.com/cenkalti/backoff"
dockerapi "github.com/fsouza/go-dockerclient"
)

var hostIp = flag.String("ip", "", "IP for ports mapped to the host")
var refreshInterval = flag.Int("ttl-refresh", 0, "Frequency with which service TTLs are refreshed")
var refreshTtl = flag.Int("ttl", 0, "TTL for services (default is no expiry)")

func getopt(name, def string) string {
if env := os.Getenv(name); env != "" {
Expand Down Expand Up @@ -40,6 +44,7 @@ func mapdefault(m map[string]string, key, default_ string) string {
type ServiceRegistry interface {
Register(service *Service) error
Deregister(service *Service) error
Refresh(service *Service) error
}

func NewServiceRegistry(uri *url.URL) ServiceRegistry {
Expand All @@ -62,6 +67,11 @@ func main() {
if *hostIp != "" {
log.Println("registrator: Forcing host IP to", *hostIp)
}
if (*refreshTtl == 0 && *refreshInterval > 0) || (*refreshTtl > 0 && *refreshInterval == 0) {
assert(errors.New("-ttl and -refresh-interval must be specified together or not at all"))
} else if *refreshTtl <= *refreshInterval {
assert(errors.New("-ttl must be greater than -refresh-interval"))
}

docker, err := dockerapi.NewClient(getopt("DOCKER_HOST", "unix:///var/run/docker.sock"))
assert(err)
Expand All @@ -82,6 +92,22 @@ func main() {
bridge.Add(listing.ID)
}

quit := make(chan struct{})
if *refreshInterval > 0 {
ticker := time.NewTicker(time.Duration(*refreshInterval) * time.Second)
go func() {
for {
select {
case <-ticker.C:
bridge.Refresh()
case <-quit:
ticker.Stop()
return
}
}
}()
}

events := make(chan *dockerapi.APIEvents)
assert(docker.AddEventListener(events))
log.Println("registrator: Listening for Docker events...")
Expand All @@ -94,5 +120,6 @@ func main() {
}
}

close(quit)
log.Fatal("registrator: docker event loop closed") // todo: reconnect?
}
8 changes: 6 additions & 2 deletions skydns2.go
Expand Up @@ -2,8 +2,8 @@ package main

import (
"net/url"
"strings"
"strconv"
"strings"

"github.com/coreos/go-etcd/etcd"
)
Expand All @@ -25,7 +25,7 @@ func NewSkydns2Registry(uri *url.URL) ServiceRegistry {
func (r *Skydns2Registry) Register(service *Service) error {
port := strconv.Itoa(service.Port)
record := `{"host":"` + service.IP + `","port":` + port + `}`
_, err := r.client.Set(r.servicePath(service), record, uint64(0))
_, err := r.client.Set(r.servicePath(service), record, uint64(service.TTL))
return err
}

Expand All @@ -34,6 +34,10 @@ func (r *Skydns2Registry) Deregister(service *Service) error {
return err
}

func (r *Skydns2Registry) Refresh(service *Service) error {
return r.Register(service)
}

func (r *Skydns2Registry) servicePath(service *Service) string {
return r.path + "/" + service.Name + "/" + service.ID
}
Expand Down

0 comments on commit 06551c1

Please sign in to comment.