Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

1.9 compat and commented out some debug logger calls

  • Loading branch information...
commit 20b6fdb6acab1e4e03c5590c0220f939969dd93e 1 parent bdcb114
@arya authored
View
1  lib/pandemic.rb
@@ -30,6 +30,7 @@
TCP_NO_DELAY_AVAILABLE =
RUBY_VERSION < '1.9' ? Socket.constants.include?('TCP_NODELAY') : Socket.constants.include?(:TCP_NODELAY)
+MONITOR_TIMEOUT_AVAILABLE = (RUBY_VERSION < '1.9')
def epidemic!(options = {})
if $pandemic_logger.nil?
$pandemic_logger = Logger.new(options[:log_file] || "pandemic.log")
View
2  lib/pandemic/connection_pool.rb
@@ -12,7 +12,7 @@ def initialize(options = {})
@max_connections = options[:max_connections] || 10
@min_connections = options[:min_connections] || 1
@connect_at_define = options.include?(:connect_at_define) ? options[:connect_at_define] : true
- @timeout = options[:timeout] || 3
+ @timeout = MONITOR_TIMEOUT_AVAILABLE ? options[:timeout] || 3 : nil
end
def add_connection!
View
21 lib/pandemic/server_side/client.rb
@@ -2,6 +2,7 @@ module Pandemic
module ServerSide
class Client
REQUEST_FLAGS = {:async => 'a'}
+ EMPTY_STRING = ""
REQUEST_REGEXP = /^([0-9]+)(?: ([#{REQUEST_FLAGS.values.join('')}]*))?$/
class DisconnectClient < Exception; end
include Util
@@ -22,21 +23,21 @@ def listen
@listener_thread = Thread.new do
begin
while @server.running
- debug("Waiting for incoming request")
+ # debug("Waiting for incoming request")
request = @connection.gets
- info("Received incoming request")
+ # info("Received incoming request")
@received_requests += 1
if request.nil?
- debug("Incoming request is nil")
+ # debug("Incoming request is nil")
@connection.close
@connection = nil
break
elsif request.strip! =~ REQUEST_REGEXP
- size, flags = $1.to_i, $2.to_s.split("")
- debug("Reading request body (size #{size})")
+ size, flags = $1.to_i, $2.to_s.split(EMPTY_STRING)
+ # debug("Reading request body (size #{size})")
body = @connection.read(size)
- debug("Finished reading request body")
+ # debug("Finished reading request body")
if flags.include?(REQUEST_FLAGS[:async])
Thread.new do
handle_request(body)
@@ -45,18 +46,18 @@ def listen
else
response = handle_request(body)
if response
- debug("Writing response to client")
+ # debug("Writing response to client")
# the connection could be closed, we'll let it be rescued if it is.
@connection.write("#{response.size}\n#{response}")
@connection.flush
- debug("Finished writing response to client")
+ # debug("Finished writing response to client")
else
- debug("Writing error code to client")
+ # debug("Writing error code to client")
@connection.write("-1\n")
@connection.flush
- debug("Finished writing error code to client")
+ # debug("Finished writing error code to client")
end
@responded_requests.inc
end
View
48 lib/pandemic/server_side/peer.rb
@@ -29,18 +29,18 @@ def connected?
end
def client_request(request, body)
- debug("Sending client's request to peer")
- debug("Connection pool has #{@connection_pool.available_count} of #{@connection_pool.connections_count} connections available")
+ # debug("Sending client's request to peer")
+ # debug("Connection pool has #{@connection_pool.available_count} of #{@connection_pool.connections_count} connections available")
# TODO: Consider adding back threads here if it will be faster that way in Ruby 1.9
@connection_pool.with_connection do |connection|
if connection && !connection.closed?
@pending_requests.synchronize do
@pending_requests[request.hash] = request
end
- debug("Writing client's request")
+ # debug("Writing client's request")
connection.write("PROCESS #{request.hash} #{body.size}\n#{body}")
connection.flush
- debug("Finished writing client's request")
+ # debug("Finished writing client's request")
end # TODO: else? fail silently? reconnect?
end
end
@@ -55,15 +55,15 @@ def add_incoming_connection(conn)
begin
debug("Incoming connection thread started")
while @server.running
- debug("Listening for incoming requests")
+ # debug("Listening for incoming requests")
request = connection.gets
- debug("Read incoming request from peer")
+ # debug("Read incoming request from peer")
if request.nil?
- debug("Incoming connection request is nil")
+ # debug("Incoming connection request is nil")
break
else
- debug("Received incoming (#{request.strip})")
+ # debug("Received incoming (#{request.strip})")
handle_incoming_request(request, connection) if request =~ /^PROCESS/
handle_incoming_response(request, connection) if request =~ /^RESPONSE/
end
@@ -114,23 +114,23 @@ def initialize_connection_pool
end
def handle_incoming_request(request, connection)
- debug("Identified as request")
+ # debug("Identified as request")
if request.strip =~ /^PROCESS ([A-Za-z0-9]+) ([0-9]+)$/
hash = $1
size = $2.to_i
- debug("Incoming request: #{hash} #{size}")
+ # debug("Incoming request: #{hash} #{size}")
begin
- debug("Reading request body")
+ # debug("Reading request body")
request_body = connection.read(size)
- debug("Finished reading request body")
+ # debug("Finished reading request body")
rescue EOFError, TruncatedDataError
- debug("Failed to read request body")
+ # debug("Failed to read request body")
# TODO: what to do here?
return false
rescue Exception => e
warn("Unhandled exception in incoming request read:\n#{e.inspect}\n#{e.backtrace.join("\n")}")
end
- debug("Processing body")
+ # debug("Processing body")
process_request(hash, request_body)
else
warn("Malformed incoming request: #{request.strip}")
@@ -143,13 +143,13 @@ def handle_incoming_response(response, connection)
if response.strip =~ /^RESPONSE ([A-Za-z0-9]+) ([0-9]+)$/
hash = $1
size = $2.to_i
- debug("Incoming response: #{hash} #{size}")
+ # debug("Incoming response: #{hash} #{size}")
begin
- debug("Reading response body")
+ # debug("Reading response body")
response_body = connection.read(size)
- debug("Finished reading response body")
+ # debug("Finished reading response body")
rescue EOFError, TruncatedDataError
- debug("Failed to read response body")
+ # debug("Failed to read response body")
# TODO: what to do here?
return false
rescue Exception => e
@@ -167,14 +167,14 @@ def handle_incoming_response(response, connection)
def process_request(hash, body)
Thread.new do
begin
- debug("Starting processing thread (#{hash})")
+ # debug("Starting processing thread (#{hash})")
response = @server.process(body)
- debug("Processing finished (#{hash})")
+ # debug("Processing finished (#{hash})")
@connection_pool.with_connection do |connection|
- debug( "Sending response (#{hash})")
+ # debug( "Sending response (#{hash})")
connection.write("RESPONSE #{hash} #{response.size}\n#{response}")
connection.flush
- debug( "Finished sending response (#{hash})")
+ # debug( "Finished sending response (#{hash})")
end
rescue Exception => e
warn("Unhandled exception in process request thread:\n#{e.inspect}\n#{e.backtrace.join("\n")}")
@@ -185,10 +185,10 @@ def process_request(hash, body)
def process_response(hash, body)
Thread.new do
begin
- debug("Finding original request (#{hash})")
+ # debug("Finding original request (#{hash})")
original_request = @pending_requests.synchronize { @pending_requests.delete(hash) }
if original_request
- debug("Found original request, adding response")
+ # debug("Found original request, adding response")
original_request.add_response(body)
else
warn("Original response not found (#{hash})")
View
10 lib/pandemic/server_side/request.rb
@@ -32,10 +32,10 @@ def add_response(response)
@@late_responses.inc
return
end
- debug("Adding response")
+ # debug("Adding response")
@responses << response
if @max_responses && @responses.size >= @max_responses
- debug("Hit max responses, waking up waiting thread")
+ # debug("Hit max responses, waking up waiting thread")
wakeup_waiting_thread
@complete = true
end
@@ -62,6 +62,12 @@ def wait_for_responses
return if @complete
if Config.response_timeout <= 0
@waiter.wait
+ elsif !MONITOR_TIMEOUT_AVAILABLE
+ Thread.new do
+ sleep Config.response_timeout
+ wakeup_waiting_thread
+ end
+ @waiter.wait
else
@waiter.wait(Config.response_timeout)
end
View
28 lib/pandemic/server_side/server.rb
@@ -57,14 +57,14 @@ def start
@running = true
@running_since = Time.now
- debug("Connecting to peers")
+ # debug("Connecting to peers")
@peers.values.each { |peer| peer.connect }
@listener_thread = Thread.new do
begin
while @running
begin
- debug("Listening")
+ # debug("Listening")
conn = @listener.accept
Thread.new(conn) { |c| handle_connection(c) }
rescue Errno::ECONNABORTED, Errno::EINTR # TODO: what else can wrong here? this should be more robust.
@@ -95,15 +95,15 @@ def handle_connection(connection)
connection.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1) if TCP_NO_DELAY_AVAILABLE
identification = connection.gets.strip
- info("Incoming connection from #{connection.peeraddr.values_at(3,1).join(":")} (#{identification})")
+ # info("Incoming connection from #{connection.peeraddr.values_at(3,1).join(":")} (#{identification})")
if identification =~ /^SERVER ([a-zA-Z0-9.]+:[0-9]+)$/
- debug("Recognized as peer")
+ # debug("Recognized as peer")
host, port = host_port($1)
matching_peer = @peers.values.detect { |peer| [peer.host, peer.port] == [host, port] }
if matching_peer
- debug("Found matching peer")
+ # debug("Found matching peer")
else
- debug("Didn't find matching peer, adding it")
+ # debug("Didn't find matching peer, adding it")
matching_peer = @peers.synchronize do
hostport = "#{host}:#{port}"
@servers.push(hostport) unless @servers.include?(hostport)
@@ -112,13 +112,13 @@ def handle_connection(connection)
end
matching_peer.add_incoming_connection(connection)
elsif identification =~ /^CLIENT$/
- debug("Recognized as client")
+ # debug("Recognized as client")
@clients_mutex.synchronize do
@clients << Client.new(connection, self).listen
@total_clients += 1
end
elsif identification =~ /^stats$/
- debug("Stats request received")
+ # debug("Stats request received")
print_stats(connection)
else
debug("Unrecognized connection. Closing.")
@@ -130,10 +130,10 @@ def handle_connection(connection)
end
def handle_client_request(request)
- info("Handling client request")
+ # info("Handling client request")
map = @handler_instance.partition(request, connection_statuses)
request.max_responses = map.size
- debug("Sending client request to #{map.size} handlers (#{request.hash})")
+ # debug("Sending client request to #{map.size} handlers (#{request.hash})")
map.each do |peer, body|
if @peers[peer]
@@ -142,7 +142,7 @@ def handle_client_request(request)
end
if map[signature]
- debug("Processing #{request.hash}")
+ # debug("Processing #{request.hash}")
Thread.new do
begin
request.add_response(self.process(map[signature]))
@@ -154,10 +154,10 @@ def handle_client_request(request)
@requests_per_second.hit
- debug("Waiting for responses")
+ # debug("Waiting for responses")
request.wait_for_responses
- debug("Done waiting for responses, calling reduce")
+ # debug("Done waiting for responses, calling reduce")
@handler_instance.reduce(request)
end
@@ -173,7 +173,7 @@ def process(body)
end
def signature
- "#{@host}:#{@port}"
+ @signature ||= "#{@host}:#{@port}"
end
def connection_statuses
Please sign in to comment.
Something went wrong with that request. Please try again.