Skip to content

Commit

Permalink
minor: replica set cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
TylerBrock committed Jul 27, 2012
1 parent 9c9bebe commit 5283701
Show file tree
Hide file tree
Showing 4 changed files with 12 additions and 67 deletions.
6 changes: 3 additions & 3 deletions lib/mongo/repl_set_connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,8 @@ def connect
@connect_mutex.synchronize do
return if @connected

discovered_seeds = @manager ? @manager.seeds : []
@manager = PoolManager.new(self, discovered_seeds)
seeds = @manager.nil? ? @seeds : @manager.seeds
@manager = PoolManager.new(self, seeds)

Thread.current[:managers] ||= Hash.new
Thread.current[:managers][self] = @manager
Expand Down Expand Up @@ -205,7 +205,7 @@ def refresh(opts={})
# to get the refresh lock.
def hard_refresh!
log(:info, "Initiating hard refresh...")
discovered_seeds = @manager ? @manager.seeds : []
discovered_seeds = @manager.seeds
new_manager = PoolManager.new(self, discovered_seeds | @seeds)
new_manager.connect

Expand Down
10 changes: 1 addition & 9 deletions lib/mongo/util/node.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ class Node
def initialize(connection, host_port)
@connection = connection
@host, @port = split_node(host_port)
@address = "#{host}:#{port}"
@address = "#{@host}:#{@port}"
@config = nil
@socket = nil
end
Expand Down Expand Up @@ -111,14 +111,6 @@ def arbiters
end
end

def tags
connect unless connected?
set_config unless @config
return {} unless config['tags'] && !config['tags'].empty?

config['tags']
end

def primary?
@config['ismaster'] == true || @config['ismaster'] == 1
end
Expand Down
61 changes: 7 additions & 54 deletions lib/mongo/util/pool_manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ class PoolManager

attr_reader :connection, :arbiters, :primary, :secondaries, :primary_pool,
:read_pool, :secondary_pool, :read, :secondary_pools, :hosts, :nodes,
:max_bson_size, :tags_to_pools, :tag_map, :members
:max_bson_size, :members, :seeds

# Create a new set of connection pools.
#
Expand All @@ -14,7 +14,6 @@ class PoolManager
# with the newly-discovered nodes being used first.
def initialize(connection, seeds=[])
@connection = connection
@original_seeds = connection.seeds
@seeds = seeds
@previously_connected = false
end
Expand All @@ -31,7 +30,6 @@ def connect
initialize_pools(members)
cache_discovered_seeds(members)
set_read_pool
set_tag_mappings

@members = members
@previously_connected = true
Expand Down Expand Up @@ -90,32 +88,12 @@ def closed?

def close(opts={})
begin
if @primary_pool
@primary_pool.close(opts)
end

if @secondary_pools
@secondary_pools.each do |pool|
pool.close(opts)
end
end

if @members
@members.each do |member|
member.close
end
end

rescue ConnectionFailure
pools.each { |pool| pool.close(opts) if pool }
@members.each { |member| member.close }
rescue ConnectionFailure
end
end

# The set of nodes that this class has discovered and
# successfully connected to.
def seeds
@seeds || []
end

private

def pools
Expand Down Expand Up @@ -153,8 +131,6 @@ def initialize_data
@secondary_pools = []
@hosts = Set.new
@members = Set.new
@tags_to_pools = {}
@tag_map = {}
@refresh_required = false
end

Expand All @@ -181,13 +157,6 @@ def connect_to_members
members
end

def associate_tags_with_pool(tags, pool)
tags.each_key do |key|
@tags_to_pools[{key => tags[key]}] ||= []
@tags_to_pools[{key => tags[key]}] << pool
end
end

# Initialize the connection pools for the primary and secondary nodes.
def initialize_pools(members)
members.each do |member|
Expand All @@ -212,7 +181,6 @@ def assign_primary(member)
:size => self.connection.pool_size,
:timeout => self.connection.pool_timeout,
:node => member)
associate_tags_with_pool(member.tags, @primary_pool)
end

def assign_secondary(member)
Expand All @@ -223,19 +191,6 @@ def assign_secondary(member)
:timeout => self.connection.pool_timeout,
:node => member)
@secondary_pools << pool
associate_tags_with_pool(member.tags, pool)
end

# If there's more than one pool associated with
# a given tag, choose a close one using the bucket method.
def set_tag_mappings
@tags_to_pools.each do |key, pool_list|
if pool_list.length == 1
@tag_map[key] = pool_list.first
else
@tag_map[key] = nearby_pool_from_set(pool_list)
end
end
end

# Pick a node from the set of possible secondaries.
Expand Down Expand Up @@ -280,7 +235,7 @@ def nearby_pool_from_set(pool_set)
#
# If we don't get a response, raise an exception.
def get_valid_seed_node
seed_list.each do |seed|
@seeds.each do |seed|
node = Mongo::Node.new(self.connection, seed)
if !node.connect
next
Expand All @@ -292,12 +247,10 @@ def get_valid_seed_node
end

raise ConnectionFailure, "Cannot connect to a replica set using seeds " +
"#{seed_list.map {|s| "#{s[0]}:#{s[1]}" }.join(', ')}"
"#{@seeds.map {|s| "#{s[0]}:#{s[1]}" }.join(', ')}"
end

def seed_list
@seeds | @original_seeds
end
private

def cache_discovered_seeds(members)
@seeds = members.map { |n| n.host_port }
Expand Down
2 changes: 1 addition & 1 deletion test/replica_sets/basic_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def test_accessors
assert @conn.secondary_pools.include?(@conn.read_pool)
assert_equal 90, @conn.refresh_interval
assert_equal @conn.refresh_mode, false
assert_equal 5, @conn.tag_map.keys.length unless @major_version < 2
#assert_equal 5, @conn.tag_map.keys.length unless @major_version < 2
end

context "Socket pools" do
Expand Down

0 comments on commit 5283701

Please sign in to comment.