Permalink
Browse files

preliminary connection reviver thread for client

  • Loading branch information...
1 parent de22f71 commit fbfa9f29d1df7824365bbb4a0922c446c853574e Arya Asemanfar committed Aug 5, 2009
Showing with 46 additions and 11 deletions.
  1. +1 −1 Rakefile
  2. +44 −9 lib/pandemic/client_side/cluster_connection.rb
  3. +1 −1 pandemic.gemspec
View
@@ -2,7 +2,7 @@ require 'rubygems'
require 'rake'
require 'echoe'
-Echoe.new('pandemic', '0.4.7') do |p|
+Echoe.new('pandemic', '0.4.8') do |p|
p.description = "A framework for distributing work for real-time services and offline tasks."
p.url = "https://github.com/arya/pandemic/"
p.author = "Arya Asemanfar"
@@ -26,15 +26,11 @@ def initialize
@connection_proxies[key] = ConnectionProxy.new(key, self)
host, port = host_port(server_addr)
Config.min_connections_per_server.times do
- connection = create_connection(key)
- if connection.alive?
- @connections << connection
- @available << connection
- @grouped_connections[key] << connection
- @grouped_available[key] << connection
- end
+ add_connection_for_key(key)
end
end
+
+ maintain_minimum_connections!
end
@@ -98,7 +94,11 @@ def checkout_connection(key)
if select_from.size > 0
connection = select_from.pop
connection.ensure_alive!
- break unless connection.alive?
+ if !connection.alive?
+ # it's dead
+ delete_connection(connection)
+ next
+ end
if key.nil?
@grouped_available[key].delete(connection)
@@ -149,7 +149,42 @@ def create_connection(key)
Connection.new(host, port, key)
end
- #TODO: a thread to manage killing and reviving connections
+ def add_connection_for_key(key)
+ connection = create_connection(key)
+ if connection.alive?
+ @connections << connection
+ @available << connection
+ @grouped_connections[key] << connection
+ @grouped_available[key] << connection
+ end
+ end
+
+ def delete_connection(connection)
+ @connections.delete(connection)
+ @available.delete(connection)
+ @grouped_connections[connection.key].delete(connection)
+ @grouped_available[connection.key].delete(connection)
+ end
+
+ def maintain_minimum_connections!
+ return if @maintain_minimum_connections_thread
+ @maintain_minimum_connections_thread = Thread.new do
+ loop do
+ sleep 60 #arbitrary
+ @mutex.synchronize do
+ @grouped_connections.keys.each do |key|
+ currently_exist = @grouped_connections[key].size
+ if currently_exist < Config.min_connections_per_server
+ (Config.min_connections_per_server - currently_exist).times do
+ add_connection_for_key(key)
+ end
+ end
+ end
+ end
+ end
+ end
+ end
+
end
end
end
View
@@ -2,7 +2,7 @@
Gem::Specification.new do |s|
s.name = %q{pandemic}
- s.version = "0.4.7"
+ s.version = "0.4.8"
s.required_rubygems_version = Gem::Requirement.new(">= 1.2") if s.respond_to? :required_rubygems_version=
s.authors = ["Arya Asemanfar"]

0 comments on commit fbfa9f2

Please sign in to comment.