Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

check in old changes that i probably shouldn't check in because i just

found them lying around on my computer after almost a year and don't
remember making them (or much about any of this code in general).

the tests still pass and they appear to be good changes, though (adding
timeouts if we don't get back full responses from the server and making
task-adding synchronous).


git-svn-id: http://code.sixapart.com/svn/gearman/trunk/api/ruby@364 011c6a6d-750f-0410-a5f6-93fdcd050bc4
  • Loading branch information...
commit d8dc5f423eda4107273f127d8c65ed6f2a830b96 1 parent f993321
derat authored
View
3  lib/gearman/client.rb
@@ -21,9 +21,10 @@ def initialize(job_servers=nil, prefix=nil)
@sockets = {} # "host:port" -> [sock1, sock2, ...]
@socket_to_hostport = {} # sock -> "host:port"
@test_hostport = nil # make get_job_server return a given host for testing
+ @task_create_timeout_sec = 10
end
attr_reader :job_servers
- attr_accessor :prefix, :test_hostport
+ attr_accessor :prefix, :test_hostport, :task_create_timeout_sec
##
# Set the job servers to be used by this client.
View
15 lib/gearman/task.rb
@@ -30,7 +30,16 @@ def initialize(func, arg='', opts={})
@hash = nil
end
attr_accessor :uniq, :retry_count, :high_priority
- attr_reader :successful
+ attr_reader :successful, :func, :arg
+
+ ##
+ # Internal method to reset this task's state so it can be run again.
+ # Called by TaskSet#add_task.
+ def reset_state
+ @retries_done = 0
+ @successful = false
+ self
+ end
##
# Set a block of code to be executed when this task completes
@@ -82,7 +91,7 @@ def handle_failure
end
@retries_done += 1
@on_retry.call(@retries_done) if @on_retry
- return true
+ true
end
##
@@ -115,8 +124,6 @@ def get_submit_packet(prefix=nil, background=false)
func = (prefix ? prefix + "\t" : '') + @func
Util::pack_request(mode, [func, get_uniq_hash, arg].join("\0"))
end
-
- attr_reader :func, :arg, :finished
end
end
View
51 lib/gearman/taskset.rb
@@ -12,7 +12,7 @@ module Gearman
class TaskSet
def initialize(client)
@client = client
- @tasks_waiting_for_handle = []
+ @task_waiting_for_handle = nil
@tasks_in_progress = {} # "host:port//handle" -> [job1, job2, ...]
@finished_tasks = [] # tasks that have completed or failed
@sockets = {} # "host:port" -> Socket
@@ -26,9 +26,23 @@ def initialize(client)
# @return true if the task was created successfully, false otherwise
def add_task(*args)
task = Util::get_task_from_args(*args)
+ add_task_internal(task, true)
+ end
+
+ ##
+ # Internal function to add a task.
+ #
+ # @param task Task to add
+ # @param reset_state should we reset task state? true if we're adding a
+ # new task; false if we're rescheduling one that's
+ # failed
+ # @return true if the task was created successfully, false
+ # otherwise
+ def add_task_internal(task, reset_state=true)
+ task.reset_state if reset_state
req = task.get_submit_packet(@client.prefix)
- @tasks_waiting_for_handle << task
+ @task_waiting_for_handle = task
# FIXME: We need to loop here in case we get a bad job server, or the
# job creation fails (see how the server reports this to us), or ...
merge_hash = task.get_uniq_hash
@@ -37,11 +51,21 @@ def add_task(*args)
sock = (@sockets[hostport] or @client.get_socket(hostport))
Util.log "Using socket #{sock.inspect} for #{hostport}"
Util.send_request(sock, req)
- read_packet(sock) while @tasks_waiting_for_handle.size > 0
+ while @task_waiting_for_handle
+ begin
+ read_packet(sock, @client.task_create_timeout_sec)
+ rescue NetworkError
+ Util.log "Got timeout on read from #{hostport}"
+ @task_waiting_for_handle = nil
+ @client.close_socket(sock)
+ return false
+ end
+ end
@sockets[hostport] ||= sock
true
end
+ private :add_task_internal
##
# Handle a 'job_created' response from a job server.
@@ -50,12 +74,13 @@ def add_task(*args)
# @param data data returned in packet from server
def handle_job_created(hostport, data)
Util.log "Got job_created with handle #{data} from #{hostport}"
- if @tasks_waiting_for_handle.empty?
+ if not @task_waiting_for_handle
raise ProtocolError, "Got unexpected job_created notification " +
"with handle #{data} from #{hostport}"
end
js_handle = Util.handle_to_str(hostport, data)
- task = @tasks_waiting_for_handle.shift
+ task = @task_waiting_for_handle
+ @task_waiting_for_handle = nil
(@tasks_in_progress[js_handle] ||= []) << task
nil
end
@@ -99,7 +124,7 @@ def handle_work_fail(hostport, data)
end
tasks.each do |t|
if t.handle_failure
- add_task(t)
+ add_task_internal(t, false)
else
@finished_tasks << t
end
@@ -130,13 +155,13 @@ def handle_work_status(hostport, data)
# Read and process a packet from a socket.
#
# @param sock socket connected to a job server
- def read_packet(sock)
+ def read_packet(sock, timeout=nil)
hostport = @client.get_hostport_for_socket(sock)
if not hostport
raise RuntimeError, "Client doesn't know host/port for socket " +
sock.inspect
end
- type, data = Util.read_response(sock)
+ type, data = Util.read_response(sock, timeout)
case type
when :job_created
handle_job_created(hostport, data)
@@ -160,8 +185,9 @@ def read_packet(sock)
def wait(timeout=1)
end_time = Time.now.to_f + timeout
while not @tasks_in_progress.empty?
+ remaining = end_time - Time.now.to_f
ready_socks = IO::select(
- @sockets.values, nil, nil, end_time-Time.now.to_f)
+ @sockets.values, nil, nil, remaining > 0 ? remaining : 0)
if not ready_socks or not ready_socks[0]
Util.log "Timed out while waiting for tasks to finish"
# not sure what state the connections are in, so just be lame and
@@ -172,10 +198,13 @@ def wait(timeout=1)
end
ready_socks[0].each do |sock|
begin
- read_packet(sock)
+ read_packet(sock, end_time - Time.now.to_f)
rescue ProtocolError
hostport = @client.get_hostport_for_socket(sock)
- Util.err "Ignoring bad packet from #{hostport}"
+ Util.log "Ignoring bad packet from #{hostport}"
+ rescue NetworkError
+ hostport = @client.get_hostport_for_socket(sock)
+ Util.log "Got timeout on read from #{hostport}"
end
end
end
View
48 test/mock_client.rb
@@ -380,4 +380,52 @@ def test_nuls_in_data
assert_equal("1\0002\0003", res)
end
+
+ ##
+ # Test that clients time out when the server sends a partial packet and
+ # then hangs.
+ def test_read_timeouts
+ server = FakeJobServer.new(self)
+ client, sock, task, taskset, res = nil
+
+ s = TestScript.new
+ c = TestScript.new
+
+ server_thread = Thread.new { s.loop_forever }.run
+ client_thread = Thread.new { c.loop_forever }.run
+
+ c.exec { client = Gearman::Client.new("localhost:#{server.port}") }
+
+ # First, create a new task. The server claims to be sending back a
+ # packet with 1 byte of data, but actually sends an empty packet. The
+ # client should time out after 0.1 sec.
+ c.exec { taskset = Gearman::TaskSet.new(client) }
+ c.exec { task = Gearman::Task.new('foo', 'bar') }
+ c.exec { client.task_create_timeout_sec = 0.1 }
+ c.exec { res = taskset.add_task(task) }
+ s.exec { sock = server.expect_connection }
+ s.wait
+
+ s.exec { server.expect_request(sock, :submit_job, "foo\000\000bar") }
+ s.exec { server.send_response(sock, :job_created, '', 1) }
+ c.wait
+ s.wait
+
+ assert_equal(false, res)
+
+ # Now create a task, but only return a partial packet for
+ # work_complete. The client should again time out after 0.1 sec.
+ c.exec { res = taskset.add_task(task) }
+ s.exec { sock = server.expect_connection }
+ s.wait
+
+ s.exec { server.expect_request(sock, :submit_job, "foo\000\000bar") }
+ s.exec { server.send_response(sock, :job_created, 'a') }
+ c.exec { res = taskset.wait(0.1) }
+ s.exec { server.send_response(sock, :work_complete, "a\000", 3) }
+ c.wait
+ s.wait
+
+ assert_equal(false, res)
+ end
end
View
4 test/testlib.rb
@@ -37,10 +37,10 @@ def expect_request(sock, exp_type, exp_data='')
@tester.assert_equal(exp_data, data)
end
- def send_response(sock, type, data='')
+ def send_response(sock, type, data='', bogus_size=nil)
type_num = Gearman::Util::NUMS[type.to_sym]
raise RuntimeError, "Invalid type #{type}" if not type_num
- response = "\0RES" + [type_num, data.size].pack('NN') + data
+ response = "\0RES" + [type_num, (bogus_size or data.size)].pack('NN') + data
sock.write(response)
end
end
Please sign in to comment.
Something went wrong with that request. Please try again.