Permalink
Browse files

Merged / modified Colin's contribution for scheduled jobs, and an exa…

…mple of using EPOCH. This requires my modified gearmand that supports SUBMIT_JOB_EPOCH
  • Loading branch information...
2 parents b06aad5 + b0477bc commit 3960e19a9f86d78f30c4bc1030b06aa3a556774b @johnewart johnewart committed Jul 1, 2010
Showing with 72 additions and 15 deletions.
  1. +26 −0 examples/client_epoch.rb
  2. +18 −0 examples/worker_reverse_to_file.rb
  3. +26 −13 lib/gearman/task.rb
  4. +2 −2 lib/gearman/util.rb
View
@@ -0,0 +1,26 @@
+#!/usr/bin/env ruby
+require 'rubygems'
+require '../lib/gearman'
+#Gearman::Util.debug = true
+
+# Connect to the local server (at the default port 7003)
+client = Gearman::Client.new('localhost')
+taskset = Gearman::TaskSet.new(client)
+
+# Get something to echo
+puts '[client] Type a string to reverse:'
+input = gets.chomp
+
+puts '[client] File to write to:'
+outfile = gets.chomp
+
+# Set scheduled time to 90 seconds from now
+time = Time.now() + 30
+puts "Time as seconds: #{time.to_i}"
+data = [input, outfile].join("\0")
+task = Gearman::Task.new("reverse_to_file", data)
+task.schedule(time)
+
+# Sending the task to the server
+puts "[client] Sending task: #{task.inspect}, to the 'reverse_to_file' worker"
+taskset.add_task(task)
@@ -0,0 +1,18 @@
+require 'rubygems'
+#require 'gearman'
+require '../lib/gearman'
+
+servers = ['localhost:4730']
+w = Gearman::Worker.new(servers)
+
+# Add a handler for a "sleep" function that takes a single argument, the
+# number of seconds to sleep before reporting success.
+w.add_ability('reverse_to_file') do |data,job|
+ puts "Data: #{data.inspect}"
+ word, file = data.split("\0")
+ puts "Word: #{word}"
+ puts "File: #{file}"
+ # Report success.
+ true
+end
+loop { w.work }
View
@@ -30,9 +30,19 @@ def initialize(func, arg='', opts={})
@retries_done = 0
@hash = nil
end
- attr_accessor :uniq, :retry_count, :priority, :background
+
+ attr_accessor :uniq, :retry_count, :priority, :background, :epoch
attr_reader :successful, :func, :arg, :retries_done
+ ##
+ # Schedule this job to run at a certain time (like `cron`)
+ # XXX: But there is no wildcard??
+ #
+ # @param time Ruby Time object that represents when to run the thing
+ def schedule(time)
+ @scheduled_at = time
+ end
+
##
# Internal method to reset this task's state so it can be run again.
# Called by TaskSet#add_task.
@@ -169,20 +179,23 @@ def get_uniq_hash
#
# @return String representation of packet
def get_submit_packet()
- mode = 'submit_job'
- if(@priority)
- if(@priority == :high)
- mode += "_high"
- elsif(@priority == :low)
- mode += "_low"
+ modes = ['submit_job']
+
+ if @scheduled_at
+ modes << 'epoch'
+ args = [func, get_uniq_hash, @scheduled_at.to_i, arg]
+ else
+ if @priority
+ modes << 'high' if @priority == :high
+ modes << 'low' if @priority == :low
+ else
+ modes << 'bg' if @background
end
+ args = [func, get_uniq_hash, arg]
end
-
- if(@background)
- mode += "_bg"
- end
-
- Util::pack_request(mode, [func, get_uniq_hash, arg].join("\0"))
+
+ mode = modes.join('_')
+ Util::pack_request(mode, args.join("\0"))
end
end
View
@@ -51,14 +51,14 @@ class Util
33 => :submit_job_low, # C->J: FUNC[0]UNIQ[0]ARGS
34 => :submit_job_low_bg, # C->J: FUNC[0]UNIQ[0]ARGS
35 => :submit_job_sched, # REQ Client
- 36 => :submit_job_epoch # REQ Client
+ 36 => :submit_job_epoch # C->J: FUNC[0]UNIQ[0]EPOCH[0]ARGS
}
# Map e.g. 'can_do' => 1
NUMS = COMMANDS.invert
# Default job server port.
- DEFAULT_PORT = 7003
+ DEFAULT_PORT = 4730
def Util.logger=(logger)
@logger = logger

0 comments on commit 3960e19

Please sign in to comment.