Permalink
Browse files

make worker time out on network reads and add tests

git-svn-id: http://code.sixapart.com/svn/gearman/trunk/api/ruby@278 011c6a6d-750f-0410-a5f6-93fdcd050bc4
  • Loading branch information...
1 parent 6f6b473 commit 1557583c1a7ab3cc5a1d5d87e6449f72043eaa70 derat committed May 10, 2007
Showing with 73 additions and 22 deletions.
  1. +36 −20 lib/gearman/worker.rb
  2. +37 −2 test/mock_worker.rb
View
@@ -30,11 +30,6 @@ module Gearman
# end
# loop { w.work }
class Worker
- # Number of seconds to sleep when we don't have work before polling the
- # job server again (if a job comes in while we're sleeping, the server
- # will wake us up).
- SLEEP_SEC = 10
-
# = Ability
#
# == Description
@@ -98,7 +93,7 @@ def initialize(job_servers=nil, prefix=nil, opts={})
@sockets = {} # "host:port" -> Socket
@abilities = {} # "funcname" -> Ability
@prefix = prefix
- @failed_servers = [] # "host:port"
+ @bad_servers = [] # "host:port"
@servers_mutex = Mutex.new
%w{client_id reconnect_sec
network_timeout_sec}.map {|s| s.to_sym }.each do |k|
@@ -122,8 +117,8 @@ def start_reconnect_thread
loop do
@servers_mutex.synchronize do
# If there are any failed servers, try to reconnect to them.
- if not @failed_servers.empty?
- update_job_servers(@sockets.keys + @failed_servers)
+ if not @bad_servers.empty?
+ update_job_servers(@sockets.keys + @bad_servers)
end
end
sleep @reconnect_sec
@@ -134,7 +129,7 @@ def start_reconnect_thread
def job_servers
servers = nil
@servers_mutex.synchronize do
- servers = @sockets.keys + @failed_servers
+ servers = @sockets.keys + @bad_servers
end
servers
end
@@ -154,7 +149,7 @@ def job_servers=(servers)
#
# @param servers "host:port"; either a single server or an array
def update_job_servers(servers)
- @failed_servers = []
+ @bad_servers = []
servers = Set.new(Util.normalize_job_servers(servers))
# Disconnect from servers that we no longer care about.
@sockets.each do |server,sock|
@@ -171,7 +166,7 @@ def update_job_servers(servers)
Util.log "Connecting to server #{server}"
@sockets[server] = connect(server)
rescue NetworkError
- @failed_servers << server
+ @bad_servers << server
Util.log "Unable to connect to #{server}"
end
end
@@ -283,40 +278,61 @@ def handle_job_assign(data, sock, hostport)
##
# Do a single job and return.
def work
+ req = Util.pack_request(:grab_job)
loop do
- req = Util.pack_request(:grab_job)
+ bad_servers = []
# We iterate through the servers in sorted order to make testing
# easier.
- @sockets.keys.sort.each do |hostport|
+ servers = nil
+ @servers_mutex.synchronize { servers = @sockets.keys.sort }
+ servers.each do |hostport|
Util.log "Sending grab_job to #{hostport}"
sock = @sockets[hostport]
Util.send_request(sock, req)
+
# Now that we've sent grab_job, we need to keep reading packets
# until we see a no_job or job_assign response (there may be a noop
# waiting for us in response to a previous pre_sleep).
loop do
- type, data = Util.read_response(sock)
+ begin
+ type, data = Util.read_response(sock, @network_timeout_sec)
+ rescue NetworkError
+ Util.log "Server #{hostport} timed out; marking bad"
+ bad_servers << hostport
+ break
+ end
case type
when :no_job
Util.log "Got no_job from #{hostport}"
break
when :job_assign
return if handle_job_assign(data, sock, hostport)
+ break
else
- # Keep on reading until we get a response to our grab_job
- # (either no_job or job_assign).
Util.log "Got #{type.to_s} from #{hostport}"
end
end
end
- Util.log "Sending pre_sleep and going to sleep for #{SLEEP_SEC} sec"
- @sockets.values.each do |sock|
- Util.send_request(sock, Util.pack_request(:pre_sleep))
+
+ @servers_mutex.synchronize do
+ bad_servers.each do |hostport|
+ @sockets[hostport].close if @sockets[hostport]
+ @bad_servers << hostport if @sockets[hostport]
+ @sockets.delete(hostport)
+ end
end
+
+ Util.log "Sending pre_sleep and going to sleep for #{2 * @reconnect_sec} sec"
+ @servers_mutex.synchronize do
+ @sockets.values.each do |sock|
+ Util.send_request(sock, Util.pack_request(:pre_sleep))
+ end
+ end
+
# FIXME: We could optimize things the next time through the 'each' by
# sending the first grab_job to one of the servers that had a socket
# with data in it. Not bothering with it for now.
- IO::select(@sockets.values, nil, nil, SLEEP_SEC)
+ IO::select(@sockets.values, nil, nil, 2 * @reconnect_sec)
end
end
end
View
@@ -121,8 +121,6 @@ def test_multiple_servers
s1.wait
s2.wait
- worker.network_timeout_sec = 0.1
-
# Stop the first job server and make the worker try to reconnect to
# both.
old_servers = worker.job_servers
@@ -162,4 +160,41 @@ def test_multiple_servers
s2.wait
w.wait
end
+
+ def test_timeout
+ server = FakeJobServer.new(self)
+ worker = nil
+ sock = nil
+
+ s = TestScript.new
+ w = TestScript.new
+
+ server_thread = Thread.new { s.loop_forever }.run
+ worker_thread = Thread.new { w.loop_forever }.run
+
+ w.exec {
+ worker = Gearman::Worker.new("localhost:#{server.port}", nil,
+ { :client_id => 'test',
+ :reconnect_sec => 0.1,
+ :network_timeout_sec => 0.1 })
+ }
+ s.exec { sock = server.expect_connection }
+ s.wait
+ s.exec { server.expect_request(sock, :set_client_id, 'test') }
+
+ w.exec { worker.add_ability('foo') {|d,j| 'bar' } }
+ s.exec { server.expect_request(sock, :can_do, 'foo') }
+
+ w.exec { worker.work }
+ s.exec { server.expect_request(sock, :grab_job) }
+ s.exec { sleep 0.11 }
+ s.wait
+
+ s.exec { sock = server.expect_connection }
+ s.wait
+ s.exec { server.expect_request(sock, :set_client_id, 'test') }
+ s.exec { server.expect_request(sock, :can_do, 'foo') }
+ s.exec { server.expect_request(sock, :grab_job) }
+ s.wait
+ end
end

0 comments on commit 1557583

Please sign in to comment.