Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add marathon watcher #101

Merged
merged 15 commits into from
Oct 10, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,16 @@ PATH
GEM
remote: https://rubygems.org/
specs:
addressable (2.3.6)
archive-tar-minitar (0.5.2)
aws-sdk (1.64.0)
aws-sdk-v1 (= 1.64.0)
aws-sdk-v1 (1.64.0)
json (~> 1.4)
nokogiri (>= 1.4.4)
coderay (1.0.9)
crack (0.4.2)
safe_yaml (~> 1.0.0)
diff-lcs (1.2.5)
docker-api (1.7.6)
archive-tar-minitar
Expand Down Expand Up @@ -57,9 +60,13 @@ GEM
rspec-mocks (3.1.3)
rspec-support (~> 3.1.0)
rspec-support (3.1.2)
safe_yaml (1.0.4)
slop (3.4.6)
spoon (0.0.4)
ffi
webmock (1.18.0)
addressable (>= 2.3.6)
crack (>= 0.3.2)
zk (1.9.5)
logging (~> 1.8.2)
zookeeper (~> 1.4.0)
Expand All @@ -75,6 +82,7 @@ DEPENDENCIES
rake
rspec (~> 3.1.0)
synapse!
webmock

BUNDLED WITH
1.10.5
12 changes: 12 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,18 @@ be used in preference to the `AWS_` environment variables.
* `aws_secret_access_key`: AWS secret key or set `AWS_SECRET_ACCESS_KEY` in the environment.
* `aws_region`: AWS region (i.e. `us-east-1`) or set `AWS_REGION` in the environment.

##### Marathon #####

This watcher polls the Marathon API and retrieves a list of instances for a
given application.

It takes the following options:

* `marathon_api_url`: Address of the marathon API (e.g. `http://marathon-master:8080`)
* `application_name`: Name of the application in Marathon
* `check_interval`: How often to request the list of tasks from Marathon (default: 10 seconds)
* `port_index`: Index of the backend port in the task's "ports" array. (default: 0)

#### Listing Default Servers ####

You may list a number of default servers providing a service.
Expand Down
112 changes: 112 additions & 0 deletions lib/synapse/service_watcher/marathon.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
require 'synapse/service_watcher/base'
require 'json'
require 'net/http'
require 'resolv'

class Synapse::ServiceWatcher
class MarathonWatcher < BaseWatcher
def start
@check_interval = @discovery['check_interval'] || 10.0
@connection = nil
@watcher = Thread.new { sleep splay; watch }
end

def stop
@connection.finish
rescue
# pass
end

private

def validate_discovery_opts
required_opts = %w[marathon_api_url application_name]

required_opts.each do |opt|
if @discovery.fetch(opt, '').empty?
raise ArgumentError,
"a value for services.#{@name}.discovery.#{opt} must be specified"
end
end
end

def attempt_marathon_connection
marathon_api_path = @discovery.fetch('marathon_api_path', '/v2/apps/%{app}/tasks')
marathon_api_path = marathon_api_path % { app: @discovery['application_name'] }

@marathon_api = URI.join(@discovery['marathon_api_url'], marathon_api_path)

begin
@connection = Net::HTTP.new(@marathon_api.host, @marathon_api.port)
@connection.open_timeout = 5
@connection.start
rescue => ex
@connection = nil
log.error "synapse: could not connect to marathon at #{@marathon_api}: #{ex}"

raise ex
end
end

def watch
until @should_exit
retry_count = 0
start = Time.now

begin
if @connection.nil?
attempt_marathon_connection
end

req = Net::HTTP::Get.new(@marathon_api.request_uri)
req['Accept'] = 'application/json'
response = @connection.request(req)

tasks = JSON.parse(response.body).fetch('tasks', [])
port_index = @discovery['port_index'] || 0
backends = tasks.keep_if { |task| task['startedAt'] }.map do |task|
{
'name' => task['host'],
'host' => task['host'],
'port' => task['ports'][port_index],
}
end.sort_by { |task| task['name'] }

invalid_backends = backends.find_all { |b| b['port'].nil? }
if invalid_backends.any?
backends = backends - invalid_backends

invalid_backends.each do |backend|
log.error "synapse: port index #{port_index} not found in task's port array!"
end
end

set_backends(backends)
rescue EOFError
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to set @connection to nil here to ensure we attempt to connect back to marathon?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think so.

# If the persistent HTTP connection is severed, we can automatically
# retry
log.info "synapse: marathon HTTP API disappeared, reconnecting..."

retry if (retry_count += 1) == 1
rescue => e
log.warn "synapse: error in watcher thread: #{e.inspect}"
log.warn e.backtrace.join("\n")
@connection = nil
ensure
elapsed_time = Time.now - start
sleep (@check_interval - elapsed_time) if elapsed_time < @check_interval
end

@should_exit = true if only_run_once? # for testability
end
end

def splay
Random.rand(@check_interval)
end

def only_run_once?
false
end
end
end
191 changes: 191 additions & 0 deletions spec/lib/synapse/service_watcher_marathon_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
require 'spec_helper'
require 'synapse/service_watcher/marathon'

