Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Zookeeper recursive #119

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,6 @@ tmp
.vagrant
.*sw?
vendor/
/.idea

synapse.jar
35 changes: 35 additions & 0 deletions config/synapse_zookeeper_recursive.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
haproxy:
reload_command: "haproxy -p /tmp/haproxy.pid -f /etc/haproxy/haproxy.cfg -sf `cat /tmp/haproxy.pid`"
config_file_path: "/etc/haproxy/haproxy.cfg"
do_writes: true
do_reloads: true
global:
- "daemon"
- "user haproxy"
- "group haproxy"
- "maxconn 4096"
- "log 127.0.0.1 local2 notice"
- "stats socket /var/run/haproxy.pid"
defaults:
- "log global"
- "mode http"
- "balance roundrobin"
- "timeout connect 5000ms"
- "timeout client 50000ms"
- "timeout server 50000ms"
shared_frontend:
- "bind 127.0.0.1:80"
services:
zookeeper_recursive:
discovery:
method: "zookeeper_recursive"
path: "/path/to/synapse"
hosts:
- "localhost:2181"
empty_backend_pool: "true"
haproxy:
server_options: "check inter 2s rise 3 fall 2"
shared_frontend:
- "acl is#[service] path_beg #[servicePath]"
- "use_backend #[service] if is#[service]"

15 changes: 15 additions & 0 deletions lib/synapse.rb
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,21 @@ def reconfigure!
@config_updated = true
end

def append_service_watcher(service_name, service_config)
watcher = ServiceWatcher.create(service_name, service_config, self)
@service_watchers << watcher
watcher.start
end

def remove_watcher_by_name(service_name)
@service_watchers.each do |watcher|
if watcher.name == service_name
watcher.stop
@service_watchers.delete(watcher)
end
end
end

private
def create_service_watchers(services={})
service_watchers =[]
Expand Down
2 changes: 2 additions & 0 deletions lib/synapse/service_watcher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
require "synapse/service_watcher/dns"
require "synapse/service_watcher/docker"
require "synapse/service_watcher/zookeeper_dns"
require "synapse/service_watcher/zookeeper_recursive"

module Synapse
class ServiceWatcher
Expand All @@ -15,6 +16,7 @@ class ServiceWatcher
'dns' => DnsWatcher,
'docker' => DockerWatcher,
'zookeeper_dns' => ZookeeperDnsWatcher,
'zookeeper_recursive' => ZookeeperRecursiveWatcher
}

# the method which actually dispatches watcher creation requests
Expand Down
12 changes: 9 additions & 3 deletions lib/synapse/service_watcher/zookeeper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,19 @@ def discover
numeric_id = NUMBERS_RE =~ numeric_id ? numeric_id.to_i : nil

log.debug "synapse: discovered backend #{name} at #{host}:#{server_port} for service #{@name}"
new_backends << { 'name' => name, 'host' => host, 'port' => server_port, 'id' => numeric_id}
end
new_backends << {'name' => name, 'host' => host, 'port' => server_port, 'id' => numeric_id}
end unless node[1].ephemeralOwner == 0
end

if new_backends.empty?
if @default_servers.empty?
log.warn "synapse: no backends and no default servers for service #{@name}; using previous backends: #{@backends.inspect}"
log.info @discovery['empty_backend_pool']
if @discovery['empty_backend_pool'].nil? or @discovery['empty_backend_pool'] == "false"
log.warn "synapse: no backends and no default servers for service #{@name}; using previous backends: #{@backends.inspect}"
else
log.warn "synapse: no backends and no default servers for service #{@name}; purging backends"
@backends=[]
end
else
log.warn "synapse: no backends for service #{@name}; using default servers: #{@default_servers.inspect}"
@backends = @default_servers
Expand Down
138 changes: 138 additions & 0 deletions lib/synapse/service_watcher/zookeeper_recursive.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
require "synapse/service_watcher/base"
require "zk"
require "thread"

module Synapse
class ZookeeperRecursiveWatcher < BaseWatcher

