Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

- add crazy but apparently-working test code for the worker lib

- let worker client ids be user-definable
- fix some random cheesiness in how nil args are handled
- use /usr/bin/env in shebangs


git-svn-id: http://code.sixapart.com/svn/gearman/trunk/api/ruby@272 011c6a6d-750f-0410-a5f6-93fdcd050bc4
  • Loading branch information...
commit a071108f18178911c7ec0bd28a043a217d17b28d 1 parent b57cf18
derat authored
View
2  lib/gearman.rb
@@ -1,4 +1,4 @@
-#!/usr/bin/ruby
+#!/usr/bin/env ruby
#
# = Name
# Gearman
View
2  lib/gearman/client.rb
@@ -1,4 +1,4 @@
-#!/usr/bin/ruby
+#!/usr/bin/env ruby
require 'socket'
View
2  lib/gearman/task.rb
@@ -1,4 +1,4 @@
-#!/usr/bin/ruby
+#!/usr/bin/env ruby
module Gearman
View
2  lib/gearman/taskset.rb
@@ -1,4 +1,4 @@
-#!/usr/bin/ruby
+#!/usr/bin/env ruby
require 'socket'
require 'time'
View
5 lib/gearman/util.rb
@@ -1,4 +1,4 @@
-#!/usr/bin/ruby
+#!/usr/bin/env ruby
require 'socket'
require 'time'
@@ -65,6 +65,7 @@ def Util.debug=(v)
def Util.pack_request(type_name, arg='')
type_num = NUMS[type_name.to_sym]
raise InvalidArgsError, "Invalid type name '#{type_name}'" unless type_num
+ arg = '' if not arg
"\0REQ" + [type_num, arg.size].pack('NN') + arg
end
@@ -96,7 +97,7 @@ def Util.read_response(sock, timeout=0)
head = sock.recv(12)
magic, type, len = head.unpack('a4NN')
raise ProtocolError, "Invalid magic '#{magic}'" unless magic == "\0RES"
- buf = (len > 0) ? sock.recv(len) : ''
+ buf = len > 0 ? sock.recv(len) : ''
type = COMMANDS[type]
raise ProtocolError, "Invalid packet type #{type}" unless type
[type, buf]
View
19 lib/gearman/worker.rb
@@ -1,4 +1,4 @@
-#!/usr/bin/ruby
+#!/usr/bin/env ruby
require 'set'
require 'socket'
@@ -90,13 +90,22 @@ def report_status(numerator, denominator)
#
# @param job_servers "host:port"; either a single server or an array
# @param prefix function name prefix (namespace)
- def initialize(job_servers=nil, prefix=nil)
+ # @param opts hash of additional options
+ def initialize(job_servers=nil, prefix=nil, opts={})
chars = ('a'..'z').to_a
- @id = Array.new(30) { chars[rand(chars.size)] }.join
+ @client_id = Array.new(30) { chars[rand(chars.size)] }.join
@sockets = {}
@abilities = {}
- self.job_servers = job_servers if job_servers
@prefix = prefix
+ %w{client_id}.map {|s| s.to_sym }.each do |k|
+ instance_variable_set "@#{k}", opts[k]
+ opts.delete k
+ end
+ if opts.size > 0
+ raise InvalidArgsError,
+ 'Invalid worker args: ' + opts.keys.sort.join(', ')
+ end
+ self.job_servers = job_servers if job_servers
end
##
@@ -127,7 +136,7 @@ def job_servers=(servers)
def connect(hostport)
sock = TCPSocket.new(*hostport.split(':'))
# FIXME: catch exceptions; do something smart
- Util.send_request(sock, Util.pack_request(:set_client_id, @id))
+ Util.send_request(sock, Util.pack_request(:set_client_id, @client_id))
@abilities.each {|f,a| announce_ability(sock, f, a.timeout) }
@sockets[hostport] = sock
end
View
127 test/worker.rb
@@ -0,0 +1,127 @@
+#!/usr/bin/env ruby
+
+$:.unshift('../lib')
+require 'gearman'
+require 'socket'
+require 'test/unit'
+require 'thread'
+
+Thread.abort_on_exception = true
+
+class FakeJobServer
+ def initialize(tester)
+ @tester = tester
+ @serv = TCPserver.open(0)
+ @port = @serv.addr[1]
+ end
+ attr_reader :port
+
+ def expect_connection
+ sock = @serv.accept
+ return sock
+ end
+
+ def expect_closed(sock)
+ @tester.assert_true(sock.closed?)
+ end
+
+ def expect_request(sock, exp_type, exp_data='')
+ head = sock.recv(12)
+ magic, type, len = head.unpack('a4NN')
+ @tester.assert_equal("\0REQ", magic)
+ @tester.assert_equal(Gearman::Util::NUMS[exp_type.to_sym], type)
+ data = len > 0 ? sock.recv(len) : ''
+ @tester.assert_equal(exp_data, data)
+ end
+
+ def send_response(sock, type, data='')
+ type_num = Gearman::Util::NUMS[type.to_sym]
+ response = "\0RES" + [type_num, data.size].pack('NN') + data
+ sock.write(response)
+ end
+end
+
+class TestScript
+ def initialize
+ @mutex = Mutex.new
+ @cv = ConditionVariable.new
+ @blocks = []
+ end
+
+ def loop_forever
+ loop do
+ f = nil
+ @mutex.synchronize do
+ @cv.wait(@mutex) if @blocks.empty?
+ f = @blocks.shift
+ end
+ f.call if f
+ @mutex.synchronize do
+ @cv.signal if @blocks.empty?
+ end
+ end
+ end
+
+ def exec(&f)
+ @mutex.synchronize do
+ @blocks << f
+ @cv.signal
+ end
+ end
+
+ def wait
+ @mutex.synchronize do
+ @cv.wait(@mutex) if not @blocks.empty?
+ end
+ end
+end
+
+class TestWorker < Test::Unit::TestCase
+ def test_worker
+ server = FakeJobServer.new(self)
+ worker = nil
+ sock = nil
+
+ s = TestScript.new
+ w = TestScript.new
+
+ server_thread = Thread.new { s.loop_forever }.run
+ worker_thread = Thread.new { w.loop_forever }.run
+
+ # Create a worker and wait for it to connect to us.
+ w.exec {
+ worker = Gearman::Worker.new(
+ "localhost:#{server.port}", nil, { :client_id => 'test' })
+ }
+ s.exec { sock = server.expect_connection }
+ s.wait
+
+ # After it connects, it should send its ID, and it should tell us its
+ # abilities when we report them.
+ s.exec { server.expect_request(sock, :set_client_id, 'test') }
+ w.exec { worker.add_ability('echo') {|d,j| j.report_status(1, 1); d } }
+ s.exec { server.expect_request(sock, :can_do, 'echo') }
+
+ # It should try to grab a job when we tell it to work.
+ w.exec { worker.work }
+ s.exec { server.expect_request(sock, :grab_job) }
+
+ # If we tell it there aren't any jobs, it should go to sleep.
+ s.exec { server.send_response(sock, :no_job) }
+ s.exec { server.expect_request(sock, :pre_sleep) }
+
+ # When we send it a noop, it should wake up and ask for a job again.
+ s.exec { server.send_response(sock, :noop) }
+ s.exec { server.expect_request(sock, :grab_job) }
+
+ # When we give it a job, it should do it.
+ s.exec { server.send_response(sock, :job_assign, "a\0echo\0foo") }
+ s.exec { server.expect_request(sock, :work_status, "a\0001\0001") }
+ s.exec { server.expect_request(sock, :work_complete, "a\0foo") }
+
+ # Test that functions are unregistered correctly.
+ s.exec { worker.remove_ability('echo') }
+ s.exec { server.expect_request(sock, :cant_do, 'echo') }
+ s.wait
+ end
+end
Please sign in to comment.
Something went wrong with that request. Please try again.