Browse files

whaddya know, testing works! fix a silly bug in the uniq code.

add a bunch more client tests.


git-svn-id: http://code.sixapart.com/svn/gearman/trunk/api/ruby@281 011c6a6d-750f-0410-a5f6-93fdcd050bc4
  • Loading branch information...
1 parent 23e2bf7 commit ab83a860d958e5896fc1817f39d4a2e945e2ad2f derat committed May 12, 2007
Showing with 219 additions and 7 deletions.
  1. +4 −2 lib/gearman/client.rb
  2. +6 −4 lib/gearman/task.rb
  3. +1 −1 lib/gearman/taskset.rb
  4. +208 −0 test/mock_client.rb
View
6 lib/gearman/client.rb
@@ -20,9 +20,10 @@ def initialize(job_servers=nil, prefix=nil)
@prefix = prefix
@sockets = {} # "host:port" -> [sock1, sock2, ...]
@socket_to_hostport = {} # sock -> "host:port"
+ @test_hostport = nil # make get_job_server return a given host for testing
end
attr_reader :job_servers
- attr_accessor :prefix
+ attr_accessor :prefix, :test_hostport
##
# Set the job servers to be used by this client.
@@ -39,7 +40,8 @@ def job_servers=(servers)
#
# @return "host:port"
def get_job_server
- @job_servers[rand(@job_servers.size)]
+ # Return a specific server if one's been set.
+ @test_hostport or @job_servers[rand(@job_servers.size)]
end
##
View
10 lib/gearman/task.rb
@@ -27,6 +27,7 @@ def initialize(func, arg='', opts={})
@retry_count ||= 0
@successful = false
@retries_done = 0
+ @hash = nil
end
attr_accessor :uniq, :retry_count, :high_priority
attr_reader :successful
@@ -97,9 +98,10 @@ def handle_status(numerator, denominator)
#
# @return hashed value, based on @arg if @uniq is '-', on @uniq if it's
# set to something else, and just nil if @uniq is nil
- def get_hash_for_merging
- merge_on = @uniq and @uniq == '-' ? @arg : @uniq
- merge_on ? merge_on.hash : nil
+ def get_uniq_hash
+ return @hash if @hash
+ merge_on = (@uniq and @uniq == '-') ? @arg : @uniq
+ @hash = merge_on ? merge_on.hash.to_s : ''
end
##
@@ -111,7 +113,7 @@ def get_submit_packet(prefix=nil, background=false)
mode = 'submit_job' +
(background ? '_bg' : @high_priority ? '_high' : '')
func = (prefix ? prefix + "\t" : '') + @func
- Util::pack_request(mode, [func, (@uniq or ''), arg].join("\0"))
+ Util::pack_request(mode, [func, get_uniq_hash, arg].join("\0"))
end
attr_reader :func, :arg, :finished
View
2 lib/gearman/taskset.rb
@@ -31,7 +31,7 @@ def add_task(*args)
@tasks_waiting_for_handle << task
# FIXME: We need to loop here in case we get a bad job server, or the
# job creation fails (see how the server reports this to us), or ...
- merge_hash = task.get_hash_for_merging
+ merge_hash = task.get_uniq_hash
hostport = (@merge_hash_to_hostport[merge_hash] or @client.get_job_server)
@merge_hash_to_hostport[merge_hash] = hostport if merge_hash
sock = (@sockets[hostport] or @client.get_socket(hostport))
View
208 test/mock_client.rb
@@ -9,6 +9,8 @@
Thread.abort_on_exception = true
class TestClient < Test::Unit::TestCase
+ ##
+ # Do a simple test of the functionality of the client code.
def test_client
server = FakeJobServer.new(self)
client, task1, task2, taskset, sock, res1, res2 = nil
@@ -47,13 +49,50 @@ def test_client
c.exec { taskset.wait }
s.exec { server.send_response(sock, :work_complete, "b\00015") }
c.wait
+ s.wait
# Check that we got the right answers.
assert_equal(7, res1)
assert_equal(15, res2)
end
##
+ # Test Client#do_task.
+ def test_do_task
+ server = FakeJobServer.new(self)
+ client, sock, res = nil
+
+ s = TestScript.new
+ c = TestScript.new
+
+ server_thread = Thread.new { s.loop_forever }.run
+ client_thread = Thread.new { c.loop_forever }.run
+
+ c.exec { client = Gearman::Client.new("localhost:#{server.port}") }
+
+ c.exec { res = client.do_task('add', '5 2').to_i }
+ s.exec { sock = server.expect_connection }
+ s.wait
+
+ s.exec { server.expect_request(sock, :submit_job, "add\000\0005 2") }
+ s.exec { server.send_response(sock, :job_created, 'a') }
+ s.exec { server.send_response(sock, :work_complete, "a\0007") }
+ c.wait
+ s.wait
+
+ assert_equal(7, res)
+
+ c.exec { res = client.do_task('add', '1 2') }
+ s.exec { server.expect_request(sock, :submit_job, "add\000\0001 2") }
+ s.exec { server.send_response(sock, :job_created, 'a') }
+ s.exec { server.send_response(sock, :work_fail, 'a') }
+ c.wait
+ s.wait
+
+ assert_equal(nil, res)
+ end
+
+ ##
# Test that Gearman::Task's callback's get called when they should.
def test_callbacks
server = FakeJobServer.new(self)
@@ -145,4 +184,173 @@ def test_failure
assert_equal(true, fail2)
assert_equal(false, setres)
end
+
+ ##
+ # Test that user-supplied uniq values are handled correctly.
+ def test_uniq
+ server1 = FakeJobServer.new(self)
+ server2 = FakeJobServer.new(self)
+ client = nil
+ sock1, sock2 = nil
+ taskset = nil
+ task1, task2, task3, task4 = nil
+ res1, res2, res3, res4 = nil
+ hostport1 = "localhost:#{server1.port}"
+ hostport2 = "localhost:#{server2.port}"
+
+ s1 = TestScript.new
+ s2 = TestScript.new
+ c = TestScript.new
+
+ server1_thread = Thread.new { s1.loop_forever }.run
+ server2_thread = Thread.new { s2.loop_forever }.run
+ client_thread = Thread.new { c.loop_forever }.run
+
+ c.exec { client = Gearman::Client.new }
+ c.exec { client.job_servers = [hostport1, hostport2] }
+ c.exec { taskset = Gearman::TaskSet.new(client) }
+
+ # Submit a task with uniq key 'u' to the first server.
+ c.exec { client.test_hostport = hostport1 }
+ c.exec { task1 = Gearman::Task.new('func1', 'arg', { :uniq => 'u' }) }
+ c.exec { task1.on_complete {|d| res1 = d.to_i } }
+ c.exec { taskset.add_task(task1) }
+
+ s1.exec { sock1 = server1.expect_connection }
+ s1.wait
+
+ s1.exec { server1.expect_request(
+ sock1, :submit_job, "func1\000#{'u'.hash}\000arg") }
+ s1.exec { server1.send_response(sock1, :job_created, 'a') }
+
+ # If we submit a second task with the same key, it should get sent to
+ # the same server.
+ c.exec { client.test_hostport = hostport2 }
+ c.exec { task2 = Gearman::Task.new('func1', 'arg2', { :uniq => 'u' }) }
+ c.exec { task2.on_complete {|d| res2 = d.to_i } }
+ c.exec { taskset.add_task(task2) }
+
+ s1.exec { server1.expect_request(
+ sock1, :submit_job, "func1\000#{'u'.hash}\000arg2") }
+ s1.exec { server1.send_response(sock1, :job_created, 'a') }
+
+ # When we create a task with key 'a', it should go to the second
+ # server.
+ c.exec { task3 = Gearman::Task.new('func1', 'arg', { :uniq => 'a' }) }
+ c.exec { task3.on_complete {|d| res3 = d.to_i } }
+ c.exec { taskset.add_task(task3) }
+
+ s2.exec { sock2 = server2.expect_connection }
+ s2.wait
+
+ s2.exec { server2.expect_request(
+ sock2, :submit_job, "func1\000#{'a'.hash}\000arg") }
+ s2.exec { server2.send_response(sock2, :job_created, 'b') }
+
+ # If we tell the client to use the first server again and create
+ # another job with no uniq key, it should go back to the first server.
+ c.exec { client.test_hostport = hostport1 }
+ c.exec { task4 = Gearman::Task.new('func1', 'arg') }
+ c.exec { task4.on_complete {|d| res4 = d.to_i } }
+ c.exec { taskset.add_task(task4) }
+
+ s1.exec { server1.expect_request(
+ sock1, :submit_job, "func1\000\000arg") }
+ s1.exec { server1.send_response(sock1, :job_created, 'c') }
+
+ # Send back responses for all the handles we've handed out and make
+ # sure that we got what we expected.
+ c.exec { taskset.wait }
+ s1.exec { server1.send_response(sock1, :work_complete, "a\0001") }
+ s2.exec { server2.send_response(sock2, :work_complete, "b\0002") }
+ s1.exec { server1.send_response(sock1, :work_complete, "c\0003") }
+
+ c.wait
+ s1.wait
+ s2.wait
+
+ assert_equal(1, res1)
+ assert_equal(1, res2)
+ assert_equal(2, res3)
+ assert_equal(3, res4)
+
+ c.wait
+ s1.wait
+ s2.wait
+ end
+
+ ##
+ # Test that '-' uniq values work correctly.
+ def test_uniq_dash
+ server1 = FakeJobServer.new(self)
+ server2 = FakeJobServer.new(self)
+ client, taskset, sock1, sock2 = nil
+ task1, task2, task3 = nil
+ res1, res2, res3 = nil
+ hostport1 = "localhost:#{server1.port}"
+ hostport2 = "localhost:#{server2.port}"
+
+ s1 = TestScript.new
+ s2 = TestScript.new
+ c = TestScript.new
+
+ server1_thread = Thread.new { s1.loop_forever }.run
+ server2_thread = Thread.new { s2.loop_forever }.run
+ client_thread = Thread.new { c.loop_forever }.run
+
+ c.exec { client = Gearman::Client.new }
+ c.exec { client.job_servers = [hostport1, hostport2] }
+ c.exec { taskset = Gearman::TaskSet.new(client) }
+
+ # The first task uses uniq = '-' with the argument 'arg'.
+ c.exec { client.test_hostport = hostport1 }
+ c.exec { task1 = Gearman::Task.new('func1', 'arg', { :uniq => '-' }) }
+ c.exec { task1.on_complete {|d| res1 = d.to_i } }
+ c.exec { taskset.add_task(task1) }
+
+ s1.exec { sock1 = server1.expect_connection }
+ s1.wait
+
+ s1.exec { server1.expect_request(
+ sock1, :submit_job, "func1\000#{'arg'.hash}\000arg") }
+ s1.exec { server1.send_response(sock1, :job_created, 'a') }
+
+ # The second task uses the same arg, so it should be merged with the
+ # first by the server (and also be executed on the first server, even
+ # though we've changed the client to use the second by default).
+ c.exec { client.test_hostport = hostport2 }
+ c.exec { task2 = Gearman::Task.new('func1', 'arg', { :uniq => '-' }) }
+ c.exec { task2.on_complete {|d| res2 = d.to_i } }
+ c.exec { taskset.add_task(task2) }
+
+ s1.exec { server1.expect_request(
+ sock1, :submit_job, "func1\000#{'arg'.hash}\000arg") }
+ s1.exec { server1.send_response(sock1, :job_created, 'a') }
+
+ # The third task uses 'arg2', so it should not be merged and instead
+ # run on the second server.
+ c.exec { task3 = Gearman::Task.new('func1', 'arg2', { :uniq => '-' }) }
+ c.exec { task3.on_complete {|d| res3 = d.to_i } }
+ c.exec { taskset.add_task(task3) }
+
+ s2.exec { sock2 = server2.expect_connection }
+ s2.wait
+
+ s2.exec { server2.expect_request(
+ sock2, :submit_job, "func1\000#{'arg2'.hash}\000arg2") }
+ s2.exec { server2.send_response(sock2, :job_created, 'b') }
+
+ # Send back results for the two handles that we've handed out.
+ c.exec { taskset.wait }
+ s1.exec { server1.send_response(sock1, :work_complete, "a\0001") }
+ s2.exec { server2.send_response(sock2, :work_complete, "b\0002") }
+
+ c.wait
+ s1.wait
+ s2.wait
+
+ assert_equal(1, res1)
+ assert_equal(1, res2)
+ assert_equal(2, res3)
+ end
end

0 comments on commit ab83a86

Please sign in to comment.