From 932330084fb50149ddbb711cfb653faeccdaea96 Mon Sep 17 00:00:00 2001 From: Tomas Doran Date: Sun, 13 Apr 2014 08:03:33 +0100 Subject: [PATCH 1/6] synapse etcd service watcher --- lib/synapse/service_watcher.rb | 2 + lib/synapse/service_watcher/etcd.rb | 154 ++++++++++++++++++++++++++++ synapse.gemspec | 1 + 3 files changed, 157 insertions(+) create mode 100644 lib/synapse/service_watcher/etcd.rb diff --git a/lib/synapse/service_watcher.rb b/lib/synapse/service_watcher.rb index 44227465..3af96864 100644 --- a/lib/synapse/service_watcher.rb +++ b/lib/synapse/service_watcher.rb @@ -3,6 +3,7 @@ module Synapse class ServiceWatcher + # the method which actually dispatches watcher creation requests def self.create(name, opts, synapse) opts['name'] = name @@ -24,3 +25,4 @@ def self.create(name, opts, synapse) end end end + diff --git a/lib/synapse/service_watcher/etcd.rb b/lib/synapse/service_watcher/etcd.rb new file mode 100644 index 00000000..dc4ec277 --- /dev/null +++ b/lib/synapse/service_watcher/etcd.rb @@ -0,0 +1,154 @@ +require "synapse/service_watcher/base" + +require 'etcd' + +module Synapse + class EtcdWatcher < BaseWatcher + NUMBERS_RE = /^\d+$/ + + def start + etcd_hosts = @discovery['host'] + + log.info "synapse: starting etcd watcher #{@name} @ host: #{@discovery['host']}, path: #{@discovery['path']}" + @should_exit = false + @etcd = ::Etcd.client(:host => @discovery['host'], :port => @discovery['port']) + + # call the callback to bootstrap the process + discover + @synapse.reconfigure! + @watcher = Thread.new do + watch + end + end + + def stop + log.warn "synapse: etcd watcher exiting" + + @should_exit = true + @etcd = nil + + log.info "synapse: etcd watcher cleaned up successfully" + end + + def ping? + @etcd.leader + end + + private + def validate_discovery_opts + raise ArgumentError, "invalid discovery method #{@discovery['method']}" \ + unless @discovery['method'] == 'etcd' + raise ArgumentError, "missing or invalid etcd host for service #{@name}" \ + unless @discovery['host'] + raise ArgumentError, "missing or invalid etcd port for service #{@name}" \ + unless @discovery['port'] + raise ArgumentError, "invalid etcd path for service #{@name}" \ + unless @discovery['path'] + end + + # helper method that ensures that the discovery path exists + def create(path) + log.debug "synapse: creating etcd path: #{path}" + @etcd.create(path, dir: true) + end + + def each_node(node) + begin + host, port, name = deserialize_service_instance(node.value) + rescue StandardError => e + log.error "synapse: invalid data in etcd node #{node.inspect} at #{@discovery['path']}: #{e} DATA #{node.value}" + nil + else + server_port = @server_port_override ? @server_port_override : port + + # find the numberic id in the node name; used for leader elections if enabled + numeric_id = node.key.split('/').last + numeric_id = NUMBERS_RE =~ numeric_id ? numeric_id.to_i : nil + + log.warn "synapse: discovered backend #{name} at #{host}:#{server_port} for service #{@name}" + { 'name' => name, 'host' => host, 'port' => server_port, 'id' => numeric_id} + end + end + + def each_dir(d) + new_backends = [] + d.children.each do |node| + if node.directory? + new_backends << each_dir(@etcd.get(node.key)) + else + backend = each_node(node) + if backend + new_backends << backend + end + end + end + new_backends.flatten + end + + # find the current backends at the discovery path; sets @backends + def discover + log.info "synapse: discovering backends for service #{@name}" + + d = nil + begin + d = @etcd.get(@discovery['path']) + rescue Etcd::KeyNotFound + create(@discovery['path']) + d = @etcd.get(@discovery['path']) + end + + new_backends = [] + if d.directory? + new_backends = each_dir(d) + else + log.warn "synapse: path #{@discovery['path']} is not a directory" + end + + if new_backends.empty? + if @default_servers.empty? + log.warn "synapse: no backends and no default servers for service #{@name}; using previous backends: #{@backends.inspect}" + false + else + log.warn "synapse: no backends for service #{@name}; using default servers: #{@default_servers.inspect}" + @backends = @default_servers + true + end + else + if @backends != new_backends + log.info "synapse: discovered #{new_backends.length} backends (including new) for service #{@name}" + @backends = new_backends + true + else + log.info "synapse: discovered #{new_backends.length} backends for service #{@name}" + false + end + end + end + + def watch + while !@should_exit + begin + @etcd.watch(@discovery['path'], :timeout => 60, :recursive => true) + rescue Timeout::Error + else + if discover + @synapse.reconfigure! + end + end + end + end + + # decode the data at a zookeeper endpoint + def deserialize_service_instance(data) + log.debug "synapse: deserializing process data" + decoded = JSON.parse(data) + + host = decoded['host'] || (raise ValueError, 'instance json data does not have host key') + port = decoded['port'] || (raise ValueError, 'instance json data does not have port key') + name = decoded['name'] || nil + + return host, port, name + end + end +end + diff --git a/synapse.gemspec b/synapse.gemspec index 86e238b3..4c23cae3 100644 --- a/synapse.gemspec +++ b/synapse.gemspec @@ -19,6 +19,7 @@ Gem::Specification.new do |gem| gem.add_runtime_dependency "aws-sdk", "~> 1.39" gem.add_runtime_dependency "docker-api", "~> 1.7.2" gem.add_runtime_dependency "zk", "~> 1.9.4" + gem.add_runtime_dependency "etcd", "~> 0.2.4" gem.add_development_dependency "rake" gem.add_development_dependency "rspec", "~> 3.1.0" From 5fc104fb7c96da9ed308ba148c45dd09ff8f590a Mon Sep 17 00:00:00 2001 From: Tomas Doran Date: Tue, 22 Sep 2015 21:04:56 -0700 Subject: [PATCH 2/6] Unfuck the Gemfile to do the right thing w docker --- Gemfile | 2 ++ Gemfile.lock | 8 +++++--- synapse.gemspec | 2 +- 3 files changed, 8 insertions(+), 4 deletions(-) diff --git a/Gemfile b/Gemfile index cf1d57e5..b94f21bf 100644 --- a/Gemfile +++ b/Gemfile @@ -1,4 +1,6 @@ source 'https://rubygems.org' +gem 'docker-api', :require => 'docker' + # Specify your gem's dependencies in synapse.gemspec gemspec diff --git a/Gemfile.lock b/Gemfile.lock index 07b0aba3..1bb63688 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -4,6 +4,7 @@ PATH synapse (0.12.1) aws-sdk (~> 1.39) docker-api (~> 1.7.2) + etcd (~> 0.2.4) zk (~> 1.9.4) GEM @@ -21,6 +22,8 @@ GEM archive-tar-minitar excon (>= 0.28) json + etcd (0.2.4) + mixlib-log excon (0.45.4) ffi (1.9.3-java) json (1.8.3) @@ -30,6 +33,7 @@ GEM multi_json (>= 1.8.4) method_source (0.8.2) mini_portile (0.6.2) + mixlib-log (1.6.0) multi_json (1.11.2) nokogiri (1.6.6.2) mini_portile (~> 0.6.0) @@ -70,11 +74,9 @@ PLATFORMS ruby DEPENDENCIES + docker-api pry pry-nav rake rspec (~> 3.1.0) synapse! - -BUNDLED WITH - 1.10.5 diff --git a/synapse.gemspec b/synapse.gemspec index 4c23cae3..ab3e60d9 100644 --- a/synapse.gemspec +++ b/synapse.gemspec @@ -17,7 +17,7 @@ Gem::Specification.new do |gem| gem.test_files = gem.files.grep(%r{^(test|spec|features)/}) gem.add_runtime_dependency "aws-sdk", "~> 1.39" - gem.add_runtime_dependency "docker-api", "~> 1.7.2" + gem.add_runtime_dependency 'docker-api', "~> 1.7.2" gem.add_runtime_dependency "zk", "~> 1.9.4" gem.add_runtime_dependency "etcd", "~> 0.2.4" From 35b1a25c96708d6b53b0e92224923cf96323780d Mon Sep 17 00:00:00 2001 From: Alan Smith Date: Thu, 14 Aug 2014 16:11:59 -0400 Subject: [PATCH 3/6] Switched EtcdWatcher to use the `hosts` parameter and rotate through the hosts until a reachable host is found. --- lib/synapse/service_watcher/etcd.rb | 24 ++++++++++++++++++++++-- 1 file changed, 22 insertions(+), 2 deletions(-) diff --git a/lib/synapse/service_watcher/etcd.rb b/lib/synapse/service_watcher/etcd.rb index dc4ec277..37df5165 100644 --- a/lib/synapse/service_watcher/etcd.rb +++ b/lib/synapse/service_watcher/etcd.rb @@ -7,11 +7,15 @@ class EtcdWatcher < BaseWatcher NUMBERS_RE = /^\d+$/ def start - etcd_hosts = @discovery['host'] + @etcd_hosts = @discovery['hosts'].shuffle log.info "synapse: starting etcd watcher #{@name} @ host: #{@discovery['host']}, path: #{@discovery['path']}" @should_exit = false - @etcd = ::Etcd.client(:host => @discovery['host'], :port => @discovery['port']) + + host, port = @etcd_hosts[0].split(':') + host = host || '127.0.0.1' + port = port || 4003 + @etcd = ::Etcd.client(:host => host, :port => port) # call the callback to bootstrap the process discover @@ -31,6 +35,22 @@ def stop end def ping? + etcd_connected? + end + + private + def etcd_connected? + unless @etcd.leader + @etcd_hosts.each do |h| + host, port = h.split(':') + host = host || '127.0.0.1' + port = port || 4003 + @etcd = ::Etcd.client(:host => host, :port => port) + + break if @etcd.leader + end + end + @etcd.leader end From 479d3935cee2780de1f073497ec61d7ec7d66de8 Mon Sep 17 00:00:00 2001 From: Alan Smith Date: Fri, 15 Aug 2014 10:38:26 -0400 Subject: [PATCH 4/6] Switched to less resilient failover for etcd in favor of fewer connections made to etcd. --- lib/synapse/service_watcher/etcd.rb | 34 ++++++++++++----------------- 1 file changed, 14 insertions(+), 20 deletions(-) diff --git a/lib/synapse/service_watcher/etcd.rb b/lib/synapse/service_watcher/etcd.rb index 37df5165..6b00e582 100644 --- a/lib/synapse/service_watcher/etcd.rb +++ b/lib/synapse/service_watcher/etcd.rb @@ -12,10 +12,20 @@ def start log.info "synapse: starting etcd watcher #{@name} @ host: #{@discovery['host']}, path: #{@discovery['path']}" @should_exit = false - host, port = @etcd_hosts[0].split(':') - host = host || '127.0.0.1' - port = port || 4003 - @etcd = ::Etcd.client(:host => host, :port => port) + @etcd_hosts.each do |h| + host, port = h.split(':') + port = port || 4003 + @etcd = ::Etcd.client(:host => host, :port => port) + + connected = + begin + @etcd.leader + rescue + false + end + + break if connected + end # call the callback to bootstrap the process discover @@ -35,22 +45,6 @@ def stop end def ping? - etcd_connected? - end - - private - def etcd_connected? - unless @etcd.leader - @etcd_hosts.each do |h| - host, port = h.split(':') - host = host || '127.0.0.1' - port = port || 4003 - @etcd = ::Etcd.client(:host => host, :port => port) - - break if @etcd.leader - end - end - @etcd.leader end From 16567defc925a03e9181e2dfb5b1fdf4e6265293 Mon Sep 17 00:00:00 2001 From: Tomas Doran Date: Tue, 22 Sep 2015 21:41:52 -0700 Subject: [PATCH 5/6] Add README --- README.md | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/README.md b/README.md index ffa51f00..3e17561e 100644 --- a/README.md +++ b/README.md @@ -186,6 +186,16 @@ be used in preference to the `AWS_` environment variables. * `aws_secret_access_key`: AWS secret key or set `AWS_SECRET_ACCESS_KEY` in the environment. * `aws_region`: AWS region (i.e. `us-east-1`) or set `AWS_REGION` in the environment. +##### etcd ##### + +This watcher retrieves a list from etcd, similarly to the zookeeper watcher + +It takes the following options: + +* `method`: etcd +* `hosts`: A list of etcd hosts to connect to, in the format "host" or "host:port" +* `path`: The path to create in etcd where ephemeral nodes will be created for each available service server + #### Listing Default Servers #### You may list a number of default servers providing a service. From 9426ba7dc954e0bb73ef0943c1b4b747670bd416 Mon Sep 17 00:00:00 2001 From: Tomas Doran Date: Tue, 22 Sep 2015 21:42:01 -0700 Subject: [PATCH 6/6] Adjust this so that it may actually work --- lib/synapse/service_watcher/etcd.rb | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/lib/synapse/service_watcher/etcd.rb b/lib/synapse/service_watcher/etcd.rb index 6b00e582..0fa43d67 100644 --- a/lib/synapse/service_watcher/etcd.rb +++ b/lib/synapse/service_watcher/etcd.rb @@ -9,14 +9,14 @@ class EtcdWatcher < BaseWatcher def start @etcd_hosts = @discovery['hosts'].shuffle - log.info "synapse: starting etcd watcher #{@name} @ host: #{@discovery['host']}, path: #{@discovery['path']}" + log.info "synapse: starting etcd watcher #{@name} @ hosts: #{@discovery['hosts']}, path: #{@discovery['path']}" @should_exit = false @etcd_hosts.each do |h| host, port = h.split(':') port = port || 4003 @etcd = ::Etcd.client(:host => host, :port => port) - + connected = begin @etcd.leader @@ -52,10 +52,8 @@ def ping? def validate_discovery_opts raise ArgumentError, "invalid discovery method #{@discovery['method']}" \ unless @discovery['method'] == 'etcd' - raise ArgumentError, "missing or invalid etcd host for service #{@name}" \ - unless @discovery['host'] - raise ArgumentError, "missing or invalid etcd port for service #{@name}" \ - unless @discovery['port'] + raise ArgumentError, "missing or invalid etcd hosts for service #{@name}" \ + unless @discovery['hosts'] raise ArgumentError, "invalid etcd path for service #{@name}" \ unless @discovery['path'] end @@ -152,7 +150,7 @@ def watch end end - # decode the data at a zookeeper endpoint + # decode the data at an etcd endpoint def deserialize_service_instance(data) log.debug "synapse: deserializing process data" decoded = JSON.parse(data)