describe Synapse::ServiceWatcher::MarathonWatcher do
let(:mocksynapse) { double() }
let(:marathon_host) { '127.0.0.1' }
let(:marathon_port) { '8080' }
let(:app_name) { 'foo' }
let(:check_interval) { 11 }
let(:marathon_request_uri) { "#{marathon_host}:#{marathon_port}/v2/apps/#{app_name}/tasks" }
let(:config) do
{
'name' => 'foo',
'discovery' => {
'method' => 'marathon',
'marathon_api_url' => "http://#{marathon_host}:#{marathon_port}",
'application_name' => app_name,
'check_interval' => check_interval,
},
'haproxy' => {},
}
end
let(:marathon_response) { { 'tasks' => [] } }

subject { described_class.new(config, mocksynapse) }

before do
allow(subject.log).to receive(:warn)
allow(subject.log).to receive(:info)

allow(Thread).to receive(:new).and_yield
allow(subject).to receive(:sleep)
allow(subject).to receive(:only_run_once?).and_return(true)
allow(subject).to receive(:splay).and_return(0)

stub_request(:get, marathon_request_uri).
with(:headers => { 'Accept' => 'application/json' }).
to_return(:body => JSON.generate(marathon_response))
end

context 'with a valid argument hash' do
it 'instantiates' do
expect(subject).to be_a(Synapse::ServiceWatcher::MarathonWatcher)
end
end

describe '#watch' do
context 'when synapse cannot connect to marathon' do
before do
allow(Net::HTTP).to receive(:new).
with(marathon_host, marathon_port.to_i).
and_raise(Errno::ECONNREFUSED)
end

it 'does not crash' do
expect { subject.start }.not_to raise_error
end
end

it 'requests the proper API endpoint one time' do
subject.start
expect(a_request(:get, marathon_request_uri)).to have_been_made.times(1)
end

context 'when the API path (marathon_api_path) is customized' do
let(:config) do
super().tap do |c|
c['discovery']['marathon_api_path'] = '/v3/tasks/%{app}'
end
end

let(:marathon_request_uri) { "#{marathon_host}:#{marathon_port}/v3/tasks/#{app_name}" }

it 'calls the customized path' do
subject.start
expect(a_request(:get, marathon_request_uri)).to have_been_made.times(1)
end
end

context 'with tasks returned from marathon' do
let(:marathon_response) do
{
'tasks' => [
{
'host' => 'agouti.local',
'id' => 'my-app_1-1396592790353',
'ports' => [
31336,
31337
],
'stagedAt' => '2014-04-04T06:26:30.355Z',
'startedAt' => '2014-04-04T06:26:30.860Z',
'version' => '2014-04-04T06:26:23.051Z'
},
]
}
end
let(:expected_backend_hash) do
{
'name' => 'agouti.local', 'host' => 'agouti.local', 'port' => 31336
}
end

it 'adds the task as a backend' do
expect(subject).to receive(:set_backends).with([expected_backend_hash])
subject.start
end

context 'with a custom port_index' do
let(:config) do
super().tap do |c|
c['discovery']['port_index'] = 1
end
end

let(:expected_backend_hash) do
{
'name' => 'agouti.local', 'host' => 'agouti.local', 'port' => 31337
}
end

it 'adds the task as a backend' do
expect(subject).to receive(:set_backends).with([expected_backend_hash])
subject.start
end

context 'when that port_index does not exist' do
let(:config) do
super().tap { |c| c['discovery']['port_index'] = 999 }
end

it 'does not include the backend' do
expect(subject).to receive(:set_backends).with([])
subject.start
end
end
end

context 'with a task that has not started yet' do
let(:marathon_response) do
super().tap do |resp|
resp['tasks'] << {
'host' => 'agouti.local',
'id' => 'my-app_2-1396592790353',
'ports' => [
31336,
31337
],
'stagedAt' => '2014-04-04T06:26:30.355Z',
'startedAt' => nil,
'version' => '2014-04-04T06:26:23.051Z'
}
end
end

it 'filters tasks that have no startedAt value' do
expect(subject).to receive(:set_backends).with([expected_backend_hash])
subject.start
end
end

context 'when marathon returns invalid response' do
let(:marathon_response) { [] }
it 'does not blow up' do
expect { subject.start }.to_not raise_error
end
end

context 'when the job takes a long time for some reason' do
let(:job_duration) { 10 } # seconds

before do
actual_time = Time.now
time_offset = -1 * job_duration
allow(Time).to receive(:now) do
# on first run, return the right time
# subsequently, add in our job_duration offset
actual_time + (time_offset += job_duration)
end
allow(subject).to receive(:set_backends)
end

it 'only sleeps for the difference' do
expect(subject).to receive(:sleep).with(check_interval - job_duration)
subject.start
end
end
end
end
end

1 change: 1 addition & 0 deletions spec/spec_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
require "#{File.dirname(__FILE__)}/../lib/synapse"
require 'pry'
require 'support/configuration'
require 'webmock/rspec'

RSpec.configure do |config|
config.run_all_when_everything_filtered = true
Expand Down
1 change: 1 addition & 0 deletions synapse.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,5 @@ Gem::Specification.new do |gem|
gem.add_development_dependency "rspec", "~> 3.1.0"
gem.add_development_dependency "pry"
gem.add_development_dependency "pry-nav"
gem.add_development_dependency "webmock"
end