# Overriden methods start, stop, validate_discovery_opts, ping
def start
boot unless @already_started
end

def boot
@already_started = true
log.info "#{@name}: Starting @ hosts: #{@discovery["hosts"]}, path: #{@discovery["path"]}, id #{self.object_id}"
setup_zk_connection
setup_haproxy_configuration
start_watching_services
end

def stop
log.info "#{@name}: Stopping using default stop handler"
@subwatcher.each { |watcher| cleanup_service_watcher(watcher) }
@should_exit = true
end

def validate_discovery_opts
raise ArgumentError, "invalid discovery method #{@discovery["method"]}" \
unless @discovery["method"] == "zookeeper_recursive"
raise ArgumentError, "missing or invalid zookeeper host for service #{@name}" \
unless @discovery["hosts"]
raise ArgumentError, "invalid zookeeper path for service #{@name}" \
unless @discovery["path"]
end

def ping?
@zk && @zk.connected?
end

# Methods for Initializing
def setup_zk_connection
@zk_hosts = @discovery["hosts"].shuffle.join(",")
@zk = ZK.new(@zk_hosts)
end

def setup_haproxy_configuration
@haproxy_template = @haproxy.dup
#Purge the own haproxy-conf to a minimum, in order to be no haproxy-instance
@haproxy = {"listen" => @haproxy["listen"]}
end

def start_watching_services
@subwatcher = []
create_if_not_exists(@discovery["path"])
watch_services(@discovery["path"])
end

def create_if_not_exists(path)
log.debug "#{@name}: Creating ZK path: #{path}"
current = ""
path.split("/").drop(1).each { |node|
current += "/#{node}"
@zk.create(current) unless @zk.exists?(current)
}
end

# Methods for running
def watch_services(path)
log.info("Watching path #{path}")
# Register each time a event is fired, since we"re getting only one event per register
@zk.register(path, [:deleted, :child]) do |event|
if event.node_deleted?
cleanup_service_watcher(path)
else
watch_services(path)
end
end

children = @zk.children(path, :watch => true).map { |child| "#{path}/#{child}" }

persistent_children = children.select { |child| @zk.get("#{child}")[1].ephemeralOwner == 0 }
persistent_children.each { |child| watch_services(child) }


unless (path == @discovery["path"])
if (!@subwatcher.include?(path) && persistent_children.empty?) then
create_service_watcher(path)
end
if (@subwatcher.include?(path) && !persistent_children.empty?) then
cleanup_service_watcher(path)
end
end
end

def create_service_watcher(service_path)
service_name = service_path.gsub(/[\/\.]/, "_")
service_config = {
"discovery" => {
"method" => "zookeeper",
"path" => "#{service_path}",
"hosts" => @discovery["hosts"],
"empty_backend_pool" => @discovery["empty_backend_pool"]
},
"haproxy" => build_haproxy_section(service_name, service_path, @haproxy_template)
}
log.info "#{@name}: Creating new Service-Watcher for #{service_name}@ hosts: #{@zk_hosts}"
log.debug service_config
@subwatcher << service_path
@synapse.append_service_watcher(service_name, service_config)
end

def build_haproxy_section(service_name, service_path, template)
new_haproxy = {}
template.each { |key, section| new_haproxy[key] = parse_section(section, service_name, service_path) }
return new_haproxy
end

def parse_section(section, service_name, service_path)
service_url = service_path.sub(@discovery["path"], "")
service_url = "/" if service_url.empty?
if section.is_a?(String)
new_section = section.gsub(/#\[servicePath\]/, "#{service_url}").gsub(/#\[service\]/, "#{service_name}")
else
unless section.nil? || section == 0
new_section = section.map { |subsection| parse_section(subsection, service_name, service_path) }
end
end
new_section
end

def cleanup_service_watcher(service_path)
service_name = service_path.gsub(/\//, "_")
log.info("#{@name}: Removing Watcher: #{service_name}")
@synapse.remove_watcher_by_name(service_name)
@subwatcher.delete(service_path)
end
end
end
Loading