Skip to content

Commit

Permalink
Ask for machines after resolving the leader
Browse files Browse the repository at this point in the history
And flush the leader and machines URI caches when the leader changes -- but don't flush the machines cache on a leader failure.
  • Loading branch information
iconara committed Aug 26, 2013
1 parent 5ba1a26 commit dbd91a9
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 15 deletions.
24 changes: 14 additions & 10 deletions lib/etcd/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ def initialize(options={})
end

def connect
@machines = machines
change_leader(leader)
self
rescue AllNodesDownError => e
Expand Down Expand Up @@ -339,9 +338,9 @@ def machines_uri
def request(method, uri, args={})
@http_client.request(method, uri, args.merge(follow_redirect: true))
rescue HTTPClient::TimeoutError => e
old_leader, old_port = @leader, @port
old_leader, old_port = @host, @port
handle_leader_down
uri.sub!("#{old_leader}:#{old_port}", "#{@leader}:#{@port}")
uri.sub!("#{old_leader}:#{old_port}", "#{@host}:#{@port}")
retry
end

Expand All @@ -366,26 +365,31 @@ def extract_info(data)

def handle_redirected(uri, response)
location = URI.parse(response.header[S_LOCATION][0])
change_leader(location.host, location.port)
change_leader(location.host, port: location.port)
@http_client.default_redirect_uri_callback(uri, response)
end

def handle_leader_down
if @machines && @machines.any?
@machines = @machines.reject { |m| m == @leader }
change_leader(@machines.shift)
@machines = @machines.reject { |m| m == @host }
change_leader(@machines.shift, flush_cache: false)
else
raise AllNodesDownError, 'All known nodes are down'
end
end

def change_leader(new_leader, port=nil)
if port
@leader, @port = new_leader, port
def change_leader(new_leader, options={})
if options[:port]
@host, @port = new_leader, options[:port]
else
@leader, port = new_leader.split(':')
@host, port = new_leader.split(':')
@port = port.to_i
end
@leader_uri = nil
@machines_uri = nil
if options[:flush_cache] != false
@machines = machines
end
end

# @private
Expand Down
24 changes: 19 additions & 5 deletions spec/etcd/client_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ module Etcd
end

let :port do
rand(2**16)
50_000
end

let :base_uri do
Expand All @@ -34,8 +34,8 @@ module Etcd
end

before do
stub_request(:get, machines_uri).to_return(body: machines.join(','))
stub_request(:get, leader_uri).to_return(body: "#{host}:#{port}")
stub_request(:get, machines_uri).to_return(body: machines.join(','))
end

describe '#connect' do
Expand All @@ -52,6 +52,7 @@ module Etcd

it 'discards the seed node and instead talks to the leader' do
stub_request(:get, leader_uri).to_return(body: "#{host}:#{port + 1}")
stub_request(:get, "http://#{host}:#{port + 1}/machines").to_return(body: machines.join(','))
stub_request(:get, "#{base_uri}/keys/foo").to_return(body: MultiJson.dump({'value' => 'wrong value'}))
stub_request(:get, "#{base_uri.sub(port.to_s, (port + 1).to_s)}/keys/foo").to_return(body: MultiJson.dump({'value' => 'right value'}))
client.get('/foo').should == 'right value'
Expand Down Expand Up @@ -94,19 +95,26 @@ module Etcd
end

context 'when not talking to the master' do
it 'follows redirects' do
before do
stub_request(:get, "#{base_uri}/keys/foo").to_return(status: 307, headers: {'Location' => 'http://example.com:7654/v1/keys/foo'})
stub_request(:get, 'http://example.com:7654/machines').to_return(body: %w[example.com:7654 example.com:7655].join(','))
stub_request(:get, 'http://example.com:7654/v1/keys/foo').to_return(body: MultiJson.dump({'value' => 'bar'}))
end

it 'follows redirects' do
client.get('/foo').should == 'bar'
end

it 'sets the new host as the new leader when it is redirected' do
stub_request(:get, "#{base_uri}/keys/foo").to_return(status: 307, headers: {'Location' => 'http://example.com:7654/v1/keys/foo'})
stub_request(:get, 'http://example.com:7654/v1/keys/foo').to_return(body: MultiJson.dump({'value' => 'bar'}))
client.get('/foo')
client.get('/foo').should == 'bar'
WebMock.should have_requested(:get, "#{base_uri}/keys/foo").once
end

it 'asks for a new list of cluster machines' do
client.get('/foo')
WebMock.should have_requested(:get, 'http://example.com:7654/machines').once
end
end

context 'when nodes go down' do
Expand All @@ -132,6 +140,7 @@ module Etcd
stub_request(:get, uris[0]).to_timeout
stub_request(:get, uris[1]).to_return(status: 307, headers: {'Location' => uris[2]})
stub_request(:get, uris[2]).to_return(body: MultiJson.dump({'value' => 'bar'}))
stub_request(:get, uris[2].sub('/v1/keys/thing', '/machines')).to_return(body: [uris[1], uris[2]].join(','))
stub_request(:get, uris[2].sub('/thing', '/bar')).to_return(body: MultiJson.dump({'value' => 'baz'}))
client.get('/thing').should == 'bar'
client.get('/bar').should == 'baz'
Expand Down Expand Up @@ -432,6 +441,11 @@ module Etcd
end

describe '#leader' do
before do
stub_request(:get, 'http://localhost:4001/leader').to_return(body: 'localhost:4001')
stub_request(:get, 'http://localhost:4001/machines').to_return(body: 'localhost:4001,localhost:4002')
end

it 'returns the host and port of the leader' do
stub_request(:get, leader_uri).to_return(body: 'localhost:4001')
client.leader.should == 'localhost:4001'
Expand Down

0 comments on commit dbd91a9

Please sign in to comment.