Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

- add worker test for multiple servers

- add a function to do a timed network read, although we're not using
  timeouts yet
- add support for uniq on tasks
- add some code to make workers reconnect to down job servers; currently
  disabled


git-svn-id: http://code.sixapart.com/svn/gearman/trunk/api/ruby@274 011c6a6d-750f-0410-a5f6-93fdcd050bc4
  • Loading branch information...
commit 8340bf093f67413526b96de8efd4c539800697ad 1 parent 025b575
derat authored
View
2  lib/gearman/client.rb
@@ -64,7 +64,7 @@ def get_socket(hostport, num_retries=3)
return sock
end
end
- raise RuntimeError, "Unable to connect to job server"
+ raise RuntimeError, "Unable to connect to job server #{hostport}"
end
##
View
11 lib/gearman/task.rb
@@ -92,6 +92,17 @@ def handle_status(numerator, denominator)
end
##
+ # Return a hash that we can use to execute identical tasks on the same
+ # job server.
+ #
+ # @return hashed value, based on @arg if @uniq is '-', on @uniq if it's
+ # set to something else, and just nil if @uniq is nil
+ def get_hash_for_merging
+ merge_on = @uniq and @uniq == '-' ? @arg : @uniq
+ merge_on ? merge_on.hash : nil
+ end
+
+ ##
# Construct a packet to submit this task to a job server.
#
# @param background ??
View
11 lib/gearman/taskset.rb
@@ -16,6 +16,7 @@ def initialize(client)
@tasks_in_progress = {} # "host:port//handle" -> [job1, job2, ...]
@finished_tasks = [] # tasks that have completed or failed
@sockets = {} # "host:port" -> Socket
+ @merge_hash_to_hostport = {} # Fixnum -> "host:port"
end
##
@@ -28,11 +29,13 @@ def add_task(*args)
req = task.get_submit_packet(@client.prefix)
@tasks_waiting_for_handle << task
- # we need to loop here in case we get a bad job server, or the job
- # creation fails (see how the server reports this to us), or ...
- hostport = @client.get_job_server
+ # FIXME: We need to loop here in case we get a bad job server, or the
+ # job creation fails (see how the server reports this to us), or ...
+ merge_hash = task.get_hash_for_merging
+ hostport = (@merge_hash_to_hostport[merge_hash] or @client.get_job_server)
+ @merge_hash_to_hostport[merge_hash] = hostport if merge_hash
sock = (@sockets[hostport] or @client.get_socket(hostport))
- Util.log "Using socket #{sock.inspect}"
+ Util.log "Using socket #{sock.inspect} for #{hostport}"
Util.send_request(sock, req)
read_packet(sock) while @tasks_waiting_for_handle.size > 0
View
34 lib/gearman/util.rb
@@ -86,18 +86,40 @@ def Util.get_task_from_args(*args)
end
##
+ # Read from a socket, giving up if it doesn't finish quickly enough.
+ # NetworkError is thrown if we don't read all the bytes in time.
+ #
+ # @param sock Socket from which we read
+ # @param len number of bytes to read
+ # @param timeout maximum number of seconds we'll take; nil for no timeout
+ # @return full data that was read
+ def Util.timed_recv(sock, len, timeout=nil)
+ data = ''
+ end_time = Time.now.to_f + timeout if timeout
+ while data.size < len and (not timeout or Time.now.to_f < end_time) do
+ IO::select([sock], nil, nil, timeout ? end_time - Time.now.to_f : nil) \
+ or break
+ data += sock.readpartial(len - data.size)
+ end
+ if data.size < len
+ raise NetworkError, "Read #{data.size} byte(s) instead of #{len}"
+ end
+ data
+ end
+
+ ##
# Read a response packet from a socket.
#
# @param sock Socket connected to a job server
- # @param timeout timeout in seconds, 0 to disable (grr, doesn't work)
+ # @param timeout timeout in seconds, nil for no timeout
# @return array consisting of integer packet type and data
- def Util.read_response(sock, timeout=0)
- # FIXME: use a non-blocking socket and do the work ourselves, i guess...
- #sock.setsockopt(Socket::IPPROTO_TCP, Socket::SO_RCVTIMEO, timeout)
- head = sock.recv(12)
+ def Util.read_response(sock, timeout=nil)
+ end_time = Time.now.to_f + timeout if timeout
+ head = timed_recv(sock, 12, timeout)
magic, type, len = head.unpack('a4NN')
raise ProtocolError, "Invalid magic '#{magic}'" unless magic == "\0RES"
- buf = len > 0 ? sock.recv(len) : ''
+ buf = len > 0 ?
+ timed_recv(sock, len, timeout ? end_time - Time.now.to_f : nil) : ''
type = COMMANDS[type]
raise ProtocolError, "Invalid packet type #{type}" unless type
[type, buf]
View
49 lib/gearman/worker.rb
@@ -2,6 +2,7 @@
require 'set'
require 'socket'
+require 'thread'
module Gearman
@@ -97,7 +98,9 @@ def initialize(job_servers=nil, prefix=nil, opts={})
@sockets = {}
@abilities = {}
@prefix = prefix
- %w{client_id}.map {|s| s.to_sym }.each do |k|
+ @failed_servers = []
+ @servers_mutex = Mutex.new
+ %w{client_id reconnect_sec}.map {|s| s.to_sym }.each do |k|
instance_variable_set "@#{k}", opts[k]
opts.delete k
end
@@ -105,7 +108,24 @@ def initialize(job_servers=nil, prefix=nil, opts={})
raise InvalidArgsError,
'Invalid worker args: ' + opts.keys.sort.join(', ')
end
+ @reconnect_sec = 30 if not @reconnect_sec
self.job_servers = job_servers if job_servers
+ # FIXME: clean this up, test it, and start using it
+ #start_reconnect_thread
+ end
+
+ # Start a thread to repeatedly attempt to connect to down job servers.
+ def start_reconnect_thread
+ Thread.new do
+ loop do
+ @servers_mutex.synchronize do
+ if @failed_servers.size > 0
+ job_servers_int(@sockets.keys + @failed_servers)
+ end
+ end
+ sleep @reconnect_sec
+ end
+ end.run
end
##
@@ -113,6 +133,17 @@ def initialize(job_servers=nil, prefix=nil, opts={})
#
# @param servers "host:port"; either a single server or an array
def job_servers=(servers)
+ @servers_mutex.synchronize do
+ job_servers_int(servers)
+ end
+ end
+
+ # Internal function to actually connect to servers.
+ # Caller must acquire @servers_mutex before calling us.
+ #
+ # @param servers "host:port"; either a single server or an array
+ def job_servers_int(servers)
+ @failed_servers = []
servers = Set.new(Util.normalize_job_servers(servers))
# Disconnect from servers that we no longer care about.
@sockets.each do |server,sock|
@@ -124,17 +155,27 @@ def job_servers=(servers)
# Connect to new servers.
servers.each do |server|
if not @sockets[server]
- @sockets[server] = connect(server)
+ begin
+ @sockets[server] = connect(server)
+ rescue NetworkError
+ @failed_servers << server
+ Util.log "Unable to connect to #{server}"
+ end
end
end
end
+ private :job_servers_int
##
# Connect to a job server.
#
# @param hostport "hostname:port"
def connect(hostport)
- sock = TCPSocket.new(*hostport.split(':'))
+ begin
+ sock = TCPSocket.new(*hostport.split(':'))
+ rescue Errno::ECONNREFUSED
+ raise NetworkError
+ end
# FIXME: catch exceptions; do something smart
Util.send_request(sock, Util.pack_request(:set_client_id, @client_id))
@abilities.each {|f,a| announce_ability(sock, f, a.timeout) }
@@ -236,6 +277,8 @@ def work
type, data = Util.read_response(sock)
case type
when :noop
+ # FIXME: double-check this... can we really just blindly read
+ # when we get a noop without selecting first?
Util.log "Got noop"
next
when :no_job
View
54 test/mock_worker.rb
@@ -58,5 +58,59 @@ def test_complete
end
def test_multiple_servers
+ server1 = FakeJobServer.new(self)
+ server2 = FakeJobServer.new(self)
+ worker = nil
+ sock1, sock2 = nil
+
+ s1 = TestScript.new
+ s2 = TestScript.new
+ w = TestScript.new
+
+ server1_thread = Thread.new { s1.loop_forever }.run
+ server2_thread = Thread.new { s2.loop_forever }.run
+ worker_thread = Thread.new { w.loop_forever }.run
+
+ # Create a worker, which should connect to both servers.
+ w.exec { worker = Gearman::Worker.new(nil, nil, { :client_id => 'test' }) }
+ w.exec { worker.add_ability('foo') {|d,j| 'bar' } }
+ w.exec {
+ worker.job_servers =
+ [ "localhost:#{server1.port}", "localhost:#{server2.port}" ]
+ }
+ s1.exec { sock1 = server1.expect_connection }
+ s2.exec { sock2 = server2.expect_connection }
+ s1.wait
+ s2.wait
+
+ # It should register itself with both.
+ s1.exec { server1.expect_request(sock1, :set_client_id, 'test') }
+ s1.exec { server1.expect_request(sock1, :can_do, 'foo') }
+ s2.exec { server2.expect_request(sock2, :set_client_id, 'test') }
+ s2.exec { server2.expect_request(sock2, :can_do, 'foo') }
+
+ # It should try to get a job from both servers and then sleep.
+ w.exec { worker.work }
+ s1.exec { server1.expect_request(sock1, :grab_job) }
+ s1.exec { server1.send_response(sock1, :no_job) }
+ s2.exec { server2.expect_request(sock2, :grab_job) }
+ s2.exec { server2.send_response(sock2, :no_job) }
+ s1.exec { server1.expect_request(sock1, :pre_sleep) }
+ s2.exec { server2.expect_request(sock2, :pre_sleep) }
+
+ # If the second server wakes it up, it should again try to get a job
+ # and then do it.
+ s2.exec { server2.send_response(sock2, :noop) }
+ s1.exec { server1.expect_request(sock1, :grab_job) }
+ s1.exec { server1.send_response(sock1, :no_job) }
+ s2.exec { server2.expect_request(sock2, :grab_job) }
+ s2.exec { server2.send_response(sock2, :job_assign, "a\0foo\0") }
+ s2.exec { server2.expect_request(sock2, :work_complete, "a\0bar") }
+
+ # FIXME: We're not going through the servers in a guaranteed order, so
+ # s1 may never get a second grab_job. Need to figure out how to deal
+ # with this...
+ #s1.wait
+ s2.wait
end
end
Please sign in to comment.
Something went wrong with that request. Please try again.