Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Etcd service watcher #58

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Gemfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
source 'https://rubygems.org'

gem 'docker-api', :require => 'docker'

# Specify your gem's dependencies in synapse.gemspec
gemspec
8 changes: 5 additions & 3 deletions Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ PATH
synapse (0.12.1)
aws-sdk (~> 1.39)
docker-api (~> 1.7.2)
etcd (~> 0.2.4)
zk (~> 1.9.4)

GEM
Expand All @@ -21,6 +22,8 @@ GEM
archive-tar-minitar
excon (>= 0.28)
json
etcd (0.2.4)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So while testing this I came across quite a pickle, which is that the etcd ruby gem < 0.3.0 cannot handle etcd 2.0+ (it errors all over the place regarding 404 errors)

ranjib/etcd-ruby#51 has been merged and my local testing indicates that version 0.3.0 seems to fix things.

@bobtfish do you think we should depend on an old etcd or the new one? I sorta prefer the new one but I'm not sure what the community progress on moving to etcd 2.0 is like?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that we recommend installing via gem, can we just do ~> 0.2 ?

mixlib-log
excon (0.45.4)
ffi (1.9.3-java)
json (1.8.3)
Expand All @@ -30,6 +33,7 @@ GEM
multi_json (>= 1.8.4)
method_source (0.8.2)
mini_portile (0.6.2)
mixlib-log (1.6.0)
multi_json (1.11.2)
nokogiri (1.6.6.2)
mini_portile (~> 0.6.0)
Expand Down Expand Up @@ -70,11 +74,9 @@ PLATFORMS
ruby

DEPENDENCIES
docker-api
pry
pry-nav
rake
rspec (~> 3.1.0)
synapse!

BUNDLED WITH
1.10.5
10 changes: 10 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,16 @@ be used in preference to the `AWS_` environment variables.
* `aws_secret_access_key`: AWS secret key or set `AWS_SECRET_ACCESS_KEY` in the environment.
* `aws_region`: AWS region (i.e. `us-east-1`) or set `AWS_REGION` in the environment.

##### etcd #####

This watcher retrieves a list from etcd, similarly to the zookeeper watcher

It takes the following options:

* `method`: etcd
* `hosts`: A list of etcd hosts to connect to, in the format "host" or "host:port"
* `path`: The path to create in etcd where ephemeral nodes will be created for each available service server

#### Listing Default Servers ####

You may list a number of default servers providing a service.
Expand Down
2 changes: 2 additions & 0 deletions lib/synapse/service_watcher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

module Synapse
class ServiceWatcher

# the method which actually dispatches watcher creation requests
def self.create(name, opts, synapse)
opts['name'] = name
Expand All @@ -24,3 +25,4 @@ def self.create(name, opts, synapse)
end
end
end

166 changes: 166 additions & 0 deletions lib/synapse/service_watcher/etcd.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
require "synapse/service_watcher/base"

require 'etcd'

module Synapse
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be class Synapse::ServiceWatcher

Also can you add this watcher to the auto creation test?

class EtcdWatcher < BaseWatcher
NUMBERS_RE = /^\d+$/

def start
@etcd_hosts = @discovery['hosts'].shuffle

log.info "synapse: starting etcd watcher #{@name} @ hosts: #{@discovery['hosts']}, path: #{@discovery['path']}"
@should_exit = false

@etcd_hosts.each do |h|
host, port = h.split(':')
port = port || 4003
@etcd = ::Etcd.client(:host => host, :port => port)

connected =
begin
@etcd.leader
rescue
false
end

break if connected
end

# call the callback to bootstrap the process
discover
@synapse.reconfigure!
@watcher = Thread.new do
watch
end
end

def stop
log.warn "synapse: etcd watcher exiting"

@should_exit = true
@etcd = nil

log.info "synapse: etcd watcher cleaned up successfully"
end

def ping?
@etcd.leader
end

