Permalink
Browse files

- add more worker tests

- turn on worker reconnection code
- clean up worker polling code


git-svn-id: http://code.sixapart.com/svn/gearman/trunk/api/ruby@277 011c6a6d-750f-0410-a5f6-93fdcd050bc4
  • Loading branch information...
1 parent 8340bf0 commit 6f6b473fc3c583a33a1e05636941389cb7303a89 derat committed May 10, 2007
Showing with 117 additions and 38 deletions.
  1. +53 −32 lib/gearman/worker.rb
  2. +54 −5 test/mock_worker.rb
  3. +10 −1 test/testlib.rb
View
@@ -95,12 +95,13 @@ def report_status(numerator, denominator)
def initialize(job_servers=nil, prefix=nil, opts={})
chars = ('a'..'z').to_a
@client_id = Array.new(30) { chars[rand(chars.size)] }.join
- @sockets = {}
- @abilities = {}
+ @sockets = {} # "host:port" -> Socket
+ @abilities = {} # "funcname" -> Ability
@prefix = prefix
- @failed_servers = []
+ @failed_servers = [] # "host:port"
@servers_mutex = Mutex.new
- %w{client_id reconnect_sec}.map {|s| s.to_sym }.each do |k|
+ %w{client_id reconnect_sec
+ network_timeout_sec}.map {|s| s.to_sym }.each do |k|
instance_variable_set "@#{k}", opts[k]
opts.delete k
end
@@ -109,53 +110,65 @@ def initialize(job_servers=nil, prefix=nil, opts={})
'Invalid worker args: ' + opts.keys.sort.join(', ')
end
@reconnect_sec = 30 if not @reconnect_sec
+ @network_timeout_sec = 5 if not @network_timeout_sec
self.job_servers = job_servers if job_servers
- # FIXME: clean this up, test it, and start using it
- #start_reconnect_thread
+ start_reconnect_thread
end
+ attr_accessor :client_id, :reconnect_sec, :network_timeout_sec
# Start a thread to repeatedly attempt to connect to down job servers.
def start_reconnect_thread
Thread.new do
loop do
@servers_mutex.synchronize do
- if @failed_servers.size > 0
- job_servers_int(@sockets.keys + @failed_servers)
+ # If there are any failed servers, try to reconnect to them.
+ if not @failed_servers.empty?
+ update_job_servers(@sockets.keys + @failed_servers)
end
end
sleep @reconnect_sec
end
end.run
end
+ def job_servers
+ servers = nil
+ @servers_mutex.synchronize do
+ servers = @sockets.keys + @failed_servers
+ end
+ servers
+ end
+
##
# Connect to job servers to be used by this worker.
#
# @param servers "host:port"; either a single server or an array
def job_servers=(servers)
@servers_mutex.synchronize do
- job_servers_int(servers)
+ update_job_servers(servers)
end
end
# Internal function to actually connect to servers.
# Caller must acquire @servers_mutex before calling us.
#
# @param servers "host:port"; either a single server or an array
- def job_servers_int(servers)
+ def update_job_servers(servers)
@failed_servers = []
servers = Set.new(Util.normalize_job_servers(servers))
# Disconnect from servers that we no longer care about.
@sockets.each do |server,sock|
- if not servers[server]
- sock.disconnect
+ if not servers.include? server
+ Util.log "Disconnecting from old server #{server}"
+ sock.close
@sockets.delete(server)
end
end
# Connect to new servers.
servers.each do |server|
if not @sockets[server]
begin
+ Util.log "Connecting to server #{server}"
@sockets[server] = connect(server)
rescue NetworkError
@failed_servers << server
@@ -164,14 +177,15 @@ def job_servers_int(servers)
end
end
end
- private :job_servers_int
+ private :update_job_servers
##
# Connect to a job server.
#
# @param hostport "hostname:port"
def connect(hostport)
begin
+ # FIXME: handle timeouts
sock = TCPSocket.new(*hostport.split(':'))
rescue Errno::ECONNREFUSED
raise NetworkError
@@ -228,21 +242,23 @@ def remove_ability(func)
##
# Handle a job_assign packet.
#
- # @param data data in the packet
- # @param sock Socket on which the packet arrived
- def handle_job_assign(data, sock)
+ # @param data data in the packet
+ # @param sock Socket on which the packet arrived
+ # @param hostport "host:port"
+ def handle_job_assign(data, sock, hostport)
handle, func, data = data.split("\0", 3)
if not func
- Util.err "Ignoring job_assign with no function"
+ Util.err "Ignoring job_assign with no function from #{hostport}"
return false
end
- Util.log "Got job_assign with handle #{handle} and #{data.size} byte(s)"
+ Util.log "Got job_assign with handle #{handle} and #{data.size} byte(s) " +
+ "from #{hostport}"
ability = @abilities[func]
if not ability
Util.err "Ignoring job_assign for unsupported func #{func} " +
- "with handle #{handle}"
+ "with handle #{handle} from #{hostport}"
Util.send_request(sock, Util.pack_request(:work_fail, handle))
return false
end
@@ -252,10 +268,11 @@ def handle_job_assign(data, sock)
cmd = nil
if ret
ret = ret.to_s
- Util.log "Sending work_complete for #{handle} with #{ret.size} byte(s)"
+ Util.log "Sending work_complete for #{handle} with #{ret.size} byte(s) " +
+ "to #{hostport}"
cmd = Util.pack_request(:work_complete, "#{handle}\0#{ret}")
else
- Util.log "Sending work_fail for #{handle}"
+ Util.log "Sending work_fail for #{handle} to #{hostport}"
cmd = Util.pack_request(:work_fail, handle)
end
@@ -267,34 +284,38 @@ def handle_job_assign(data, sock)
# Do a single job and return.
def work
loop do
- @sockets.values.each do |sock|
- Util.log "Sending grab_job"
- Util.send_request(sock, Util.pack_request(:grab_job))
+ req = Util.pack_request(:grab_job)
+ # We iterate through the servers in sorted order to make testing
+ # easier.
+ @sockets.keys.sort.each do |hostport|
+ Util.log "Sending grab_job to #{hostport}"
+ sock = @sockets[hostport]
+ Util.send_request(sock, req)
# Now that we've sent grab_job, we need to keep reading packets
# until we see a no_job or job_assign response (there may be a noop
# waiting for us in response to a previous pre_sleep).
loop do
type, data = Util.read_response(sock)
case type
- when :noop
- # FIXME: double-check this... can we really just blindly read
- # when we get a noop without selecting first?
- Util.log "Got noop"
- next
when :no_job
- Util.log "Got no_job"
+ Util.log "Got no_job from #{hostport}"
break
when :job_assign
- return if handle_job_assign(data, sock)
+ return if handle_job_assign(data, sock, hostport)
else
- Util.log "Got #{type.to_s}"
+ # Keep on reading until we get a response to our grab_job
+ # (either no_job or job_assign).
+ Util.log "Got #{type.to_s} from #{hostport}"
end
end
end
Util.log "Sending pre_sleep and going to sleep for #{SLEEP_SEC} sec"
@sockets.values.each do |sock|
Util.send_request(sock, Util.pack_request(:pre_sleep))
end
+ # FIXME: We could optimize things the next time through the 'each' by
+ # sending the first grab_job to one of the servers that had a socket
+ # with data in it. Not bothering with it for now.
IO::select(@sockets.values, nil, nil, SLEEP_SEC)
end
end
View
@@ -60,6 +60,14 @@ def test_complete
def test_multiple_servers
server1 = FakeJobServer.new(self)
server2 = FakeJobServer.new(self)
+ # This is cheesy. We want to know the order that Worker#work will
+ # iterate through the servers, so we make sure that server1 will be the
+ # first one when the names are lexographically sorted.
+ if server2.port.to_s < server1.port.to_s
+ tmp = server1
+ server1 = server2
+ server2 = tmp
+ end
worker = nil
sock1, sock2 = nil
@@ -72,7 +80,9 @@ def test_multiple_servers
worker_thread = Thread.new { w.loop_forever }.run
# Create a worker, which should connect to both servers.
- w.exec { worker = Gearman::Worker.new(nil, nil, { :client_id => 'test' }) }
+ w.exec {
+ worker = Gearman::Worker.new(
+ nil, nil, { :client_id => 'test', :reconnect_sec => 0.1 }) }
w.exec { worker.add_ability('foo') {|d,j| 'bar' } }
w.exec {
worker.job_servers =
@@ -107,10 +117,49 @@ def test_multiple_servers
s2.exec { server2.send_response(sock2, :job_assign, "a\0foo\0") }
s2.exec { server2.expect_request(sock2, :work_complete, "a\0bar") }
- # FIXME: We're not going through the servers in a guaranteed order, so
- # s1 may never get a second grab_job. Need to figure out how to deal
- # with this...
- #s1.wait
+ w.wait
+ s1.wait
+ s2.wait
+
+ worker.network_timeout_sec = 0.1
+
+ # Stop the first job server and make the worker try to reconnect to
+ # both.
+ old_servers = worker.job_servers
+ server1.stop
+ worker.job_servers = []
+ worker.job_servers = old_servers
+ s2.exec { sock2 = server2.expect_connection }
+ s2.wait
+
+ # It shouldn't have any trouble with the second server. Tell it to go
+ # to work.
+ s2.exec { server2.expect_request(sock2, :set_client_id, 'test') }
+ s2.exec { server2.expect_request(sock2, :can_do, 'foo') }
+ w.exec { worker.work }
+ s2.exec { server2.expect_request(sock2, :grab_job) }
+ s2.exec { server2.send_response(sock2, :no_job) }
+ s2.exec { server2.expect_request(sock2, :pre_sleep) }
+ s2.wait
+
+ # Start the first server and wait for the worker to connect to it and
+ # register.
+ server1.start
+ s1.exec { sock1 = server1.expect_connection }
+ s1.wait
+ s1.exec { server1.expect_request(sock1, :set_client_id, 'test') }
+ s1.exec { server1.expect_request(sock1, :can_do, 'foo') }
+ s1.wait
+
+ # Let the second server wake the worker up and then give it a job.
+ s2.exec { server2.send_response(sock2, :noop) }
+ s1.exec { server1.expect_request(sock1, :grab_job) }
+ s1.exec { server1.send_response(sock1, :no_job) }
+ s2.exec { server2.expect_request(sock2, :grab_job) }
+ s2.exec { server2.send_response(sock2, :job_assign, "a\0foo\0") }
+ s2.exec { server2.expect_request(sock2, :work_complete, "a\0bar") }
+ s1.wait
s2.wait
+ w.wait
end
end
View
@@ -11,6 +11,14 @@ def initialize(tester)
end
attr_reader :port
+ def stop
+ @serv.close
+ end
+
+ def start
+ @serv = TCPserver.open(@port)
+ end
+
def expect_connection
sock = @serv.accept
return sock
@@ -48,10 +56,11 @@ def loop_forever
f = nil
@mutex.synchronize do
@cv.wait(@mutex) if @blocks.empty?
- f = @blocks.shift
+ f = @blocks[0] if not @blocks.empty?
end
f.call if f
@mutex.synchronize do
+ @blocks.shift
@cv.signal if @blocks.empty?
end
end

0 comments on commit 6f6b473

Please sign in to comment.