private
def validate_discovery_opts
raise ArgumentError, "invalid discovery method #{@discovery['method']}" \
unless @discovery['method'] == 'etcd'
raise ArgumentError, "missing or invalid etcd hosts for service #{@name}" \
unless @discovery['hosts']
raise ArgumentError, "invalid etcd path for service #{@name}" \
unless @discovery['path']
end

# helper method that ensures that the discovery path exists
def create(path)
log.debug "synapse: creating etcd path: #{path}"
@etcd.create(path, dir: true)
end

def each_node(node)
begin
host, port, name = deserialize_service_instance(node.value)
rescue StandardError => e
log.error "synapse: invalid data in etcd node #{node.inspect} at #{@discovery['path']}: #{e} DATA #{node.value}"
nil
else
server_port = @server_port_override ? @server_port_override : port

# find the numberic id in the node name; used for leader elections if enabled
numeric_id = node.key.split('/').last
numeric_id = NUMBERS_RE =~ numeric_id ? numeric_id.to_i : nil

log.warn "synapse: discovered backend #{name} at #{host}:#{server_port} for service #{@name}"
{ 'name' => name, 'host' => host, 'port' => server_port, 'id' => numeric_id}
end
end

def each_dir(d)
new_backends = []
d.children.each do |node|
if node.directory?
new_backends << each_dir(@etcd.get(node.key))
else
backend = each_node(node)
if backend
new_backends << backend
end
end
end
new_backends.flatten
end

# find the current backends at the discovery path; sets @backends
def discover
log.info "synapse: discovering backends for service #{@name}"

d = nil
begin
d = @etcd.get(@discovery['path'])
rescue Etcd::KeyNotFound
create(@discovery['path'])
d = @etcd.get(@discovery['path'])
end

new_backends = []
if d.directory?
new_backends = each_dir(d)
else
log.warn "synapse: path #{@discovery['path']} is not a directory"
end

if new_backends.empty?
if @default_servers.empty?
log.warn "synapse: no backends and no default servers for service #{@name}; using previous backends: #{@backends.inspect}"
false
else
log.warn "synapse: no backends for service #{@name}; using default servers: #{@default_servers.inspect}"
@backends = @default_servers
true
end
else
if @backends != new_backends
log.info "synapse: discovered #{new_backends.length} backends (including new) for service #{@name}"
@backends = new_backends
true
else
log.info "synapse: discovered #{new_backends.length} backends for service #{@name}"
false
end
end
end

def watch
while !@should_exit
begin
@etcd.watch(@discovery['path'], :timeout => 60, :recursive => true)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright so while testing I came across this gem. If my understanding is correct etcd makes us not only do our own heartbeats from nerve (hella writes), but we also end up basically constantly polling in this function because there is no capability to filter watch events?

Basically without etcd-io/etcd#633 or etcd-io/etcd#174 closed SmartStack scalability on etcd will be limited to ... not very much.

If that understanding is correct can we implement get children watches internally, where we do a lightweight "see if the list of children changed" operation and if that is different we actually go through loading all the data (aka run discover). This way we're at least not constantly pulling all of the etcd state?

rescue Timeout::Error
else
if discover
@synapse.reconfigure!
end
end
end
end

# decode the data at an etcd endpoint
def deserialize_service_instance(data)
log.debug "synapse: deserializing process data"
decoded = JSON.parse(data)

host = decoded['host'] || (raise ValueError, 'instance json data does not have host key')
port = decoded['port'] || (raise ValueError, 'instance json data does not have port key')
name = decoded['name'] || nil

return host, port, name
end
end
end

3 changes: 2 additions & 1 deletion synapse.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@ Gem::Specification.new do |gem|
gem.test_files = gem.files.grep(%r{^(test|spec|features)/})

gem.add_runtime_dependency "aws-sdk", "~> 1.39"
gem.add_runtime_dependency "docker-api", "~> 1.7.2"
gem.add_runtime_dependency 'docker-api', "~> 1.7.2"
gem.add_runtime_dependency "zk", "~> 1.9.4"
gem.add_runtime_dependency "etcd", "~> 0.2.4"

gem.add_development_dependency "rake"
gem.add_development_dependency "rspec", "~> 3.1.0"
Expand Down